File udp_proto.cpp
File List > GGPOUE4 > Private > network > udp_proto.cpp
Go to the documentation of this file
/* -----------------------------------------------------------------------
* GGPO.net (http://ggpo.net) - Copyright 2009 GroundStorm Studios, LLC.
*
* Use of this software is governed by the MIT license that can be found
* in the LICENSE file.
*/
#include "udp_proto.h"
#include "types.h"
#include "bitvector.h"
static const int UDP_HEADER_SIZE = 28; /* Size of IP + UDP headers */
static const int NUM_SYNC_PACKETS = 5;
static const int SYNC_RETRY_INTERVAL = 2000;
static const int SYNC_FIRST_RETRY_INTERVAL = 500;
static const int RUNNING_RETRY_INTERVAL = 200;
static const int KEEP_ALIVE_INTERVAL = 200;
static const int QUALITY_REPORT_INTERVAL = 1000;
static const int NETWORK_STATS_INTERVAL = 1000;
static const int UDP_SHUTDOWN_TIMER = 5000;
static const int MAX_SEQ_DISTANCE = (1 << 15);
UdpProtocol::UdpProtocol() :
_udp(NULL),
_magic_number(0),
_queue(-1),
_remote_magic_number(0),
_connected(false),
_packets_sent(0),
_bytes_sent(0),
_stats_start_time(0),
_local_frame_advantage(0),
_remote_frame_advantage(0),
_last_send_time(0),
_shutdown_timeout(0),
_disconnect_event_sent(false),
_disconnect_timeout(0),
_disconnect_notify_start(0),
_disconnect_notify_sent(false),
_next_send_seq(0),
_next_recv_seq(0)
{
_last_sent_input.init(-1, NULL, 1);
_last_received_input.init(-1, NULL, 1);
_last_acked_input.init(-1, NULL, 1);
memset(&_state, 0, sizeof _state);
memset(_peer_connect_status, 0, sizeof(_peer_connect_status));
for (int i = 0; i < ARRAY_SIZE(_peer_connect_status); i++) {
_peer_connect_status[i].last_frame = -1;
}
_oo_packet.msg = NULL;
_send_latency = 2;
_oop_percent = 0;
}
UdpProtocol::~UdpProtocol()
{
ClearSendQueue();
}
void
UdpProtocol::Init(Udp *udp,
Poll &poll,
int queue,
int connection_id,
UdpMsg::connect_status *status)
{
_udp = udp;
_queue = queue;
_local_connect_status = status;
_connection_id = connection_id;
do {
_magic_number = (uint16)rand();
} while (_magic_number == 0);
poll.RegisterLoop(this);
}
void
UdpProtocol::SendInput(GameInput &input)
{
if (_udp) {
if (_current_state == Running) {
/*
* Check to see if this is a good time to adjust for the rift...
*/
_timesync.advance_frame(input, _local_frame_advantage, _remote_frame_advantage);
/*
* Save this input packet
*
* XXX: This queue may fill up for spectators who do not ack input packets in a timely
* manner. When this happens, we can either resize the queue (ug) or disconnect them
* (better, but still ug). For the meantime, make this queue really big to decrease
* the odds of this happening...
*/
_pending_output.push(input);
}
SendPendingOutput();
}
}
void
UdpProtocol::SendPendingOutput()
{
UdpMsg *msg = new UdpMsg(UdpMsg::Input);
int i, j, offset = 0;
uint8 *bits;
GameInput last;
if (_pending_output.size()) {
last = _last_acked_input;
bits = msg->u.input.bits;
msg->u.input.start_frame = _pending_output.front().frame;
msg->u.input.input_size = (uint8)_pending_output.front().size;
ASSERT(last.frame == -1 || last.frame + 1 == msg->u.input.start_frame);
for (j = 0; j < _pending_output.size(); j++) {
GameInput ¤t = _pending_output.item(j);
if (memcmp(current.bits, last.bits, current.size) != 0) {
ASSERT((GAMEINPUT_MAX_BYTES * GAMEINPUT_MAX_PLAYERS * 8) < (1 << BITVECTOR_NIBBLE_SIZE));
for (i = 0; i < current.size * 8; i++) {
ASSERT(i < (1 << BITVECTOR_NIBBLE_SIZE));
if (current.value(i) != last.value(i)) {
BitVector_SetBit(msg->u.input.bits, &offset);
(current.value(i) ? BitVector_SetBit : BitVector_ClearBit)(bits, &offset);
BitVector_WriteNibblet(bits, i, &offset);
}
}
}
BitVector_ClearBit(msg->u.input.bits, &offset);
last = _last_sent_input = current;
}
} else {
msg->u.input.start_frame = 0;
msg->u.input.input_size = 0;
}
msg->u.input.ack_frame = _last_received_input.frame;
msg->u.input.num_bits = (uint16)offset;
msg->u.input.disconnect_requested = _current_state == Disconnected;
if (_local_connect_status) {
memcpy(msg->u.input.peer_connect_status, _local_connect_status, sizeof(UdpMsg::connect_status) * UDP_MSG_MAX_PLAYERS);
} else {
memset(msg->u.input.peer_connect_status, 0, sizeof(UdpMsg::connect_status) * UDP_MSG_MAX_PLAYERS);
}
ASSERT(offset < MAX_COMPRESSED_BITS);
SendMsg(msg);
}
void
UdpProtocol::SendInputAck()
{
UdpMsg *msg = new UdpMsg(UdpMsg::InputAck);
msg->u.input_ack.ack_frame = _last_received_input.frame;
SendMsg(msg);
}
bool
UdpProtocol::GetEvent(UdpProtocol::Event &e)
{
if (_event_queue.size() == 0) {
return false;
}
e = _event_queue.front();
_event_queue.pop();
return true;
}
bool
UdpProtocol::OnLoopPoll(void *cookie)
{
if (!_udp) {
return true;
}
unsigned int now = Platform::GetCurrentTimeMS();
unsigned int next_interval;
PumpSendQueue();
switch (_current_state) {
case Syncing:
next_interval = (_state.sync.roundtrips_remaining == NUM_SYNC_PACKETS) ? SYNC_FIRST_RETRY_INTERVAL : SYNC_RETRY_INTERVAL;
if (_last_send_time && _last_send_time + next_interval < now) {
Log("No luck syncing after %d ms... Re-queueing sync packet.\n", next_interval);
SendSyncRequest();
}
break;
case Running:
// xxx: rig all this up with a timer wrapper
if (!_state.running.last_input_packet_recv_time || _state.running.last_input_packet_recv_time + RUNNING_RETRY_INTERVAL < now) {
Log("Haven't exchanged packets in a while (last received:%d last sent:%d). Resending.\n", _last_received_input.frame, _last_sent_input.frame);
SendPendingOutput();
_state.running.last_input_packet_recv_time = now;
}
if (!_state.running.last_quality_report_time || _state.running.last_quality_report_time + QUALITY_REPORT_INTERVAL < now) {
UdpMsg *msg = new UdpMsg(UdpMsg::QualityReport);
msg->u.quality_report.ping = Platform::GetCurrentTimeMS();
msg->u.quality_report.frame_advantage = (uint8)_local_frame_advantage;
SendMsg(msg);
_state.running.last_quality_report_time = now;
}
if (!_state.running.last_network_stats_interval || _state.running.last_network_stats_interval + NETWORK_STATS_INTERVAL < now) {
UpdateNetworkStats();
_state.running.last_network_stats_interval = now;
}
if (_last_send_time && _last_send_time + KEEP_ALIVE_INTERVAL < now) {
Log("Sending keep alive packet\n");
SendMsg(new UdpMsg(UdpMsg::KeepAlive));
}
if (_disconnect_timeout && _disconnect_notify_start &&
!_disconnect_notify_sent && (_last_recv_time + _disconnect_notify_start < now)) {
Log("Endpoint has stopped receiving packets for %d ms. Sending notification.\n", _disconnect_notify_start);
Event e(Event::NetworkInterrupted);
e.u.network_interrupted.disconnect_timeout = _disconnect_timeout - _disconnect_notify_start;
QueueEvent(e);
_disconnect_notify_sent = true;
}
if (_disconnect_timeout && (_last_recv_time + _disconnect_timeout < now)) {
if (!_disconnect_event_sent) {
Log("Endpoint has stopped receiving packets for %d ms. Disconnecting.\n", _disconnect_timeout);
QueueEvent(Event(Event::Disconnected));
_disconnect_event_sent = true;
}
}
break;
case Disconnected:
if (_shutdown_timeout < now) {
Log("Shutting down udp connection.\n");
_udp = NULL;
_shutdown_timeout = 0;
}
}
return true;
}
void
UdpProtocol::Disconnect()
{
_current_state = Disconnected;
_shutdown_timeout = Platform::GetCurrentTimeMS() + UDP_SHUTDOWN_TIMER;
}
void
UdpProtocol::SendSyncRequest()
{
_state.sync.random = rand() & 0xFFFF;
UdpMsg *msg = new UdpMsg(UdpMsg::SyncRequest);
msg->u.sync_request.random_request = _state.sync.random;
SendMsg(msg);
}
void
UdpProtocol::SendMsg(UdpMsg *msg)
{
LogMsg("send", msg);
_packets_sent++;
_last_send_time = Platform::GetCurrentTimeMS();
_bytes_sent += msg->PacketSize();
msg->hdr.magic = _magic_number;
msg->hdr.sequence_number = _next_send_seq++;
_send_queue.push(QueueEntry(Platform::GetCurrentTimeMS(), _connection_id, msg));
PumpSendQueue();
}
bool
UdpProtocol::HandlesMsg(int connection_id,
UdpMsg *msg)
{
if (!_udp) {
return false;
}
return _connection_id == connection_id;
}
void
UdpProtocol::OnMsg(UdpMsg *msg, int len)
{
bool handled = false;
typedef bool (UdpProtocol::*DispatchFn)(UdpMsg *msg, int len);
static const DispatchFn table[] = {
&UdpProtocol::OnInvalid, /* Invalid */
&UdpProtocol::OnSyncRequest, /* SyncRequest */
&UdpProtocol::OnSyncReply, /* SyncReply */
&UdpProtocol::OnInput, /* Input */
&UdpProtocol::OnQualityReport, /* QualityReport */
&UdpProtocol::OnQualityReply, /* QualityReply */
&UdpProtocol::OnKeepAlive, /* KeepAlive */
&UdpProtocol::OnInputAck, /* InputAck */
};
// filter out messages that don't match what we expect
uint16 seq = msg->hdr.sequence_number;
if (msg->hdr.type != UdpMsg::SyncRequest &&
msg->hdr.type != UdpMsg::SyncReply) {
if (msg->hdr.magic != _remote_magic_number) {
LogMsg("recv rejecting", msg);
return;
}
// filter out out-of-order packets
uint16 skipped = (uint16)((int)seq - (int)_next_recv_seq);
// Log("checking sequence number -> next - seq : %d - %d = %d\n", seq, _next_recv_seq, skipped);
if (skipped > MAX_SEQ_DISTANCE) {
Log("dropping out of order packet (seq: %d, last seq:%d)\n", seq, _next_recv_seq);
return;
}
}
_next_recv_seq = seq;
LogMsg("recv", msg);
if (msg->hdr.type >= ARRAY_SIZE(table)) {
OnInvalid(msg, len);
} else {
handled = (this->*(table[msg->hdr.type]))(msg, len);
}
if (handled) {
_last_recv_time = Platform::GetCurrentTimeMS();
if (_disconnect_notify_sent && _current_state == Running) {
QueueEvent(Event(Event::NetworkResumed));
_disconnect_notify_sent = false;
}
}
}
void
UdpProtocol::UpdateNetworkStats(void)
{
int now = Platform::GetCurrentTimeMS();
if (_stats_start_time == 0) {
_stats_start_time = now;
}
int total_bytes_sent = _bytes_sent + (UDP_HEADER_SIZE * _packets_sent);
float seconds = (float)((now - _stats_start_time) / 1000.0);
float Bps = total_bytes_sent / seconds;
float udp_overhead = (float)(100.0 * (UDP_HEADER_SIZE * _packets_sent) / _bytes_sent);
_kbps_sent = int(Bps / 1024);
Log("Network Stats -- Bandwidth: %.2f KBps Packets Sent: %5d (%.2f pps) "
"KB Sent: %.2f UDP Overhead: %.2f %%.\n",
_kbps_sent,
_packets_sent,
(float)_packets_sent * 1000 / (now - _stats_start_time),
total_bytes_sent / 1024.0,
udp_overhead);
}
void
UdpProtocol::QueueEvent(const UdpProtocol::Event &evt)
{
LogEvent("Queuing event", evt);
_event_queue.push(evt);
}
void
UdpProtocol::Synchronize()
{
if (_udp) {
_current_state = Syncing;
_state.sync.roundtrips_remaining = NUM_SYNC_PACKETS;
SendSyncRequest();
}
}
bool
UdpProtocol::GetPeerConnectStatus(int id, int *frame)
{
*frame = _peer_connect_status[id].last_frame;
return !_peer_connect_status[id].disconnected;
}
void
UdpProtocol::Log(const char *fmt, ...)
{
char buf[1024];
size_t offset;
va_list args;
sprintf(buf, "udpproto%d | ", _queue);
offset = strlen(buf);
va_start(args, fmt);
vsnprintf(buf + offset, ARRAY_SIZE(buf) - offset - 1, fmt, args);
buf[ARRAY_SIZE(buf)-1] = '\0';
::Log(buf);
va_end(args);
}
void
UdpProtocol::LogMsg(const char *prefix, UdpMsg *msg)
{
switch (msg->hdr.type) {
case UdpMsg::SyncRequest:
Log("%s sync-request (%d).\n", prefix,
msg->u.sync_request.random_request);
break;
case UdpMsg::SyncReply:
Log("%s sync-reply (%d).\n", prefix,
msg->u.sync_reply.random_reply);
break;
case UdpMsg::QualityReport:
Log("%s quality report.\n", prefix);
break;
case UdpMsg::QualityReply:
Log("%s quality reply.\n", prefix);
break;
case UdpMsg::KeepAlive:
Log("%s keep alive.\n", prefix);
break;
case UdpMsg::Input:
Log("%s game-compressed-input %d (+ %d bits).\n", prefix, msg->u.input.start_frame, msg->u.input.num_bits);
break;
case UdpMsg::InputAck:
Log("%s input ack.\n", prefix);
break;
default:
ASSERT(false && "Unknown UdpMsg type.");
}
}
void
UdpProtocol::LogEvent(const char *prefix, const UdpProtocol::Event &evt)
{
switch (evt.type) {
case UdpProtocol::Event::Synchronzied:
Log("%s (event: Synchronzied).\n", prefix);
break;
}
}
bool
UdpProtocol::OnInvalid(UdpMsg *msg, int len)
{
ASSERT(false && "Invalid msg in UdpProtocol");
return false;
}
bool
UdpProtocol::OnSyncRequest(UdpMsg *msg, int len)
{
if (_remote_magic_number != 0 && msg->hdr.magic != _remote_magic_number) {
Log("Ignoring sync request from unknown endpoint (%d != %d).\n",
msg->hdr.magic, _remote_magic_number);
return false;
}
UdpMsg *reply = new UdpMsg(UdpMsg::SyncReply);
reply->u.sync_reply.random_reply = msg->u.sync_request.random_request;
SendMsg(reply);
return true;
}
bool
UdpProtocol::OnSyncReply(UdpMsg *msg, int len)
{
if (_current_state != Syncing) {
Log("Ignoring SyncReply while not synching.\n");
return msg->hdr.magic == _remote_magic_number;
}
if (msg->u.sync_reply.random_reply != _state.sync.random) {
Log("sync reply %d != %d. Keep looking...\n",
msg->u.sync_reply.random_reply, _state.sync.random);
return false;
}
if (!_connected) {
QueueEvent(Event(Event::Connected));
_connected = true;
}
Log("Checking sync state (%d round trips remaining).\n", _state.sync.roundtrips_remaining);
if (--_state.sync.roundtrips_remaining == 0) {
Log("Synchronized!\n");
QueueEvent(UdpProtocol::Event(UdpProtocol::Event::Synchronzied));
_current_state = Running;
_last_received_input.frame = -1;
_remote_magic_number = msg->hdr.magic;
} else {
UdpProtocol::Event evt(UdpProtocol::Event::Synchronizing);
evt.u.synchronizing.total = NUM_SYNC_PACKETS;
evt.u.synchronizing.count = NUM_SYNC_PACKETS - _state.sync.roundtrips_remaining;
QueueEvent(evt);
SendSyncRequest();
}
return true;
}
bool
UdpProtocol::OnInput(UdpMsg *msg, int len)
{
/*
* If a disconnect is requested, go ahead and disconnect now.
*/
bool disconnect_requested = msg->u.input.disconnect_requested;
if (disconnect_requested) {
if (_current_state != Disconnected && !_disconnect_event_sent) {
Log("Disconnecting endpoint on remote request.\n");
QueueEvent(Event(Event::Disconnected));
_disconnect_event_sent = true;
}
} else {
/*
* Update the peer connection status if this peer is still considered to be part
* of the network.
*/
UdpMsg::connect_status* remote_status = msg->u.input.peer_connect_status;
for (int i = 0; i < ARRAY_SIZE(_peer_connect_status); i++) {
ASSERT(remote_status[i].last_frame >= _peer_connect_status[i].last_frame);
_peer_connect_status[i].disconnected = _peer_connect_status[i].disconnected || remote_status[i].disconnected;
_peer_connect_status[i].last_frame = MAX(_peer_connect_status[i].last_frame, remote_status[i].last_frame);
}
}
/*
* Decompress the input.
*/
int last_received_frame_number = _last_received_input.frame;
if (msg->u.input.num_bits) {
int offset = 0;
uint8 *bits = (uint8 *)msg->u.input.bits;
int numBits = msg->u.input.num_bits;
int currentFrame = msg->u.input.start_frame;
_last_received_input.size = msg->u.input.input_size;
if (_last_received_input.frame < 0) {
_last_received_input.frame = msg->u.input.start_frame - 1;
}
while (offset < numBits) {
/*
* Keep walking through the frames (parsing bits) until we reach
* the inputs for the frame right after the one we're on.
*/
ASSERT(currentFrame <= (_last_received_input.frame + 1));
bool useInputs = currentFrame == _last_received_input.frame + 1;
while (BitVector_ReadBit(bits, &offset)) {
int on = BitVector_ReadBit(bits, &offset);
int button = BitVector_ReadNibblet(bits, &offset);
if (useInputs) {
if (on) {
_last_received_input.set(button);
} else {
_last_received_input.clear(button);
}
}
}
ASSERT(offset <= numBits);
/*
* Now if we want to use these inputs, go ahead and send them to
* the emulator.
*/
if (useInputs) {
/*
* Move forward 1 frame in the stream.
*/
char desc[1024];
ASSERT(currentFrame == _last_received_input.frame + 1);
_last_received_input.frame = currentFrame;
/*
* Send the event to the emualtor
*/
UdpProtocol::Event evt(UdpProtocol::Event::Input);
evt.u.input.input = _last_received_input;
_last_received_input.desc(desc, ARRAY_SIZE(desc));
_state.running.last_input_packet_recv_time = Platform::GetCurrentTimeMS();
Log("Sending frame %d to emu queue %d (%s).\n", _last_received_input.frame, _queue, desc);
QueueEvent(evt);
} else {
Log("Skipping past frame:(%d) current is %d.\n", currentFrame, _last_received_input.frame);
}
/*
* Move forward 1 frame in the input stream.
*/
currentFrame++;
}
}
ASSERT(_last_received_input.frame >= last_received_frame_number);
/*
* Get rid of our buffered input
*/
while (_pending_output.size() && _pending_output.front().frame < msg->u.input.ack_frame) {
Log("Throwing away pending output frame %d\n", _pending_output.front().frame);
_last_acked_input = _pending_output.front();
_pending_output.pop();
}
return true;
}
bool
UdpProtocol::OnInputAck(UdpMsg *msg, int len)
{
/*
* Get rid of our buffered input
*/
while (_pending_output.size() && _pending_output.front().frame < msg->u.input_ack.ack_frame) {
Log("Throwing away pending output frame %d\n", _pending_output.front().frame);
_last_acked_input = _pending_output.front();
_pending_output.pop();
}
return true;
}
bool
UdpProtocol::OnQualityReport(UdpMsg *msg, int len)
{
// send a reply so the other side can compute the round trip transmit time.
UdpMsg *reply = new UdpMsg(UdpMsg::QualityReply);
reply->u.quality_reply.pong = msg->u.quality_report.ping;
SendMsg(reply);
_remote_frame_advantage = msg->u.quality_report.frame_advantage;
return true;
}
bool
UdpProtocol::OnQualityReply(UdpMsg *msg, int len)
{
_round_trip_time = Platform::GetCurrentTimeMS() - msg->u.quality_reply.pong;
return true;
}
bool
UdpProtocol::OnKeepAlive(UdpMsg *msg, int len)
{
return true;
}
void
UdpProtocol::GetNetworkStats(struct FGGPONetworkStats *s)
{
s->network.ping = _round_trip_time;
s->network.send_queue_len = _pending_output.size();
s->network.kbps_sent = _kbps_sent;
s->timesync.remote_frames_behind = _remote_frame_advantage;
s->timesync.local_frames_behind = _local_frame_advantage;
}
void
UdpProtocol::SetLocalFrameNumber(int localFrame)
{
/*
* Estimate which frame the other guy is one by looking at the
* last frame they gave us plus some delta for the one-way packet
* trip time.
*/
int remoteFrame = _last_received_input.frame + (_round_trip_time * 60 / 1000);
/*
* Our frame advantage is how many frames *behind* the other guy
* we are. Counter-intuative, I know. It's an advantage because
* it means they'll have to predict more often and our moves will
* pop more frequenetly.
*/
_local_frame_advantage = remoteFrame - localFrame;
}
int
UdpProtocol::RecommendFrameDelay()
{
// XXX: require idle input should be a configuration parameter
return _timesync.recommend_frame_wait_duration(false);
}
void
UdpProtocol::SetDisconnectTimeout(int timeout)
{
_disconnect_timeout = timeout;
}
void
UdpProtocol::SetDisconnectNotifyStart(int timeout)
{
_disconnect_notify_start = timeout;
}
void
UdpProtocol::PumpSendQueue()
{
while (!_send_queue.empty()) {
QueueEntry &entry = _send_queue.front();
if (_send_latency) {
// should really come up with a gaussian distributation based on the configured
// value, but this will do for now.
int jitter = (_send_latency * 2 / 3) + ((rand() % _send_latency) / 3);
if (Platform::GetCurrentTimeMS() < _send_queue.front().queue_time + jitter) {
break;
}
}
if (_oop_percent && !_oo_packet.msg && ((rand() % 100) < _oop_percent)) {
int delay = rand() % (_send_latency * 10 + 1000);
Log("creating rogue oop (seq: %d delay: %d)\n", entry.msg->hdr.sequence_number, delay);
_oo_packet.send_time = Platform::GetCurrentTimeMS() + delay;
_oo_packet.msg = entry.msg;
_oo_packet.connection_id = entry.connection_id;
} else {
_udp->SendTo((char *)entry.msg, entry.msg->PacketSize(), 0, entry.connection_id);
delete entry.msg;
}
_send_queue.pop();
}
if (_oo_packet.msg && _oo_packet.send_time < Platform::GetCurrentTimeMS()) {
Log("sending rogue oop!");
_udp->SendTo((char *)_oo_packet.msg, _oo_packet.msg->PacketSize(), 0, _oo_packet.connection_id);
delete _oo_packet.msg;
_oo_packet.msg = NULL;
}
}
void
UdpProtocol::ClearSendQueue()
{
while (!_send_queue.empty()) {
delete _send_queue.front().msg;
_send_queue.pop();
}
}