summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--source/cClientHandle.cpp175
-rw-r--r--source/cClientHandle.h24
-rw-r--r--source/cServer.cpp65
-rw-r--r--source/cServer.h38
-rw-r--r--source/cSocket.cpp7
-rw-r--r--source/cSocket.h3
-rw-r--r--source/cSocketThreads.cpp133
-rw-r--r--source/cSocketThreads.h27
8 files changed, 253 insertions, 219 deletions
diff --git a/source/cClientHandle.cpp b/source/cClientHandle.cpp
index bbf196ccb..898e04e72 100644
--- a/source/cClientHandle.cpp
+++ b/source/cClientHandle.cpp
@@ -91,7 +91,6 @@ extern std::string GetWSAError();
cClientHandle::cClientHandle(const cSocket & a_Socket)
: m_ProtocolVersion(23)
- , m_pReceiveThread(NULL)
, m_pSendThread(NULL)
, m_Socket(a_Socket)
, m_Semaphore(MAX_SEMAPHORES)
@@ -105,8 +104,6 @@ cClientHandle::cClientHandle(const cSocket & a_Socket)
, m_Ping(1000)
, m_bPositionConfirmed(false)
{
- LOG("cClientHandle::cClientHandle");
-
cTimer t1;
m_LastPingTime = t1.GetNowTime();
@@ -143,13 +140,11 @@ cClientHandle::cClientHandle(const cSocket & a_Socket)
memset(m_LoadedChunks, 0x00, sizeof(m_LoadedChunks));
//////////////////////////////////////////////////////////////////////////
- m_pReceiveThread = new cThread(ReceiveThread, this, "cClientHandle::ReceiveThread");
m_pSendThread = new cThread(SendThread, this, "cClientHandle::SendThread");
- m_pReceiveThread->Start(true);
m_pSendThread->Start (true);
//////////////////////////////////////////////////////////////////////////
- LOG("New ClientHandle");
+ LOG("New ClientHandle created at %p", this);
}
@@ -158,7 +153,7 @@ cClientHandle::cClientHandle(const cSocket & a_Socket)
cClientHandle::~cClientHandle()
{
- LOG("Deleting client %s", GetUsername().c_str());
+ LOG("Deleting client \"%s\"", GetUsername().c_str());
for(unsigned int i = 0; i < VIEWDISTANCE*VIEWDISTANCE; i++)
{
@@ -185,7 +180,6 @@ cClientHandle::~cClientHandle()
// First stop sending thread
m_bKeepThreadGoing = false;
- cCSLock Lock(m_SocketCriticalSection);
if (m_Socket.IsValid())
{
cPacket_Disconnect Disconnect;
@@ -193,17 +187,10 @@ cClientHandle::~cClientHandle()
m_Socket.Send(&Disconnect);
m_Socket.CloseSocket();
}
- Lock.Unlock();
m_Semaphore.Signal();
- delete m_pReceiveThread;
delete m_pSendThread;
- while (!m_PendingParsePackets.empty())
- {
- delete *m_PendingParsePackets.begin();
- m_PendingParsePackets.erase(m_PendingParsePackets.begin());
- }
while (!m_PendingNrmSendPackets.empty())
{
delete *m_PendingNrmSendPackets.begin();
@@ -224,6 +211,8 @@ cClientHandle::~cClientHandle()
{
delete m_PacketMap[i];
}
+
+ LOG("ClientHandle at %p destroyed", this);
}
@@ -233,11 +222,13 @@ cClientHandle::~cClientHandle()
void cClientHandle::Destroy()
{
m_bDestroyed = true;
- cCSLock Lock(m_SocketCriticalSection);
if (m_Socket.IsValid())
{
m_Socket.CloseSocket();
}
+
+ // Synchronize with the cSocketThreads (so that they don't call us anymore)
+ cRoot::Get()->GetServer()->ClientDestroying(this);
}
@@ -419,31 +410,6 @@ void cClientHandle::RemoveFromAllChunks()
-void cClientHandle::AddPacket(cPacket * a_Packet)
-{
- cCSLock Lock(m_CriticalSection);
- m_PendingParsePackets.push_back(a_Packet->Clone());
-}
-
-
-
-
-
-void cClientHandle::HandlePendingPackets()
-{
- cCSLock Lock(m_CriticalSection);
- for (PacketList::iterator itr = m_PendingParsePackets.begin(); itr != m_PendingParsePackets.end(); ++itr)
- {
- HandlePacket(*itr);
- delete *itr;
- }
- m_PendingParsePackets.clear();
-}
-
-
-
-
-
void cClientHandle::HandlePacket(cPacket * a_Packet)
{
m_TimeLastPacket = cWorld::GetTime();
@@ -1766,14 +1732,11 @@ void cClientHandle::SendThread(void *lpParam)
}
Lock.Unlock();
- cCSLock SocketLock(self->m_SocketCriticalSection);
if (!self->m_Socket.IsValid())
{
break;
}
-
bool bSuccess = self->m_Socket.Send(Packet);
- SocketLock.Unlock();
if (!bSuccess)
{
@@ -1799,96 +1762,88 @@ void cClientHandle::SendThread(void *lpParam)
-void cClientHandle::ReceiveThread(void *lpParam)
+const AString & cClientHandle::GetUsername(void) const
{
- LOG("ReceiveThread");
+ return m_Username;
+}
+
- cClientHandle* self = (cClientHandle*)lpParam;
- char temp = 0;
- int iStat = 0;
- cSocket socket = self->GetSocket();
- AString Received;
- while (self->m_bKeepThreadGoing)
+void cClientHandle::DataReceived(const char * a_Data, int a_Size)
+{
+ // Data is received from the client
+
+ m_ReceivedData.append(a_Data, a_Size);
+
+ // Parse and handle all complete packets in m_ReceivedData:
+ while (!m_ReceivedData.empty())
{
- char Buffer[1024];
- iStat = socket.Receive(Buffer, sizeof(Buffer), 0);
- if (cSocket::IsSocketError(iStat) || (iStat == 0))
+ cPacket* pPacket = m_PacketMap[(unsigned char)m_ReceivedData[0]];
+ if (pPacket == NULL)
+ {
+ LOGERROR("Unknown packet type 0x%02x from client \"%s\"", (unsigned char)m_ReceivedData[0], m_Username.c_str());
+
+ AString Reason;
+ Printf(Reason, "[C->S] Unknown PacketID: 0x%02x", m_ReceivedData[0]);
+ cPacket_Disconnect DC(Reason);
+ m_Socket.Send(&DC);
+ cSleep::MilliSleep(1000); // Give packet some time to be received
+ Destroy();
+ return;
+ }
+
+ int NumBytes = pPacket->Parse(m_ReceivedData.data() + 1, m_ReceivedData.size() - 1);
+ if (NumBytes == PACKET_ERROR)
{
- LOG("CLIENT DISCONNECTED (%i bytes):%s", iStat, cSocket::GetLastErrorString().c_str());
+ LOGERROR("Protocol error while parsing packet type 0x%02x; disconnecting client \"%s\"", (unsigned char)m_ReceivedData[0], m_Username.c_str());
+ cPacket_Disconnect DC("Protocol error");
+ m_Socket.Send(&DC);
+ cSleep::MilliSleep(1000); // Give packet some time to be received
+ Destroy();
+ return;
+ }
+ else if (NumBytes == PACKET_INCOMPLETE)
+ {
+ // Not a complete packet
break;
}
- Received.append(Buffer, iStat);
-
- // Parse all complete packets in Received:
- while (!Received.empty())
+ else
{
- cPacket* pPacket = self->m_PacketMap[(unsigned char)Received[0]];
- if (pPacket)
- {
- int NumBytes = pPacket->Parse(Received.data() + 1, Received.size() - 1);
- if (NumBytes == PACKET_ERROR)
- {
- LOGERROR("Protocol error while parsing packet type 0x%x; disconnecting client \"%s\"", Received[0], self->m_Username.c_str());
- cPacket_Disconnect DC("Protocol error");
- socket.Send(&DC);
-
- cSleep::MilliSleep(1000); // Give packet some time to be received
- return;
- }
- else if (NumBytes == PACKET_INCOMPLETE)
- {
- // Not a complete packet
- break;
- }
- else
- {
- // Packet parsed successfully, add it to internal queue:
- self->AddPacket(pPacket);
- // Erase the packet from the buffer:
- assert(Received.size() > (size_t)NumBytes);
- Received.erase(0, NumBytes + 1);
- }
- }
- else
- {
- LOGERROR("Unknown packet type: 0x%2x", Received[0]);
-
- AString Reason;
- Printf(Reason, "[C->S] Unknown PacketID: 0x%02x", Received[0]);
- cPacket_Disconnect DC(Reason);
- socket.Send(&DC);
-
- cSleep::MilliSleep(1000); // Give packet some time to be received
- break;
- }
- } // while (!Received.empty())
- } // while (self->m_bKeepThreadGoing)
-
- self->Destroy();
-
- LOG("ReceiveThread STOPPED");
- return;
+ // Packet parsed successfully, add it to internal queue:
+ HandlePacket(pPacket);
+ // Erase the packet from the buffer:
+ assert(m_ReceivedData.size() > (size_t)NumBytes);
+ m_ReceivedData.erase(0, NumBytes + 1);
+ }
+ } // while (!Received.empty())
}
-const AString & cClientHandle::GetUsername(void) const
+void cClientHandle::GetOutgoingData(AString & a_Data)
{
- return m_Username;
+ // Data can be sent to client
+
+ // TODO
}
-const cSocket & cClientHandle::GetSocket()
+void cClientHandle::SocketClosed(void)
{
- return m_Socket;
+ // The socket has been closed for any reason
+
+ // TODO
+ /*
+ self->Destroy();
+ LOG("Client \"%s\" disconnected", GetLogName().c_str());
+ */
}
diff --git a/source/cClientHandle.h b/source/cClientHandle.h
index 56b9382e8..48a51c2df 100644
--- a/source/cClientHandle.h
+++ b/source/cClientHandle.h
@@ -13,6 +13,7 @@
#include "packets/cPacket.h"
#include "Vector3d.h"
+#include "cSocketThreads.h"
#include "packets/cPacket_KeepAlive.h"
#include "packets/cPacket_PlayerPosition.h"
@@ -54,7 +55,8 @@ class cRedstone;
-class cClientHandle // tolua_export
+class cClientHandle : // tolua_export
+ public cSocketThreads::cCallback
{ // tolua_export
public:
enum ENUM_PRIORITY
@@ -71,15 +73,14 @@ public:
static const int VIEWDISTANCE = 17; // MUST be odd number or CRASH!
static const int GENERATEDISTANCE = 2; // Server generates this many chunks AHEAD of player sight.
- const cSocket & GetSocket();
+ const cSocket & GetSocket(void) const {return m_Socket; }
+ cSocket & GetSocket(void) {return m_Socket; }
+
cPlayer* GetPlayer() { return m_Player; } // tolua_export
void Kick(const AString & a_Reason); //tolua_export
void Authenticate(void); // Called by cAuthenticator when the user passes authentication
- void AddPacket( cPacket * a_Packet );
- void HandlePendingPackets();
-
void StreamChunks();
void StreamChunksSmart( cChunk** a_Chunks, unsigned int a_NumChunks );
void RemoveFromAllChunks();
@@ -97,7 +98,6 @@ public:
void Send( const cPacket & a_Packet, ENUM_PRIORITY a_Priority = E_PRIORITY_NORMAL );
static void SendThread( void *lpParam );
- static void ReceiveThread( void *lpParam );
const AString & GetUsername(void) const;
@@ -108,19 +108,19 @@ private:
int m_ProtocolVersion;
AString m_Username;
AString m_Password;
+
+ AString m_ReceivedData; // Accumulator for the data received from the socket, waiting to be parsed; accessed from the cSocketThreads' thread only!
- PacketList m_PendingParsePackets;
PacketList m_PendingNrmSendPackets;
PacketList m_PendingLowSendPackets;
- cThread * m_pReceiveThread;
cThread * m_pSendThread;
cSocket m_Socket;
cCriticalSection m_CriticalSection;
cCriticalSection m_SendCriticalSection;
- cCriticalSection m_SocketCriticalSection;
+ // cCriticalSection m_SocketCriticalSection;
cSemaphore m_Semaphore;
Vector3d m_ConfirmPosition;
@@ -180,6 +180,12 @@ private:
bool CheckBlockInteractionsRate(void);
void SendLoginResponse();
+
+ // cSocketThreads::cCallback overrides:
+ virtual void DataReceived (const char * a_Data, int a_Size) override; // Data is received from the client
+ virtual void GetOutgoingData(AString & a_Data) override; // Data can be sent to client
+ virtual void SocketClosed (void) override; // The socket has been closed for any reason
+
}; // tolua_export
diff --git a/source/cServer.cpp b/source/cServer.cpp
index 36fb71875..e3fdb2f86 100644
--- a/source/cServer.cpp
+++ b/source/cServer.cpp
@@ -100,46 +100,9 @@ void cServer::ServerListenThread( void *a_Args )
-std::string GetWSAError()
+void cServer::ClientDestroying(const cClientHandle * a_Client)
{
-#ifdef _WIN32
- switch( WSAGetLastError() )
- {
- case WSANOTINITIALISED:
- return "WSANOTINITIALISED";
- case WSAENETDOWN:
- return "WSAENETDOWN";
- case WSAEFAULT:
- return "WSAEFAULT";
- case WSAENOTCONN:
- return "WSAENOTCONN";
- case WSAEINTR:
- return "WSAEINTR";
- case WSAEINPROGRESS:
- return "WSAEINPROGRESS";
- case WSAENETRESET:
- return "WSAENETRESET";
- case WSAENOTSOCK:
- return "WSAENOTSOCK";
- case WSAEOPNOTSUPP:
- return "WSAEOPNOTSUPP";
- case WSAESHUTDOWN:
- return "WSAESHUTDOWN";
- case WSAEWOULDBLOCK:
- return "WSAEWOULDBLOCK";
- case WSAEMSGSIZE:
- return "WSAEMSGSIZE";
- case WSAEINVAL:
- return "WSAEINVAL";
- case WSAECONNABORTED:
- return "WSAECONNABORTED";
- case WSAETIMEDOUT:
- return "WSAETIMEDOUT";
- case WSAECONNRESET:
- return "WSAECONNRESET";
- }
-#endif
- return "No Error";
+ m_SocketThreads.RemoveClient(a_Client);
}
@@ -183,11 +146,11 @@ bool cServer::InitServer( int a_Port )
return false;
}
- if( m_pState->SListenClient.SetReuseAddress() == -1 )
+ if( m_pState->SListenClient.SetReuseAddress() == -1 )
{
- LOGERROR("setsockopt == -1");
- return false;
- }
+ LOGERROR("setsockopt == -1");
+ return false;
+ }
cSocket::SockAddr_In local;
local.Family = cSocket::ADDRESS_FAMILY_INTERNET;
@@ -308,9 +271,16 @@ void cServer::StartListenClient()
return;
}
- LOG("%s connected!", ClientIP.c_str());
+ LOG("Client \"%s\" connected!", ClientIP.c_str());
- cClientHandle *NewHandle = new cClientHandle( SClient );
+ cClientHandle *NewHandle = new cClientHandle(SClient);
+ if (!m_SocketThreads.AddClient(&(NewHandle->GetSocket()), NewHandle))
+ {
+ // For some reason SocketThreads have rejected the handle, clean it up
+ SClient.CloseSocket();
+ delete NewHandle;
+ return;
+ }
m_pState->Clients.push_back( NewHandle ); // TODO - lock list
}
@@ -335,11 +305,8 @@ bool cServer::Tick(float a_Dt)
//World->LockClientHandle(); // TODO - Lock client list
for( ClientList::iterator itr = m_pState->Clients.begin(); itr != m_pState->Clients.end();)
{
- (*itr)->HandlePendingPackets();
-
if( (*itr)->IsDestroyed() )
{
-
cClientHandle* RemoveMe = *itr;
++itr;
m_pState->Clients.remove( RemoveMe );
@@ -464,7 +431,7 @@ void cServer::ServerCommand( const char* a_Cmd )
{
if( split[0].compare( "help" ) == 0 )
{
- printf("===================ALL COMMANDS====================\n");
+ printf("================== ALL COMMANDS ===================\n");
printf("help - Shows this message\n");
printf("save-all - Saves all loaded chunks to disk\n");
printf("list - Lists all players currently in server\n");
diff --git a/source/cServer.h b/source/cServer.h
index db60a2742..b56047487 100644
--- a/source/cServer.h
+++ b/source/cServer.h
@@ -1,9 +1,30 @@
+// cServer.h
+
+// Interfaces to the cServer object representing the network server
+
+
+
+
+
#pragma once
+#ifndef CSERVER_H_INCLUDED
+#define CSERVER_H_INCLUDED
+
+#include "cSocketThreads.h"
+
+
+
+
class cPlayer;
class cClientHandle;
class cPacket;
+
+
+
+
+
class cServer //tolua_export
{ //tolua_export
public: //tolua_export
@@ -34,13 +55,20 @@ public: //tolua_export
static void ServerListenThread( void* a_Args );
const AString & GetServerID(void) const;
+
+ void ClientDestroying(const cClientHandle * a_Client); // Called by cClientHandle::Destroy(); removes the client from m_SocketThreads
+
private:
+
friend class cRoot; // so cRoot can create and destroy cServer
+
cServer();
~cServer();
struct sServerState;
sServerState* m_pState;
+
+ cSocketThreads m_SocketThreads;
// Time since server was started
float m_Millisecondsf;
@@ -51,3 +79,13 @@ private:
bool m_bRestarting;
}; //tolua_export
+
+
+
+
+
+#endif // CSERVER_H_INCLUDED
+
+
+
+
diff --git a/source/cSocket.cpp b/source/cSocket.cpp
index 42cc298a7..00f10154b 100644
--- a/source/cSocket.cpp
+++ b/source/cSocket.cpp
@@ -27,6 +27,7 @@ cSocket::cSocket(xSocket a_Socket)
cSocket::~cSocket()
{
+ // Do NOT close the socket; this class is an API wrapper, not a RAII!
}
@@ -100,6 +101,10 @@ AString cSocket::GetErrorString( int a_ErrNo )
FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, NULL, a_ErrNo, 0, buffer, ARRAYCOUNT(buffer), NULL);
Printf(Out, "%d: %s", a_ErrNo, buffer);
+ if (!Out.empty() && (Out[Out.length() - 1] == '\n'))
+ {
+ Out.erase(Out.length() - 2);
+ }
return Out;
#else // _WIN32
@@ -312,7 +317,7 @@ unsigned short cSocket::GetPort(void) const
{
return 0;
}
- return Addr.sin_port;
+ return ntohs(Addr.sin_port);
}
diff --git a/source/cSocket.h b/source/cSocket.h
index b98741c4b..81749048b 100644
--- a/source/cSocket.h
+++ b/source/cSocket.h
@@ -29,6 +29,9 @@ public:
operator const xSocket() const;
xSocket GetSocket() const;
+
+ bool operator == (const cSocket & a_Other) {return m_Socket == a_Other.m_Socket; }
+
void SetSocket( xSocket a_Socket );
int SetReuseAddress();
diff --git a/source/cSocketThreads.cpp b/source/cSocketThreads.cpp
index 277036e46..796316878 100644
--- a/source/cSocketThreads.cpp
+++ b/source/cSocketThreads.cpp
@@ -39,7 +39,7 @@ cSocketThreads::~cSocketThreads()
-void cSocketThreads::AddClient(cSocket * a_Socket, cCallback * a_Client)
+bool cSocketThreads::AddClient(cSocket * a_Socket, cCallback * a_Client)
{
// Add a (socket, client) pair for processing, data from a_Socket is to be sent to a_Client
@@ -47,25 +47,32 @@ void cSocketThreads::AddClient(cSocket * a_Socket, cCallback * a_Client)
cCSLock Lock(m_CS);
for (cSocketThreadList::iterator itr = m_Threads.begin(); itr != m_Threads.end(); ++itr)
{
- if ((*itr)->HasEmptySlot())
+ if ((*itr)->IsValid() && (*itr)->HasEmptySlot())
{
(*itr)->AddClient(a_Socket, a_Client);
- return;
+ return true;
}
}
// No thread has free space, create a new one:
+ LOG("Creating a new cSocketThread (currently have %d)", m_Threads.size());
cSocketThread * Thread = new cSocketThread(this);
- Thread->Start();
+ if (!Thread->Start())
+ {
+ // There was an error launching the thread (but it was already logged along with the reason)
+ delete Thread;
+ return false;
+ }
Thread->AddClient(a_Socket, a_Client);
m_Threads.push_back(Thread);
+ return true;
}
-void cSocketThreads::RemoveClient(cSocket * a_Socket)
+void cSocketThreads::RemoveClient(const cSocket * a_Socket)
{
// Remove the socket (and associated client) from processing
@@ -83,7 +90,7 @@ void cSocketThreads::RemoveClient(cSocket * a_Socket)
-void cSocketThreads::RemoveClient(cCallback * a_Client)
+void cSocketThreads::RemoveClient(const cCallback * a_Client)
{
// Remove the associated socket and the client from processing
@@ -101,7 +108,7 @@ void cSocketThreads::RemoveClient(cCallback * a_Client)
-void cSocketThreads::NotifyWrite(cCallback * a_Client)
+void cSocketThreads::NotifyWrite(const cCallback * a_Client)
{
// Notifies the thread responsible for a_Client that the client has something to write
@@ -152,7 +159,7 @@ void cSocketThreads::cSocketThread::AddClient(cSocket * a_Socket, cCallback * a_
-bool cSocketThreads::cSocketThread::RemoveClient(cCallback * a_Client)
+bool cSocketThreads::cSocketThread::RemoveClient(const cCallback * a_Client)
{
// Returns true if removed, false if not found
@@ -161,7 +168,7 @@ bool cSocketThreads::cSocketThread::RemoveClient(cCallback * a_Client)
return false;
}
- for (int i = m_NumSlots - 1; i > 0 ; --i)
+ for (int i = m_NumSlots - 1; i >= 0 ; --i)
{
if (m_Slots[i].m_Client != a_Client)
{
@@ -186,7 +193,7 @@ bool cSocketThreads::cSocketThread::RemoveClient(cCallback * a_Client)
-bool cSocketThreads::cSocketThread::RemoveSocket(cSocket * a_Socket)
+bool cSocketThreads::cSocketThread::RemoveSocket(const cSocket * a_Socket)
{
// Returns true if removed, false if not found
@@ -195,7 +202,7 @@ bool cSocketThreads::cSocketThread::RemoveSocket(cSocket * a_Socket)
return false;
}
- for (int i = m_NumSlots - 1; i > 0 ; --i)
+ for (int i = m_NumSlots - 1; i >= 0 ; --i)
{
if (m_Slots[i].m_Socket != a_Socket)
{
@@ -220,7 +227,7 @@ bool cSocketThreads::cSocketThread::RemoveSocket(cSocket * a_Socket)
-bool cSocketThreads::cSocketThread::NotifyWrite(cCallback * a_Client)
+bool cSocketThreads::cSocketThread::NotifyWrite(const cCallback * a_Client)
{
if (HasClient(a_Client))
{
@@ -236,7 +243,7 @@ bool cSocketThreads::cSocketThread::NotifyWrite(cCallback * a_Client)
-bool cSocketThreads::cSocketThread::HasClient(cCallback * a_Client) const
+bool cSocketThreads::cSocketThread::HasClient(const cCallback * a_Client) const
{
for (int i = m_NumSlots - 1; i >= 0; --i)
{
@@ -252,11 +259,11 @@ bool cSocketThreads::cSocketThread::HasClient(cCallback * a_Client) const
-bool cSocketThreads::cSocketThread::HasSocket(cSocket * a_Socket) const
+bool cSocketThreads::cSocketThread::HasSocket(const cSocket * a_Socket) const
{
for (int i = m_NumSlots - 1; i >= 0; --i)
{
- if (m_Slots[i].m_Socket == a_Socket)
+ if (m_Slots[i].m_Socket->GetSocket() == a_Socket->GetSocket())
{
return true;
}
@@ -271,8 +278,8 @@ bool cSocketThreads::cSocketThread::HasSocket(cSocket * a_Socket) const
bool cSocketThreads::cSocketThread::Start(void)
{
// Create the control socket listener
- m_ControlSocket1 = cSocket::CreateSocket();
- if (!m_ControlSocket1.IsValid())
+ m_ControlSocket2 = cSocket::CreateSocket();
+ if (!m_ControlSocket2.IsValid())
{
LOGERROR("Cannot create a Control socket for a cSocketThread (\"%s\"); continuing, but server may be unreachable from now on.", cSocket::GetLastErrorString().c_str());
return false;
@@ -281,36 +288,42 @@ bool cSocketThreads::cSocketThread::Start(void)
Addr.Family = cSocket::ADDRESS_FAMILY_INTERNET;
Addr.Address = cSocket::INTERNET_ADDRESS_LOCALHOST();
Addr.Port = 0; // Any free port is okay
- if (m_ControlSocket1.Bind(Addr) != 0)
+ if (m_ControlSocket2.Bind(Addr) != 0)
{
LOGERROR("Cannot bind a Control socket for a cSocketThread (\"%s\"); continuing, but server may be unreachable from now on.", cSocket::GetLastErrorString().c_str());
- m_ControlSocket1.CloseSocket();
+ m_ControlSocket2.CloseSocket();
return false;
}
- if (m_ControlSocket1.GetPort() == 0)
+ if (m_ControlSocket2.Listen(1) != 0)
+ {
+ LOGERROR("Cannot listen on a Control socket for a cSocketThread (\"%s\"); continuing, but server may be unreachable from now on.", cSocket::GetLastErrorString().c_str());
+ m_ControlSocket2.CloseSocket();
+ return false;
+ }
+ if (m_ControlSocket2.GetPort() == 0)
{
LOGERROR("Cannot determine Control socket port (\"%s\"); conitnuing, but the server may be unreachable from now on.", cSocket::GetLastErrorString().c_str());
- m_ControlSocket1.CloseSocket();
+ m_ControlSocket2.CloseSocket();
return false;
}
// Start the thread
if (!super::Start())
{
- m_ControlSocket1.CloseSocket();
+ m_ControlSocket2.CloseSocket();
return false;
}
// Finish connecting the control socket by accepting connection from the thread's socket
- cSocket tmp = m_ControlSocket1.Accept();
+ cSocket tmp = m_ControlSocket2.Accept();
if (!tmp.IsValid())
{
LOGERROR("Cannot link Control sockets for a cSocketThread (\"%s\"); continuing, but server may be unreachable from now on.", cSocket::GetLastErrorString().c_str());
- m_ControlSocket1.CloseSocket();
+ m_ControlSocket2.CloseSocket();
return false;
}
- m_ControlSocket1.CloseSocket();
- m_ControlSocket1 = tmp;
+ m_ControlSocket2.CloseSocket();
+ m_ControlSocket2 = tmp;
return true;
}
@@ -322,15 +335,16 @@ bool cSocketThreads::cSocketThread::Start(void)
void cSocketThreads::cSocketThread::Execute(void)
{
// Connect the "client" part of the Control socket:
+ m_ControlSocket1 = cSocket::CreateSocket();
cSocket::SockAddr_In Addr;
Addr.Family = cSocket::ADDRESS_FAMILY_INTERNET;
Addr.Address = cSocket::INTERNET_ADDRESS_LOCALHOST();
- Addr.Port = m_ControlSocket1.GetPort();
+ Addr.Port = m_ControlSocket2.GetPort();
assert(Addr.Port != 0); // We checked in the Start() method, but let's be sure
- if (m_ControlSocket2.Connect(Addr) != 0)
+ if (m_ControlSocket1.Connect(Addr) != 0)
{
LOGERROR("Cannot connect Control sockets for a cSocketThread (\"%s\"); continuing, but the server may be unreachable from now on.", cSocket::GetLastErrorString().c_str());
- m_ControlSocket1.CloseSocket();
+ m_ControlSocket2.CloseSocket();
return;
}
@@ -339,33 +353,35 @@ void cSocketThreads::cSocketThread::Execute(void)
{
// Put all sockets into the Read set:
fd_set fdRead;
- cSocket::xSocket Highest = 0;
+ cSocket::xSocket Highest = m_ControlSocket1.GetSocket();
PrepareSet(&fdRead, Highest);
// Wait for the sockets:
if (select(Highest + 1, &fdRead, NULL, NULL, NULL) == -1)
{
- LOGWARNING("select(R) call failed in cSocketThread: \"%s\"", cSocket::GetLastErrorString().c_str());
- break;
+ LOG("select(R) call failed in cSocketThread: \"%s\"", cSocket::GetLastErrorString().c_str());
+ continue;
}
ReadFromSockets(&fdRead);
// Test sockets for writing:
fd_set fdWrite;
- Highest = 0;
+ Highest = m_ControlSocket1.GetSocket();
PrepareSet(&fdWrite, Highest);
timeval Timeout;
Timeout.tv_sec = 0;
Timeout.tv_usec = 0;
if (select(Highest + 1, NULL, &fdWrite, NULL, &Timeout) == -1)
{
- LOGWARNING("select(W) call failed in cSocketThread: \"%s\"", cSocket::GetLastErrorString().c_str());
- break;
+ LOG("select(W) call failed in cSocketThread: \"%s\"", cSocket::GetLastErrorString().c_str());
+ continue;
}
WriteToSockets(&fdWrite);
+
+ RemoveClosedSockets();
} // while (!mShouldTerminate)
LOG("cSocketThread %p is terminating", this);
@@ -381,7 +397,7 @@ void cSocketThreads::cSocketThread::PrepareSet(fd_set * a_Set, cSocket::xSocket
FD_SET(m_ControlSocket1.GetSocket(), a_Set);
cCSLock Lock(m_Parent->m_CS);
- for (int i = m_NumSlots - 1; i > 0; --i)
+ for (int i = m_NumSlots - 1; i >= 0; --i)
{
if (!m_Slots[i].m_Socket->IsValid())
{
@@ -403,8 +419,17 @@ void cSocketThreads::cSocketThread::PrepareSet(fd_set * a_Set, cSocket::xSocket
void cSocketThreads::cSocketThread::ReadFromSockets(fd_set * a_Read)
{
// Read on available sockets:
+
+ // Reset Control socket state:
+ if (FD_ISSET(m_ControlSocket1.GetSocket(), a_Read))
+ {
+ char Dummy[128];
+ m_ControlSocket1.Receive(Dummy, sizeof(Dummy), 0);
+ }
+
+ // Read from clients:
cCSLock Lock(m_Parent->m_CS);
- for (int i = m_NumSlots - 1; i > 0; --i)
+ for (int i = m_NumSlots - 1; i >= 0; --i)
{
if (!FD_ISSET(m_Slots[i].m_Socket->GetSocket(), a_Read))
{
@@ -437,8 +462,9 @@ void cSocketThreads::cSocketThread::ReadFromSockets(fd_set * a_Read)
void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write)
{
+ // Write to available client sockets:
cCSLock Lock(m_Parent->m_CS);
- for (int i = m_NumSlots - 1; i > 0; --i)
+ for (int i = m_NumSlots - 1; i >= 0; --i)
{
if (!FD_ISSET(m_Slots[i].m_Socket->GetSocket(), a_Write))
{
@@ -464,9 +490,40 @@ void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write)
return;
}
m_Slots[i].m_Outgoing.erase(0, Sent);
+
+ // _X: If there's data left, it means the client is not reading fast enough, the server would unnecessarily spin in the main loop with zero actions taken; so signalling is disabled
+ // This means that if there's data left, it will be sent only when there's incoming data or someone queues another packet (for any socket handled by this thread)
+ /*
+ // If there's any data left, signalize the Control socket:
+ if (!m_Slots[i].m_Outgoing.empty())
+ {
+ assert(m_ControlSocket2.IsValid());
+ m_ControlSocket2.Send("q", 1);
+ }
+ */
} // for i - m_Slots[i]
}
+
+void cSocketThreads::cSocketThread::RemoveClosedSockets(void)
+{
+ // Removes sockets that have closed from m_Slots[]
+
+ cCSLock Lock(m_Parent->m_CS);
+ for (int i = m_NumSlots - 1; i >= 0; --i)
+ {
+ if (m_Slots[i].m_Socket->IsValid())
+ {
+ continue;
+ }
+ m_Slots[i] = m_Slots[m_NumSlots - 1];
+ m_NumSlots--;
+ } // for i - m_Slots[]
+}
+
+
+
+
diff --git a/source/cSocketThreads.h b/source/cSocketThreads.h
index b43d693ba..cbf73a27e 100644
--- a/source/cSocketThreads.h
+++ b/source/cSocketThreads.h
@@ -10,7 +10,7 @@
/// How many clients should one thread handle? (must be less than FD_SETSIZE - 1 for your platform)
-#define MAX_SLOTS 1
+#define MAX_SLOTS 63
@@ -65,17 +65,17 @@ public:
cSocketThreads(void);
~cSocketThreads();
- /// Add a (socket, client) pair for processing, data from a_Socket is to be sent to a_Client
- void AddClient(cSocket * a_Socket, cCallback * a_Client);
+ /// Add a (socket, client) pair for processing, data from a_Socket is to be sent to a_Client; returns true if successful
+ bool AddClient(cSocket * a_Socket, cCallback * a_Client);
/// Remove the socket (and associated client) from processing
- void RemoveClient(cSocket * a_Socket);
+ void RemoveClient(const cSocket * a_Socket);
/// Remove the associated socket and the client from processing
- void RemoveClient(cCallback * a_Client);
+ void RemoveClient(const cCallback * a_Client);
/// Notify the thread responsible for a_Client that the client has something to write
- void NotifyWrite(cCallback * a_Client);
+ void NotifyWrite(const cCallback * a_Client);
private:
@@ -92,15 +92,17 @@ private:
bool HasEmptySlot(void) const {return m_NumSlots < MAX_SLOTS; }
bool IsEmpty (void) const {return m_NumSlots == 0; }
- void AddClient (cSocket * a_Socket, cCallback * a_Client);
- bool RemoveClient(cCallback * a_Client); // Returns true if removed, false if not found
- bool RemoveSocket(cSocket * a_Socket); // Returns true if removed, false if not found
- bool HasClient (cCallback * a_Client) const;
- bool HasSocket (cSocket * a_Socket) const;
- bool NotifyWrite (cCallback * a_Client); // Returns true if client handled by this thread
+ void AddClient (cSocket * a_Socket, cCallback * a_Client);
+ bool RemoveClient(const cCallback * a_Client); // Returns true if removed, false if not found
+ bool RemoveSocket(const cSocket * a_Socket); // Returns true if removed, false if not found
+ bool HasClient (const cCallback * a_Client) const;
+ bool HasSocket (const cSocket * a_Socket) const;
+ bool NotifyWrite (const cCallback * a_Client); // Returns true if client handled by this thread
bool Start(void); // Hide the cIsThread's Start method, we need to provide our own startup to create the control socket
+ bool IsValid(void) const {return m_ControlSocket2.IsValid(); } // If the Control socket dies, the thread is not valid anymore
+
private:
cSocketThreads * m_Parent;
@@ -127,6 +129,7 @@ private:
void PrepareSet (fd_set * a_Set, cSocket::xSocket & a_Highest); // Puts all sockets into the set, along with m_ControlSocket1
void ReadFromSockets(fd_set * a_Read); // Reads from sockets indicated in a_Read
void WriteToSockets (fd_set * a_Write); // Writes to sockets indicated in a_Write
+ void RemoveClosedSockets(void); // Removes sockets that have closed from m_Slots[]
} ;
typedef std::list<cSocketThread *> cSocketThreadList;