From dbf7f13bd414daea5e787da2543df186dc465c34 Mon Sep 17 00:00:00 2001 From: Mattes D Date: Thu, 22 Jan 2015 13:00:32 +0100 Subject: cNetwork: Added link creation callback. This allows the callback classes to store the link inside them and use it internally later on, mainly for sending data. --- src/OSSupport/Network.h | 18 +++++++++++++++--- src/OSSupport/ServerHandleImpl.cpp | 2 ++ src/OSSupport/TCPLinkImpl.cpp | 26 ++++++++++++++++---------- src/OSSupport/TCPLinkImpl.h | 11 +++++++++-- tests/Network/EchoServer.cpp | 38 ++++++++++++++++++++++++++++++-------- tests/Network/Google.cpp | 21 ++++++++++++++++++--- 6 files changed, 90 insertions(+), 26 deletions(-) diff --git a/src/OSSupport/Network.h b/src/OSSupport/Network.h index b9ca377cb..85c7c5dcb 100644 --- a/src/OSSupport/Network.h +++ b/src/OSSupport/Network.h @@ -13,6 +13,14 @@ +// fwd: +class cTCPLink; +typedef SharedPtr cTCPLinkPtr; + + + + + /** Interface that provides the methods available on a single TCP connection. */ class cTCPLink { @@ -25,16 +33,20 @@ public: // Force a virtual destructor for all descendants: virtual ~cCallbacks() {} + /** Called when the cTCPLink for the connection is created. + The callback may store the cTCPLink instance for later use, but it should remove it in OnError(), OnRemoteClosed() or right after Close(). */ + virtual void OnLinkCreated(cTCPLinkPtr a_Link) = 0; + /** Called when there's data incoming from the remote peer. */ - virtual void OnReceivedData(cTCPLink & a_Link, const char * a_Data, size_t a_Length) = 0; + virtual void OnReceivedData(const char * a_Data, size_t a_Length) = 0; /** Called when the remote end closes the connection. The link is still available for connection information query (IP / port). Sending data on the link is not an error, but the data won't be delivered. */ - virtual void OnRemoteClosed(cTCPLink & a_Link) = 0; + virtual void OnRemoteClosed(void) = 0; /** Called when an error is detected on the connection. */ - virtual void OnError(cTCPLink & a_Link, int a_ErrorCode, const AString & a_ErrorMsg) = 0; + virtual void OnError(int a_ErrorCode, const AString & a_ErrorMsg) = 0; }; typedef SharedPtr cCallbacksPtr; diff --git a/src/OSSupport/ServerHandleImpl.cpp b/src/OSSupport/ServerHandleImpl.cpp index 82cbecef2..026c0fdc1 100644 --- a/src/OSSupport/ServerHandleImpl.cpp +++ b/src/OSSupport/ServerHandleImpl.cpp @@ -283,6 +283,8 @@ void cServerHandleImpl::Callback(evconnlistener * a_Listener, evutil_socket_t a_ cCSLock Lock(Self->m_CS); Self->m_Connections.push_back(Link); } // Lock(m_CS) + LinkCallbacks->OnLinkCreated(Link); + Link->Enable(); // Call the OnAccepted callback: Self->m_ListenCallbacks->OnAccepted(*Link); diff --git a/src/OSSupport/TCPLinkImpl.cpp b/src/OSSupport/TCPLinkImpl.cpp index 6f937646f..5d31da22e 100644 --- a/src/OSSupport/TCPLinkImpl.cpp +++ b/src/OSSupport/TCPLinkImpl.cpp @@ -20,9 +20,6 @@ cTCPLinkImpl::cTCPLinkImpl(cTCPLink::cCallbacksPtr a_LinkCallbacks): m_BufferEvent(bufferevent_socket_new(cNetworkSingleton::Get().GetEventBase(), -1, BEV_OPT_CLOSE_ON_FREE)), m_Server(nullptr) { - // Create the LibEvent handle, but don't assign a socket to it yet (will be assigned within Connect() method): - bufferevent_setcb(m_BufferEvent, ReadCallback, nullptr, EventCallback, this); - bufferevent_enable(m_BufferEvent, EV_READ | EV_WRITE); } @@ -37,10 +34,6 @@ cTCPLinkImpl::cTCPLinkImpl(evutil_socket_t a_Socket, cTCPLink::cCallbacksPtr a_L // Update the endpoint addresses: UpdateLocalAddress(); UpdateAddress(a_Address, a_AddrLen, m_RemoteIP, m_RemotePort); - - // Create the LibEvent handle: - bufferevent_setcb(m_BufferEvent, ReadCallback, nullptr, EventCallback, this); - bufferevent_enable(m_BufferEvent, EV_READ | EV_WRITE); } @@ -65,6 +58,8 @@ cTCPLinkImplPtr cTCPLinkImpl::Connect(const AString & a_Host, UInt16 a_Port, cTC cTCPLinkImplPtr res{new cTCPLinkImpl(a_LinkCallbacks)}; // Cannot use std::make_shared here, constructor is not accessible res->m_ConnectCallbacks = a_ConnectCallbacks; cNetworkSingleton::Get().AddLink(res); + res->m_Callbacks->OnLinkCreated(res); + res->Enable(); // If a_Host is an IP address, schedule a connection immediately: sockaddr_storage sa; @@ -107,6 +102,17 @@ cTCPLinkImplPtr cTCPLinkImpl::Connect(const AString & a_Host, UInt16 a_Port, cTC +void cTCPLinkImpl::Enable(void) +{ + // Set the LibEvent callbacks and enable processing: + bufferevent_setcb(m_BufferEvent, ReadCallback, nullptr, EventCallback, this); + bufferevent_enable(m_BufferEvent, EV_READ | EV_WRITE); +} + + + + + bool cTCPLinkImpl::Send(const void * a_Data, size_t a_Length) { return (bufferevent_write(m_BufferEvent, a_Data, a_Length) == 0); @@ -160,7 +166,7 @@ void cTCPLinkImpl::ReadCallback(bufferevent * a_BufferEvent, void * a_Self) size_t length; while ((length = bufferevent_read(a_BufferEvent, data, sizeof(data))) > 0) { - Self->m_Callbacks->OnReceivedData(*Self, data, length); + Self->m_Callbacks->OnReceivedData(data, length); } } @@ -189,7 +195,7 @@ void cTCPLinkImpl::EventCallback(bufferevent * a_BufferEvent, short a_What, void } else { - Self->m_Callbacks->OnError(*Self, err, evutil_socket_error_to_string(err)); + Self->m_Callbacks->OnError(err, evutil_socket_error_to_string(err)); if (Self->m_Server == nullptr) { cNetworkSingleton::Get().RemoveLink(Self); @@ -219,7 +225,7 @@ void cTCPLinkImpl::EventCallback(bufferevent * a_BufferEvent, short a_What, void // If the connection has been closed, call the link callback and remove the connection: if (a_What & BEV_EVENT_EOF) { - Self->m_Callbacks->OnRemoteClosed(*Self); + Self->m_Callbacks->OnRemoteClosed(); if (Self->m_Server != nullptr) { Self->m_Server->RemoveLink(Self); diff --git a/src/OSSupport/TCPLinkImpl.h b/src/OSSupport/TCPLinkImpl.h index edd295fe2..66347afe0 100644 --- a/src/OSSupport/TCPLinkImpl.h +++ b/src/OSSupport/TCPLinkImpl.h @@ -37,7 +37,8 @@ class cTCPLinkImpl: public: /** Creates a new link based on the given socket. Used for connections accepted in a server using cNetwork::Listen(). - a_Address and a_AddrLen describe the remote peer that has connected. */ + a_Address and a_AddrLen describe the remote peer that has connected. + The link is created disabled, you need to call Enable() to start the regular communication. */ cTCPLinkImpl(evutil_socket_t a_Socket, cCallbacksPtr a_LinkCallbacks, cServerHandleImpl * a_Server, const sockaddr * a_Address, socklen_t a_AddrLen); /** Destroys the LibEvent handle representing the link. */ @@ -48,6 +49,11 @@ public: Returns a link that has the connection request queued, or NULL for failure. */ static cTCPLinkImplPtr Connect(const AString & a_Host, UInt16 a_Port, cTCPLink::cCallbacksPtr a_LinkCallbacks, cNetwork::cConnectCallbacksPtr a_ConnectCallbacks); + /** Enables communication over the link. + Links are created with communication disabled, so that creation callbacks can be called first. + This function then enables the regular communication to be reported. */ + void Enable(void); + // cTCPLink overrides: virtual bool Send(const void * a_Data, size_t a_Length) override; virtual AString GetLocalIP(void) const override { return m_LocalIP; } @@ -85,7 +91,8 @@ protected: /** Creates a new link to be queued to connect to a specified host:port. Used for outgoing connections created using cNetwork::Connect(). - To be used only by the Connect() factory function. */ + To be used only by the Connect() factory function. + The link is created disabled, you need to call Enable() to start the regular communication. */ cTCPLinkImpl(const cCallbacksPtr a_LinkCallbacks); /** Callback that LibEvent calls when there's data available from the remote peer. */ diff --git a/tests/Network/EchoServer.cpp b/tests/Network/EchoServer.cpp index 061310c82..728db0b7c 100644 --- a/tests/Network/EchoServer.cpp +++ b/tests/Network/EchoServer.cpp @@ -16,11 +16,21 @@ class cEchoLinkCallbacks: public cTCPLink::cCallbacks { - virtual void OnReceivedData(cTCPLink & a_Link, const char * a_Data, size_t a_Size) override + // cTCPLink::cCallbacks overrides: + virtual void OnLinkCreated(cTCPLinkPtr a_Link) override { + ASSERT(m_Link == nullptr); + m_Link = a_Link; + } + + + virtual void OnReceivedData(const char * a_Data, size_t a_Size) override + { + ASSERT(m_Link != nullptr); + // Echo the incoming data back to outgoing data: - LOGD("%p (%s:%d): Data received (%u bytes), echoing back.", &a_Link, a_Link.GetRemoteIP().c_str(), a_Link.GetRemotePort(), static_cast(a_Size)); - a_Link.Send(a_Data, a_Size); + LOGD("%p (%s:%d): Data received (%u bytes), echoing back.", m_Link.get(), m_Link->GetRemoteIP().c_str(), m_Link->GetRemotePort(), static_cast(a_Size)); + m_Link->Send(a_Data, a_Size); LOGD("Echo queued"); // Search for a Ctrl+Z, if found, drop the connection: @@ -28,21 +38,33 @@ class cEchoLinkCallbacks: { if (a_Data[i] == '\x1a') { - a_Link.Close(); + m_Link->Close(); + m_Link.reset(); return; } } } - virtual void OnRemoteClosed(cTCPLink & a_Link) override + + virtual void OnRemoteClosed(void) override { - LOGD("%p (%s:%d): Remote has closed the connection.", &a_Link, a_Link.GetRemoteIP().c_str(), a_Link.GetRemotePort()); + ASSERT(m_Link != nullptr); + + LOGD("%p (%s:%d): Remote has closed the connection.", m_Link.get(), m_Link->GetRemoteIP().c_str(), m_Link->GetRemotePort()); + m_Link.reset(); } - virtual void OnError(cTCPLink & a_Link, int a_ErrorCode, const AString & a_ErrorMsg) override + + virtual void OnError(int a_ErrorCode, const AString & a_ErrorMsg) override { - LOGD("%p (%s:%d): Error %d in the cEchoLinkCallbacks: %s", &a_Link, a_Link.GetRemoteIP().c_str(), a_Link.GetRemotePort(), a_ErrorCode, a_ErrorMsg.c_str()); + ASSERT(m_Link != nullptr); + + LOGD("%p (%s:%d): Error %d in the cEchoLinkCallbacks: %s", m_Link.get(), m_Link->GetRemoteIP().c_str(), m_Link->GetRemotePort(), a_ErrorCode, a_ErrorMsg.c_str()); + m_Link.reset(); } + + /** The link attached to this callbacks instance. */ + cTCPLinkPtr m_Link; }; diff --git a/tests/Network/Google.cpp b/tests/Network/Google.cpp index be08f179c..8985eef17 100644 --- a/tests/Network/Google.cpp +++ b/tests/Network/Google.cpp @@ -49,24 +49,39 @@ class cDumpCallbacks: public cTCPLink::cCallbacks { cEvent & m_Event; + cTCPLinkPtr m_Link; - virtual void OnReceivedData(cTCPLink & a_Link, const char * a_Data, size_t a_Size) override + virtual void OnLinkCreated(cTCPLinkPtr a_Link) override { + ASSERT(m_Link == nullptr); + m_Link = a_Link; + } + + virtual void OnReceivedData(const char * a_Data, size_t a_Size) override + { + ASSERT(m_Link != nullptr); + // Log the incoming data size: AString Hex; CreateHexDump(Hex, a_Data, a_Size, 16); LOGD("Incoming data: %u bytes:\n%s", static_cast(a_Size), Hex.c_str()); } - virtual void OnRemoteClosed(cTCPLink & a_Link) override + virtual void OnRemoteClosed(void) override { + ASSERT(m_Link != nullptr); + LOGD("Remote has closed the connection."); + m_Link.reset(); m_Event.Set(); } - virtual void OnError(cTCPLink & a_Link, int a_ErrorCode, const AString & a_ErrorMsg) override + virtual void OnError(int a_ErrorCode, const AString & a_ErrorMsg) override { + ASSERT(m_Link != nullptr); + LOGD("Error %d (%s) in the cDumpCallbacks.", a_ErrorCode, a_ErrorMsg.c_str()); + m_Link.reset(); m_Event.Set(); } -- cgit v1.2.3