// ListenThread.cpp
// Implements the cListenThread class representing the thread that listens for client connections
#include "Globals.h"
#include "ListenThread.h"
cListenThread::cListenThread(cCallback & a_Callback, cSocket::eFamily a_Family) :
super("ListenThread"),
m_Callback(a_Callback),
m_Family(a_Family)
{
}
cListenThread::~cListenThread()
{
// TODO
}
bool cListenThread::Initialize(const AString & a_PortsString)
{
ASSERT(m_Sockets.empty()); // Not yet started
if (!CreateSockets(a_PortsString))
{
return false;
}
return true;
}
bool cListenThread::Start(void)
{
if (m_Sockets.empty())
{
// There are no sockets listening, either forgotten to initialize or the user specified no listening ports
// Report as successful, though
return true;
}
return super::Start();
}
void cListenThread::Stop(void)
{
if (m_Sockets.empty())
{
// No sockets means no thread was running in the first place
return;
}
m_ShouldTerminate = true;
// Close one socket to wake the thread up from the select() call
m_Sockets[0].CloseSocket();
// Wait for the thread to finish
super::Wait();
// Clean up all sockets
m_Sockets.clear();
}
void cListenThread::SetReuseAddr(bool a_Reuse)
{
ASSERT(m_Sockets.empty()); // Must not have been Initialize()d yet
m_ShouldReuseAddr = a_Reuse;
}
bool cListenThread::CreateSockets(const AString & a_PortsString)
{
AStringVector Ports = StringSplit(a_PortsString, ",");
if (Ports.empty())
{
return false;
}
const char * FamilyStr = "";
switch (m_Family)
{
case cSocket::IPv4: FamilyStr = "IPv4: "; break;
case cSocket::IPv6: FamilyStr = "IPv6: "; break;
default:
{
ASSERT(!"Unknown address family");
break;
}
}
for (AStringVector::const_iterator itr = Ports.begin(), end = Ports.end(); itr != end; ++itr)
{
int Port = atoi(Trim(*itr).c_str());
if ((Port <= 0) || (Port > 65535))
{
LOGWARNING("Invalid port specified: \"%s\".", Trim(*itr).c_str());
continue;
}
m_Sockets.push_back(cSocket::CreateSocket(m_Family));
if (!m_Sockets.back().IsValid())
{
LOGERROR("Cannot create listening socket for port %d: \"%s\"", Port, cSocket::GetLastErrorString().c_str());
m_Sockets.pop_back();
continue;
}
if (m_ShouldReuseAddr)
{
if (!m_Sockets.back().SetReuseAddress())
{
LOG("Port %d cannot reuse addr, syscall failed: \"%s\".", Port, cSocket::GetLastErrorString().c_str());
}
}
// Bind to port:
bool res = false;
switch (m_Family)
{
case cSocket::IPv4: res = m_Sockets.back().BindToAnyIPv4(Port); break;
case cSocket::IPv6: res = m_Sockets.back().BindToAnyIPv6(Port); break;
default:
{
ASSERT(!"Unknown address family");
res = false;
}
}
if (!res)
{
LOGWARNING("Cannot bind port %d: \"%s\".", Port, cSocket::GetLastErrorString().c_str());
m_Sockets.pop_back();
continue;
}
if (!m_Sockets.back().Listen())
{
LOGWARNING("Cannot listen on port %d: \"%s\".", Port, cSocket::GetLastErrorString().c_str());
m_Sockets.pop_back();
continue;
}
LOGINFO("%sPort %d is open for connections", FamilyStr, Port);
} // for itr - Ports[]
return !(m_Sockets.empty());
}
void cListenThread::Execute(void)
{
if (m_Sockets.empty())
{
LOGD("Empty cListenThread, ending thread now.");
return;
}
// Find the highest socket number:
cSocket::xSocket Highest = m_Sockets[0].GetSocket();
for (cSockets::iterator itr = m_Sockets.begin(), end = m_Sockets.end(); itr != end; ++itr)
{
if (itr->GetSocket() > Highest)
{
Highest = itr->GetSocket();
}
} // for itr - m_Sockets[]
while (!m_ShouldTerminate)
{
// Put all sockets into a FD set:
fd_set fdRead;
FD_ZERO(&fdRead);
for (cSockets::iterator itr = m_Sockets.begin(), end = m_Sockets.end(); itr != end; ++itr)
{
FD_SET(itr->GetSocket(), &fdRead);
} // for itr - m_Sockets[]
if (select(Highest + 1, &fdRead, NULL, NULL, NULL) == -1)
{
LOG("select(R) call failed in cListenThread: \"%s\"", cSocket::GetLastErrorString().c_str());
continue;
}
for (cSockets::iterator itr = m_Sockets.begin(), end = m_Sockets.end(); itr != end; ++itr)
{
if (FD_ISSET(itr->GetSocket(), &fdRead))
{
cSocket Client = itr->Accept();
m_Callback.OnConnectionAccepted(Client);
}
} // for itr - m_Sockets[]
} // while (!m_ShouldTerminate)
}