diff options
Diffstat (limited to 'src/OSSupport')
-rw-r--r-- | src/OSSupport/BlockingTCPLink.cpp | 10 | ||||
-rw-r--r-- | src/OSSupport/Errors.cpp | 53 | ||||
-rw-r--r-- | src/OSSupport/Errors.h | 5 | ||||
-rw-r--r-- | src/OSSupport/Event.cpp | 17 | ||||
-rw-r--r-- | src/OSSupport/File.cpp | 45 | ||||
-rw-r--r-- | src/OSSupport/File.h | 47 | ||||
-rw-r--r-- | src/OSSupport/GZipFile.cpp | 9 | ||||
-rw-r--r-- | src/OSSupport/IsThread.h | 2 | ||||
-rw-r--r-- | src/OSSupport/ListenThread.h | 20 | ||||
-rw-r--r-- | src/OSSupport/Queue.h | 6 | ||||
-rw-r--r-- | src/OSSupport/Sleep.h | 2 | ||||
-rw-r--r-- | src/OSSupport/Socket.cpp | 82 | ||||
-rw-r--r-- | src/OSSupport/Socket.h | 16 | ||||
-rw-r--r-- | src/OSSupport/SocketThreads.cpp | 189 | ||||
-rw-r--r-- | src/OSSupport/SocketThreads.h | 30 | ||||
-rw-r--r-- | src/OSSupport/Thread.h | 2 | ||||
-rw-r--r-- | src/OSSupport/Timer.cpp | 2 |
17 files changed, 353 insertions, 184 deletions
diff --git a/src/OSSupport/BlockingTCPLink.cpp b/src/OSSupport/BlockingTCPLink.cpp index 08aec0c65..07f48b955 100644 --- a/src/OSSupport/BlockingTCPLink.cpp +++ b/src/OSSupport/BlockingTCPLink.cpp @@ -2,7 +2,7 @@ #include "Globals.h" // NOTE: MSVC stupidness requires this to be the same across all modules #include "BlockingTCPLink.h" - +#include "Errors.h" @@ -70,12 +70,12 @@ bool cBlockingTCPLink::Connect(const char * iAddress, unsigned int iPort) } } - server.sin_addr.s_addr = *((unsigned long *)hp->h_addr); + memcpy(&server.sin_addr.s_addr,hp->h_addr, hp->h_length); server.sin_family = AF_INET; server.sin_port = htons( (unsigned short)iPort); if (connect(m_Socket, (struct sockaddr *)&server, sizeof(server))) { - LOGWARN("cTCPLink: Connection to \"%s:%d\" failed (%s)", iAddress, iPort, cSocket::GetErrorString( cSocket::GetLastError() ).c_str() ); + LOGWARN("cTCPLink: Connection to \"%s:%d\" failed (%s)", iAddress, iPort,GetOSErrorString( cSocket::GetLastError() ).c_str() ); CloseSocket(); return false; } @@ -89,6 +89,8 @@ bool cBlockingTCPLink::Connect(const char * iAddress, unsigned int iPort) int cBlockingTCPLink::Send(char * a_Data, unsigned int a_Size, int a_Flags /* = 0 */ ) { + UNUSED(a_Flags); + ASSERT(m_Socket.IsValid()); if (!m_Socket.IsValid()) { @@ -104,6 +106,8 @@ int cBlockingTCPLink::Send(char * a_Data, unsigned int a_Size, int a_Flags /* = int cBlockingTCPLink::SendMessage( const char* a_Message, int a_Flags /* = 0 */ ) { + UNUSED(a_Flags); + ASSERT(m_Socket.IsValid()); if (!m_Socket.IsValid()) { diff --git a/src/OSSupport/Errors.cpp b/src/OSSupport/Errors.cpp new file mode 100644 index 000000000..6072b6ac6 --- /dev/null +++ b/src/OSSupport/Errors.cpp @@ -0,0 +1,53 @@ + +#include "Globals.h" + +#include "Errors.h" + +AString GetOSErrorString( int a_ErrNo ) +{ + char buffer[ 1024 ]; + AString Out; + + #ifdef _WIN32 + + FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, NULL, a_ErrNo, 0, buffer, ARRAYCOUNT(buffer), NULL); + Printf(Out, "%d: %s", a_ErrNo, buffer); + if (!Out.empty() && (Out[Out.length() - 1] == '\n')) + { + Out.erase(Out.length() - 2); + } + return Out; + + #else // _WIN32 + + // According to http://linux.die.net/man/3/strerror_r there are two versions of strerror_r(): + + #if !defined(__APPLE__) && ( _GNU_SOURCE ) && !defined(ANDROID_NDK) // GNU version of strerror_r() + + char * res = strerror_r( errno, buffer, ARRAYCOUNT(buffer) ); + if( res != NULL ) + { + Printf(Out, "%d: %s", a_ErrNo, res); + return Out; + } + + #else // XSI version of strerror_r(): + + int res = strerror_r( errno, buffer, ARRAYCOUNT(buffer) ); + if( res == 0 ) + { + Printf(Out, "%d: %s", a_ErrNo, buffer); + return Out; + } + + #endif // strerror_r() version + + else + { + Printf(Out, "Error %d while getting error string for error #%d!", errno, a_ErrNo); + return Out; + } + + #endif // else _WIN32 +} + diff --git a/src/OSSupport/Errors.h b/src/OSSupport/Errors.h new file mode 100644 index 000000000..8ce9deb10 --- /dev/null +++ b/src/OSSupport/Errors.h @@ -0,0 +1,5 @@ + +#pragma once + +AString GetOSErrorString(int a_ErrNo); + diff --git a/src/OSSupport/Event.cpp b/src/OSSupport/Event.cpp index cbacbba17..649a0a3cf 100644 --- a/src/OSSupport/Event.cpp +++ b/src/OSSupport/Event.cpp @@ -7,7 +7,7 @@ #include "Globals.h" // NOTE: MSVC stupidness requires this to be the same across all modules #include "Event.h" - +#include "Errors.h" @@ -35,14 +35,16 @@ cEvent::cEvent(void) m_Event = sem_open(EventName.c_str(), O_CREAT, 777, 0 ); if (m_Event == SEM_FAILED) { - LOGERROR("cEvent: Cannot create event, errno = %i. Aborting server.", errno); + AString error = GetOSErrorString(errno); + LOGERROR("cEvent: Cannot create event, err = %s. Aborting server.", error.c_str()); abort(); } // Unlink the semaphore immediately - it will continue to function but will not pollute the namespace // We don't store the name, so can't call this in the destructor if (sem_unlink(EventName.c_str()) != 0) { - LOGWARN("ERROR: Could not unlink cEvent. (%i)", errno); + AString error = GetOSErrorString(errno); + LOGWARN("ERROR: Could not unlink cEvent. (%s)", error.c_str()); } } #endif // *nix @@ -61,7 +63,8 @@ cEvent::~cEvent() { if (sem_close(m_Event) != 0) { - LOGERROR("ERROR: Could not close cEvent. (%i)", errno); + AString error = GetOSErrorString(errno); + LOGERROR("ERROR: Could not close cEvent. (%s)", error.c_str()); } } else @@ -88,7 +91,8 @@ void cEvent::Wait(void) int res = sem_wait(m_Event); if (res != 0 ) { - LOGWARN("cEvent: waiting for the event failed: %i, errno = %i. Continuing, but server may be unstable.", res, errno); + AString error = GetOSErrorString(errno); + LOGWARN("cEvent: waiting for the event failed: %i, err = %s. Continuing, but server may be unstable.", res, error.c_str()); } #endif } @@ -108,7 +112,8 @@ void cEvent::Set(void) int res = sem_post(m_Event); if (res != 0) { - LOGWARN("cEvent: Could not set cEvent: %i, errno = %d", res, errno); + AString error = GetOSErrorString(errno); + LOGWARN("cEvent: Could not set cEvent: %i, err = %s", res, error.c_str()); } #endif } diff --git a/src/OSSupport/File.cpp b/src/OSSupport/File.cpp index 9f7c0d439..7f0f0ad2f 100644 --- a/src/OSSupport/File.cpp +++ b/src/OSSupport/File.cpp @@ -73,14 +73,26 @@ bool cFile::Open(const AString & iFileName, eMode iMode) return false; } } - m_File = fopen( (FILE_IO_PREFIX + iFileName).c_str(), Mode); + +#ifdef _WIN32 + fopen_s(&m_File, (FILE_IO_PREFIX + iFileName).c_str(), Mode); +#else + m_File = fopen((FILE_IO_PREFIX + iFileName).c_str(), Mode); +#endif // _WIN32 + if ((m_File == NULL) && (iMode == fmReadWrite)) { // Fix for MS not following C spec, opening "a" mode files for writing at the end only // The file open operation has been tried with "read update", fails if file not found // So now we know either the file doesn't exist or we don't have rights, no need to worry about file contents. // Simply re-open for read-writing, erasing existing contents: - m_File = fopen( (FILE_IO_PREFIX + iFileName).c_str(), "wb+"); + +#ifdef _WIN32 + fopen_s(&m_File, (FILE_IO_PREFIX + iFileName).c_str(), "wb+"); +#else + m_File = fopen((FILE_IO_PREFIX + iFileName).c_str(), "wb+"); +#endif // _WIN32 + } return (m_File != NULL); } @@ -140,7 +152,7 @@ int cFile::Read (void * iBuffer, int iNumBytes) return -1; } - return fread(iBuffer, 1, iNumBytes, m_File); // fread() returns the portion of Count parameter actually read, so we need to send iNumBytes as Count + return (int)fread(iBuffer, 1, (size_t)iNumBytes, m_File); // fread() returns the portion of Count parameter actually read, so we need to send iNumBytes as Count } @@ -156,7 +168,7 @@ int cFile::Write(const void * iBuffer, int iNumBytes) return -1; } - int res = fwrite(iBuffer, 1, iNumBytes, m_File); // fwrite() returns the portion of Count parameter actually written, so we need to send iNumBytes as Count + int res = (int)fwrite(iBuffer, 1, (size_t)iNumBytes, m_File); // fwrite() returns the portion of Count parameter actually written, so we need to send iNumBytes as Count return res; } @@ -177,7 +189,7 @@ int cFile::Seek (int iPosition) { return -1; } - return ftell(m_File); + return (int)ftell(m_File); } @@ -194,7 +206,7 @@ int cFile::Tell (void) const return -1; } - return ftell(m_File); + return (int)ftell(m_File); } @@ -210,7 +222,7 @@ int cFile::GetSize(void) const return -1; } - int CurPos = ftell(m_File); + int CurPos = Tell(); if (CurPos < 0) { return -1; @@ -219,8 +231,8 @@ int cFile::GetSize(void) const { return -1; } - int res = ftell(m_File); - if (fseek(m_File, CurPos, SEEK_SET) != 0) + int res = Tell(); + if (fseek(m_File, (long)CurPos, SEEK_SET) != 0) { return -1; } @@ -243,7 +255,7 @@ int cFile::ReadRestOfFile(AString & a_Contents) int DataSize = GetSize() - Tell(); // HACK: This depends on the internal knowledge that AString's data() function returns the internal buffer directly - a_Contents.assign(DataSize, '\0'); + a_Contents.assign((size_t)DataSize, '\0'); return Read((void *)a_Contents.data(), DataSize); } @@ -338,7 +350,7 @@ int cFile::GetSize(const AString & a_FileName) struct stat st; if (stat(a_FileName.c_str(), &st) == 0) { - return st.st_size; + return (int)st.st_size; } return -1; } @@ -444,7 +456,16 @@ int cFile::Printf(const char * a_Fmt, ...) va_start(args, a_Fmt); AppendVPrintf(buf, a_Fmt, args); va_end(args); - return Write(buf.c_str(), buf.length()); + return Write(buf.c_str(), (int)buf.length()); +} + + + + + +void cFile::Flush(void) +{ + fflush(m_File); } diff --git a/src/OSSupport/File.h b/src/OSSupport/File.h index 01663a229..b394c5cb9 100644 --- a/src/OSSupport/File.h +++ b/src/OSSupport/File.h @@ -18,6 +18,8 @@ Usage: 2, Check if the file was opened using IsOpen() 3, Read / write 4, Destroy the instance + +For reading entire files into memory, just use the static cFile::ReadWholeFile() */ @@ -55,7 +57,7 @@ public: static const char PathSeparator = '/'; #endif - /// The mode in which to open the file + /** The mode in which to open the file */ enum eMode { fmRead, // Read-only. If the file doesn't exist, object will not be valid @@ -63,13 +65,13 @@ public: fmReadWrite // Read/write. If the file already exists, it will be left intact; writing will overwrite the data from the beginning } ; - /// Simple constructor - creates an unopened file object, use Open() to open / create a real file + /** Simple constructor - creates an unopened file object, use Open() to open / create a real file */ cFile(void); - /// Constructs and opens / creates the file specified, use IsOpen() to check for success + /** Constructs and opens / creates the file specified, use IsOpen() to check for success */ cFile(const AString & iFileName, eMode iMode); - /// Auto-closes the file, if open + /** Auto-closes the file, if open */ ~cFile(); bool Open(const AString & iFileName, eMode iMode); @@ -77,59 +79,62 @@ public: bool IsOpen(void) const; bool IsEOF(void) const; - /// Reads up to iNumBytes bytes into iBuffer, returns the number of bytes actually read, or -1 on failure; asserts if not open + /** Reads up to iNumBytes bytes into iBuffer, returns the number of bytes actually read, or -1 on failure; asserts if not open */ int Read (void * iBuffer, int iNumBytes); - /// Writes up to iNumBytes bytes from iBuffer, returns the number of bytes actually written, or -1 on failure; asserts if not open + /** Writes up to iNumBytes bytes from iBuffer, returns the number of bytes actually written, or -1 on failure; asserts if not open */ int Write(const void * iBuffer, int iNumBytes); - /// Seeks to iPosition bytes from file start, returns old position or -1 for failure; asserts if not open + /** Seeks to iPosition bytes from file start, returns old position or -1 for failure; asserts if not open */ int Seek (int iPosition); - /// Returns the current position (bytes from file start) or -1 for failure; asserts if not open + /** Returns the current position (bytes from file start) or -1 for failure; asserts if not open */ int Tell (void) const; - /// Returns the size of file, in bytes, or -1 for failure; asserts if not open + /** Returns the size of file, in bytes, or -1 for failure; asserts if not open */ int GetSize(void) const; - /// Reads the file from current position till EOF into an AString; returns the number of bytes read or -1 for error + /** Reads the file from current position till EOF into an AString; returns the number of bytes read or -1 for error */ int ReadRestOfFile(AString & a_Contents); // tolua_begin - /// Returns true if the file specified exists + /** Returns true if the file specified exists */ static bool Exists(const AString & a_FileName); - /// Deletes a file, returns true if successful + /** Deletes a file, returns true if successful */ static bool Delete(const AString & a_FileName); - /// Renames a file or folder, returns true if successful. May fail if dest already exists (libc-dependant)! + /** Renames a file or folder, returns true if successful. May fail if dest already exists (libc-dependant)! */ static bool Rename(const AString & a_OrigPath, const AString & a_NewPath); - /// Copies a file, returns true if successful. + /** Copies a file, returns true if successful. */ static bool Copy(const AString & a_SrcFileName, const AString & a_DstFileName); - /// Returns true if the specified path is a folder + /** Returns true if the specified path is a folder */ static bool IsFolder(const AString & a_Path); - /// Returns true if the specified path is a regular file + /** Returns true if the specified path is a regular file */ static bool IsFile(const AString & a_Path); - /// Returns the size of the file, or a negative number on error + /** Returns the size of the file, or a negative number on error */ static int GetSize(const AString & a_FileName); - /// Creates a new folder with the specified name. Returns true if successful. Path may be relative or absolute + /** Creates a new folder with the specified name. Returns true if successful. Path may be relative or absolute */ static bool CreateFolder(const AString & a_FolderPath); - /// Returns the entire contents of the specified file as a string. Returns empty string on error. + /** Returns the entire contents of the specified file as a string. Returns empty string on error. */ static AString ReadWholeFile(const AString & a_FileName); // tolua_end - /// Returns the list of all items in the specified folder (files, folders, nix pipes, whatever's there). + /** Returns the list of all items in the specified folder (files, folders, nix pipes, whatever's there). */ static AStringVector GetFolderContents(const AString & a_Folder); // Exported in ManualBindings.cpp - int Printf(const char * a_Fmt, ...); + int Printf(const char * a_Fmt, ...) FORMATSTRING(2, 3); + + /** Flushes all the bufferef output into the file (only when writing) */ + void Flush(void); private: #ifdef USE_STDIO_FILE diff --git a/src/OSSupport/GZipFile.cpp b/src/OSSupport/GZipFile.cpp index cbf6be6c4..7a8433f4f 100644 --- a/src/OSSupport/GZipFile.cpp +++ b/src/OSSupport/GZipFile.cpp @@ -73,12 +73,15 @@ int cGZipFile::ReadRestOfFile(AString & a_Contents) // Since the gzip format doesn't really support getting the uncompressed length, we need to read incrementally. Yuck! int NumBytesRead = 0; + int TotalBytes = 0; char Buffer[64 KiB]; while ((NumBytesRead = gzread(m_File, Buffer, sizeof(Buffer))) > 0) { - a_Contents.append(Buffer, NumBytesRead); + TotalBytes += NumBytesRead; + a_Contents.append(Buffer, (size_t)NumBytesRead); } - return NumBytesRead; + // NumBytesRead is < 0 on error + return (NumBytesRead >= 0) ? TotalBytes : NumBytesRead; } @@ -99,7 +102,7 @@ bool cGZipFile::Write(const char * a_Contents, int a_Size) return false; } - return (gzwrite(m_File, a_Contents, a_Size) != 0); + return (gzwrite(m_File, a_Contents, (unsigned int)a_Size) != 0); } diff --git a/src/OSSupport/IsThread.h b/src/OSSupport/IsThread.h index b8784ea33..42b8bfdda 100644 --- a/src/OSSupport/IsThread.h +++ b/src/OSSupport/IsThread.h @@ -34,7 +34,7 @@ protected: public: cIsThread(const AString & iThreadName); - ~cIsThread(); + virtual ~cIsThread(); /// Starts the thread; returns without waiting for the actual start bool Start(void); diff --git a/src/OSSupport/ListenThread.h b/src/OSSupport/ListenThread.h index 4e337d814..b2d806c82 100644 --- a/src/OSSupport/ListenThread.h +++ b/src/OSSupport/ListenThread.h @@ -29,43 +29,45 @@ class cListenThread : typedef cIsThread super; public: - /// Used as the callback for connection events + /** Used as the callback for connection events */ class cCallback { public: - /// This callback is called whenever a socket connection is accepted + virtual ~cCallback() {} + + /** This callback is called whenever a socket connection is accepted */ virtual void OnConnectionAccepted(cSocket & a_Socket) = 0; } ; cListenThread(cCallback & a_Callback, cSocket::eFamily a_Family, const AString & a_ServiceName = ""); ~cListenThread(); - /// Creates all the sockets, returns trus if successful, false if not. + /** Creates all the sockets, returns trus if successful, false if not. */ bool Initialize(const AString & a_PortsString); bool Start(void); void Stop(void); - /// Call before Initialize() to set the "reuse" flag on the sockets + /** Call before Initialize() to set the "reuse" flag on the sockets */ void SetReuseAddr(bool a_Reuse = true); protected: typedef std::vector<cSocket> cSockets; - /// The callback which to notify of incoming connections + /** The callback which to notify of incoming connections */ cCallback & m_Callback; - /// Socket address family to use + /** Socket address family to use */ cSocket::eFamily m_Family; - /// Sockets that are being monitored + /** Sockets that are being monitored */ cSockets m_Sockets; - /// If set to true, the SO_REUSEADDR socket option is set to true + /** If set to true, the SO_REUSEADDR socket option is set to true */ bool m_ShouldReuseAddr; - /// Name of the service that's listening on the ports; for logging purposes only + /** Name of the service that's listening on the ports; for logging purposes only */ AString m_ServiceName; diff --git a/src/OSSupport/Queue.h b/src/OSSupport/Queue.h index 6c3d58295..beb6a63f1 100644 --- a/src/OSSupport/Queue.h +++ b/src/OSSupport/Queue.h @@ -29,7 +29,11 @@ public: static void Delete(T) {}; /// Called when an Item is inserted with EnqueueItemIfNotPresent and there is another equal value already inserted - static void Combine(T & a_existing, const T & a_new) {}; + static void Combine(T & a_existing, const T & a_new) + { + UNUSED(a_existing); + UNUSED(a_new); + }; }; diff --git a/src/OSSupport/Sleep.h b/src/OSSupport/Sleep.h index 5298c15da..0ec0adf9d 100644 --- a/src/OSSupport/Sleep.h +++ b/src/OSSupport/Sleep.h @@ -4,4 +4,4 @@ class cSleep { public: static void MilliSleep( unsigned int a_MilliSeconds ); -};
\ No newline at end of file +}; diff --git a/src/OSSupport/Socket.cpp b/src/OSSupport/Socket.cpp index d80c9bb3d..c29e495c3 100644 --- a/src/OSSupport/Socket.cpp +++ b/src/OSSupport/Socket.cpp @@ -6,7 +6,8 @@ #ifndef _WIN32 #include <netdb.h> #include <unistd.h> - #include <arpa/inet.h> //inet_ntoa() + #include <arpa/inet.h> // inet_ntoa() + #include <sys/ioctl.h> // ioctl() #else #define socklen_t int #endif @@ -105,58 +106,6 @@ void cSocket::ShutdownReadWrite(void) - -AString cSocket::GetErrorString( int a_ErrNo ) -{ - char buffer[ 1024 ]; - AString Out; - - #ifdef _WIN32 - - FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, NULL, a_ErrNo, 0, buffer, ARRAYCOUNT(buffer), NULL); - Printf(Out, "%d: %s", a_ErrNo, buffer); - if (!Out.empty() && (Out[Out.length() - 1] == '\n')) - { - Out.erase(Out.length() - 2); - } - return Out; - - #else // _WIN32 - - // According to http://linux.die.net/man/3/strerror_r there are two versions of strerror_r(): - - #if ( _GNU_SOURCE ) && !defined(ANDROID_NDK) // GNU version of strerror_r() - - char * res = strerror_r( errno, buffer, ARRAYCOUNT(buffer) ); - if( res != NULL ) - { - Printf(Out, "%d: %s", a_ErrNo, res); - return Out; - } - - #else // XSI version of strerror_r(): - - int res = strerror_r( errno, buffer, ARRAYCOUNT(buffer) ); - if( res == 0 ) - { - Printf(Out, "%d: %s", a_ErrNo, buffer); - return Out; - } - - #endif // strerror_r() version - - else - { - Printf(Out, "Error %d while getting error string for error #%d!", errno, a_ErrNo); - return Out; - } - - #endif // else _WIN32 -} - - - - int cSocket::GetLastError() { #ifdef _WIN32 @@ -358,7 +307,8 @@ bool cSocket::ConnectIPv4(const AString & a_HostNameOrAddr, unsigned short a_Por CloseSocket(); return false; } - addr = *((unsigned long*)hp->h_addr); + // Should be optimised to a single word copy + memcpy(&addr, hp->h_addr, hp->h_length); } sockaddr_in server; @@ -372,7 +322,7 @@ bool cSocket::ConnectIPv4(const AString & a_HostNameOrAddr, unsigned short a_Por -int cSocket::Receive(char* a_Buffer, unsigned int a_Length, unsigned int a_Flags) +int cSocket::Receive(char * a_Buffer, unsigned int a_Length, unsigned int a_Flags) { return recv(m_Socket, a_Buffer, a_Length, a_Flags); } @@ -406,3 +356,25 @@ unsigned short cSocket::GetPort(void) const + +void cSocket::SetNonBlocking(void) +{ + #ifdef _WIN32 + u_long NonBlocking = 1; + int res = ioctlsocket(m_Socket, FIONBIO, &NonBlocking); + #else + int NonBlocking = 1; + int res = ioctl(m_Socket, FIONBIO, (char *)&NonBlocking); + #endif + if (res != 0) + { + LOGERROR("Cannot set socket to non-blocking. This would make the server deadlock later on, aborting.\nErr: %d, %d, %s", + res, GetLastError(), GetLastErrorString().c_str() + ); + abort(); + } +} + + + + diff --git a/src/OSSupport/Socket.h b/src/OSSupport/Socket.h index 91c9ca5fd..bdc2babf5 100644 --- a/src/OSSupport/Socket.h +++ b/src/OSSupport/Socket.h @@ -14,7 +14,7 @@ #endif - +#include "Errors.h" class cSocket @@ -24,6 +24,12 @@ public: { IPv4 = AF_INET, IPv6 = AF_INET6, + + #ifdef _WIN32 + ErrWouldBlock = WSAEWOULDBLOCK, + #else + ErrWouldBlock = EWOULDBLOCK, + #endif } ; #ifdef _WIN32 @@ -57,11 +63,10 @@ public: /// Initializes the network stack. Returns 0 on success, or another number as an error code. static int WSAStartup(void); - static AString GetErrorString(int a_ErrNo); static int GetLastError(); static AString GetLastErrorString(void) { - return GetErrorString(GetLastError()); + return GetOSErrorString(GetLastError()); } /// Creates a new socket of the specified address family @@ -111,8 +116,11 @@ public: unsigned short GetPort(void) const; // Returns 0 on failure const AString & GetIPString(void) const { return m_IPString; } + + /** Sets the socket into non-blocking mode */ + void SetNonBlocking(void); private: xSocket m_Socket; AString m_IPString; -};
\ No newline at end of file +}; diff --git a/src/OSSupport/SocketThreads.cpp b/src/OSSupport/SocketThreads.cpp index b8069cf00..0bc1d6b55 100644 --- a/src/OSSupport/SocketThreads.cpp +++ b/src/OSSupport/SocketThreads.cpp @@ -7,6 +7,7 @@ #include "Globals.h" #include "SocketThreads.h" +#include "Errors.h" @@ -53,7 +54,7 @@ bool cSocketThreads::AddClient(const cSocket & a_Socket, cCallback * a_Client) } // No thread has free space, create a new one: - LOGD("Creating a new cSocketThread (currently have %d)", m_Threads.size()); + LOGD("Creating a new cSocketThread (currently have " SIZE_T_FMT ")", m_Threads.size()); cSocketThread * Thread = new cSocketThread(this); if (!Thread->Start()) { @@ -174,6 +175,7 @@ void cSocketThreads::cSocketThread::AddClient(const cSocket & a_Socket, cCallbac m_Slots[m_NumSlots].m_Client = a_Client; m_Slots[m_NumSlots].m_Socket = a_Socket; + m_Slots[m_NumSlots].m_Socket.SetNonBlocking(); m_Slots[m_NumSlots].m_Outgoing.clear(); m_Slots[m_NumSlots].m_State = sSlot::ssNormal; m_NumSlots++; @@ -207,12 +209,18 @@ bool cSocketThreads::cSocketThread::RemoveClient(const cCallback * a_Client) if (m_Slots[i].m_State == sSlot::ssRemoteClosed) { // The remote has already closed the socket, remove the slot altogether: + if (m_Slots[i].m_Socket.IsValid()) + { + m_Slots[i].m_Socket.CloseSocket(); + } m_Slots[i] = m_Slots[--m_NumSlots]; } else { // Query and queue the last batch of outgoing data: - m_Slots[i].m_Client->GetOutgoingData(m_Slots[i].m_Outgoing); + AString Data; + m_Slots[i].m_Client->GetOutgoingData(Data); + m_Slots[i].m_Outgoing.append(Data); if (m_Slots[i].m_Outgoing.empty()) { // No more outgoing data, shut the socket down immediately: @@ -385,38 +393,28 @@ void cSocketThreads::cSocketThread::Execute(void) // The main thread loop: while (!m_ShouldTerminate) { - // Put all sockets into the Read set: + // Read outgoing data from the clients: + QueueOutgoingData(); + + // Put sockets into the sets fd_set fdRead; + fd_set fdWrite; cSocket::xSocket Highest = m_ControlSocket1.GetSocket(); - - PrepareSet(&fdRead, Highest, false); + PrepareSets(&fdRead, &fdWrite, Highest); // Wait for the sockets: timeval Timeout; Timeout.tv_sec = 5; Timeout.tv_usec = 0; - if (select(Highest + 1, &fdRead, NULL, NULL, &Timeout) == -1) + if (select(Highest + 1, &fdRead, &fdWrite, NULL, &Timeout) == -1) { - LOG("select(R) call failed in cSocketThread: \"%s\"", cSocket::GetLastErrorString().c_str()); + LOG("select() call failed in cSocketThread: \"%s\"", cSocket::GetLastErrorString().c_str()); continue; } + // Perform the IO: ReadFromSockets(&fdRead); - - // Test sockets for writing: - fd_set fdWrite; - Highest = m_ControlSocket1.GetSocket(); - PrepareSet(&fdWrite, Highest, true); - Timeout.tv_sec = 0; - Timeout.tv_usec = 0; - if (select(Highest + 1, NULL, &fdWrite, NULL, &Timeout) == -1) - { - LOG("select(W) call failed in cSocketThread: \"%s\"", cSocket::GetLastErrorString().c_str()); - continue; - } - WriteToSockets(&fdWrite); - CleanUpShutSockets(); } // while (!mShouldTerminate) } @@ -425,10 +423,11 @@ void cSocketThreads::cSocketThread::Execute(void) -void cSocketThreads::cSocketThread::PrepareSet(fd_set * a_Set, cSocket::xSocket & a_Highest, bool a_IsForWriting) +void cSocketThreads::cSocketThread::PrepareSets(fd_set * a_Read, fd_set * a_Write, cSocket::xSocket & a_Highest) { - FD_ZERO(a_Set); - FD_SET(m_ControlSocket1.GetSocket(), a_Set); + FD_ZERO(a_Read); + FD_ZERO(a_Write); + FD_SET(m_ControlSocket1.GetSocket(), a_Read); cCSLock Lock(m_Parent->m_CS); for (int i = m_NumSlots - 1; i >= 0; --i) @@ -443,11 +442,16 @@ void cSocketThreads::cSocketThread::PrepareSet(fd_set * a_Set, cSocket::xSocket continue; } cSocket::xSocket s = m_Slots[i].m_Socket.GetSocket(); - FD_SET(s, a_Set); + FD_SET(s, a_Read); if (s > a_Highest) { a_Highest = s; } + if (!m_Slots[i].m_Outgoing.empty()) + { + // There's outgoing data for the socket, put it in the Write set + FD_SET(s, a_Write); + } } // for i - m_Slots[] } @@ -479,34 +483,38 @@ void cSocketThreads::cSocketThread::ReadFromSockets(fd_set * a_Read) int Received = m_Slots[i].m_Socket.Receive(Buffer, ARRAYCOUNT(Buffer), 0); if (Received <= 0) { - // The socket has been closed by the remote party - switch (m_Slots[i].m_State) + if (cSocket::GetLastError() != cSocket::ErrWouldBlock) { - case sSlot::ssNormal: + // The socket has been closed by the remote party + switch (m_Slots[i].m_State) { - // Notify the callback that the remote has closed the socket; keep the slot - m_Slots[i].m_Client->SocketClosed(); - m_Slots[i].m_State = sSlot::ssRemoteClosed; - break; - } - case sSlot::ssWritingRestOut: - case sSlot::ssShuttingDown: - case sSlot::ssShuttingDown2: - { - // Force-close the socket and remove the slot: - m_Slots[i].m_Socket.CloseSocket(); - m_Slots[i] = m_Slots[--m_NumSlots]; - break; - } - default: - { - LOG("%s: Unexpected socket state: %d (%s)", - __FUNCTION__, m_Slots[i].m_Socket.GetSocket(), m_Slots[i].m_Socket.GetIPString().c_str() - ); - ASSERT(!"Unexpected socket state"); - break; - } - } // switch (m_Slots[i].m_State) + case sSlot::ssNormal: + { + // Notify the callback that the remote has closed the socket; keep the slot + m_Slots[i].m_Client->SocketClosed(); + m_Slots[i].m_State = sSlot::ssRemoteClosed; + m_Slots[i].m_Socket.CloseSocket(); + break; + } + case sSlot::ssWritingRestOut: + case sSlot::ssShuttingDown: + case sSlot::ssShuttingDown2: + { + // Force-close the socket and remove the slot: + m_Slots[i].m_Socket.CloseSocket(); + m_Slots[i] = m_Slots[--m_NumSlots]; + break; + } + default: + { + LOG("%s: Unexpected socket state: %d (%s)", + __FUNCTION__, m_Slots[i].m_Socket.GetSocket(), m_Slots[i].m_Socket.GetIPString().c_str() + ); + ASSERT(!"Unexpected socket state"); + break; + } + } // switch (m_Slots[i].m_State) + } } else { @@ -538,7 +546,9 @@ void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write) // Request another chunk of outgoing data: if (m_Slots[i].m_Client != NULL) { - m_Slots[i].m_Client->GetOutgoingData(m_Slots[i].m_Outgoing); + AString Data; + m_Slots[i].m_Client->GetOutgoingData(Data); + m_Slots[i].m_Outgoing.append(Data); } if (m_Slots[i].m_Outgoing.empty()) { @@ -552,19 +562,22 @@ void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write) } } // if (outgoing data is empty) - int Sent = m_Slots[i].m_Socket.Send(m_Slots[i].m_Outgoing.data(), m_Slots[i].m_Outgoing.size()); - if (Sent < 0) + if (m_Slots[i].m_State == sSlot::ssRemoteClosed) + { + continue; + } + + if (!SendDataThroughSocket(m_Slots[i].m_Socket, m_Slots[i].m_Outgoing)) { int Err = cSocket::GetLastError(); - LOGWARNING("Error %d while writing to client \"%s\", disconnecting. \"%s\"", Err, m_Slots[i].m_Socket.GetIPString().c_str(), cSocket::GetErrorString(Err).c_str()); + LOGWARNING("Error %d while writing to client \"%s\", disconnecting. \"%s\"", Err, m_Slots[i].m_Socket.GetIPString().c_str(), GetOSErrorString(Err).c_str()); m_Slots[i].m_Socket.CloseSocket(); if (m_Slots[i].m_Client != NULL) { m_Slots[i].m_Client->SocketClosed(); } - return; + continue; } - m_Slots[i].m_Outgoing.erase(0, Sent); if (m_Slots[i].m_Outgoing.empty() && (m_Slots[i].m_State == sSlot::ssWritingRestOut)) { @@ -589,8 +602,41 @@ void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write) +bool cSocketThreads::cSocketThread::SendDataThroughSocket(cSocket & a_Socket, AString & a_Data) +{ + // Send data in smaller chunks, so that the OS send buffers aren't overflown easily + while (!a_Data.empty()) + { + size_t NumToSend = std::min(a_Data.size(), (size_t)1024); + int Sent = a_Socket.Send(a_Data.data(), NumToSend); + if (Sent < 0) + { + int Err = cSocket::GetLastError(); + if (Err == cSocket::ErrWouldBlock) + { + // The OS send buffer is full, leave the outgoing data for the next time + return true; + } + // An error has occured + return false; + } + if (Sent == 0) + { + a_Socket.CloseSocket(); + return true; + } + a_Data.erase(0, Sent); + } + return true; +} + + + + + void cSocketThreads::cSocketThread::CleanUpShutSockets(void) { + cCSLock Lock(m_Parent->m_CS); for (int i = m_NumSlots - 1; i >= 0; i--) { switch (m_Slots[i].m_State) @@ -616,3 +662,32 @@ void cSocketThreads::cSocketThread::CleanUpShutSockets(void) +void cSocketThreads::cSocketThread::QueueOutgoingData(void) +{ + cCSLock Lock(m_Parent->m_CS); + for (int i = 0; i < m_NumSlots; i++) + { + if (m_Slots[i].m_Client != NULL) + { + AString Data; + m_Slots[i].m_Client->GetOutgoingData(Data); + m_Slots[i].m_Outgoing.append(Data); + } + if (m_Slots[i].m_Outgoing.empty()) + { + // No outgoing data is ready + if (m_Slots[i].m_State == sSlot::ssWritingRestOut) + { + // The socket doesn't want to be kept alive anymore, and doesn't have any remaining data to send. + // Shut it down and then close it after a timeout, or when the other side agrees + m_Slots[i].m_State = sSlot::ssShuttingDown; + m_Slots[i].m_Socket.ShutdownReadWrite(); + } + continue; + } + } +} + + + + diff --git a/src/OSSupport/SocketThreads.h b/src/OSSupport/SocketThreads.h index 9e1947ab6..679e374e1 100644 --- a/src/OSSupport/SocketThreads.h +++ b/src/OSSupport/SocketThreads.h @@ -64,9 +64,10 @@ public: virtual ~cCallback() {} /** Called when data is received from the remote party */ - virtual void DataReceived(const char * a_Data, int a_Size) = 0; + virtual void DataReceived(const char * a_Data, size_t a_Size) = 0; - /** Called when data can be sent to remote party; the function is supposed to *append* outgoing data to a_Data */ + /** Called when data can be sent to remote party + The function is supposed to *set* outgoing data to a_Data (overwrite) */ virtual void GetOutgoingData(AString & a_Data) = 0; /** Called when the socket has been closed for any reason */ @@ -102,7 +103,7 @@ private: public: cSocketThread(cSocketThreads * a_Parent); - ~cSocketThread(); + virtual ~cSocketThread(); // All these methods assume parent's m_CS is locked bool HasEmptySlot(void) const {return m_NumSlots < MAX_SLOTS; } @@ -156,16 +157,27 @@ private: virtual void Execute(void) override; - /** Puts all sockets into the set, along with m_ControlSocket1. - Only sockets that are able to send and receive data are put in the Set. - Is a_IsForWriting is true, the ssWritingRestOut sockets are added as well. */ - void PrepareSet(fd_set * a_Set, cSocket::xSocket & a_Highest, bool a_IsForWriting); + /** Prepares the Read and Write socket sets for select() + Puts all sockets into the read set, along with m_ControlSocket1. + Only sockets that have outgoing data queued on them are put in the write set.*/ + void PrepareSets(fd_set * a_ReadSet, fd_set * a_WriteSet, cSocket::xSocket & a_Highest); - void ReadFromSockets(fd_set * a_Read); // Reads from sockets indicated in a_Read - void WriteToSockets (fd_set * a_Write); // Writes to sockets indicated in a_Write + /** Reads from sockets indicated in a_Read */ + void ReadFromSockets(fd_set * a_Read); + /** Writes to sockets indicated in a_Write */ + void WriteToSockets (fd_set * a_Write); + + /** Sends data through the specified socket, trying to fill the OS send buffer in chunks. + Returns true if there was no error while sending, false if an error has occured. + Modifies a_Data to contain only the unsent data. */ + bool SendDataThroughSocket(cSocket & a_Socket, AString & a_Data); + /** Removes those slots in ssShuttingDown2 state, sets those with ssShuttingDown state to ssShuttingDown2 */ void CleanUpShutSockets(void); + + /** Calls each client's callback to retrieve outgoing data for that client. */ + void QueueOutgoingData(void); } ; typedef std::list<cSocketThread *> cSocketThreadList; diff --git a/src/OSSupport/Thread.h b/src/OSSupport/Thread.h index 3c9316424..4153b2427 100644 --- a/src/OSSupport/Thread.h +++ b/src/OSSupport/Thread.h @@ -23,4 +23,4 @@ private: cEvent* m_StopEvent; AString m_ThreadName; -};
\ No newline at end of file +}; diff --git a/src/OSSupport/Timer.cpp b/src/OSSupport/Timer.cpp index ed16f9e3a..fd838dd0d 100644 --- a/src/OSSupport/Timer.cpp +++ b/src/OSSupport/Timer.cpp @@ -28,7 +28,7 @@ long long cTimer::GetNowTime(void) #else struct timeval now; gettimeofday(&now, NULL); - return (long long)(now.tv_sec * 1000 + now.tv_usec / 1000); + return (long long)now.tv_sec * 1000 + (long long)now.tv_usec / 1000; #endif } |