Commit bdd9852d authored by Benjamin Lee's avatar Benjamin Lee 💬
Browse files

Resend important lost packets from the server.

parent db4c347a
......@@ -114,6 +114,7 @@ dependencies = [
"crossbeam 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
"ctrlc 3.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"either 1.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
"enum-kinds 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
"failure 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
"gfx-backend-vulkan 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"gfx-hal 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
......@@ -366,6 +367,15 @@ name = "either"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "enum-kinds"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"quote 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
"syn 0.12.15 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "failure"
version = "0.1.5"
......@@ -942,6 +952,14 @@ name = "pkg-config"
version = "0.3.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "proc-macro2"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"unicode-xid 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "proc-macro2"
version = "0.4.24"
......@@ -950,6 +968,14 @@ dependencies = [
"unicode-xid 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "quote"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"proc-macro2 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "quote"
version = "0.6.10"
......@@ -1286,6 +1312,16 @@ dependencies = [
"syn 0.15.25 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "syn"
version = "0.12.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"proc-macro2 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)",
"quote 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
"unicode-xid 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "syn"
version = "0.14.9"
......@@ -1672,6 +1708,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum dlib 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "77e51249a9d823a4cb79e3eca6dcd756153e8ed0157b6c04775d04bf1b13b76a"
"checksum downcast-rs 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "18df8ce4470c189d18aa926022da57544f31e154631eb4cfe796aea97051fe6c"
"checksum either 1.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3be565ca5c557d7f59e7cfcf1844f9e3033650c929c6566f511e8005f205c1d0"
"checksum enum-kinds 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d0f21c374dea848c19071b1504ca5ad03c9ad0d03d2e509e68f6623b8fcac4b5"
"checksum failure 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "795bd83d3abeb9220f257e597aa0080a508b27533824adf336529648f6abf7e2"
"checksum failure_derive 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "ea1063915fd7ef4309e222a5a07cf9c319fb9c7836b1f89b85458672dbb127e1"
"checksum foreign-types 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1"
......@@ -1737,7 +1774,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum phf_generator 0.7.24 (registry+https://github.com/rust-lang/crates.io-index)" = "09364cc93c159b8b06b1f4dd8a4398984503483891b0c26b867cf431fb132662"
"checksum phf_shared 0.7.24 (registry+https://github.com/rust-lang/crates.io-index)" = "234f71a15de2288bcb7e3b6515828d22af7ec8598ee6d24c3b526fa0a80b67a0"
"checksum pkg-config 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)" = "676e8eb2b1b4c9043511a9b7bea0915320d7e502b0a079fb03f9635a5252b18c"
"checksum proc-macro2 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "cd07deb3c6d1d9ff827999c7f9b04cdfd66b1b17ae508e14fe47b620f2282ae0"
"checksum proc-macro2 0.4.24 (registry+https://github.com/rust-lang/crates.io-index)" = "77619697826f31a02ae974457af0b29b723e5619e113e9397b8b82c6bd253f09"
"checksum quote 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "1eca14c727ad12702eb4b6bfb5a232287dcf8385cb8ca83a3eeaf6519c44c408"
"checksum quote 0.6.10 (registry+https://github.com/rust-lang/crates.io-index)" = "53fa22a1994bd0f9372d7a816207d8a2677ad0325b073f5c5332760f0fb62b5c"
"checksum rand 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)" = "e464cd887e869cddcae8792a4ee31d23c7edd516700695608f5b98c67ee0131c"
"checksum rand 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)" = "3906503e80ac6cbcacb2c2973fa8e473f24d7e2747c8c92bb230c2441cad96b5"
......@@ -1777,6 +1816,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum strsim 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "bb4f380125926a99e52bc279241539c018323fab05ad6368b56f93d9369ff550"
"checksum structopt 0.2.14 (registry+https://github.com/rust-lang/crates.io-index)" = "670ad348dc73012fcf78c71f06f9d942232cdd4c859d4b6975e27836c3efc0c3"
"checksum structopt-derive 0.2.14 (registry+https://github.com/rust-lang/crates.io-index)" = "ef98172b1a00b0bec738508d3726540edcbd186d50dfd326f2b1febbb3559f04"
"checksum syn 0.12.15 (registry+https://github.com/rust-lang/crates.io-index)" = "c97c05b8ebc34ddd6b967994d5c6e9852fa92f8b82b3858c39451f97346dcce5"
"checksum syn 0.14.9 (registry+https://github.com/rust-lang/crates.io-index)" = "261ae9ecaa397c42b960649561949d69311f08eeaea86a65696e6e46517cf741"
"checksum syn 0.15.25 (registry+https://github.com/rust-lang/crates.io-index)" = "71b7693d9626935a362a3d1d4e59380800a919ebfa478d77a4f49e2a6d2c3ad5"
"checksum synstructure 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)" = "73687139bf99285483c96ac0add482c3776528beac1d97d444f6e91f203a2015"
......
......@@ -37,6 +37,7 @@ ord_subset = "3.1.1"
nalgebra = { version = "0.16.12", features = ["serde-serialize"] }
either = "1.5.0"
crossbeam = "0.6.0"
enum-kinds = "0.4.1"
[build-dependencies]
shaderc = "0.3.12"
......
use crate::graphics::Circle;
use enum_kinds::EnumKind;
use nalgebra::{self, Point2, Vector2};
use palette::LinSrgb;
use serde::{Deserialize, Serialize};
......@@ -20,7 +21,8 @@ const BALL_START_SPEED: f32 = 1.0;
pub type PlayerId = u16;
/// Finite state machine for the round state.
#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
#[derive(Debug, Copy, Clone, Serialize, Deserialize, EnumKind)]
#[enum_kind(RoundStateKind)]
pub enum RoundState {
/// Less than two players, so nothing happens.
Lobby,
......
......@@ -44,7 +44,7 @@ pub struct Player {
#[derive(Clone, Debug, Default)]
pub struct Game {
pub players: HashMap<PlayerId, Player>,
round: RoundState,
pub round: RoundState,
next_id: PlayerId,
}
......
......@@ -5,7 +5,7 @@ use crate::game::{
};
use crate::networking::connection::{Connection, HEADER_BYTES};
use crate::networking::event_loop::{run_event_loop, EventHandler};
use crate::networking::server::{ServerHandshake, ServerPacket};
use crate::networking::server::ServerPacket;
use crate::networking::tick::Interval;
use crate::networking::{
Error,
......@@ -45,18 +45,16 @@ enum TimeoutState {
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum ClientPacket {
Handshake {
/// Cursor position when connecting.
cursor: Point2<f32>,
},
Input(Input),
Disconnect,
Ping,
Pong(u32),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ClientHandshake {
/// Cursor position when connecting.
pub cursor: Point2<f32>,
}
pub enum ClientState {
Connecting {
done: Sender<Result<Game, Error>>,
......@@ -288,7 +286,7 @@ impl Client {
};
// Send handshake
client.send(&ClientHandshake {
client.send(&ClientPacket::Handshake {
cursor,
})?;
......@@ -416,52 +414,60 @@ impl Client {
return Err(Error::PacketTooLarge(bytes_read));
}
let packet = &self.recv_buffer[0..bytes_read];
let (packet, sequence, _, lost) =
self.connection.decode(Cursor::new(packet))?;
self.stats.packets_lost += lost.len() as u16;
let transition = match self.state {
ClientState::Connecting {
ref mut done,
ref cursor,
} => {
// Assumed to be a handshake packet.
let (handshake, _, lost) = self
.connection
.decode::<_, ServerHandshake>(Cursor::new(packet))?;
self.stats.packets_lost += lost.len() as u16;
let (game, game_handle) = Game::new(
handshake.players,
handshake.snapshot,
handshake.id,
*cursor,
);
let tick = Interval::new(TICK_RATE);
let ping = Interval::new(PING_RATE);
// Start the timer for sending input ticks and pings.
self.timer.set_timeout(tick.interval(), TimeoutState::Tick);
self.timer.set_timeout(ping.interval(), TimeoutState::Ping);
// Signal the main thread that connection finished.
done.send(Ok(game)).unwrap();
info!("completed connection to server");
// Transition to connected state.
Some(ClientState::Connected {
game: game_handle,
tick,
ping,
rtt: RttEstimator::default(),
})
match packet {
ServerPacket::Handshake {
players,
snapshot,
id,
} => {
let (game, game_handle) =
Game::new(players, snapshot, id, *cursor);
let tick = Interval::new(TICK_RATE);
let ping = Interval::new(PING_RATE);
// Start the timer for sending input ticks and pings.
self.timer
.set_timeout(tick.interval(), TimeoutState::Tick);
self.timer
.set_timeout(ping.interval(), TimeoutState::Ping);
// Signal the main thread that connection finished.
done.send(Ok(game)).unwrap();
info!("completed connection to server");
// Transition to connected state.
Some(ClientState::Connected {
game: game_handle,
tick,
ping,
rtt: RttEstimator::default(),
})
},
// Ignore non-handshake packets
_ => {
warn!("received {:?} before handshake", packet);
None
},
}
},
ClientState::Connected {
ref mut game,
ref mut rtt,
..
} => {
let (packet, sequence, lost) =
self.connection.decode(Cursor::new(packet))?;
self.stats.packets_lost += lost.len() as u16;
match packet {
ServerPacket::Event(event) => game.event(event),
ServerPacket::Handshake {
..
} => warn!("received a second handshake packet"),
ServerPacket::Pong(sequence) => {
rtt.pong(sequence);
},
......@@ -481,7 +487,7 @@ impl Client {
Ok(())
}
fn send<P: Serialize>(&mut self, contents: &P) -> Result<u32, Error> {
fn send(&mut self, contents: &ClientPacket) -> Result<u32, Error> {
// Don't send any additional packets while shutting down.
if self.needs_shutdown {
return Err(Error::ShuttingDown);
......
......@@ -88,21 +88,23 @@ impl Acks {
impl Connection {
/// Processes the header of a received packet and returns it's
/// sequence number, as well as acknowledged packets.
/// sequence number, as well as acknowledged packets and lost
/// packets.
pub fn recv_header<B: Read>(
&mut self,
mut packet: B,
) -> Result<(u32, SmallVec<[u32; 4]>), Error> {
) -> Result<(u32, Acks, SmallVec<[u32; 4]>), Error> {
let sequence = packet.read_u32::<BE>().map_err(Error::header_read)?;
let ack = packet.read_u32::<BE>().map_err(Error::header_read)?;
let ack_bits = packet.read_u32::<BE>().map_err(Error::header_read)?;
self.acks.ack(sequence);
let lost = self.remote_acks.combine(Acks {
let acks = Acks {
ack_bits,
ack,
});
Ok((sequence, lost))
};
let lost = self.remote_acks.combine(acks);
Ok((sequence, acks, lost))
}
pub fn send_header<B: Write>(
......@@ -125,10 +127,10 @@ impl Connection {
pub fn decode<B: Read, P: DeserializeOwned>(
&mut self,
mut read: B,
) -> Result<(P, u32, SmallVec<[u32; 4]>), Error> {
let (sequence, lost) = self.recv_header(&mut read)?;
) -> Result<(P, u32, Acks, SmallVec<[u32; 4]>), Error> {
let (sequence, acks, lost) = self.recv_header(&mut read)?;
let packet =
bincode::deserialize_from(read).map_err(Error::deserialize)?;
Ok((packet, sequence, lost))
Ok((packet, sequence, acks, lost))
}
}
......@@ -4,10 +4,11 @@ use crate::game::{
Event,
GetPlayer,
PlayerId,
RoundStateKind,
Snapshot,
StaticPlayerState,
};
use crate::networking::client::{ClientHandshake, ClientPacket};
use crate::networking::client::ClientPacket;
use crate::networking::connection::{Connection, HEADER_BYTES};
use crate::networking::event_loop::{run_event_loop, EventHandler};
use crate::networking::tick::Interval;
......@@ -23,6 +24,7 @@ use log::{debug, error, info, trace, warn};
use mio::net::UdpSocket;
use mio::{self, Poll, PollOpt, Ready, Registration, SetReadiness, Token};
use mio_extras::timer::{self, Timeout, Timer};
use nalgebra::Point2;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use std::io;
......@@ -51,13 +53,11 @@ pub enum ServerPacket {
Event(Event),
Ping,
Pong(u32),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ServerHandshake {
pub id: PlayerId,
pub players: HashMap<PlayerId, StaticPlayerState>,
pub snapshot: Snapshot,
Handshake {
id: PlayerId,
players: HashMap<PlayerId, StaticPlayerState>,
snapshot: Snapshot,
},
}
struct Client {
......@@ -65,26 +65,15 @@ struct Client {
connection: Connection,
timeout: Timeout,
rtt: RttEstimator,
}
/// Contains a message and a list of clients to send the message to.
struct Packet {
/// Current client with the correct header.
pub client: SocketAddr,
/// Clients left to send to.
remaining: Vec<SocketAddr>,
/// The packet is assumed to have room reserved at the front for a
/// header. Every time a new client is sent to, these bytes get
/// overwritten with the new header. This means that the previous
/// `packet` stops being correct.
pub packet: Vec<u8>,
last_input: u32,
reliable: HashMap<u32, ServerPacket>,
}
pub struct Server {
socket: UdpSocket,
timer: Timer<TimeoutState>,
recv_buffer: [u8; MAX_PACKET_SIZE],
send_queue: VecDeque<Packet>,
send_queue: VecDeque<(SocketAddr, Vec<u8>)>,
clients: HashMap<SocketAddr, Client>,
game: Game,
send_tick: Interval,
......@@ -129,77 +118,59 @@ impl Drop for ServerHandle {
}
}
impl Packet {
/// Constructs a new packet for a single client.
pub fn single_client<P: Serialize>(
addr: SocketAddr,
client: &mut Client,
contents: &P,
) -> Result<(Packet, u32), Error> {
// Write header and contents
let size = bincode::serialized_size(contents)
.map_err(Error::serialize)? as usize;
let mut packet = Vec::with_capacity(size + HEADER_BYTES);
let sequence = client.connection.send_header(&mut packet)?;
bincode::serialize_into(&mut packet, contents)
.map_err(Error::serialize)?;
Ok((
Packet {
client: addr,
remaining: Vec::new(),
packet,
impl ServerPacket {
fn reliable(&self) -> bool {
match self {
ServerPacket::Event(event) => {
match event {
Event::NewPlayer {
..
} => true,
Event::RemovePlayer(_) => true,
Event::RoundState(_) => true,
Event::Snapshot(_) => false,
}
},
sequence,
))
ServerPacket::Handshake {
..
} => true,
ServerPacket::Ping => false,
ServerPacket::Pong(_) => false,
}
}
/// Poossibly constructs a new packet, but returns `None` if
/// `clients` is empty.
pub fn new<I: IntoIterator<Item = SocketAddr>, P: Serialize>(
clients: I,
contents: &P,
clients_state: &mut HashMap<SocketAddr, Client>,
) -> Result<Option<Packet>, Error> {
// Determine first client, to write the header for.
let mut clients = clients.into_iter();
let client = match clients.next() {
Some(client) => client,
None => return Ok(None),
};
// Write header and contents
let size = bincode::serialized_size(contents)
.map_err(Error::serialize)? as usize;
let mut packet = Vec::with_capacity(size + HEADER_BYTES);
let client_state = clients_state.get_mut(&client).unwrap();
client_state.connection.send_header(&mut packet)?;
bincode::serialize_into(&mut packet, contents)
.map_err(Error::serialize)?;
Ok(Some(Packet {
client,
remaining: clients.collect(),
packet,
}))
fn resend(&self, game: &Game) -> bool {
match self {
ServerPacket::Event(Event::RoundState(round)) => {
// Only resend if the round state hasn't
// changed again since it was sent.
RoundStateKind::from(round) == RoundStateKind::from(game.round)
},
// Everything else is simple.
_ => self.reliable(),
}
}
}
pub fn next_packet(
impl Client {
/// Encodes a packet and possibly saves it in the reliable packet
/// buffer.
///
/// Returns the sequence number.
fn encode(
&mut self,
clients_state: &mut HashMap<SocketAddr, Client>,
) -> Result<bool, Error> {
match self.remaining.pop() {
Some(client) => {
self.client = client;
// Write the new header.
let connection =
&mut clients_state.get_mut(&client).unwrap().connection;
let cursor = Cursor::new(&mut self.packet[..HEADER_BYTES]);
connection.send_header(cursor)?;
Ok(true)
},
None => Ok(false),
packet: &ServerPacket,
) -> Result<(Vec<u8>, u32), Error> {
let size = bincode::serialized_size(packet).map_err(Error::serialize)?
as usize;
let mut data = Vec::with_capacity(size + HEADER_BYTES);
let sequence = self.connection.send_header(&mut data)?;
bincode::serialize_into(&mut data, packet).map_err(Error::serialize)?;
if packet.reliable() {
self.reliable.insert(sequence, packet.clone());
}
Ok((data, sequence))
}
}
......@@ -315,44 +286,36 @@ impl Server {
}
fn socket_writable(&mut self) {
while let Some(packet) = self.send_queue.front_mut() {
match self.socket.send_to(&packet.packet, &packet.client) {
while let Some(&(ref addr, ref packet)) = self.send_queue.front() {
match self.socket.send_to(packet, addr) {
Err(err) => {
if err.kind() != io::ErrorKind::WouldBlock {
error!(
"error sending packet to {} ({}): {:?}",
&packet.client, err, &packet.packet
addr, err, packet
);
self.send_queue.pop_front();
}
break;
},
// Pretty sure this never happens?
Ok(bytes_written) => {
if bytes_written < packet.packet.len() {
if bytes_written < packet.len() {
error!(
"only wrote {} out of {} bytes for packet to {}: \
{:?}",
bytes_written,
packet.packet.len(),
&packet.client,
&packet.packet
packet.len(),
addr,
packet
)
}
},
}
// If it got here, the packet must have actually been sent.
match packet.next_packet(&mut self.clients) {
Ok(true) => {
// Continue as usual.
},
Ok(false) => {
self.send_queue.pop_front().unwrap();
},
Err(err) => {
error!("error writing header for packet: {}", err);
},
}
// Only pop after making sure it didn't return
// WouldBlock.
self.send_queue.pop_front();
}
if self.send_queue.is_empty() {
......@@ -380,23 +343,22 @@ impl Server {
self.timer.set_timeout(interval, TimeoutState::Ping);
for (&addr, client) in self.clients.iter_mut() {
let (packet, sequence) = match Packet::single_client(
addr,
client,
&ServerPacket::Ping,
) {
let (packet, sequence) = match client.encode(&ServerPacket::Ping) {
Ok(result) => result,
Err(err) => {
error!(
"error initializing ping packet for client {}: {}",
"error encoding ping packet for client {}: {}",
addr, err
);
continue;
},
};
self.send_queue.push_back(packet);
self.send_queue.push_back((addr, packet));
client.rtt.ping(sequence, now);
}
if let Err(err) = self.reregister_socket(true) {
error!("failed to reregister socket as writable: {}", err);
}
}
fn send_snapshot(&mut self) {
......@@ -431,7 +393,7 @@ impl Server {
&mut self,
addr: SocketAddr,
connection: Connection,
handshake: &ClientHandshake,
cursor: Point2<f32>,
) -> Result<(), Error> {
info!("new player from {}", addr);
......@@ -440,23 +402,21 @@ impl Server {
TimeoutState::LostConnection(addr),
);
let (player_id, events) =
self.game.add_player(clamp_cursor(handshake.cursor));
let (player_id, events) = self.game.add_player(clamp_cursor(cursor));
self.send_events(events)?;
// Now start processing this client.
self.clients.insert(
addr,
Client {
timeout,
player: player_id,
connection,
rtt: RttEstimator::default(),
},
);
let client = self.clients.entry(addr).or_insert(Client {
timeout,