From 59b8205f02eb7bf2954acba56fd63f485a30ece5 Mon Sep 17 00:00:00 2001 From: Tycho Date: Sat, 25 Jan 2014 05:51:03 -0800 Subject: Extracted cSocket::GetErrorString into GetOSErrorString --- src/OSSupport/SocketThreads.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'src/OSSupport/SocketThreads.cpp') diff --git a/src/OSSupport/SocketThreads.cpp b/src/OSSupport/SocketThreads.cpp index b8069cf00..74932daf8 100644 --- a/src/OSSupport/SocketThreads.cpp +++ b/src/OSSupport/SocketThreads.cpp @@ -7,6 +7,7 @@ #include "Globals.h" #include "SocketThreads.h" +#include "Errors.h" @@ -556,7 +557,7 @@ void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write) if (Sent < 0) { int Err = cSocket::GetLastError(); - LOGWARNING("Error %d while writing to client \"%s\", disconnecting. \"%s\"", Err, m_Slots[i].m_Socket.GetIPString().c_str(), cSocket::GetErrorString(Err).c_str()); + LOGWARNING("Error %d while writing to client \"%s\", disconnecting. \"%s\"", Err, m_Slots[i].m_Socket.GetIPString().c_str(), GetOSErrorString(Err).c_str()); m_Slots[i].m_Socket.CloseSocket(); if (m_Slots[i].m_Client != NULL) { -- cgit v1.2.3 From cc1284a753874aa7d351a6ea41aaec4662ca6e57 Mon Sep 17 00:00:00 2001 From: madmaxoft Date: Mon, 27 Jan 2014 21:27:13 +0100 Subject: Rewritten networking to use non-blocking sockets. This fixes #592. --- src/OSSupport/SocketThreads.cpp | 172 +++++++++++++++++++++++++++------------- 1 file changed, 118 insertions(+), 54 deletions(-) (limited to 'src/OSSupport/SocketThreads.cpp') diff --git a/src/OSSupport/SocketThreads.cpp b/src/OSSupport/SocketThreads.cpp index 74932daf8..3e2631733 100644 --- a/src/OSSupport/SocketThreads.cpp +++ b/src/OSSupport/SocketThreads.cpp @@ -175,6 +175,7 @@ void cSocketThreads::cSocketThread::AddClient(const cSocket & a_Socket, cCallbac m_Slots[m_NumSlots].m_Client = a_Client; m_Slots[m_NumSlots].m_Socket = a_Socket; + m_Slots[m_NumSlots].m_Socket.SetNonBlocking(); m_Slots[m_NumSlots].m_Outgoing.clear(); m_Slots[m_NumSlots].m_State = sSlot::ssNormal; m_NumSlots++; @@ -213,7 +214,9 @@ bool cSocketThreads::cSocketThread::RemoveClient(const cCallback * a_Client) else { // Query and queue the last batch of outgoing data: - m_Slots[i].m_Client->GetOutgoingData(m_Slots[i].m_Outgoing); + AString Data; + m_Slots[i].m_Client->GetOutgoingData(Data); + m_Slots[i].m_Outgoing.append(Data); if (m_Slots[i].m_Outgoing.empty()) { // No more outgoing data, shut the socket down immediately: @@ -386,38 +389,28 @@ void cSocketThreads::cSocketThread::Execute(void) // The main thread loop: while (!m_ShouldTerminate) { - // Put all sockets into the Read set: + // Read outgoing data from the clients: + QueueOutgoingData(); + + // Put sockets into the sets fd_set fdRead; + fd_set fdWrite; cSocket::xSocket Highest = m_ControlSocket1.GetSocket(); - - PrepareSet(&fdRead, Highest, false); + PrepareSets(&fdRead, &fdWrite, Highest); // Wait for the sockets: timeval Timeout; Timeout.tv_sec = 5; Timeout.tv_usec = 0; - if (select(Highest + 1, &fdRead, NULL, NULL, &Timeout) == -1) + if (select(Highest + 1, &fdRead, &fdWrite, NULL, &Timeout) == -1) { - LOG("select(R) call failed in cSocketThread: \"%s\"", cSocket::GetLastErrorString().c_str()); + LOG("select() call failed in cSocketThread: \"%s\"", cSocket::GetLastErrorString().c_str()); continue; } + // Perform the IO: ReadFromSockets(&fdRead); - - // Test sockets for writing: - fd_set fdWrite; - Highest = m_ControlSocket1.GetSocket(); - PrepareSet(&fdWrite, Highest, true); - Timeout.tv_sec = 0; - Timeout.tv_usec = 0; - if (select(Highest + 1, NULL, &fdWrite, NULL, &Timeout) == -1) - { - LOG("select(W) call failed in cSocketThread: \"%s\"", cSocket::GetLastErrorString().c_str()); - continue; - } - WriteToSockets(&fdWrite); - CleanUpShutSockets(); } // while (!mShouldTerminate) } @@ -426,10 +419,11 @@ void cSocketThreads::cSocketThread::Execute(void) -void cSocketThreads::cSocketThread::PrepareSet(fd_set * a_Set, cSocket::xSocket & a_Highest, bool a_IsForWriting) +void cSocketThreads::cSocketThread::PrepareSets(fd_set * a_Read, fd_set * a_Write, cSocket::xSocket & a_Highest) { - FD_ZERO(a_Set); - FD_SET(m_ControlSocket1.GetSocket(), a_Set); + FD_ZERO(a_Read); + FD_ZERO(a_Write); + FD_SET(m_ControlSocket1.GetSocket(), a_Read); cCSLock Lock(m_Parent->m_CS); for (int i = m_NumSlots - 1; i >= 0; --i) @@ -444,11 +438,16 @@ void cSocketThreads::cSocketThread::PrepareSet(fd_set * a_Set, cSocket::xSocket continue; } cSocket::xSocket s = m_Slots[i].m_Socket.GetSocket(); - FD_SET(s, a_Set); + FD_SET(s, a_Read); if (s > a_Highest) { a_Highest = s; } + if (!m_Slots[i].m_Outgoing.empty()) + { + // There's outgoing data for the socket, put it in the Write set + FD_SET(s, a_Write); + } } // for i - m_Slots[] } @@ -480,34 +479,37 @@ void cSocketThreads::cSocketThread::ReadFromSockets(fd_set * a_Read) int Received = m_Slots[i].m_Socket.Receive(Buffer, ARRAYCOUNT(Buffer), 0); if (Received <= 0) { - // The socket has been closed by the remote party - switch (m_Slots[i].m_State) + if (cSocket::GetLastError() != cSocket::ErrWouldBlock) { - case sSlot::ssNormal: - { - // Notify the callback that the remote has closed the socket; keep the slot - m_Slots[i].m_Client->SocketClosed(); - m_Slots[i].m_State = sSlot::ssRemoteClosed; - break; - } - case sSlot::ssWritingRestOut: - case sSlot::ssShuttingDown: - case sSlot::ssShuttingDown2: - { - // Force-close the socket and remove the slot: - m_Slots[i].m_Socket.CloseSocket(); - m_Slots[i] = m_Slots[--m_NumSlots]; - break; - } - default: + // The socket has been closed by the remote party + switch (m_Slots[i].m_State) { - LOG("%s: Unexpected socket state: %d (%s)", - __FUNCTION__, m_Slots[i].m_Socket.GetSocket(), m_Slots[i].m_Socket.GetIPString().c_str() - ); - ASSERT(!"Unexpected socket state"); - break; - } - } // switch (m_Slots[i].m_State) + case sSlot::ssNormal: + { + // Notify the callback that the remote has closed the socket; keep the slot + m_Slots[i].m_Client->SocketClosed(); + m_Slots[i].m_State = sSlot::ssRemoteClosed; + break; + } + case sSlot::ssWritingRestOut: + case sSlot::ssShuttingDown: + case sSlot::ssShuttingDown2: + { + // Force-close the socket and remove the slot: + m_Slots[i].m_Socket.CloseSocket(); + m_Slots[i] = m_Slots[--m_NumSlots]; + break; + } + default: + { + LOG("%s: Unexpected socket state: %d (%s)", + __FUNCTION__, m_Slots[i].m_Socket.GetSocket(), m_Slots[i].m_Socket.GetIPString().c_str() + ); + ASSERT(!"Unexpected socket state"); + break; + } + } // switch (m_Slots[i].m_State) + } } else { @@ -539,7 +541,9 @@ void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write) // Request another chunk of outgoing data: if (m_Slots[i].m_Client != NULL) { - m_Slots[i].m_Client->GetOutgoingData(m_Slots[i].m_Outgoing); + AString Data; + m_Slots[i].m_Client->GetOutgoingData(Data); + m_Slots[i].m_Outgoing.append(Data); } if (m_Slots[i].m_Outgoing.empty()) { @@ -553,8 +557,7 @@ void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write) } } // if (outgoing data is empty) - int Sent = m_Slots[i].m_Socket.Send(m_Slots[i].m_Outgoing.data(), m_Slots[i].m_Outgoing.size()); - if (Sent < 0) + if (!SendDataThroughSocket(m_Slots[i].m_Socket, m_Slots[i].m_Outgoing)) { int Err = cSocket::GetLastError(); LOGWARNING("Error %d while writing to client \"%s\", disconnecting. \"%s\"", Err, m_Slots[i].m_Socket.GetIPString().c_str(), GetOSErrorString(Err).c_str()); @@ -565,7 +568,6 @@ void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write) } return; } - m_Slots[i].m_Outgoing.erase(0, Sent); if (m_Slots[i].m_Outgoing.empty() && (m_Slots[i].m_State == sSlot::ssWritingRestOut)) { @@ -590,8 +592,41 @@ void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write) +bool cSocketThreads::cSocketThread::SendDataThroughSocket(cSocket & a_Socket, AString & a_Data) +{ + // Send data in smaller chunks, so that the OS send buffers aren't overflown easily + while (!a_Data.empty()) + { + size_t NumToSend = std::min(a_Data.size(), (size_t)1024); + int Sent = a_Socket.Send(a_Data.data(), NumToSend); + if (Sent < 0) + { + int Err = cSocket::GetLastError(); + if (Err == cSocket::ErrWouldBlock) + { + // The OS send buffer is full, leave the outgoing data for the next time + return true; + } + // An error has occured + return false; + } + if (Sent == 0) + { + a_Socket.CloseSocket(); + return true; + } + a_Data.erase(0, Sent); + } + return true; +} + + + + + void cSocketThreads::cSocketThread::CleanUpShutSockets(void) { + cCSLock Lock(m_Parent->m_CS); for (int i = m_NumSlots - 1; i >= 0; i--) { switch (m_Slots[i].m_State) @@ -617,3 +652,32 @@ void cSocketThreads::cSocketThread::CleanUpShutSockets(void) +void cSocketThreads::cSocketThread::QueueOutgoingData(void) +{ + cCSLock Lock(m_Parent->m_CS); + for (int i = 0; i < m_NumSlots; i++) + { + if (m_Slots[i].m_Client != NULL) + { + AString Data; + m_Slots[i].m_Client->GetOutgoingData(Data); + m_Slots[i].m_Outgoing.append(Data); + } + if (m_Slots[i].m_Outgoing.empty()) + { + // No outgoing data is ready + if (m_Slots[i].m_State == sSlot::ssWritingRestOut) + { + // The socket doesn't want to be kept alive anymore, and doesn't have any remaining data to send. + // Shut it down and then close it after a timeout, or when the other side agrees + m_Slots[i].m_State = sSlot::ssShuttingDown; + m_Slots[i].m_Socket.ShutdownReadWrite(); + } + continue; + } + } +} + + + + -- cgit v1.2.3 From 0b384198e535470ac4aaa7d8f38b3da62b12ea34 Mon Sep 17 00:00:00 2001 From: madmaxoft Date: Mon, 3 Feb 2014 10:38:25 +0100 Subject: SocketThreads: Fixed sending to closed socket. --- src/OSSupport/SocketThreads.cpp | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) (limited to 'src/OSSupport/SocketThreads.cpp') diff --git a/src/OSSupport/SocketThreads.cpp b/src/OSSupport/SocketThreads.cpp index 3e2631733..269f1d452 100644 --- a/src/OSSupport/SocketThreads.cpp +++ b/src/OSSupport/SocketThreads.cpp @@ -557,6 +557,11 @@ void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write) } } // if (outgoing data is empty) + if (m_Slots[i].m_State == sSlot::ssRemoteClosed) + { + continue; + } + if (!SendDataThroughSocket(m_Slots[i].m_Socket, m_Slots[i].m_Outgoing)) { int Err = cSocket::GetLastError(); @@ -566,7 +571,7 @@ void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write) { m_Slots[i].m_Client->SocketClosed(); } - return; + continue; } if (m_Slots[i].m_Outgoing.empty() && (m_Slots[i].m_State == sSlot::ssWritingRestOut)) -- cgit v1.2.3 From c9916cd8c2332b6d6992ab38c0339ced6c91bc92 Mon Sep 17 00:00:00 2001 From: madmaxoft Date: Mon, 3 Feb 2014 17:07:46 +0100 Subject: Fixed socket leaking. --- src/OSSupport/SocketThreads.cpp | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'src/OSSupport/SocketThreads.cpp') diff --git a/src/OSSupport/SocketThreads.cpp b/src/OSSupport/SocketThreads.cpp index 269f1d452..a02661d2c 100644 --- a/src/OSSupport/SocketThreads.cpp +++ b/src/OSSupport/SocketThreads.cpp @@ -209,6 +209,10 @@ bool cSocketThreads::cSocketThread::RemoveClient(const cCallback * a_Client) if (m_Slots[i].m_State == sSlot::ssRemoteClosed) { // The remote has already closed the socket, remove the slot altogether: + if (m_Slots[i].m_Socket.IsValid()) + { + m_Slots[i].m_Socket.CloseSocket(); + } m_Slots[i] = m_Slots[--m_NumSlots]; } else @@ -489,6 +493,7 @@ void cSocketThreads::cSocketThread::ReadFromSockets(fd_set * a_Read) // Notify the callback that the remote has closed the socket; keep the slot m_Slots[i].m_Client->SocketClosed(); m_Slots[i].m_State = sSlot::ssRemoteClosed; + m_Slots[i].m_Socket.CloseSocket(); break; } case sSlot::ssWritingRestOut: -- cgit v1.2.3 From 16b27c4b7ae2ccb03355148fa7fc7116190cc5fb Mon Sep 17 00:00:00 2001 From: Tycho Date: Tue, 11 Mar 2014 14:16:08 -0700 Subject: Fixed a load of format string errors --- src/OSSupport/SocketThreads.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/OSSupport/SocketThreads.cpp') diff --git a/src/OSSupport/SocketThreads.cpp b/src/OSSupport/SocketThreads.cpp index a02661d2c..f37b00202 100644 --- a/src/OSSupport/SocketThreads.cpp +++ b/src/OSSupport/SocketThreads.cpp @@ -54,7 +54,7 @@ bool cSocketThreads::AddClient(const cSocket & a_Socket, cCallback * a_Client) } // No thread has free space, create a new one: - LOGD("Creating a new cSocketThread (currently have %d)", m_Threads.size()); + LOGD("Creating a new cSocketThread (currently have %zu)", m_Threads.size()); cSocketThread * Thread = new cSocketThread(this); if (!Thread->Start()) { -- cgit v1.2.3 From 862e2194433b5d47aaf88261091b35a1ee663482 Mon Sep 17 00:00:00 2001 From: Tycho Date: Wed, 12 Mar 2014 10:34:50 -0700 Subject: Added additional macros to support the MSVC size_t format and changed all formats to use the macros --- src/OSSupport/SocketThreads.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/OSSupport/SocketThreads.cpp') diff --git a/src/OSSupport/SocketThreads.cpp b/src/OSSupport/SocketThreads.cpp index f37b00202..0bc1d6b55 100644 --- a/src/OSSupport/SocketThreads.cpp +++ b/src/OSSupport/SocketThreads.cpp @@ -54,7 +54,7 @@ bool cSocketThreads::AddClient(const cSocket & a_Socket, cCallback * a_Client) } // No thread has free space, create a new one: - LOGD("Creating a new cSocketThread (currently have %zu)", m_Threads.size()); + LOGD("Creating a new cSocketThread (currently have " SIZE_T_FMT ")", m_Threads.size()); cSocketThread * Thread = new cSocketThread(this); if (!Thread->Start()) { -- cgit v1.2.3