diff options
Diffstat (limited to '')
-rw-r--r-- | source/cSocketThreads.cpp | 133 |
1 files changed, 95 insertions, 38 deletions
diff --git a/source/cSocketThreads.cpp b/source/cSocketThreads.cpp index 277036e46..796316878 100644 --- a/source/cSocketThreads.cpp +++ b/source/cSocketThreads.cpp @@ -39,7 +39,7 @@ cSocketThreads::~cSocketThreads() -void cSocketThreads::AddClient(cSocket * a_Socket, cCallback * a_Client)
+bool cSocketThreads::AddClient(cSocket * a_Socket, cCallback * a_Client)
{
// Add a (socket, client) pair for processing, data from a_Socket is to be sent to a_Client
@@ -47,25 +47,32 @@ void cSocketThreads::AddClient(cSocket * a_Socket, cCallback * a_Client) cCSLock Lock(m_CS);
for (cSocketThreadList::iterator itr = m_Threads.begin(); itr != m_Threads.end(); ++itr)
{
- if ((*itr)->HasEmptySlot())
+ if ((*itr)->IsValid() && (*itr)->HasEmptySlot())
{
(*itr)->AddClient(a_Socket, a_Client);
- return;
+ return true;
}
}
// No thread has free space, create a new one:
+ LOG("Creating a new cSocketThread (currently have %d)", m_Threads.size());
cSocketThread * Thread = new cSocketThread(this);
- Thread->Start();
+ if (!Thread->Start())
+ {
+ // There was an error launching the thread (but it was already logged along with the reason)
+ delete Thread;
+ return false;
+ }
Thread->AddClient(a_Socket, a_Client);
m_Threads.push_back(Thread);
+ return true;
}
-void cSocketThreads::RemoveClient(cSocket * a_Socket)
+void cSocketThreads::RemoveClient(const cSocket * a_Socket)
{
// Remove the socket (and associated client) from processing
@@ -83,7 +90,7 @@ void cSocketThreads::RemoveClient(cSocket * a_Socket) -void cSocketThreads::RemoveClient(cCallback * a_Client)
+void cSocketThreads::RemoveClient(const cCallback * a_Client)
{
// Remove the associated socket and the client from processing
@@ -101,7 +108,7 @@ void cSocketThreads::RemoveClient(cCallback * a_Client) -void cSocketThreads::NotifyWrite(cCallback * a_Client)
+void cSocketThreads::NotifyWrite(const cCallback * a_Client)
{
// Notifies the thread responsible for a_Client that the client has something to write
@@ -152,7 +159,7 @@ void cSocketThreads::cSocketThread::AddClient(cSocket * a_Socket, cCallback * a_ -bool cSocketThreads::cSocketThread::RemoveClient(cCallback * a_Client)
+bool cSocketThreads::cSocketThread::RemoveClient(const cCallback * a_Client)
{
// Returns true if removed, false if not found
@@ -161,7 +168,7 @@ bool cSocketThreads::cSocketThread::RemoveClient(cCallback * a_Client) return false;
}
- for (int i = m_NumSlots - 1; i > 0 ; --i)
+ for (int i = m_NumSlots - 1; i >= 0 ; --i)
{
if (m_Slots[i].m_Client != a_Client)
{
@@ -186,7 +193,7 @@ bool cSocketThreads::cSocketThread::RemoveClient(cCallback * a_Client) -bool cSocketThreads::cSocketThread::RemoveSocket(cSocket * a_Socket)
+bool cSocketThreads::cSocketThread::RemoveSocket(const cSocket * a_Socket)
{
// Returns true if removed, false if not found
@@ -195,7 +202,7 @@ bool cSocketThreads::cSocketThread::RemoveSocket(cSocket * a_Socket) return false;
}
- for (int i = m_NumSlots - 1; i > 0 ; --i)
+ for (int i = m_NumSlots - 1; i >= 0 ; --i)
{
if (m_Slots[i].m_Socket != a_Socket)
{
@@ -220,7 +227,7 @@ bool cSocketThreads::cSocketThread::RemoveSocket(cSocket * a_Socket) -bool cSocketThreads::cSocketThread::NotifyWrite(cCallback * a_Client)
+bool cSocketThreads::cSocketThread::NotifyWrite(const cCallback * a_Client)
{
if (HasClient(a_Client))
{
@@ -236,7 +243,7 @@ bool cSocketThreads::cSocketThread::NotifyWrite(cCallback * a_Client) -bool cSocketThreads::cSocketThread::HasClient(cCallback * a_Client) const
+bool cSocketThreads::cSocketThread::HasClient(const cCallback * a_Client) const
{
for (int i = m_NumSlots - 1; i >= 0; --i)
{
@@ -252,11 +259,11 @@ bool cSocketThreads::cSocketThread::HasClient(cCallback * a_Client) const -bool cSocketThreads::cSocketThread::HasSocket(cSocket * a_Socket) const
+bool cSocketThreads::cSocketThread::HasSocket(const cSocket * a_Socket) const
{
for (int i = m_NumSlots - 1; i >= 0; --i)
{
- if (m_Slots[i].m_Socket == a_Socket)
+ if (m_Slots[i].m_Socket->GetSocket() == a_Socket->GetSocket())
{
return true;
}
@@ -271,8 +278,8 @@ bool cSocketThreads::cSocketThread::HasSocket(cSocket * a_Socket) const bool cSocketThreads::cSocketThread::Start(void)
{
// Create the control socket listener
- m_ControlSocket1 = cSocket::CreateSocket();
- if (!m_ControlSocket1.IsValid())
+ m_ControlSocket2 = cSocket::CreateSocket();
+ if (!m_ControlSocket2.IsValid())
{
LOGERROR("Cannot create a Control socket for a cSocketThread (\"%s\"); continuing, but server may be unreachable from now on.", cSocket::GetLastErrorString().c_str());
return false;
@@ -281,36 +288,42 @@ bool cSocketThreads::cSocketThread::Start(void) Addr.Family = cSocket::ADDRESS_FAMILY_INTERNET;
Addr.Address = cSocket::INTERNET_ADDRESS_LOCALHOST();
Addr.Port = 0; // Any free port is okay
- if (m_ControlSocket1.Bind(Addr) != 0)
+ if (m_ControlSocket2.Bind(Addr) != 0)
{
LOGERROR("Cannot bind a Control socket for a cSocketThread (\"%s\"); continuing, but server may be unreachable from now on.", cSocket::GetLastErrorString().c_str());
- m_ControlSocket1.CloseSocket();
+ m_ControlSocket2.CloseSocket();
return false;
}
- if (m_ControlSocket1.GetPort() == 0)
+ if (m_ControlSocket2.Listen(1) != 0)
+ {
+ LOGERROR("Cannot listen on a Control socket for a cSocketThread (\"%s\"); continuing, but server may be unreachable from now on.", cSocket::GetLastErrorString().c_str());
+ m_ControlSocket2.CloseSocket();
+ return false;
+ }
+ if (m_ControlSocket2.GetPort() == 0)
{
LOGERROR("Cannot determine Control socket port (\"%s\"); conitnuing, but the server may be unreachable from now on.", cSocket::GetLastErrorString().c_str());
- m_ControlSocket1.CloseSocket();
+ m_ControlSocket2.CloseSocket();
return false;
}
// Start the thread
if (!super::Start())
{
- m_ControlSocket1.CloseSocket();
+ m_ControlSocket2.CloseSocket();
return false;
}
// Finish connecting the control socket by accepting connection from the thread's socket
- cSocket tmp = m_ControlSocket1.Accept();
+ cSocket tmp = m_ControlSocket2.Accept();
if (!tmp.IsValid())
{
LOGERROR("Cannot link Control sockets for a cSocketThread (\"%s\"); continuing, but server may be unreachable from now on.", cSocket::GetLastErrorString().c_str());
- m_ControlSocket1.CloseSocket();
+ m_ControlSocket2.CloseSocket();
return false;
}
- m_ControlSocket1.CloseSocket();
- m_ControlSocket1 = tmp;
+ m_ControlSocket2.CloseSocket();
+ m_ControlSocket2 = tmp;
return true;
}
@@ -322,15 +335,16 @@ bool cSocketThreads::cSocketThread::Start(void) void cSocketThreads::cSocketThread::Execute(void)
{
// Connect the "client" part of the Control socket:
+ m_ControlSocket1 = cSocket::CreateSocket();
cSocket::SockAddr_In Addr;
Addr.Family = cSocket::ADDRESS_FAMILY_INTERNET;
Addr.Address = cSocket::INTERNET_ADDRESS_LOCALHOST();
- Addr.Port = m_ControlSocket1.GetPort();
+ Addr.Port = m_ControlSocket2.GetPort();
assert(Addr.Port != 0); // We checked in the Start() method, but let's be sure
- if (m_ControlSocket2.Connect(Addr) != 0)
+ if (m_ControlSocket1.Connect(Addr) != 0)
{
LOGERROR("Cannot connect Control sockets for a cSocketThread (\"%s\"); continuing, but the server may be unreachable from now on.", cSocket::GetLastErrorString().c_str());
- m_ControlSocket1.CloseSocket();
+ m_ControlSocket2.CloseSocket();
return;
}
@@ -339,33 +353,35 @@ void cSocketThreads::cSocketThread::Execute(void) {
// Put all sockets into the Read set:
fd_set fdRead;
- cSocket::xSocket Highest = 0;
+ cSocket::xSocket Highest = m_ControlSocket1.GetSocket();
PrepareSet(&fdRead, Highest);
// Wait for the sockets:
if (select(Highest + 1, &fdRead, NULL, NULL, NULL) == -1)
{
- LOGWARNING("select(R) call failed in cSocketThread: \"%s\"", cSocket::GetLastErrorString().c_str());
- break;
+ LOG("select(R) call failed in cSocketThread: \"%s\"", cSocket::GetLastErrorString().c_str());
+ continue;
}
ReadFromSockets(&fdRead);
// Test sockets for writing:
fd_set fdWrite;
- Highest = 0;
+ Highest = m_ControlSocket1.GetSocket();
PrepareSet(&fdWrite, Highest);
timeval Timeout;
Timeout.tv_sec = 0;
Timeout.tv_usec = 0;
if (select(Highest + 1, NULL, &fdWrite, NULL, &Timeout) == -1)
{
- LOGWARNING("select(W) call failed in cSocketThread: \"%s\"", cSocket::GetLastErrorString().c_str());
- break;
+ LOG("select(W) call failed in cSocketThread: \"%s\"", cSocket::GetLastErrorString().c_str());
+ continue;
}
WriteToSockets(&fdWrite);
+
+ RemoveClosedSockets();
} // while (!mShouldTerminate)
LOG("cSocketThread %p is terminating", this);
@@ -381,7 +397,7 @@ void cSocketThreads::cSocketThread::PrepareSet(fd_set * a_Set, cSocket::xSocket FD_SET(m_ControlSocket1.GetSocket(), a_Set);
cCSLock Lock(m_Parent->m_CS);
- for (int i = m_NumSlots - 1; i > 0; --i)
+ for (int i = m_NumSlots - 1; i >= 0; --i)
{
if (!m_Slots[i].m_Socket->IsValid())
{
@@ -403,8 +419,17 @@ void cSocketThreads::cSocketThread::PrepareSet(fd_set * a_Set, cSocket::xSocket void cSocketThreads::cSocketThread::ReadFromSockets(fd_set * a_Read)
{
// Read on available sockets:
+
+ // Reset Control socket state:
+ if (FD_ISSET(m_ControlSocket1.GetSocket(), a_Read))
+ {
+ char Dummy[128];
+ m_ControlSocket1.Receive(Dummy, sizeof(Dummy), 0);
+ }
+
+ // Read from clients:
cCSLock Lock(m_Parent->m_CS);
- for (int i = m_NumSlots - 1; i > 0; --i)
+ for (int i = m_NumSlots - 1; i >= 0; --i)
{
if (!FD_ISSET(m_Slots[i].m_Socket->GetSocket(), a_Read))
{
@@ -437,8 +462,9 @@ void cSocketThreads::cSocketThread::ReadFromSockets(fd_set * a_Read) void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write)
{
+ // Write to available client sockets:
cCSLock Lock(m_Parent->m_CS);
- for (int i = m_NumSlots - 1; i > 0; --i)
+ for (int i = m_NumSlots - 1; i >= 0; --i)
{
if (!FD_ISSET(m_Slots[i].m_Socket->GetSocket(), a_Write))
{
@@ -464,9 +490,40 @@ void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write) return;
}
m_Slots[i].m_Outgoing.erase(0, Sent);
+
+ // _X: If there's data left, it means the client is not reading fast enough, the server would unnecessarily spin in the main loop with zero actions taken; so signalling is disabled
+ // This means that if there's data left, it will be sent only when there's incoming data or someone queues another packet (for any socket handled by this thread)
+ /*
+ // If there's any data left, signalize the Control socket:
+ if (!m_Slots[i].m_Outgoing.empty())
+ {
+ assert(m_ControlSocket2.IsValid());
+ m_ControlSocket2.Send("q", 1);
+ }
+ */
} // for i - m_Slots[i]
}
+
+void cSocketThreads::cSocketThread::RemoveClosedSockets(void)
+{
+ // Removes sockets that have closed from m_Slots[]
+
+ cCSLock Lock(m_Parent->m_CS);
+ for (int i = m_NumSlots - 1; i >= 0; --i)
+ {
+ if (m_Slots[i].m_Socket->IsValid())
+ {
+ continue;
+ }
+ m_Slots[i] = m_Slots[m_NumSlots - 1];
+ m_NumSlots--;
+ } // for i - m_Slots[]
+}
+
+
+
+
|