summaryrefslogtreecommitdiffstats
path: root/src/OSSupport
diff options
context:
space:
mode:
Diffstat (limited to 'src/OSSupport')
-rw-r--r--src/OSSupport/BlockingTCPLink.cpp4
-rw-r--r--src/OSSupport/Errors.cpp53
-rw-r--r--src/OSSupport/Errors.h5
-rw-r--r--src/OSSupport/Event.cpp17
-rw-r--r--src/OSSupport/File.cpp9
-rw-r--r--src/OSSupport/File.h45
-rw-r--r--src/OSSupport/Socket.cpp51
-rw-r--r--src/OSSupport/Socket.h13
-rw-r--r--src/OSSupport/SocketThreads.cpp286
-rw-r--r--src/OSSupport/SocketThreads.h82
10 files changed, 282 insertions, 283 deletions
diff --git a/src/OSSupport/BlockingTCPLink.cpp b/src/OSSupport/BlockingTCPLink.cpp
index 08aec0c65..af50eda5d 100644
--- a/src/OSSupport/BlockingTCPLink.cpp
+++ b/src/OSSupport/BlockingTCPLink.cpp
@@ -2,7 +2,7 @@
#include "Globals.h" // NOTE: MSVC stupidness requires this to be the same across all modules
#include "BlockingTCPLink.h"
-
+#include "Errors.h"
@@ -75,7 +75,7 @@ bool cBlockingTCPLink::Connect(const char * iAddress, unsigned int iPort)
server.sin_port = htons( (unsigned short)iPort);
if (connect(m_Socket, (struct sockaddr *)&server, sizeof(server)))
{
- LOGWARN("cTCPLink: Connection to \"%s:%d\" failed (%s)", iAddress, iPort, cSocket::GetErrorString( cSocket::GetLastError() ).c_str() );
+ LOGWARN("cTCPLink: Connection to \"%s:%d\" failed (%s)", iAddress, iPort,GetOSErrorString( cSocket::GetLastError() ).c_str() );
CloseSocket();
return false;
}
diff --git a/src/OSSupport/Errors.cpp b/src/OSSupport/Errors.cpp
new file mode 100644
index 000000000..2e05f1df1
--- /dev/null
+++ b/src/OSSupport/Errors.cpp
@@ -0,0 +1,53 @@
+
+#include "Globals.h"
+
+#include "Errors.h"
+
+AString GetOSErrorString( int a_ErrNo )
+{
+ char buffer[ 1024 ];
+ AString Out;
+
+ #ifdef _WIN32
+
+ FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, NULL, a_ErrNo, 0, buffer, ARRAYCOUNT(buffer), NULL);
+ Printf(Out, "%d: %s", a_ErrNo, buffer);
+ if (!Out.empty() && (Out[Out.length() - 1] == '\n'))
+ {
+ Out.erase(Out.length() - 2);
+ }
+ return Out;
+
+ #else // _WIN32
+
+ // According to http://linux.die.net/man/3/strerror_r there are two versions of strerror_r():
+
+ #if ( _GNU_SOURCE ) && !defined(ANDROID_NDK) // GNU version of strerror_r()
+
+ char * res = strerror_r( errno, buffer, ARRAYCOUNT(buffer) );
+ if( res != NULL )
+ {
+ Printf(Out, "%d: %s", a_ErrNo, res);
+ return Out;
+ }
+
+ #else // XSI version of strerror_r():
+
+ int res = strerror_r( errno, buffer, ARRAYCOUNT(buffer) );
+ if( res == 0 )
+ {
+ Printf(Out, "%d: %s", a_ErrNo, buffer);
+ return Out;
+ }
+
+ #endif // strerror_r() version
+
+ else
+ {
+ Printf(Out, "Error %d while getting error string for error #%d!", errno, a_ErrNo);
+ return Out;
+ }
+
+ #endif // else _WIN32
+}
+
diff --git a/src/OSSupport/Errors.h b/src/OSSupport/Errors.h
new file mode 100644
index 000000000..8ce9deb10
--- /dev/null
+++ b/src/OSSupport/Errors.h
@@ -0,0 +1,5 @@
+
+#pragma once
+
+AString GetOSErrorString(int a_ErrNo);
+
diff --git a/src/OSSupport/Event.cpp b/src/OSSupport/Event.cpp
index cbacbba17..649a0a3cf 100644
--- a/src/OSSupport/Event.cpp
+++ b/src/OSSupport/Event.cpp
@@ -7,7 +7,7 @@
#include "Globals.h" // NOTE: MSVC stupidness requires this to be the same across all modules
#include "Event.h"
-
+#include "Errors.h"
@@ -35,14 +35,16 @@ cEvent::cEvent(void)
m_Event = sem_open(EventName.c_str(), O_CREAT, 777, 0 );
if (m_Event == SEM_FAILED)
{
- LOGERROR("cEvent: Cannot create event, errno = %i. Aborting server.", errno);
+ AString error = GetOSErrorString(errno);
+ LOGERROR("cEvent: Cannot create event, err = %s. Aborting server.", error.c_str());
abort();
}
// Unlink the semaphore immediately - it will continue to function but will not pollute the namespace
// We don't store the name, so can't call this in the destructor
if (sem_unlink(EventName.c_str()) != 0)
{
- LOGWARN("ERROR: Could not unlink cEvent. (%i)", errno);
+ AString error = GetOSErrorString(errno);
+ LOGWARN("ERROR: Could not unlink cEvent. (%s)", error.c_str());
}
}
#endif // *nix
@@ -61,7 +63,8 @@ cEvent::~cEvent()
{
if (sem_close(m_Event) != 0)
{
- LOGERROR("ERROR: Could not close cEvent. (%i)", errno);
+ AString error = GetOSErrorString(errno);
+ LOGERROR("ERROR: Could not close cEvent. (%s)", error.c_str());
}
}
else
@@ -88,7 +91,8 @@ void cEvent::Wait(void)
int res = sem_wait(m_Event);
if (res != 0 )
{
- LOGWARN("cEvent: waiting for the event failed: %i, errno = %i. Continuing, but server may be unstable.", res, errno);
+ AString error = GetOSErrorString(errno);
+ LOGWARN("cEvent: waiting for the event failed: %i, err = %s. Continuing, but server may be unstable.", res, error.c_str());
}
#endif
}
@@ -108,7 +112,8 @@ void cEvent::Set(void)
int res = sem_post(m_Event);
if (res != 0)
{
- LOGWARN("cEvent: Could not set cEvent: %i, errno = %d", res, errno);
+ AString error = GetOSErrorString(errno);
+ LOGWARN("cEvent: Could not set cEvent: %i, err = %s", res, error.c_str());
}
#endif
}
diff --git a/src/OSSupport/File.cpp b/src/OSSupport/File.cpp
index 9f7c0d439..0ebd04915 100644
--- a/src/OSSupport/File.cpp
+++ b/src/OSSupport/File.cpp
@@ -450,3 +450,12 @@ int cFile::Printf(const char * a_Fmt, ...)
+
+void cFile::Flush(void)
+{
+ fflush(m_File);
+}
+
+
+
+
diff --git a/src/OSSupport/File.h b/src/OSSupport/File.h
index 01663a229..07fce6661 100644
--- a/src/OSSupport/File.h
+++ b/src/OSSupport/File.h
@@ -18,6 +18,8 @@ Usage:
2, Check if the file was opened using IsOpen()
3, Read / write
4, Destroy the instance
+
+For reading entire files into memory, just use the static cFile::ReadWholeFile()
*/
@@ -55,7 +57,7 @@ public:
static const char PathSeparator = '/';
#endif
- /// The mode in which to open the file
+ /** The mode in which to open the file */
enum eMode
{
fmRead, // Read-only. If the file doesn't exist, object will not be valid
@@ -63,13 +65,13 @@ public:
fmReadWrite // Read/write. If the file already exists, it will be left intact; writing will overwrite the data from the beginning
} ;
- /// Simple constructor - creates an unopened file object, use Open() to open / create a real file
+ /** Simple constructor - creates an unopened file object, use Open() to open / create a real file */
cFile(void);
- /// Constructs and opens / creates the file specified, use IsOpen() to check for success
+ /** Constructs and opens / creates the file specified, use IsOpen() to check for success */
cFile(const AString & iFileName, eMode iMode);
- /// Auto-closes the file, if open
+ /** Auto-closes the file, if open */
~cFile();
bool Open(const AString & iFileName, eMode iMode);
@@ -77,60 +79,63 @@ public:
bool IsOpen(void) const;
bool IsEOF(void) const;
- /// Reads up to iNumBytes bytes into iBuffer, returns the number of bytes actually read, or -1 on failure; asserts if not open
+ /** Reads up to iNumBytes bytes into iBuffer, returns the number of bytes actually read, or -1 on failure; asserts if not open */
int Read (void * iBuffer, int iNumBytes);
- /// Writes up to iNumBytes bytes from iBuffer, returns the number of bytes actually written, or -1 on failure; asserts if not open
+ /** Writes up to iNumBytes bytes from iBuffer, returns the number of bytes actually written, or -1 on failure; asserts if not open */
int Write(const void * iBuffer, int iNumBytes);
- /// Seeks to iPosition bytes from file start, returns old position or -1 for failure; asserts if not open
+ /** Seeks to iPosition bytes from file start, returns old position or -1 for failure; asserts if not open */
int Seek (int iPosition);
- /// Returns the current position (bytes from file start) or -1 for failure; asserts if not open
+ /** Returns the current position (bytes from file start) or -1 for failure; asserts if not open */
int Tell (void) const;
- /// Returns the size of file, in bytes, or -1 for failure; asserts if not open
+ /** Returns the size of file, in bytes, or -1 for failure; asserts if not open */
int GetSize(void) const;
- /// Reads the file from current position till EOF into an AString; returns the number of bytes read or -1 for error
+ /** Reads the file from current position till EOF into an AString; returns the number of bytes read or -1 for error */
int ReadRestOfFile(AString & a_Contents);
// tolua_begin
- /// Returns true if the file specified exists
+ /** Returns true if the file specified exists */
static bool Exists(const AString & a_FileName);
- /// Deletes a file, returns true if successful
+ /** Deletes a file, returns true if successful */
static bool Delete(const AString & a_FileName);
- /// Renames a file or folder, returns true if successful. May fail if dest already exists (libc-dependant)!
+ /** Renames a file or folder, returns true if successful. May fail if dest already exists (libc-dependant)! */
static bool Rename(const AString & a_OrigPath, const AString & a_NewPath);
- /// Copies a file, returns true if successful.
+ /** Copies a file, returns true if successful. */
static bool Copy(const AString & a_SrcFileName, const AString & a_DstFileName);
- /// Returns true if the specified path is a folder
+ /** Returns true if the specified path is a folder */
static bool IsFolder(const AString & a_Path);
- /// Returns true if the specified path is a regular file
+ /** Returns true if the specified path is a regular file */
static bool IsFile(const AString & a_Path);
- /// Returns the size of the file, or a negative number on error
+ /** Returns the size of the file, or a negative number on error */
static int GetSize(const AString & a_FileName);
- /// Creates a new folder with the specified name. Returns true if successful. Path may be relative or absolute
+ /** Creates a new folder with the specified name. Returns true if successful. Path may be relative or absolute */
static bool CreateFolder(const AString & a_FolderPath);
- /// Returns the entire contents of the specified file as a string. Returns empty string on error.
+ /** Returns the entire contents of the specified file as a string. Returns empty string on error. */
static AString ReadWholeFile(const AString & a_FileName);
// tolua_end
- /// Returns the list of all items in the specified folder (files, folders, nix pipes, whatever's there).
+ /** Returns the list of all items in the specified folder (files, folders, nix pipes, whatever's there). */
static AStringVector GetFolderContents(const AString & a_Folder); // Exported in ManualBindings.cpp
int Printf(const char * a_Fmt, ...);
+ /** Flushes all the bufferef output into the file (only when writing) */
+ void Flush(void);
+
private:
#ifdef USE_STDIO_FILE
FILE * m_File;
diff --git a/src/OSSupport/Socket.cpp b/src/OSSupport/Socket.cpp
index 8ea5d8320..4226a7535 100644
--- a/src/OSSupport/Socket.cpp
+++ b/src/OSSupport/Socket.cpp
@@ -87,52 +87,19 @@ void cSocket::CloseSocket()
-AString cSocket::GetErrorString( int a_ErrNo )
+void cSocket::ShutdownReadWrite(void)
{
- char buffer[ 1024 ];
- AString Out;
-
#ifdef _WIN32
-
- FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, NULL, a_ErrNo, 0, buffer, ARRAYCOUNT(buffer), NULL);
- Printf(Out, "%d: %s", a_ErrNo, buffer);
- if (!Out.empty() && (Out[Out.length() - 1] == '\n'))
- {
- Out.erase(Out.length() - 2);
- }
- return Out;
-
- #else // _WIN32
-
- // According to http://linux.die.net/man/3/strerror_r there are two versions of strerror_r():
-
- #if ( _GNU_SOURCE ) && !defined(ANDROID_NDK) // GNU version of strerror_r()
-
- char * res = strerror_r( errno, buffer, ARRAYCOUNT(buffer) );
- if( res != NULL )
- {
- Printf(Out, "%d: %s", a_ErrNo, res);
- return Out;
- }
-
- #else // XSI version of strerror_r():
-
- int res = strerror_r( errno, buffer, ARRAYCOUNT(buffer) );
- if( res == 0 )
- {
- Printf(Out, "%d: %s", a_ErrNo, buffer);
- return Out;
- }
-
- #endif // strerror_r() version
-
- else
+ int res = shutdown(m_Socket, SD_BOTH);
+ #else
+ int res = shutdown(m_Socket, SHUT_RDWR);
+ #endif
+ if (res != 0)
{
- Printf(Out, "Error %d while getting error string for error #%d!", errno, a_ErrNo);
- return Out;
+ LOGWARN("%s: Error shutting down socket %d (%s): %d (%s)",
+ __FUNCTION__, m_Socket, m_IPString.c_str(), this->GetLastError(), GetLastErrorString().c_str()
+ );
}
-
- #endif // else _WIN32
}
diff --git a/src/OSSupport/Socket.h b/src/OSSupport/Socket.h
index b86560de8..4ca3d61f4 100644
--- a/src/OSSupport/Socket.h
+++ b/src/OSSupport/Socket.h
@@ -14,7 +14,7 @@
#endif
-
+#include "Errors.h"
class cSocket
@@ -39,7 +39,11 @@ public:
bool IsValid(void) const { return IsValidSocket(m_Socket); }
void CloseSocket(void);
-
+
+ /** Notifies the socket that we don't expect any more reads nor writes on it.
+ Most TCPIP implementations use this to send the FIN flag in a packet */
+ void ShutdownReadWrite(void);
+
operator xSocket(void) const;
xSocket GetSocket(void) const;
@@ -53,11 +57,10 @@ public:
/// Initializes the network stack. Returns 0 on success, or another number as an error code.
static int WSAStartup(void);
- static AString GetErrorString(int a_ErrNo);
static int GetLastError();
static AString GetLastErrorString(void)
{
- return GetErrorString(GetLastError());
+ return GetOSErrorString(GetLastError());
}
/// Creates a new socket of the specified address family
@@ -111,4 +114,4 @@ public:
private:
xSocket m_Socket;
AString m_IPString;
-}; \ No newline at end of file
+};
diff --git a/src/OSSupport/SocketThreads.cpp b/src/OSSupport/SocketThreads.cpp
index 3e505616c..74932daf8 100644
--- a/src/OSSupport/SocketThreads.cpp
+++ b/src/OSSupport/SocketThreads.cpp
@@ -7,6 +7,7 @@
#include "Globals.h"
#include "SocketThreads.h"
+#include "Errors.h"
@@ -71,29 +72,6 @@ bool cSocketThreads::AddClient(const cSocket & a_Socket, cCallback * a_Client)
-/*
-void cSocketThreads::RemoveClient(const cSocket * a_Socket)
-{
- // Remove the socket (and associated client) from processing
-
- cCSLock Lock(m_CS);
- for (cSocketThreadList::iterator itr = m_Threads.begin(); itr != m_Threads.end(); ++itr)
- {
- if ((*itr)->RemoveSocket(a_Socket))
- {
- return;
- }
- } // for itr - m_Threads[]
-
- // Cannot assert here, this may actually happen legally, since cClientHandle has to clean up the socket and it may have already closed in the meantime
- // ASSERT(!"Removing an unknown socket");
-}
-*/
-
-
-
-
-
void cSocketThreads::RemoveClient(const cCallback * a_Client)
{
// Remove the associated socket and the client from processing
@@ -155,47 +133,6 @@ void cSocketThreads::Write(const cCallback * a_Client, const AString & a_Data)
-/// Stops reading from the socket - when this call returns, no more calls to the callbacks are made
-void cSocketThreads::StopReading(const cCallback * a_Client)
-{
- cCSLock Lock(m_CS);
- for (cSocketThreadList::iterator itr = m_Threads.begin(); itr != m_Threads.end(); ++itr)
- {
- if ((*itr)->StopReading(a_Client))
- {
- return;
- }
- } // for itr - m_Threads[]
-
- // Cannot assert, this normally happens if the socket is closed before the client deinitializes
- // ASSERT(!"Stopping reading on an unknown client");
-}
-
-
-
-
-
-/// Queues the socket for closing, as soon as its outgoing data is sent
-void cSocketThreads::QueueClose(const cCallback * a_Client)
-{
- LOGD("QueueClose(client %p)", a_Client);
-
- cCSLock Lock(m_CS);
- for (cSocketThreadList::iterator itr = m_Threads.begin(); itr != m_Threads.end(); ++itr)
- {
- if ((*itr)->QueueClose(a_Client))
- {
- return;
- }
- } // for itr - m_Threads[]
-
- ASSERT(!"Queueing close of an unknown client");
-}
-
-
-
-
-
////////////////////////////////////////////////////////////////////////////////
// cSocketThreads::cSocketThread:
@@ -233,13 +170,13 @@ cSocketThreads::cSocketThread::~cSocketThread()
void cSocketThreads::cSocketThread::AddClient(const cSocket & a_Socket, cCallback * a_Client)
{
+ ASSERT(m_Parent->m_CS.IsLockedByCurrentThread());
ASSERT(m_NumSlots < MAX_SLOTS); // Use HasEmptySlot() to check before adding
m_Slots[m_NumSlots].m_Client = a_Client;
m_Slots[m_NumSlots].m_Socket = a_Socket;
m_Slots[m_NumSlots].m_Outgoing.clear();
- m_Slots[m_NumSlots].m_ShouldClose = false;
- m_Slots[m_NumSlots].m_ShouldCallClient = true;
+ m_Slots[m_NumSlots].m_State = sSlot::ssNormal;
m_NumSlots++;
// Notify the thread of the change:
@@ -253,7 +190,7 @@ void cSocketThreads::cSocketThread::AddClient(const cSocket & a_Socket, cCallbac
bool cSocketThreads::cSocketThread::RemoveClient(const cCallback * a_Client)
{
- // Returns true if removed, false if not found
+ ASSERT(m_Parent->m_CS.IsLockedByCurrentThread());
if (m_NumSlots == 0)
{
@@ -267,36 +204,29 @@ bool cSocketThreads::cSocketThread::RemoveClient(const cCallback * a_Client)
continue;
}
- // Found, remove it:
- m_Slots[i] = m_Slots[--m_NumSlots];
-
- // Notify the thread of the change:
- ASSERT(m_ControlSocket2.IsValid());
- m_ControlSocket2.Send("r", 1);
- return true;
- } // for i - m_Slots[]
-
- // Not found
- return false;
-}
-
-
-
-
-
-bool cSocketThreads::cSocketThread::RemoveSocket(const cSocket * a_Socket)
-{
- // Returns true if removed, false if not found
-
- for (int i = m_NumSlots - 1; i >= 0 ; --i)
- {
- if (m_Slots[i].m_Socket != *a_Socket)
+ // Found the slot:
+ if (m_Slots[i].m_State == sSlot::ssRemoteClosed)
{
- continue;
+ // The remote has already closed the socket, remove the slot altogether:
+ m_Slots[i] = m_Slots[--m_NumSlots];
+ }
+ else
+ {
+ // Query and queue the last batch of outgoing data:
+ m_Slots[i].m_Client->GetOutgoingData(m_Slots[i].m_Outgoing);
+ if (m_Slots[i].m_Outgoing.empty())
+ {
+ // No more outgoing data, shut the socket down immediately:
+ m_Slots[i].m_Socket.ShutdownReadWrite();
+ m_Slots[i].m_State = sSlot::ssShuttingDown;
+ }
+ else
+ {
+ // More data to send, shut down reading and wait for the rest to get sent:
+ m_Slots[i].m_State = sSlot::ssWritingRestOut;
+ }
+ m_Slots[i].m_Client = NULL;
}
-
- // Found, remove it:
- m_Slots[i] = m_Slots[--m_NumSlots];
// Notify the thread of the change:
ASSERT(m_ControlSocket2.IsValid());
@@ -314,6 +244,8 @@ bool cSocketThreads::cSocketThread::RemoveSocket(const cSocket * a_Socket)
bool cSocketThreads::cSocketThread::HasClient(const cCallback * a_Client) const
{
+ ASSERT(m_Parent->m_CS.IsLockedByCurrentThread());
+
for (int i = m_NumSlots - 1; i >= 0; --i)
{
if (m_Slots[i].m_Client == a_Client)
@@ -346,6 +278,8 @@ bool cSocketThreads::cSocketThread::HasSocket(const cSocket * a_Socket) const
bool cSocketThreads::cSocketThread::NotifyWrite(const cCallback * a_Client)
{
+ ASSERT(m_Parent->m_CS.IsLockedByCurrentThread());
+
if (HasClient(a_Client))
{
// Notify the thread that there's another packet in the queue:
@@ -362,7 +296,7 @@ bool cSocketThreads::cSocketThread::NotifyWrite(const cCallback * a_Client)
bool cSocketThreads::cSocketThread::Write(const cCallback * a_Client, const AString & a_Data)
{
- // Returns true if socket handled by this thread
+ ASSERT(m_Parent->m_CS.IsLockedByCurrentThread());
for (int i = m_NumSlots - 1; i >= 0; --i)
{
if (m_Slots[i].m_Client == a_Client)
@@ -383,47 +317,6 @@ bool cSocketThreads::cSocketThread::Write(const cCallback * a_Client, const AStr
-bool cSocketThreads::cSocketThread::StopReading (const cCallback * a_Client)
-{
- // Returns true if client handled by this thread
- for (int i = m_NumSlots - 1; i >= 0; --i)
- {
- if (m_Slots[i].m_Client == a_Client)
- {
- m_Slots[i].m_ShouldCallClient = false;
- return true;
- }
- } // for i - m_Slots[]
- return false;
-}
-
-
-
-
-
-bool cSocketThreads::cSocketThread::QueueClose(const cCallback * a_Client)
-{
- // Returns true if socket handled by this thread
- for (int i = m_NumSlots - 1; i >= 0; --i)
- {
- if (m_Slots[i].m_Client == a_Client)
- {
- m_Slots[i].m_ShouldClose = true;
-
- // Notify the thread that there's a close queued (in case its conditions are already met):
- ASSERT(m_ControlSocket2.IsValid());
- m_ControlSocket2.Send("c", 1);
-
- return true;
- }
- } // for i - m_Slots[]
- return false;
-}
-
-
-
-
-
bool cSocketThreads::cSocketThread::Start(void)
{
// Create the control socket listener
@@ -497,10 +390,13 @@ void cSocketThreads::cSocketThread::Execute(void)
fd_set fdRead;
cSocket::xSocket Highest = m_ControlSocket1.GetSocket();
- PrepareSet(&fdRead, Highest);
+ PrepareSet(&fdRead, Highest, false);
// Wait for the sockets:
- if (select(Highest + 1, &fdRead, NULL, NULL, NULL) == -1)
+ timeval Timeout;
+ Timeout.tv_sec = 5;
+ Timeout.tv_usec = 0;
+ if (select(Highest + 1, &fdRead, NULL, NULL, &Timeout) == -1)
{
LOG("select(R) call failed in cSocketThread: \"%s\"", cSocket::GetLastErrorString().c_str());
continue;
@@ -511,8 +407,7 @@ void cSocketThreads::cSocketThread::Execute(void)
// Test sockets for writing:
fd_set fdWrite;
Highest = m_ControlSocket1.GetSocket();
- PrepareSet(&fdWrite, Highest);
- timeval Timeout;
+ PrepareSet(&fdWrite, Highest, true);
Timeout.tv_sec = 0;
Timeout.tv_usec = 0;
if (select(Highest + 1, NULL, &fdWrite, NULL, &Timeout) == -1)
@@ -522,6 +417,8 @@ void cSocketThreads::cSocketThread::Execute(void)
}
WriteToSockets(&fdWrite);
+
+ CleanUpShutSockets();
} // while (!mShouldTerminate)
}
@@ -529,7 +426,7 @@ void cSocketThreads::cSocketThread::Execute(void)
-void cSocketThreads::cSocketThread::PrepareSet(fd_set * a_Set, cSocket::xSocket & a_Highest)
+void cSocketThreads::cSocketThread::PrepareSet(fd_set * a_Set, cSocket::xSocket & a_Highest, bool a_IsForWriting)
{
FD_ZERO(a_Set);
FD_SET(m_ControlSocket1.GetSocket(), a_Set);
@@ -541,6 +438,11 @@ void cSocketThreads::cSocketThread::PrepareSet(fd_set * a_Set, cSocket::xSocket
{
continue;
}
+ if (m_Slots[i].m_State == sSlot::ssRemoteClosed)
+ {
+ // This socket won't provide nor consume any data anymore, don't put it in the Set
+ continue;
+ }
cSocket::xSocket s = m_Slots[i].m_Socket.GetSocket();
FD_SET(s, a_Set);
if (s > a_Highest)
@@ -576,29 +478,42 @@ void cSocketThreads::cSocketThread::ReadFromSockets(fd_set * a_Read)
}
char Buffer[1024];
int Received = m_Slots[i].m_Socket.Receive(Buffer, ARRAYCOUNT(Buffer), 0);
- if (Received == 0)
- {
- // The socket has been closed by the remote party, close our socket and let it be removed after we process all reading
- m_Slots[i].m_Socket.CloseSocket();
- if (m_Slots[i].m_ShouldCallClient)
- {
- m_Slots[i].m_Client->SocketClosed();
- }
- }
- else if (Received > 0)
+ if (Received <= 0)
{
- if (m_Slots[i].m_ShouldCallClient)
+ // The socket has been closed by the remote party
+ switch (m_Slots[i].m_State)
{
- m_Slots[i].m_Client->DataReceived(Buffer, Received);
- }
+ case sSlot::ssNormal:
+ {
+ // Notify the callback that the remote has closed the socket; keep the slot
+ m_Slots[i].m_Client->SocketClosed();
+ m_Slots[i].m_State = sSlot::ssRemoteClosed;
+ break;
+ }
+ case sSlot::ssWritingRestOut:
+ case sSlot::ssShuttingDown:
+ case sSlot::ssShuttingDown2:
+ {
+ // Force-close the socket and remove the slot:
+ m_Slots[i].m_Socket.CloseSocket();
+ m_Slots[i] = m_Slots[--m_NumSlots];
+ break;
+ }
+ default:
+ {
+ LOG("%s: Unexpected socket state: %d (%s)",
+ __FUNCTION__, m_Slots[i].m_Socket.GetSocket(), m_Slots[i].m_Socket.GetIPString().c_str()
+ );
+ ASSERT(!"Unexpected socket state");
+ break;
+ }
+ } // switch (m_Slots[i].m_State)
}
else
{
- // The socket has encountered an error, close it and let it be removed after we process all reading
- m_Slots[i].m_Socket.CloseSocket();
- if (m_Slots[i].m_ShouldCallClient)
+ if (m_Slots[i].m_Client != NULL)
{
- m_Slots[i].m_Client->SocketClosed();
+ m_Slots[i].m_Client->DataReceived(Buffer, Received);
}
}
} // for i - m_Slots[]
@@ -622,22 +537,17 @@ void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write)
if (m_Slots[i].m_Outgoing.empty())
{
// Request another chunk of outgoing data:
- if (m_Slots[i].m_ShouldCallClient)
+ if (m_Slots[i].m_Client != NULL)
{
m_Slots[i].m_Client->GetOutgoingData(m_Slots[i].m_Outgoing);
}
if (m_Slots[i].m_Outgoing.empty())
{
- // Nothing ready
- if (m_Slots[i].m_ShouldClose)
+ // No outgoing data is ready
+ if (m_Slots[i].m_State == sSlot::ssWritingRestOut)
{
- // Socket was queued for closing and there's no more data to send, close it now:
-
- // DEBUG
- LOGD("Socket was queued for closing, closing now. Slot %d, client %p, socket %d", i, m_Slots[i].m_Client, m_Slots[i].m_Socket.GetSocket());
-
- m_Slots[i].m_Socket.CloseSocket();
- // The slot must be freed actively by the client, using RemoveClient()
+ m_Slots[i].m_State = sSlot::ssShuttingDown;
+ m_Slots[i].m_Socket.ShutdownReadWrite();
}
continue;
}
@@ -647,9 +557,9 @@ void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write)
if (Sent < 0)
{
int Err = cSocket::GetLastError();
- LOGWARNING("Error %d while writing to client \"%s\", disconnecting. \"%s\"", Err, m_Slots[i].m_Socket.GetIPString().c_str(), cSocket::GetErrorString(Err).c_str());
+ LOGWARNING("Error %d while writing to client \"%s\", disconnecting. \"%s\"", Err, m_Slots[i].m_Socket.GetIPString().c_str(), GetOSErrorString(Err).c_str());
m_Slots[i].m_Socket.CloseSocket();
- if (m_Slots[i].m_ShouldCallClient)
+ if (m_Slots[i].m_Client != NULL)
{
m_Slots[i].m_Client->SocketClosed();
}
@@ -657,6 +567,12 @@ void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write)
}
m_Slots[i].m_Outgoing.erase(0, Sent);
+ if (m_Slots[i].m_Outgoing.empty() && (m_Slots[i].m_State == sSlot::ssWritingRestOut))
+ {
+ m_Slots[i].m_State = sSlot::ssShuttingDown;
+ m_Slots[i].m_Socket.ShutdownReadWrite();
+ }
+
// _X: If there's data left, it means the client is not reading fast enough, the server would unnecessarily spin in the main loop with zero actions taken; so signalling is disabled
// This means that if there's data left, it will be sent only when there's incoming data or someone queues another packet (for any socket handled by this thread)
/*
@@ -673,3 +589,31 @@ void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write)
+
+void cSocketThreads::cSocketThread::CleanUpShutSockets(void)
+{
+ for (int i = m_NumSlots - 1; i >= 0; i--)
+ {
+ switch (m_Slots[i].m_State)
+ {
+ case sSlot::ssShuttingDown2:
+ {
+ // The socket has reached the shutdown timeout, close it and clear its slot:
+ m_Slots[i].m_Socket.CloseSocket();
+ m_Slots[i] = m_Slots[--m_NumSlots];
+ break;
+ }
+ case sSlot::ssShuttingDown:
+ {
+ // The socket has been shut down for a single thread loop, let it loop once more before closing:
+ m_Slots[i].m_State = sSlot::ssShuttingDown2;
+ break;
+ }
+ default: break;
+ }
+ } // for i - m_Slots[]
+}
+
+
+
+
diff --git a/src/OSSupport/SocketThreads.h b/src/OSSupport/SocketThreads.h
index 858729c49..9e1947ab6 100644
--- a/src/OSSupport/SocketThreads.h
+++ b/src/OSSupport/SocketThreads.h
@@ -7,19 +7,20 @@
/*
Additional details:
-When a client is terminating a connection:
-- they call the StopReading() method to disable callbacks for the incoming data
-- they call the Write() method to queue any outstanding outgoing data
-- they call the QueueClose() method to queue the socket to close after outgoing data has been sent.
-When a socket slot is marked as having no callback, it is kept alive until its outgoing data queue is empty and its m_ShouldClose flag is set.
-This means that the socket can be written to several times before finally closing it via QueueClose()
+When a client wants to terminate the connection, they call the RemoveClient() function. This calls the
+callback one last time to read all the available outgoing data, putting it in the slot's m_OutgoingData
+buffer. Then it marks the slot as having no callback. The socket is kept alive until its outgoing data
+queue is empty, then shutdown is called on it and finally the socket is closed after a timeout.
+If at any time within this the remote end closes the socket, then the socket is closed directly.
+As soon as the socket is closed, the slot is finally removed from the SocketThread.
+The graph in $/docs/SocketThreads States.gv shows the state-machine transitions of the slot.
*/
-/// How many clients should one thread handle? (must be less than FD_SETSIZE for your platform)
+/** How many clients should one thread handle? (must be less than FD_SETSIZE for your platform) */
#define MAX_SLOTS 63
@@ -27,8 +28,6 @@ This means that the socket can be written to several times before finally closin
#pragma once
-#ifndef CSOCKETTHREADS_H_INCLUDED
-#define CSOCKETTHREADS_H_INCLUDED
#include "Socket.h"
#include "IsThread.h"
@@ -64,13 +63,13 @@ public:
// Force a virtual destructor in all subclasses:
virtual ~cCallback() {}
- /// Called when data is received from the remote party
+ /** Called when data is received from the remote party */
virtual void DataReceived(const char * a_Data, int a_Size) = 0;
- /// Called when data can be sent to remote party; the function is supposed to append outgoing data to a_Data
+ /** Called when data can be sent to remote party; the function is supposed to *append* outgoing data to a_Data */
virtual void GetOutgoingData(AString & a_Data) = 0;
- /// Called when the socket has been closed for any reason
+ /** Called when the socket has been closed for any reason */
virtual void SocketClosed(void) = 0;
} ;
@@ -78,24 +77,21 @@ public:
cSocketThreads(void);
~cSocketThreads();
- /// Add a (socket, client) pair for processing, data from a_Socket is to be sent to a_Client; returns true if successful
+ /** Add a (socket, client) pair for processing, data from a_Socket is to be sent to a_Client; returns true if successful */
bool AddClient(const cSocket & a_Socket, cCallback * a_Client);
- /// Remove the associated socket and the client from processing. The socket is left to send its data and is removed only after all its m_OutgoingData is sent
+ /** Remove the associated socket and the client from processing.
+ The socket is left to send its last outgoing data and is removed only after all its m_Outgoing is sent
+ and after the socket is properly shutdown (unless the remote disconnects before that)
+ */
void RemoveClient(const cCallback * a_Client);
- /// Notify the thread responsible for a_Client that the client has something to write
+ /** Notify the thread responsible for a_Client that the client has something to write */
void NotifyWrite(const cCallback * a_Client);
- /// Puts a_Data into outgoing data queue for a_Client
+ /** Puts a_Data into outgoing data queue for a_Client */
void Write(const cCallback * a_Client, const AString & a_Data);
- /// Stops reading from the client - when this call returns, no more calls to the callbacks are made
- void StopReading(const cCallback * a_Client);
-
- /// Queues the client for closing, as soon as its outgoing data is sent
- void QueueClose(const cCallback * a_Client);
-
private:
class cSocketThread :
@@ -114,13 +110,10 @@ private:
void AddClient (const cSocket & a_Socket, cCallback * a_Client); // Takes ownership of the socket
bool RemoveClient(const cCallback * a_Client); // Returns true if removed, false if not found
- bool RemoveSocket(const cSocket * a_Socket); // Returns true if removed, false if not found
bool HasClient (const cCallback * a_Client) const;
bool HasSocket (const cSocket * a_Socket) const;
bool NotifyWrite (const cCallback * a_Client); // Returns true if client handled by this thread
bool Write (const cCallback * a_Client, const AString & a_Data); // Returns true if client handled by this thread
- bool StopReading (const cCallback * a_Client); // Returns true if client handled by this thread
- bool QueueClose (const cCallback * a_Client); // Returns true if client handled by this thread
bool Start(void); // Hide the cIsThread's Start method, we need to provide our own startup to create the control socket
@@ -134,24 +127,45 @@ private:
cSocket m_ControlSocket1;
cSocket m_ControlSocket2;
- // Socket-client-packetqueues triplets.
+ // Socket-client-dataqueues-state quadruplets.
// Manipulation with these assumes that the parent's m_CS is locked
struct sSlot
{
- cSocket m_Socket; // The socket is primarily owned by this
+ /** The socket is primarily owned by this object */
+ cSocket m_Socket;
+
+ /** The callback to call for events. May be NULL */
cCallback * m_Client;
- AString m_Outgoing; // If sending writes only partial data, the rest is stored here for another send
- bool m_ShouldClose; // If true, the socket is to be closed after sending all outgoing data
- bool m_ShouldCallClient; // If true, the client callbacks are called. Set to false in StopReading()
+
+ /** If sending writes only partial data, the rest is stored here for another send.
+ Also used when the slot is being removed to store the last batch of outgoing data. */
+ AString m_Outgoing;
+
+ enum eState
+ {
+ ssNormal, ///< Normal read / write operations
+ ssWritingRestOut, ///< The client callback was removed, continue to send outgoing data
+ ssShuttingDown, ///< The last outgoing data has been sent, the socket has called shutdown()
+ ssShuttingDown2, ///< The shutdown has been done at least 1 thread loop ago (timeout detection)
+ ssRemoteClosed, ///< The remote end has closed the connection (and we still have a client callback)
+ } m_State;
} ;
+
sSlot m_Slots[MAX_SLOTS];
int m_NumSlots; // Number of slots actually used
virtual void Execute(void) override;
- void PrepareSet (fd_set * a_Set, cSocket::xSocket & a_Highest); // Puts all sockets into the set, along with m_ControlSocket1
+ /** Puts all sockets into the set, along with m_ControlSocket1.
+ Only sockets that are able to send and receive data are put in the Set.
+ Is a_IsForWriting is true, the ssWritingRestOut sockets are added as well. */
+ void PrepareSet(fd_set * a_Set, cSocket::xSocket & a_Highest, bool a_IsForWriting);
+
void ReadFromSockets(fd_set * a_Read); // Reads from sockets indicated in a_Read
void WriteToSockets (fd_set * a_Write); // Writes to sockets indicated in a_Write
+
+ /** Removes those slots in ssShuttingDown2 state, sets those with ssShuttingDown state to ssShuttingDown2 */
+ void CleanUpShutSockets(void);
} ;
typedef std::list<cSocketThread *> cSocketThreadList;
@@ -164,9 +178,3 @@ private:
-
-#endif // CSOCKETTHREADS_H_INCLUDED
-
-
-
-