Commit db4c347a authored by Benjamin Lee's avatar Benjamin Lee 💬
Browse files

Detect lost packets directly.

parent 768aba7b
......@@ -17,6 +17,10 @@ pub struct NetworkStats {
pub bytes_out: u32,
/// Number of bytes received since the last recorded stats.
pub bytes_in: u32,
/// Number of packets sent since the last recorded stats.
pub packets_sent: u16,
/// Number of packets marked lost since the last recorded stats.
pub packets_lost: u16,
/// Estimated round trip time.
pub rtt: f32,
}
......@@ -41,6 +45,7 @@ pub struct DebugState {
network_rx: Receiver<NetworkStats>,
bandwidth_in_history: [f32; NETWORK_HISTORY_LENGTH],
bandwidth_out_history: [f32; NETWORK_HISTORY_LENGTH],
packet_loss_history: [f32; NETWORK_HISTORY_LENGTH],
rtt_history: [f32; NETWORK_HISTORY_LENGTH],
frame_time_history: [f32; FRAME_TIME_HISTORY_LENGTH],
}
......@@ -56,6 +61,7 @@ impl Default for DebugState {
network_rx,
bandwidth_in_history: [0.0; NETWORK_HISTORY_LENGTH],
bandwidth_out_history: [0.0; NETWORK_HISTORY_LENGTH],
packet_loss_history: [0.0; NETWORK_HISTORY_LENGTH],
rtt_history: [0.0; NETWORK_HISTORY_LENGTH],
frame_time_history: [0.0; FRAME_TIME_HISTORY_LENGTH],
}
......@@ -80,14 +86,20 @@ impl DebugState {
// Copy elements to make room for new ones.
self.bandwidth_in_history.copy_within(size.., 0);
self.bandwidth_out_history.copy_within(size.., 0);
self.packet_loss_history.copy_within(size.., 0);
self.rtt_history.copy_within(size.., 0);
let start = NETWORK_HISTORY_LENGTH - size;
for (i, stats) in self.network_rx.try_iter().enumerate() {
let bandwidth_in = stats.bytes_in as f32 / NETWORK_STATS_RATE.as_float_secs() as f32;
let bandwidth_out = stats.bytes_out as f32 / NETWORK_STATS_RATE.as_float_secs() as f32;
let bandwidth_in = stats.bytes_in as f32 /
NETWORK_STATS_RATE.as_float_secs() as f32;
let bandwidth_out = stats.bytes_out as f32 /
NETWORK_STATS_RATE.as_float_secs() as f32;
let packet_loss =
stats.packets_lost as f32 / stats.packets_sent as f32;
// Convert to KB
self.bandwidth_in_history[start + i] = bandwidth_in / 1000.0;
self.bandwidth_out_history[start + i] = bandwidth_out / 1000.0;
self.packet_loss_history[start + i] = packet_loss * 100.0;
self.rtt_history[start + i] = stats.rtt * 1000.0;
}
}
......@@ -105,6 +117,7 @@ impl DebugState {
let bandwidth_in = *self.bandwidth_in_history.last().unwrap();
let bandwidth_out = *self.bandwidth_out_history.last().unwrap();
let rtt = *self.rtt_history.last().unwrap();
let packet_loss = *self.packet_loss_history.last().unwrap();
ui.plot_lines(
im_str!("Bandwidth in"),
......@@ -136,6 +149,15 @@ impl DebugState {
.overlay_text(&ImString::new(format!("{:.2} ms", rtt)))
.build();
ui.plot_lines(
im_str!("Packet loss"),
&self.packet_loss_history,
)
.scale_max(100.0)
.scale_min(0.0)
.overlay_text(&ImString::new(format!("{:.0} %", packet_loss)))
.build();
ui.checkbox(
im_str!("Draw latest snapshot"),
&mut self.draw_latest_snapshot,
......
......@@ -335,9 +335,10 @@ impl Client {
break;
}
},
// Pretty sure this never happens?
Ok(bytes_written) => {
self.stats.packets_sent += 1;
self.stats.bytes_out += bytes_written as u32;
// Pretty sure this never happens?
if bytes_written < packet.len() {
error!(
"Only wrote {} out of {} bytes for client packet: \
......@@ -422,9 +423,10 @@ impl Client {
ref cursor,
} => {
// Assumed to be a handshake packet.
let (handshake, ..) = self
let (handshake, _, lost) = self
.connection
.decode::<_, ServerHandshake>(Cursor::new(packet))?;
self.stats.packets_lost += lost.len() as u16;
let (game, game_handle) = Game::new(
handshake.players,
handshake.snapshot,
......@@ -455,8 +457,9 @@ impl Client {
ref mut rtt,
..
} => {
let (packet, sequence, _) =
let (packet, sequence, lost) =
self.connection.decode(Cursor::new(packet))?;
self.stats.packets_lost += lost.len() as u16;
match packet {
ServerPacket::Event(event) => game.event(event),
ServerPacket::Pong(sequence) => {
......
// TODO: implement detecting and resending lost messages
use crate::networking::Error;
use byteorder::{ReadBytesExt, WriteBytesExt, BE};
use serde::de::DeserializeOwned;
use smallvec::SmallVec;
use std::io::{Read, Write};
// 4 u32s
......@@ -21,6 +20,7 @@ pub struct Acks {
pub struct Connection {
pub local_sequence: u32,
pub acks: Acks,
pub remote_acks: Acks,
}
impl Acks {
......@@ -39,19 +39,45 @@ impl Acks {
}
}
/// Combines this with another set of acks, and returns the set of
/// packets that can now be considered lost.
pub fn combine(&mut self, new: Acks) -> SmallVec<[u32; 4]> {
let mut lost = SmallVec::new();
if new.ack > self.ack {
// Anything that is outside the range of the new ack can
// be considered lost.
let mask = !(!0 >> (new.ack - self.ack));
lost.extend(
Acks {
ack_bits: !self.ack_bits & mask,
ack: self.ack,
}
.iter(),
);
// Shift everything.
self.ack_bits <<= new.ack - self.ack;
self.ack_bits |= new.ack;
self.ack = new.ack;
} else if self.ack - new.ack <= 32 {
self.ack_bits |= new.ack << (self.ack - new.ack);
};
lost
}
/// Checks if a particular sequence number is present in this set
/// of acks.
pub fn contains(self, sequence: u32) -> bool {
if self.ack < sequence {
return false;
}
self.ack_bits | (1 << (self.ack - sequence)) != 0
self.ack_bits & (1 << (self.ack - sequence)) != 0
}
/// Returns an iterator over the acked packets.
pub fn iter(self) -> impl Iterator<Item = u32> {
(0..32).filter_map(move |offset| {
if self.ack_bits | (1 << offset) != 0 {
(0..32.min(self.ack)).filter_map(move |offset| {
if self.ack_bits & (1 << offset) != 0 {
Some(self.ack - offset)
} else {
None
......@@ -66,19 +92,17 @@ impl Connection {
pub fn recv_header<B: Read>(
&mut self,
mut packet: B,
) -> Result<(u32, Acks), Error> {
) -> Result<(u32, SmallVec<[u32; 4]>), Error> {
let sequence = packet.read_u32::<BE>().map_err(Error::header_read)?;
let ack = packet.read_u32::<BE>().map_err(Error::header_read)?;
let ack_bits = packet.read_u32::<BE>().map_err(Error::header_read)?;
self.acks.ack(sequence);
Ok((
sequence,
Acks {
ack_bits,
ack,
},
))
let lost = self.remote_acks.combine(Acks {
ack_bits,
ack,
});
Ok((sequence, lost))
}
pub fn send_header<B: Write>(
......@@ -96,14 +120,15 @@ impl Connection {
}
/// Reads the header of a packet, and then deserializes the
/// contents with serde.
/// contents with serde. Returns the sequence numbers of packets
/// that are now considered lost.
pub fn decode<B: Read, P: DeserializeOwned>(
&mut self,
mut read: B,
) -> Result<(P, u32, Acks), Error> {
let (sequence, acks) = self.recv_header(&mut read)?;
) -> Result<(P, u32, SmallVec<[u32; 4]>), Error> {
let (sequence, lost) = self.recv_header(&mut read)?;
let packet =
bincode::deserialize_from(&mut read).map_err(Error::deserialize)?;
Ok((packet, sequence, acks))
bincode::deserialize_from(read).map_err(Error::deserialize)?;
Ok((packet, sequence, lost))
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment