summaryrefslogtreecommitdiffstats
path: root/source
diff options
context:
space:
mode:
authormadmaxoft@gmail.com <madmaxoft@gmail.com@0a769ca7-a7f5-676a-18bf-c427514a06d6>2012-02-26 01:36:51 +0100
committermadmaxoft@gmail.com <madmaxoft@gmail.com@0a769ca7-a7f5-676a-18bf-c427514a06d6>2012-02-26 01:36:51 +0100
commit0e33c919dd5c954e0e9d266924c1650237bb95a1 (patch)
tree32f4f3c136da43a9edaaa4921979ac4c0518743c /source
parentExtended SocketThreads for writing support (unusable in cClientHandle due to too many deadlock possibilities) (diff)
downloadcuberite-0e33c919dd5c954e0e9d266924c1650237bb95a1.tar
cuberite-0e33c919dd5c954e0e9d266924c1650237bb95a1.tar.gz
cuberite-0e33c919dd5c954e0e9d266924c1650237bb95a1.tar.bz2
cuberite-0e33c919dd5c954e0e9d266924c1650237bb95a1.tar.lz
cuberite-0e33c919dd5c954e0e9d266924c1650237bb95a1.tar.xz
cuberite-0e33c919dd5c954e0e9d266924c1650237bb95a1.tar.zst
cuberite-0e33c919dd5c954e0e9d266924c1650237bb95a1.zip
Diffstat (limited to '')
-rw-r--r--source/cClientHandle.cpp188
-rw-r--r--source/cClientHandle.h11
-rw-r--r--source/cServer.cpp160
-rw-r--r--source/cServer.h44
4 files changed, 253 insertions, 150 deletions
diff --git a/source/cClientHandle.cpp b/source/cClientHandle.cpp
index 9eed31a3a..41d2fda6c 100644
--- a/source/cClientHandle.cpp
+++ b/source/cClientHandle.cpp
@@ -77,7 +77,8 @@
case 2: (z)-=(amount); break; case 3: (z)+=(amount); break;\
case 4: (x)-=(amount); break; case 5: (x)+=(amount); break; }
-#define MAX_SEMAPHORES (2000)
+/// If the number of queued outgoing packets reaches this, the client will be kicked
+#define MAX_OUTGOING_PACKETS 2000
@@ -89,9 +90,7 @@
cClientHandle::cClientHandle(const cSocket & a_Socket, int a_ViewDistance)
: m_ViewDistance(a_ViewDistance)
, m_ProtocolVersion(23)
- , m_pSendThread(NULL)
, m_Socket(a_Socket)
- , m_Semaphore(MAX_SEMAPHORES)
, m_bDestroyed(false)
, m_Player(NULL)
, m_bKicking(false)
@@ -135,11 +134,6 @@ cClientHandle::cClientHandle(const cSocket & a_Socket, int a_ViewDistance)
m_PacketMap[E_RESPAWN] = new cPacket_Respawn;
m_PacketMap[E_PING] = new cPacket_Ping;
- //////////////////////////////////////////////////////////////////////////
- m_pSendThread = new cThread(SendThread, this, "cClientHandle::SendThread");
- m_pSendThread->Start (true);
- //////////////////////////////////////////////////////////////////////////
-
LOG("New ClientHandle created at %p", this);
}
@@ -149,7 +143,7 @@ cClientHandle::cClientHandle(const cSocket & a_Socket, int a_ViewDistance)
cClientHandle::~cClientHandle()
{
- LOG("Deleting client \"%s\"", GetUsername().c_str());
+ LOG("Deleting client \"%s\" at %p", GetUsername().c_str(), this);
// Remove from cSocketThreads, we're not to be called anymore:
cRoot::Get()->GetServer()->ClientDestroying(this);
@@ -173,20 +167,13 @@ cClientHandle::~cClientHandle()
}
}
- // First stop sending thread
- m_bKeepThreadGoing = false;
-
if (m_Socket.IsValid())
{
cPacket_Disconnect Disconnect;
Disconnect.m_Reason = "Server shut down? Kthnxbai";
m_Socket.Send(&Disconnect);
- m_Socket.CloseSocket();
}
-
- m_Semaphore.Signal();
- delete m_pSendThread;
-
+
if (m_Player != NULL)
{
m_Player->SetClientHandle(NULL);
@@ -198,19 +185,31 @@ cClientHandle::~cClientHandle()
delete m_PacketMap[i];
}
+ // Queue all remaining outgoing packets to cSocketThreads:
{
- cCSLock Lock(m_SendCriticalSection);
+ cCSLock Lock(m_CSPackets);
for (PacketList::iterator itr = m_PendingNrmSendPackets.begin(); itr != m_PendingNrmSendPackets.end(); ++itr)
{
+ AString Data;
+ (*itr)->Serialize(Data);
+ cRoot::Get()->GetServer()->WriteToClient(&m_Socket, Data);
delete *itr;
}
+ m_PendingNrmSendPackets.clear();
for (PacketList::iterator itr = m_PendingLowSendPackets.begin(); itr != m_PendingLowSendPackets.end(); ++itr)
{
+ AString Data;
+ (*itr)->Serialize(Data);
+ cRoot::Get()->GetServer()->WriteToClient(&m_Socket, Data);
delete *itr;
}
+ m_PendingLowSendPackets.clear();
}
- LOG("ClientHandle at %p destroyed", this);
+ // Queue the socket to close as soon as it sends all outgoing data:
+ cRoot::Get()->GetServer()->QueueClientClose(&m_Socket);
+
+ LOG("ClientHandle at %p deleted", this);
}
@@ -295,8 +294,8 @@ void cClientHandle::Authenticate(void)
Send(Health);
m_Player->Initialize(World);
- m_State = csDownloadingWorld;
StreamChunks();
+ m_State = csDownloadingWorld;
}
@@ -305,7 +304,7 @@ void cClientHandle::Authenticate(void)
void cClientHandle::StreamChunks(void)
{
- if (m_State < csDownloadingWorld)
+ if (m_State < csAuthenticating)
{
return;
}
@@ -323,7 +322,7 @@ void cClientHandle::StreamChunks(void)
m_LastStreamedChunkZ = ChunkPosZ;
// DEBUG:
- LOGINFO("Streaming chunks centered on [%d, %d]", ChunkPosX, ChunkPosZ);
+ LOGINFO("Streaming chunks centered on [%d, %d], view distance %d", ChunkPosX, ChunkPosZ, m_ViewDistance);
cWorld * World = m_Player->GetWorld();
ASSERT(World != NULL);
@@ -1645,7 +1644,10 @@ void cClientHandle::Tick(float a_Dt)
if (m_State >= csDownloadingWorld)
{
cWorld * World = m_Player->GetWorld();
+
cCSLock Lock(m_CSChunkLists);
+
+ // Send the chunks:
int NumSent = 0;
for (cChunkCoordsList::iterator itr = m_ChunksToSend.begin(); itr != m_ChunksToSend.end();)
{
@@ -1662,7 +1664,8 @@ void cClientHandle::Tick(float a_Dt)
break;
}
} // for itr - m_ChunksToSend[]
-
+ Lock.Unlock();
+
// Check even if we didn't send anything - a chunk may have sent a notification that we'd miss otherwise
CheckIfWorldDownloaded();
}
@@ -1707,8 +1710,7 @@ void cClientHandle::Send(const cPacket * a_Packet, ENUM_PRIORITY a_Priority /* =
}
}
- bool bSignalSemaphore = true;
- cCSLock Lock(m_SendCriticalSection);
+ cCSLock Lock(m_CSPackets);
if (a_Priority == E_PRIORITY_NORMAL)
{
if (a_Packet->m_PacketID == E_REL_ENT_MOVE_LOOK)
@@ -1727,7 +1729,6 @@ void cClientHandle::Send(const cPacket * a_Packet, ENUM_PRIORITY a_Priority /* =
{
Packets.erase(itr);
bBreak = true;
- bSignalSemaphore = false; // Because 1 packet is removed, semaphore count is the same
delete PacketData;
break;
}
@@ -1747,10 +1748,9 @@ void cClientHandle::Send(const cPacket * a_Packet, ENUM_PRIORITY a_Priority /* =
m_PendingLowSendPackets.push_back(a_Packet->Clone());
}
Lock.Unlock();
- if (bSignalSemaphore)
- {
- m_Semaphore.Signal();
- }
+
+ // Notify SocketThreads that we have something to write:
+ cRoot::Get()->GetServer()->NotifyClientWrite(this);
}
@@ -1797,90 +1797,6 @@ void cClientHandle::SendConfirmPosition(void)
-void cClientHandle::SendThread(void *lpParam)
-{
- cClientHandle* self = (cClientHandle*)lpParam;
- PacketList & NrmSendPackets = self->m_PendingNrmSendPackets;
- PacketList & LowSendPackets = self->m_PendingLowSendPackets;
-
-
- while (self->m_bKeepThreadGoing && self->m_Socket.IsValid())
- {
- self->m_Semaphore.Wait();
- cCSLock Lock(self->m_SendCriticalSection);
- if (NrmSendPackets.size() + LowSendPackets.size() > MAX_SEMAPHORES)
- {
- LOGERROR("ERROR: Too many packets in queue for player %s !!", self->m_Username.c_str());
- cPacket_Disconnect DC("Too many packets in queue.");
- self->m_Socket.Send(DC);
-
- cSleep::MilliSleep(1000); // Give packet some time to be received
-
- Lock.Unlock();
- self->Destroy();
- break;
- }
-
- if (NrmSendPackets.size() == 0 && LowSendPackets.size() == 0)
- {
- ASSERT(!self->m_bKeepThreadGoing);
- if (self->m_bKeepThreadGoing)
- {
- LOGERROR("ERROR: Semaphore was signaled while no packets to send");
- }
- continue;
- }
- if (NrmSendPackets.size() > MAX_SEMAPHORES / 2)
- {
- LOGINFO("Pending packets: %i Last: 0x%02x", NrmSendPackets.size(), (*NrmSendPackets.rbegin())->m_PacketID);
- }
-
- cPacket * Packet = NULL;
- if (!NrmSendPackets.empty())
- {
- Packet = *NrmSendPackets.begin();
- NrmSendPackets.erase(NrmSendPackets.begin());
- }
- else if (!LowSendPackets.empty())
- {
- Packet = *LowSendPackets.begin();
- LowSendPackets.erase(LowSendPackets.begin());
- }
- Lock.Unlock();
-
- if (!self->m_Socket.IsValid())
- {
- break;
- }
-
- // LOG("Sending packet 0x%02x to \"%s\" (\"%s\")", Packet->m_PacketID, self->m_Socket.GetIPString().c_str(), self->m_Username.c_str());
-
- bool bSuccess = self->m_Socket.Send(Packet);
-
- if (!bSuccess)
- {
- LOGERROR("ERROR: While sending packet 0x%02x to client \"%s\"", Packet->m_PacketID, self->m_Username.c_str());
- delete Packet;
- self->Destroy();
- break;
- }
- delete Packet;
-
- if (self->m_bKicking && (NrmSendPackets.size() + LowSendPackets.size() == 0)) // Disconnect player after all packets have been sent
- {
- cSleep::MilliSleep(1000); // Give all packets some time to be received
- self->Destroy();
- break;
- }
- }
-
- return;
-}
-
-
-
-
-
const AString & cClientHandle::GetUsername(void) const
{
return m_Username;
@@ -1967,7 +1883,49 @@ void cClientHandle::GetOutgoingData(AString & a_Data)
{
// Data can be sent to client
- // TODO
+ cCSLock Lock(m_CSPackets);
+ if (m_PendingNrmSendPackets.size() + m_PendingLowSendPackets.size() > MAX_OUTGOING_PACKETS)
+ {
+ LOGERROR("ERROR: Too many packets in queue for player %s !!", m_Username.c_str());
+ cPacket_Disconnect DC("Too many packets in queue.");
+ m_Socket.Send(DC);
+ Lock.Unlock();
+ Destroy();
+ return;
+ }
+
+ if ((m_PendingNrmSendPackets.size() == 0) && (m_PendingLowSendPackets.size() == 0))
+ {
+ return;
+ }
+
+ if (m_PendingNrmSendPackets.size() > MAX_OUTGOING_PACKETS / 2)
+ {
+ LOGINFO("Suspiciously many pending packets: %i; client \"%s\", LastType: 0x%02x", m_PendingNrmSendPackets.size(), m_Username.c_str(), (*m_PendingNrmSendPackets.rbegin())->m_PacketID);
+ }
+
+ AString Data;
+ if (!m_PendingNrmSendPackets.empty())
+ {
+ m_PendingNrmSendPackets.front()->Serialize(Data);
+ delete m_PendingNrmSendPackets.front();
+ m_PendingNrmSendPackets.erase(m_PendingNrmSendPackets.begin());
+ }
+ else if (!m_PendingLowSendPackets.empty())
+ {
+ m_PendingLowSendPackets.front()->Serialize(Data);
+ delete m_PendingLowSendPackets.front();
+ m_PendingLowSendPackets.erase(m_PendingLowSendPackets.begin());
+ }
+ Lock.Unlock();
+
+ a_Data.append(Data);
+
+ // Disconnect player after all packets have been sent
+ if (m_bKicking && (m_PendingNrmSendPackets.size() + m_PendingLowSendPackets.size() == 0))
+ {
+ Destroy();
+ }
}
diff --git a/source/cClientHandle.h b/source/cClientHandle.h
index 6b567a88b..3fcfef716 100644
--- a/source/cClientHandle.h
+++ b/source/cClientHandle.h
@@ -98,8 +98,6 @@ public:
void Send(const cPacket & a_Packet, ENUM_PRIORITY a_Priority = E_PRIORITY_NORMAL) { Send(&a_Packet, a_Priority); }
void Send(const cPacket * a_Packet, ENUM_PRIORITY a_Priority = E_PRIORITY_NORMAL);
- static void SendThread( void *lpParam );
-
const AString & GetUsername(void) const;
inline short GetPing() { return m_Ping; }
@@ -118,20 +116,17 @@ private:
AString m_ReceivedData; // Accumulator for the data received from the socket, waiting to be parsed; accessed from the cSocketThreads' thread only!
- PacketList m_PendingNrmSendPackets;
- PacketList m_PendingLowSendPackets;
+ cCriticalSection m_CSPackets;
+ PacketList m_PendingNrmSendPackets;
+ PacketList m_PendingLowSendPackets;
cCriticalSection m_CSChunkLists;
cChunkCoordsList m_LoadedChunks; // Chunks that the player belongs to
cChunkCoordsList m_ChunksToSend; // Chunks that need to be sent to the player (queued because they weren't generated yet or there's not enough time to send them)
- cThread * m_pSendThread;
-
cSocket m_Socket;
cCriticalSection m_CriticalSection;
- cCriticalSection m_SendCriticalSection;
- cSemaphore m_Semaphore;
Vector3d m_ConfirmPosition;
diff --git a/source/cServer.cpp b/source/cServer.cpp
index 97bcffaae..b1dab21e5 100644
--- a/source/cServer.cpp
+++ b/source/cServer.cpp
@@ -65,8 +65,6 @@ struct cServer::sServerState
cThread* pListenThread; bool bStopListenThread;
cThread* pTickThread; bool bStopTickThread;
- ClientList Clients;
-
cEvent RestartEvent;
std::string ServerID;
};
@@ -109,6 +107,33 @@ void cServer::ClientDestroying(const cClientHandle * a_Client)
+void cServer::NotifyClientWrite(const cClientHandle * a_Client)
+{
+ m_NotifyWriteThread.NotifyClientWrite(a_Client);
+}
+
+
+
+
+
+void cServer::WriteToClient(const cSocket * a_Socket, const AString & a_Data)
+{
+ m_SocketThreads.Write(a_Socket, a_Data);
+}
+
+
+
+
+
+void cServer::QueueClientClose(const cSocket * a_Socket)
+{
+ m_SocketThreads.QueueClose(a_Socket);
+}
+
+
+
+
+
bool cServer::InitServer( int a_Port )
{
if( m_bIsConnected )
@@ -209,6 +234,9 @@ bool cServer::InitServer( int a_Port )
LOGINFO("Setting default viewdistance to the maximum of %d", m_ClientViewDistance);
}
}
+
+ m_NotifyWriteThread.Start(this);
+
return true;
}
@@ -250,9 +278,13 @@ cServer::~cServer()
// TODO - Need to modify this or something, so it broadcasts to all worlds? And move this to cWorld?
void cServer::Broadcast( const cPacket * a_Packet, cClientHandle* a_Exclude /* = 0 */ )
{
- for( ClientList::iterator itr = m_pState->Clients.begin(); itr != m_pState->Clients.end(); ++itr)
+ cCSLock Lock(m_CSClients);
+ for( ClientList::iterator itr = m_Clients.begin(); itr != m_Clients.end(); ++itr)
{
- if( *itr == a_Exclude || !(*itr)->IsLoggedIn() ) continue;
+ if ((*itr == a_Exclude) || !(*itr)->IsLoggedIn())
+ {
+ continue;
+ }
(*itr)->Send( a_Packet );
}
}
@@ -289,7 +321,9 @@ void cServer::StartListenClient()
delete NewHandle;
return;
}
- m_pState->Clients.push_back( NewHandle ); // TODO - lock list
+
+ cCSLock Lock(m_CSClients);
+ m_Clients.push_back( NewHandle );
}
@@ -310,21 +344,21 @@ bool cServer::Tick(float a_Dt)
cRoot::Get()->TickWorlds( a_Dt ); // TODO - Maybe give all worlds their own thread?
- //World->LockClientHandle(); // TODO - Lock client list
- for( ClientList::iterator itr = m_pState->Clients.begin(); itr != m_pState->Clients.end();)
{
- if( (*itr)->IsDestroyed() )
+ cCSLock Lock(m_CSClients);
+ for (cClientHandleList::iterator itr = m_Clients.begin(); itr != m_Clients.end();)
{
- cClientHandle* RemoveMe = *itr;
+ if ((*itr)->IsDestroyed())
+ {
+ cClientHandle* RemoveMe = *itr;
+ itr = m_Clients.erase(itr);
+ delete RemoveMe;
+ continue;
+ }
+ (*itr)->Tick(a_Dt);
++itr;
- m_pState->Clients.remove( RemoveMe );
- delete RemoveMe;
- continue;
}
- (*itr)->Tick(a_Dt);
- ++itr;
}
- //World->UnlockClientHandle();
cRoot::Get()->GetPluginManager()->Tick( a_Dt );
@@ -550,14 +584,12 @@ void cServer::Shutdown()
cRoot::Get()->GetWorld()->SaveAllChunks();
- //cWorld* World = cRoot::Get()->GetWorld();
- //World->LockClientHandle(); // TODO - Lock client list
- for( ClientList::iterator itr = m_pState->Clients.begin(); itr != m_pState->Clients.end(); ++itr )
+ cCSLock Lock(m_CSClients);
+ for( ClientList::iterator itr = m_Clients.begin(); itr != m_Clients.end(); ++itr )
{
delete *itr;
}
- m_pState->Clients.clear();
- //World->UnlockClientHandle();
+ m_Clients.clear();
}
@@ -575,13 +607,14 @@ const AString & cServer::GetServerID(void) const
void cServer::KickUser(const AString & iUserName, const AString & iReason)
{
- for (ClientList::iterator itr = m_pState->Clients.begin(); itr != m_pState->Clients.end(); ++itr)
+ cCSLock Lock(m_CSClients);
+ for (ClientList::iterator itr = m_Clients.begin(); itr != m_Clients.end(); ++itr)
{
if ((*itr)->GetUsername() == iUserName)
{
(*itr)->Kick(iReason);
}
- } // for itr - m_pState->Clients[]
+ } // for itr - m_Clients[]
}
@@ -590,13 +623,92 @@ void cServer::KickUser(const AString & iUserName, const AString & iReason)
void cServer::AuthenticateUser(const AString & iUserName)
{
- for (ClientList::iterator itr = m_pState->Clients.begin(); itr != m_pState->Clients.end(); ++itr)
+ cCSLock Lock(m_CSClients);
+ for (ClientList::iterator itr = m_Clients.begin(); itr != m_Clients.end(); ++itr)
{
if ((*itr)->GetUsername() == iUserName)
{
(*itr)->Authenticate();
}
- } // for itr - m_pState->Clients[]
+ } // for itr - m_Clients[]
+}
+
+
+
+
+
+///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+// cServer::cClientPacketThread:
+
+cServer::cNotifyWriteThread::cNotifyWriteThread(void) :
+ super("ClientPacketThread"),
+ m_Server(NULL)
+{
+}
+
+
+
+
+
+cServer::cNotifyWriteThread::~cNotifyWriteThread()
+{
+ mShouldTerminate = true;
+ m_Event.Set();
+ Wait();
+}
+
+
+
+
+
+bool cServer::cNotifyWriteThread::Start(cServer * a_Server)
+{
+ m_Server = a_Server;
+ return super::Start();
+}
+
+
+
+
+
+void cServer::cNotifyWriteThread::Execute(void)
+{
+ cClientHandleList Clients;
+ while (!mShouldTerminate)
+ {
+ cCSLock Lock(m_CS);
+ while (m_Clients.size() == 0)
+ {
+ cCSUnlock Unlock(Lock);
+ m_Event.Wait();
+ if (mShouldTerminate)
+ {
+ return;
+ }
+ }
+
+ // Copy the clients to notify and unlock the CS:
+ Clients.splice(Clients.begin(), m_Clients);
+ Lock.Unlock();
+
+ for (cClientHandleList::iterator itr = Clients.begin(); itr != Clients.end(); ++itr)
+ {
+ m_Server->m_SocketThreads.NotifyWrite(*itr);
+ } // for itr - Clients[]
+ Clients.clear();
+ } // while (!mShouldTerminate)
+}
+
+
+
+
+
+void cServer::cNotifyWriteThread::NotifyClientWrite(const cClientHandle * a_Client)
+{
+ cCSLock Lock(m_CS);
+ m_Clients.remove(const_cast<cClientHandle *>(a_Client)); // Put it there only once
+ m_Clients.push_back(const_cast<cClientHandle *>(a_Client));
+ m_Event.Set();
}
diff --git a/source/cServer.h b/source/cServer.h
index 3e7fbf094..200f2bb1f 100644
--- a/source/cServer.h
+++ b/source/cServer.h
@@ -21,6 +21,8 @@ class cPlayer;
class cClientHandle;
class cPacket;
+typedef std::list<cClientHandle *> cClientHandleList;
+
@@ -58,16 +60,48 @@ public: //tolua_export
void ClientDestroying(const cClientHandle * a_Client); // Called by cClientHandle::Destroy(); removes the client from m_SocketThreads
+ void NotifyClientWrite(const cClientHandle * a_Client); // Notifies m_SocketThreads that client has something to be written
+
+ void WriteToClient(const cSocket * a_Socket, const AString & a_Data); // Queues outgoing data for the socket through m_SocketThreads
+
+ void QueueClientClose(const cSocket * a_Socket); // Queues the socket to close when all its outgoing data is sent
+
private:
friend class cRoot; // so cRoot can create and destroy cServer
- cServer();
- ~cServer();
-
+ /// When NotifyClientWrite() is called, it is queued for this thread to process (to avoid deadlocks between cSocketThreads, cClientHandle and cChunkMap)
+ class cNotifyWriteThread :
+ public cIsThread
+ {
+ typedef cIsThread super;
+
+ cEvent m_Event; // Set when m_Clients gets appended
+ cServer * m_Server;
+
+ cCriticalSection m_CS;
+ cClientHandleList m_Clients;
+
+ virtual void Execute(void);
+
+ public:
+
+ cNotifyWriteThread(void);
+ ~cNotifyWriteThread();
+
+ bool Start(cServer * a_Server);
+
+ void NotifyClientWrite(const cClientHandle * a_Client);
+ } ;
+
struct sServerState;
sServerState* m_pState;
+ cNotifyWriteThread m_NotifyWriteThread;
+
+ cCriticalSection m_CSClients; // Locks client list
+ cClientHandleList m_Clients; // Clients that are connected to the server
+
cSocketThreads m_SocketThreads;
int m_ClientViewDistance; // The default view distance for clients; settable in Settings.ini
@@ -80,6 +114,10 @@ private:
int m_iServerPort;
bool m_bRestarting;
+
+ cServer();
+ ~cServer();
+
}; //tolua_export