diff options
Diffstat (limited to 'src/OSSupport/SocketThreads.cpp')
-rw-r--r-- | src/OSSupport/SocketThreads.cpp | 236 |
1 files changed, 115 insertions, 121 deletions
diff --git a/src/OSSupport/SocketThreads.cpp b/src/OSSupport/SocketThreads.cpp index b222a2e4e..b8069cf00 100644 --- a/src/OSSupport/SocketThreads.cpp +++ b/src/OSSupport/SocketThreads.cpp @@ -132,47 +132,6 @@ void cSocketThreads::Write(const cCallback * a_Client, const AString & a_Data) -/// Stops reading from the socket - when this call returns, no more calls to the callbacks are made -void cSocketThreads::StopReading(const cCallback * a_Client) -{ - cCSLock Lock(m_CS); - for (cSocketThreadList::iterator itr = m_Threads.begin(); itr != m_Threads.end(); ++itr) - { - if ((*itr)->StopReading(a_Client)) - { - return; - } - } // for itr - m_Threads[] - - // Cannot assert, this normally happens if the socket is closed before the client deinitializes - // ASSERT(!"Stopping reading on an unknown client"); -} - - - - - -/// Queues the socket for closing, as soon as its outgoing data is sent -void cSocketThreads::QueueClose(const cCallback * a_Client) -{ - LOGD("QueueClose(client %p)", a_Client); - - cCSLock Lock(m_CS); - for (cSocketThreadList::iterator itr = m_Threads.begin(); itr != m_Threads.end(); ++itr) - { - if ((*itr)->QueueClose(a_Client)) - { - return; - } - } // for itr - m_Threads[] - - ASSERT(!"Queueing close of an unknown client"); -} - - - - - //////////////////////////////////////////////////////////////////////////////// // cSocketThreads::cSocketThread: @@ -210,13 +169,13 @@ cSocketThreads::cSocketThread::~cSocketThread() void cSocketThreads::cSocketThread::AddClient(const cSocket & a_Socket, cCallback * a_Client) { + ASSERT(m_Parent->m_CS.IsLockedByCurrentThread()); ASSERT(m_NumSlots < MAX_SLOTS); // Use HasEmptySlot() to check before adding m_Slots[m_NumSlots].m_Client = a_Client; m_Slots[m_NumSlots].m_Socket = a_Socket; m_Slots[m_NumSlots].m_Outgoing.clear(); - m_Slots[m_NumSlots].m_ShouldClose = false; - m_Slots[m_NumSlots].m_ShouldCallClient = true; + m_Slots[m_NumSlots].m_State = sSlot::ssNormal; m_NumSlots++; // Notify the thread of the change: @@ -230,7 +189,7 @@ void cSocketThreads::cSocketThread::AddClient(const cSocket & a_Socket, cCallbac bool cSocketThreads::cSocketThread::RemoveClient(const cCallback * a_Client) { - // Returns true if removed, false if not found + ASSERT(m_Parent->m_CS.IsLockedByCurrentThread()); if (m_NumSlots == 0) { @@ -244,8 +203,29 @@ bool cSocketThreads::cSocketThread::RemoveClient(const cCallback * a_Client) continue; } - // Found, remove it: - m_Slots[i] = m_Slots[--m_NumSlots]; + // Found the slot: + if (m_Slots[i].m_State == sSlot::ssRemoteClosed) + { + // The remote has already closed the socket, remove the slot altogether: + m_Slots[i] = m_Slots[--m_NumSlots]; + } + else + { + // Query and queue the last batch of outgoing data: + m_Slots[i].m_Client->GetOutgoingData(m_Slots[i].m_Outgoing); + if (m_Slots[i].m_Outgoing.empty()) + { + // No more outgoing data, shut the socket down immediately: + m_Slots[i].m_Socket.ShutdownReadWrite(); + m_Slots[i].m_State = sSlot::ssShuttingDown; + } + else + { + // More data to send, shut down reading and wait for the rest to get sent: + m_Slots[i].m_State = sSlot::ssWritingRestOut; + } + m_Slots[i].m_Client = NULL; + } // Notify the thread of the change: ASSERT(m_ControlSocket2.IsValid()); @@ -263,6 +243,8 @@ bool cSocketThreads::cSocketThread::RemoveClient(const cCallback * a_Client) bool cSocketThreads::cSocketThread::HasClient(const cCallback * a_Client) const { + ASSERT(m_Parent->m_CS.IsLockedByCurrentThread()); + for (int i = m_NumSlots - 1; i >= 0; --i) { if (m_Slots[i].m_Client == a_Client) @@ -295,6 +277,8 @@ bool cSocketThreads::cSocketThread::HasSocket(const cSocket * a_Socket) const bool cSocketThreads::cSocketThread::NotifyWrite(const cCallback * a_Client) { + ASSERT(m_Parent->m_CS.IsLockedByCurrentThread()); + if (HasClient(a_Client)) { // Notify the thread that there's another packet in the queue: @@ -311,7 +295,7 @@ bool cSocketThreads::cSocketThread::NotifyWrite(const cCallback * a_Client) bool cSocketThreads::cSocketThread::Write(const cCallback * a_Client, const AString & a_Data) { - // Returns true if socket handled by this thread + ASSERT(m_Parent->m_CS.IsLockedByCurrentThread()); for (int i = m_NumSlots - 1; i >= 0; --i) { if (m_Slots[i].m_Client == a_Client) @@ -332,47 +316,6 @@ bool cSocketThreads::cSocketThread::Write(const cCallback * a_Client, const AStr -bool cSocketThreads::cSocketThread::StopReading (const cCallback * a_Client) -{ - // Returns true if client handled by this thread - for (int i = m_NumSlots - 1; i >= 0; --i) - { - if (m_Slots[i].m_Client == a_Client) - { - m_Slots[i].m_ShouldCallClient = false; - return true; - } - } // for i - m_Slots[] - return false; -} - - - - - -bool cSocketThreads::cSocketThread::QueueClose(const cCallback * a_Client) -{ - // Returns true if socket handled by this thread - for (int i = m_NumSlots - 1; i >= 0; --i) - { - if (m_Slots[i].m_Client == a_Client) - { - m_Slots[i].m_ShouldClose = true; - - // Notify the thread that there's a close queued (in case its conditions are already met): - ASSERT(m_ControlSocket2.IsValid()); - m_ControlSocket2.Send("c", 1); - - return true; - } - } // for i - m_Slots[] - return false; -} - - - - - bool cSocketThreads::cSocketThread::Start(void) { // Create the control socket listener @@ -446,10 +389,13 @@ void cSocketThreads::cSocketThread::Execute(void) fd_set fdRead; cSocket::xSocket Highest = m_ControlSocket1.GetSocket(); - PrepareSet(&fdRead, Highest); + PrepareSet(&fdRead, Highest, false); // Wait for the sockets: - if (select(Highest + 1, &fdRead, NULL, NULL, NULL) == -1) + timeval Timeout; + Timeout.tv_sec = 5; + Timeout.tv_usec = 0; + if (select(Highest + 1, &fdRead, NULL, NULL, &Timeout) == -1) { LOG("select(R) call failed in cSocketThread: \"%s\"", cSocket::GetLastErrorString().c_str()); continue; @@ -460,8 +406,7 @@ void cSocketThreads::cSocketThread::Execute(void) // Test sockets for writing: fd_set fdWrite; Highest = m_ControlSocket1.GetSocket(); - PrepareSet(&fdWrite, Highest); - timeval Timeout; + PrepareSet(&fdWrite, Highest, true); Timeout.tv_sec = 0; Timeout.tv_usec = 0; if (select(Highest + 1, NULL, &fdWrite, NULL, &Timeout) == -1) @@ -471,6 +416,8 @@ void cSocketThreads::cSocketThread::Execute(void) } WriteToSockets(&fdWrite); + + CleanUpShutSockets(); } // while (!mShouldTerminate) } @@ -478,7 +425,7 @@ void cSocketThreads::cSocketThread::Execute(void) -void cSocketThreads::cSocketThread::PrepareSet(fd_set * a_Set, cSocket::xSocket & a_Highest) +void cSocketThreads::cSocketThread::PrepareSet(fd_set * a_Set, cSocket::xSocket & a_Highest, bool a_IsForWriting) { FD_ZERO(a_Set); FD_SET(m_ControlSocket1.GetSocket(), a_Set); @@ -490,6 +437,11 @@ void cSocketThreads::cSocketThread::PrepareSet(fd_set * a_Set, cSocket::xSocket { continue; } + if (m_Slots[i].m_State == sSlot::ssRemoteClosed) + { + // This socket won't provide nor consume any data anymore, don't put it in the Set + continue; + } cSocket::xSocket s = m_Slots[i].m_Socket.GetSocket(); FD_SET(s, a_Set); if (s > a_Highest) @@ -525,29 +477,42 @@ void cSocketThreads::cSocketThread::ReadFromSockets(fd_set * a_Read) } char Buffer[1024]; int Received = m_Slots[i].m_Socket.Receive(Buffer, ARRAYCOUNT(Buffer), 0); - if (Received == 0) + if (Received <= 0) { - // The socket has been closed by the remote party, close our socket and let it be removed after we process all reading - m_Slots[i].m_Socket.CloseSocket(); - if (m_Slots[i].m_ShouldCallClient) + // The socket has been closed by the remote party + switch (m_Slots[i].m_State) { - m_Slots[i].m_Client->SocketClosed(); - } - } - else if (Received > 0) - { - if (m_Slots[i].m_ShouldCallClient) - { - m_Slots[i].m_Client->DataReceived(Buffer, Received); - } + case sSlot::ssNormal: + { + // Notify the callback that the remote has closed the socket; keep the slot + m_Slots[i].m_Client->SocketClosed(); + m_Slots[i].m_State = sSlot::ssRemoteClosed; + break; + } + case sSlot::ssWritingRestOut: + case sSlot::ssShuttingDown: + case sSlot::ssShuttingDown2: + { + // Force-close the socket and remove the slot: + m_Slots[i].m_Socket.CloseSocket(); + m_Slots[i] = m_Slots[--m_NumSlots]; + break; + } + default: + { + LOG("%s: Unexpected socket state: %d (%s)", + __FUNCTION__, m_Slots[i].m_Socket.GetSocket(), m_Slots[i].m_Socket.GetIPString().c_str() + ); + ASSERT(!"Unexpected socket state"); + break; + } + } // switch (m_Slots[i].m_State) } else { - // The socket has encountered an error, close it and let it be removed after we process all reading - m_Slots[i].m_Socket.CloseSocket(); - if (m_Slots[i].m_ShouldCallClient) + if (m_Slots[i].m_Client != NULL) { - m_Slots[i].m_Client->SocketClosed(); + m_Slots[i].m_Client->DataReceived(Buffer, Received); } } } // for i - m_Slots[] @@ -571,22 +536,17 @@ void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write) if (m_Slots[i].m_Outgoing.empty()) { // Request another chunk of outgoing data: - if (m_Slots[i].m_ShouldCallClient) + if (m_Slots[i].m_Client != NULL) { m_Slots[i].m_Client->GetOutgoingData(m_Slots[i].m_Outgoing); } if (m_Slots[i].m_Outgoing.empty()) { - // Nothing ready - if (m_Slots[i].m_ShouldClose) + // No outgoing data is ready + if (m_Slots[i].m_State == sSlot::ssWritingRestOut) { - // Socket was queued for closing and there's no more data to send, close it now: - - // DEBUG - LOGD("Socket was queued for closing, closing now. Slot %d, client %p, socket %d", i, m_Slots[i].m_Client, m_Slots[i].m_Socket.GetSocket()); - - m_Slots[i].m_Socket.CloseSocket(); - // The slot must be freed actively by the client, using RemoveClient() + m_Slots[i].m_State = sSlot::ssShuttingDown; + m_Slots[i].m_Socket.ShutdownReadWrite(); } continue; } @@ -598,7 +558,7 @@ void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write) int Err = cSocket::GetLastError(); LOGWARNING("Error %d while writing to client \"%s\", disconnecting. \"%s\"", Err, m_Slots[i].m_Socket.GetIPString().c_str(), cSocket::GetErrorString(Err).c_str()); m_Slots[i].m_Socket.CloseSocket(); - if (m_Slots[i].m_ShouldCallClient) + if (m_Slots[i].m_Client != NULL) { m_Slots[i].m_Client->SocketClosed(); } @@ -606,6 +566,12 @@ void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write) } m_Slots[i].m_Outgoing.erase(0, Sent); + if (m_Slots[i].m_Outgoing.empty() && (m_Slots[i].m_State == sSlot::ssWritingRestOut)) + { + m_Slots[i].m_State = sSlot::ssShuttingDown; + m_Slots[i].m_Socket.ShutdownReadWrite(); + } + // _X: If there's data left, it means the client is not reading fast enough, the server would unnecessarily spin in the main loop with zero actions taken; so signalling is disabled // This means that if there's data left, it will be sent only when there's incoming data or someone queues another packet (for any socket handled by this thread) /* @@ -622,3 +588,31 @@ void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write) + +void cSocketThreads::cSocketThread::CleanUpShutSockets(void) +{ + for (int i = m_NumSlots - 1; i >= 0; i--) + { + switch (m_Slots[i].m_State) + { + case sSlot::ssShuttingDown2: + { + // The socket has reached the shutdown timeout, close it and clear its slot: + m_Slots[i].m_Socket.CloseSocket(); + m_Slots[i] = m_Slots[--m_NumSlots]; + break; + } + case sSlot::ssShuttingDown: + { + // The socket has been shut down for a single thread loop, let it loop once more before closing: + m_Slots[i].m_State = sSlot::ssShuttingDown2; + break; + } + default: break; + } + } // for i - m_Slots[] +} + + + + |