File p2p.cpp
File List > backends > p2p.cpp
Go to the documentation of this file
/* -----------------------------------------------------------------------
* GGPO.net (http://ggpo.net) - Copyright 2009 GroundStorm Studios, LLC.
*
* Use of this software is governed by the MIT license that can be found
* in the LICENSE file.
*/
#include "p2p.h"
#include "GGPOUE4.h"
static const int RECOMMENDATION_INTERVAL = 240;
static const int DEFAULT_DISCONNECT_TIMEOUT = 5000;
static const int DEFAULT_DISCONNECT_NOTIFY_START = 750;
Peer2PeerBackend::Peer2PeerBackend(GGPOSessionCallbacks *cb,
const char *gamename,
ConnectionManager* connection_manager,
int num_players,
int input_size) :
_sync(_local_connect_status),
_num_spectators(0),
_input_size(input_size),
_num_players(num_players),
_next_spectator_frame(0),
_disconnect_timeout(DEFAULT_DISCONNECT_TIMEOUT),
_disconnect_notify_start(DEFAULT_DISCONNECT_NOTIFY_START)
{
_callbacks = *cb;
_synchronizing = true;
_next_recommended_sleep = 0;
/*
* Initialize the synchronziation layer
*/
Sync::Config config = Sync::Config();
config.num_players = num_players;
config.input_size = input_size;
config.callbacks = _callbacks;
config.num_prediction_frames = MAX_PREDICTION_FRAMES;
_sync.Init(config);
/*
* Initialize the UDP port
*/
_udp.Init(&_poll, this, connection_manager);
_endpoints = new UdpProtocol[_num_players];
memset(_local_connect_status, 0, sizeof(_local_connect_status));
for (int i = 0; i < ARRAY_SIZE(_local_connect_status); i++) {
_local_connect_status[i].last_frame = -1;
}
/*
* Preload the ROM
*/
_callbacks.begin_game(gamename);
}
Peer2PeerBackend::~Peer2PeerBackend()
{
delete [] _endpoints;
}
void
Peer2PeerBackend::AddRemotePlayer(int connection_id,
int queue)
{
/*
* Start the state machine (xxx: no)
*/
_synchronizing = true;
remoteplayerId = connection_id;
remoteplayerQueueu = queue;
_endpoints[queue].Init(&_udp, _poll, queue, connection_id, _local_connect_status);
_endpoints[queue].SetDisconnectTimeout(_disconnect_timeout);
_endpoints[queue].SetDisconnectNotifyStart(_disconnect_notify_start);
_endpoints[queue].Synchronize();
}
GGPOErrorCode Peer2PeerBackend::AddSpectator(int connection_id)
{
if (_num_spectators == GGPO_MAX_SPECTATORS) {
return GGPO_ERRORCODE_TOO_MANY_SPECTATORS;
}
/*
* Currently, we can only add spectators before the game starts.
*/
if (!_synchronizing) {
return GGPO_ERRORCODE_INVALID_REQUEST;
}
int queue = _num_spectators++;
_spectators[queue].Init(&_udp, _poll, queue + 1000, connection_id, _local_connect_status);
_spectators[queue].SetDisconnectTimeout(_disconnect_timeout);
_spectators[queue].SetDisconnectNotifyStart(_disconnect_notify_start);
_spectators[queue].Synchronize();
return GGPO_OK;
}
GGPOErrorCode
Peer2PeerBackend::DoPoll(int timeout)
{
if (!_sync.InRollback()) {
_poll.Pump(0);
PollUdpProtocolEvents();
if (!_synchronizing) {
_sync.CheckSimulation(timeout);
// notify all of our endpoints of their local frame number for their
// next connection quality report
int current_frame = _sync.GetFrameCount();
for (int i = 0; i < _num_players; i++) {
_endpoints[i].SetLocalFrameNumber(current_frame);
}
int total_min_confirmed;
if (_num_players <= 2) {
total_min_confirmed = Poll2Players(current_frame);
} else {
total_min_confirmed = PollNPlayers(current_frame);
}
Log("last confirmed frame in p2p backend is %d.\n", total_min_confirmed);
if (total_min_confirmed >= 0) {
ASSERT(total_min_confirmed != INT_MAX);
if (_num_spectators > 0) {
while (_next_spectator_frame <= total_min_confirmed) {
Log("pushing frame %d to spectators.\n", _next_spectator_frame);
GameInput input;
input.frame = _next_spectator_frame;
input.size = _input_size * _num_players;
_sync.GetConfirmedInputs(input.bits, _input_size * _num_players, _next_spectator_frame);
for (int i = 0; i < _num_spectators; i++) {
_spectators[i].SendInput(input);
}
_next_spectator_frame++;
}
}
Log("setting confirmed frame in sync to %d.\n", total_min_confirmed);
_sync.SetLastConfirmedFrame(total_min_confirmed);
}
// send timesync notifications if now is the proper time
if (current_frame > _next_recommended_sleep) {
int interval = 0;
for (int i = 0; i < _num_players; i++) {
interval = MAX(interval, _endpoints[i].RecommendFrameDelay());
}
if (interval > 0) {
GGPOEvent info;
info.code = GGPO_EVENTCODE_TIMESYNC;
info.u.timesync.frames_ahead = interval;
_callbacks.on_event(&info);
_next_recommended_sleep = current_frame + RECOMMENDATION_INTERVAL;
}
}
// XXX: this is obviously a farce...
if (timeout) {
Platform::SleepMS(1);
}
}
}
return GGPO_OK;
}
int Peer2PeerBackend::Poll2Players(int current_frame)
{
int i;
// discard confirmed frames as appropriate
int total_min_confirmed = MAX_INT;
for (i = 0; i < _num_players; i++) {
bool queue_connected = true;
if (_endpoints[i].IsRunning()) {
int ignore;
queue_connected = _endpoints[i].GetPeerConnectStatus(i, &ignore);
}
if (!_local_connect_status[i].disconnected) {
total_min_confirmed = MIN(_local_connect_status[i].last_frame, total_min_confirmed);
}
Log(" local endp: connected = %d, last_received = %d, total_min_confirmed = %d.\n", !_local_connect_status[i].disconnected, _local_connect_status[i].last_frame, total_min_confirmed);
if (!queue_connected && !_local_connect_status[i].disconnected) {
Log("disconnecting i %d by remote request.\n", i);
DisconnectPlayerQueue(i, total_min_confirmed);
}
Log(" total_min_confirmed = %d.\n", total_min_confirmed);
}
return total_min_confirmed;
}
int Peer2PeerBackend::PollNPlayers(int current_frame)
{
int i, queue, last_received;
// discard confirmed frames as appropriate
int total_min_confirmed = MAX_INT;
for (queue = 0; queue < _num_players; queue++) {
bool queue_connected = true;
int queue_min_confirmed = MAX_INT;
Log("considering queue %d.\n", queue);
for (i = 0; i < _num_players; i++) {
// we're going to do a lot of logic here in consideration of endpoint i.
// keep accumulating the minimum confirmed point for all n*n packets and
// throw away the rest.
if (_endpoints[i].IsRunning()) {
bool connected = _endpoints[i].GetPeerConnectStatus(queue, &last_received);
queue_connected = queue_connected && connected;
queue_min_confirmed = MIN(last_received, queue_min_confirmed);
Log(" endpoint %d: connected = %d, last_received = %d, queue_min_confirmed = %d.\n", i, connected, last_received, queue_min_confirmed);
} else {
Log(" endpoint %d: ignoring... not running.\n", i);
}
}
// merge in our local status only if we're still connected!
if (!_local_connect_status[queue].disconnected) {
queue_min_confirmed = MIN(_local_connect_status[queue].last_frame, queue_min_confirmed);
}
Log(" local endp: connected = %d, last_received = %d, queue_min_confirmed = %d.\n", !_local_connect_status[queue].disconnected, _local_connect_status[queue].last_frame, queue_min_confirmed);
if (queue_connected) {
total_min_confirmed = MIN(queue_min_confirmed, total_min_confirmed);
} else {
// check to see if this disconnect notification is further back than we've been before. If
// so, we need to re-adjust. This can happen when we detect our own disconnect at frame n
// and later receive a disconnect notification for frame n-1.
if (!_local_connect_status[queue].disconnected || _local_connect_status[queue].last_frame > queue_min_confirmed) {
Log("disconnecting queue %d by remote request.\n", queue);
DisconnectPlayerQueue(queue, queue_min_confirmed);
}
}
Log(" total_min_confirmed = %d.\n", total_min_confirmed);
}
return total_min_confirmed;
}
GGPOErrorCode
Peer2PeerBackend::AddPlayer(GGPOPlayer *player,
GGPOPlayerHandle *handle)
{
if (player->type == GGPO_PLAYERTYPE_SPECTATOR) {
return AddSpectator(player->connection_id);
}
int queue = player->player_num - 1;
if (player->player_num < 1 || player->player_num > _num_players) {
return GGPO_ERRORCODE_PLAYER_OUT_OF_RANGE;
}
*handle = QueueToPlayerHandle(queue);
if (player->type == GGPO_PLAYERTYPE_REMOTE) {
AddRemotePlayer(player->connection_id, queue);
}
return GGPO_OK;
}
GGPOErrorCode
Peer2PeerBackend::AddLocalInput(GGPOPlayerHandle player,
void *values,
int size)
{
int queue;
GameInput input;
GGPOErrorCode result;
if (_sync.InRollback()) {
return GGPO_ERRORCODE_IN_ROLLBACK;
}
if (_synchronizing) {
return GGPO_ERRORCODE_NOT_SYNCHRONIZED;
}
result = PlayerHandleToQueue(player, &queue);
if (!GGPO_SUCCEEDED(result)) {
return result;
}
input.init(-1, (char *)values, size);
// Feed the input for the current frame into the synchronzation layer.
if (!_sync.AddLocalInput(queue, input)) {
return GGPO_ERRORCODE_PREDICTION_THRESHOLD;
}
if (input.frame != GameInput::NullFrame) { // xxx: <- comment why this is the case
// Update the local connect status state to indicate that we've got a
// confirmed local frame for this player. this must come first so it
// gets incorporated into the next packet we send.
Log("setting local connect status for local queue %d to %d", queue, input.frame);
_local_connect_status[queue].last_frame = input.frame;
// Send the input to all the remote players.
for (int i = 0; i < _num_players; i++) {
if (_endpoints[i].IsInitialized()) {
_endpoints[i].SendInput(input);
}
}
}
return GGPO_OK;
}
GGPOErrorCode
Peer2PeerBackend::SyncInput(void *values,
int size,
int *disconnect_flags)
{
int flags;
// Wait until we've started to return inputs.
if (_synchronizing) {
return GGPO_ERRORCODE_NOT_SYNCHRONIZED;
}
flags = _sync.SynchronizeInputs(values, size);
if (disconnect_flags) {
*disconnect_flags = flags;
}
return GGPO_OK;
}
GGPOErrorCode
Peer2PeerBackend::IncrementFrame(void)
{
Log("End of frame (%d)...\n", _sync.GetFrameCount());
_sync.IncrementFrame();
DoPoll(0);
PollSyncEvents();
return GGPO_OK;
}
void
Peer2PeerBackend::PollSyncEvents(void)
{
Sync::Event e;
while (_sync.GetEvent(e)) {
OnSyncEvent(e);
}
return;
}
void
Peer2PeerBackend::PollUdpProtocolEvents(void)
{
UdpProtocol::Event evt;
for (int i = 0; i < _num_players; i++) {
while (_endpoints[i].GetEvent(evt)) {
OnUdpProtocolPeerEvent(evt, i);
}
}
for (int i = 0; i < _num_spectators; i++) {
while (_spectators[i].GetEvent(evt)) {
OnUdpProtocolSpectatorEvent(evt, i);
}
}
}
void
Peer2PeerBackend::OnUdpProtocolPeerEvent(UdpProtocol::Event &evt, int queue)
{
OnUdpProtocolEvent(evt, QueueToPlayerHandle(queue));
switch (evt.type) {
case UdpProtocol::Event::Input:
if (!_local_connect_status[queue].disconnected) {
int current_remote_frame = _local_connect_status[queue].last_frame;
int new_remote_frame = evt.u.input.input.frame;
ASSERT(current_remote_frame == -1 || new_remote_frame == (current_remote_frame + 1));
_sync.AddRemoteInput(queue, evt.u.input.input);
// Notify the other endpoints which frame we received from a peer
Log("setting remote connect status for queue %d to %d\n", queue, evt.u.input.input.frame);
_local_connect_status[queue].last_frame = evt.u.input.input.frame;
}
break;
case UdpProtocol::Event::Disconnected:
DisconnectPlayer(QueueToPlayerHandle(queue));
break;
}
}
void
Peer2PeerBackend::OnUdpProtocolSpectatorEvent(UdpProtocol::Event &evt, int queue)
{
GGPOPlayerHandle handle = QueueToSpectatorHandle(queue);
OnUdpProtocolEvent(evt, handle);
GGPOEvent info;
switch (evt.type) {
case UdpProtocol::Event::Disconnected:
_spectators[queue].Disconnect();
info.code = GGPO_EVENTCODE_DISCONNECTED_FROM_PEER;
info.u.disconnected.player = handle;
_callbacks.on_event(&info);
break;
}
}
void
Peer2PeerBackend::OnUdpProtocolEvent(UdpProtocol::Event &evt, GGPOPlayerHandle handle)
{
GGPOEvent info;
switch (evt.type) {
case UdpProtocol::Event::Connected:
info.code = GGPO_EVENTCODE_CONNECTED_TO_PEER;
info.u.connected.player = handle;
_callbacks.on_event(&info);
break;
case UdpProtocol::Event::Synchronizing:
info.code = GGPO_EVENTCODE_SYNCHRONIZING_WITH_PEER;
info.u.synchronizing.player = handle;
info.u.synchronizing.count = evt.u.synchronizing.count;
info.u.synchronizing.total = evt.u.synchronizing.total;
_callbacks.on_event(&info);
break;
case UdpProtocol::Event::Synchronzied:
info.code = GGPO_EVENTCODE_SYNCHRONIZED_WITH_PEER;
info.u.synchronized.player = handle;
_callbacks.on_event(&info);
CheckInitialSync();
break;
case UdpProtocol::Event::NetworkInterrupted:
info.code = GGPO_EVENTCODE_CONNECTION_INTERRUPTED;
info.u.connection_interrupted.player = handle;
info.u.connection_interrupted.disconnect_timeout = evt.u.network_interrupted.disconnect_timeout;
_callbacks.on_event(&info);
break;
case UdpProtocol::Event::NetworkResumed:
info.code = GGPO_EVENTCODE_CONNECTION_RESUMED;
info.u.connection_resumed.player = handle;
_callbacks.on_event(&info);
break;
}
}
/*
* Called only as the result of a local decision to disconnect. The remote
* decisions to disconnect are a result of us parsing the peer_connect_settings
* blob in every endpoint periodically.
*/
GGPOErrorCode
Peer2PeerBackend::DisconnectPlayer(GGPOPlayerHandle player)
{
int queue;
GGPOErrorCode result;
result = PlayerHandleToQueue(player, &queue);
if (!GGPO_SUCCEEDED(result)) {
return result;
}
if (_local_connect_status[queue].disconnected) {
return GGPO_ERRORCODE_PLAYER_DISCONNECTED;
}
if (!_endpoints[queue].IsInitialized()) {
int current_frame = _sync.GetFrameCount();
// xxx: we should be tracking who the local player is, but for now assume
// that if the endpoint is not initalized, this must be the local player.
Log("Disconnecting local player %d at frame %d by user request.\n", queue, _local_connect_status[queue].last_frame);
for (int i = 0; i < _num_players; i++) {
if (_endpoints[i].IsInitialized()) {
DisconnectPlayerQueue(i, current_frame);
}
}
} else {
Log("Disconnecting queue %d at frame %d by user request.\n", queue, _local_connect_status[queue].last_frame);
DisconnectPlayerQueue(queue, _local_connect_status[queue].last_frame);
}
return GGPO_OK;
}
void
Peer2PeerBackend::DisconnectPlayerQueue(int queue, int syncto)
{
GGPOEvent info;
int framecount = _sync.GetFrameCount();
_endpoints[queue].Disconnect();
Log("Changing queue %d local connect status for last frame from %d to %d on disconnect request (current: %d).\n",
queue, _local_connect_status[queue].last_frame, syncto, framecount);
_local_connect_status[queue].disconnected = 1;
_local_connect_status[queue].last_frame = syncto;
if (syncto < framecount) {
Log("adjusting simulation to account for the fact that %d disconnected @ %d.\n", queue, syncto);
_sync.AdjustSimulation(syncto);
Log("finished adjusting simulation.\n");
}
info.code = GGPO_EVENTCODE_DISCONNECTED_FROM_PEER;
info.u.disconnected.player = QueueToPlayerHandle(queue);
_callbacks.on_event(&info);
CheckInitialSync();
}
GGPOErrorCode
Peer2PeerBackend::GetNetworkStats(FGGPONetworkStats *stats, GGPOPlayerHandle player)
{
int queue;
GGPOErrorCode result;
result = PlayerHandleToQueue(player, &queue);
if (!GGPO_SUCCEEDED(result)) {
return result;
}
memset(stats, 0, sizeof *stats);
_endpoints[queue].GetNetworkStats(stats);
return GGPO_OK;
}
GGPOErrorCode
Peer2PeerBackend::SetFrameDelay(GGPOPlayerHandle player, int delay)
{
int queue;
GGPOErrorCode result;
result = PlayerHandleToQueue(player, &queue);
if (!GGPO_SUCCEEDED(result)) {
return result;
}
_sync.SetFrameDelay(queue, delay);
return GGPO_OK;
}
GGPOErrorCode
Peer2PeerBackend::SetDisconnectTimeout(int timeout)
{
_disconnect_timeout = timeout;
for (int i = 0; i < _num_players; i++) {
if (_endpoints[i].IsInitialized()) {
_endpoints[i].SetDisconnectTimeout(_disconnect_timeout);
}
}
return GGPO_OK;
}
GGPOErrorCode
Peer2PeerBackend::SetDisconnectNotifyStart(int timeout)
{
_disconnect_notify_start = timeout;
for (int i = 0; i < _num_players; i++) {
if (_endpoints[i].IsInitialized()) {
_endpoints[i].SetDisconnectNotifyStart(_disconnect_notify_start);
}
}
return GGPO_OK;
}
GGPOErrorCode
Peer2PeerBackend::TrySynchronizeLocal()
{
if (_num_players <= 1 && _num_spectators == 0)
{
if (_num_players == 0 || !_endpoints[0].IsInitialized())
CheckInitialSync();
}
if (_synchronizing)
return GGPO_ERRORCODE_NOT_SYNCHRONIZED;
UE_LOG(GGPOLOG, Verbose, TEXT("Peer2PeerBackend::TrySynchronizeLocal Synchronized local-only simulation."))
return GGPO_OK;
}
GGPOErrorCode
Peer2PeerBackend::PlayerHandleToQueue(GGPOPlayerHandle player, int *queue)
{
int offset = ((int)player - 1);
if (offset < 0 || offset >= _num_players) {
return GGPO_ERRORCODE_INVALID_PLAYER_HANDLE;
}
*queue = offset;
return GGPO_OK;
}
void
Peer2PeerBackend::OnMsg(int connection_id, UdpMsg *msg, int len)
{
for (int i = 0; i < _num_players; i++) {
if (_endpoints[i].HandlesMsg(connection_id, msg)) {
_endpoints[i].OnMsg(msg, len);
return;
}
}
for (int i = 0; i < _num_spectators; i++) {
if (_spectators[i].HandlesMsg(connection_id, msg)) {
_spectators[i].OnMsg(msg, len);
return;
}
}
}
void
Peer2PeerBackend::CheckInitialSync()
{
int i;
if (_synchronizing) {
// Check to see if everyone is now synchronized. If so,
// go ahead and tell the client that we're ok to accept input.
for (i = 0; i < _num_players; i++) {
// xxx: IsInitialized() must go... we're actually using it as a proxy for "represents the local player"
if (_endpoints[i].IsInitialized() && !_endpoints[i].IsSynchronized() && !_local_connect_status[i].disconnected) {
return;
}
}
for (i = 0; i < _num_spectators; i++) {
if (_spectators[i].IsInitialized() && !_spectators[i].IsSynchronized()) {
return;
}
}
GGPOEvent info;
info.code = GGPO_EVENTCODE_RUNNING;
_callbacks.on_event(&info);
_synchronizing = false;
}
}