From 0e33c919dd5c954e0e9d266924c1650237bb95a1 Mon Sep 17 00:00:00 2001 From: "madmaxoft@gmail.com" Date: Sun, 26 Feb 2012 00:36:51 +0000 Subject: Using cSocketThreads for client outgoing packets. Unfortunately had to put in one intermediate thread (cServer::cNotifyWriteThread) to avoid deadlocks. Still, seems we have a proper multithreading for clients and no more per-client threads, yay :) git-svn-id: http://mc-server.googlecode.com/svn/trunk@328 0a769ca7-a7f5-676a-18bf-c427514a06d6 --- source/cClientHandle.cpp | 188 ++++++++++++++++++----------------------------- source/cClientHandle.h | 11 +-- source/cServer.cpp | 160 ++++++++++++++++++++++++++++++++++------ source/cServer.h | 44 ++++++++++- 4 files changed, 253 insertions(+), 150 deletions(-) (limited to 'source') diff --git a/source/cClientHandle.cpp b/source/cClientHandle.cpp index 9eed31a3a..41d2fda6c 100644 --- a/source/cClientHandle.cpp +++ b/source/cClientHandle.cpp @@ -77,7 +77,8 @@ case 2: (z)-=(amount); break; case 3: (z)+=(amount); break;\ case 4: (x)-=(amount); break; case 5: (x)+=(amount); break; } -#define MAX_SEMAPHORES (2000) +/// If the number of queued outgoing packets reaches this, the client will be kicked +#define MAX_OUTGOING_PACKETS 2000 @@ -89,9 +90,7 @@ cClientHandle::cClientHandle(const cSocket & a_Socket, int a_ViewDistance) : m_ViewDistance(a_ViewDistance) , m_ProtocolVersion(23) - , m_pSendThread(NULL) , m_Socket(a_Socket) - , m_Semaphore(MAX_SEMAPHORES) , m_bDestroyed(false) , m_Player(NULL) , m_bKicking(false) @@ -135,11 +134,6 @@ cClientHandle::cClientHandle(const cSocket & a_Socket, int a_ViewDistance) m_PacketMap[E_RESPAWN] = new cPacket_Respawn; m_PacketMap[E_PING] = new cPacket_Ping; - ////////////////////////////////////////////////////////////////////////// - m_pSendThread = new cThread(SendThread, this, "cClientHandle::SendThread"); - m_pSendThread->Start (true); - ////////////////////////////////////////////////////////////////////////// - LOG("New ClientHandle created at %p", this); } @@ -149,7 +143,7 @@ cClientHandle::cClientHandle(const cSocket & a_Socket, int a_ViewDistance) cClientHandle::~cClientHandle() { - LOG("Deleting client \"%s\"", GetUsername().c_str()); + LOG("Deleting client \"%s\" at %p", GetUsername().c_str(), this); // Remove from cSocketThreads, we're not to be called anymore: cRoot::Get()->GetServer()->ClientDestroying(this); @@ -173,20 +167,13 @@ cClientHandle::~cClientHandle() } } - // First stop sending thread - m_bKeepThreadGoing = false; - if (m_Socket.IsValid()) { cPacket_Disconnect Disconnect; Disconnect.m_Reason = "Server shut down? Kthnxbai"; m_Socket.Send(&Disconnect); - m_Socket.CloseSocket(); } - - m_Semaphore.Signal(); - delete m_pSendThread; - + if (m_Player != NULL) { m_Player->SetClientHandle(NULL); @@ -198,19 +185,31 @@ cClientHandle::~cClientHandle() delete m_PacketMap[i]; } + // Queue all remaining outgoing packets to cSocketThreads: { - cCSLock Lock(m_SendCriticalSection); + cCSLock Lock(m_CSPackets); for (PacketList::iterator itr = m_PendingNrmSendPackets.begin(); itr != m_PendingNrmSendPackets.end(); ++itr) { + AString Data; + (*itr)->Serialize(Data); + cRoot::Get()->GetServer()->WriteToClient(&m_Socket, Data); delete *itr; } + m_PendingNrmSendPackets.clear(); for (PacketList::iterator itr = m_PendingLowSendPackets.begin(); itr != m_PendingLowSendPackets.end(); ++itr) { + AString Data; + (*itr)->Serialize(Data); + cRoot::Get()->GetServer()->WriteToClient(&m_Socket, Data); delete *itr; } + m_PendingLowSendPackets.clear(); } - LOG("ClientHandle at %p destroyed", this); + // Queue the socket to close as soon as it sends all outgoing data: + cRoot::Get()->GetServer()->QueueClientClose(&m_Socket); + + LOG("ClientHandle at %p deleted", this); } @@ -295,8 +294,8 @@ void cClientHandle::Authenticate(void) Send(Health); m_Player->Initialize(World); - m_State = csDownloadingWorld; StreamChunks(); + m_State = csDownloadingWorld; } @@ -305,7 +304,7 @@ void cClientHandle::Authenticate(void) void cClientHandle::StreamChunks(void) { - if (m_State < csDownloadingWorld) + if (m_State < csAuthenticating) { return; } @@ -323,7 +322,7 @@ void cClientHandle::StreamChunks(void) m_LastStreamedChunkZ = ChunkPosZ; // DEBUG: - LOGINFO("Streaming chunks centered on [%d, %d]", ChunkPosX, ChunkPosZ); + LOGINFO("Streaming chunks centered on [%d, %d], view distance %d", ChunkPosX, ChunkPosZ, m_ViewDistance); cWorld * World = m_Player->GetWorld(); ASSERT(World != NULL); @@ -1645,7 +1644,10 @@ void cClientHandle::Tick(float a_Dt) if (m_State >= csDownloadingWorld) { cWorld * World = m_Player->GetWorld(); + cCSLock Lock(m_CSChunkLists); + + // Send the chunks: int NumSent = 0; for (cChunkCoordsList::iterator itr = m_ChunksToSend.begin(); itr != m_ChunksToSend.end();) { @@ -1662,7 +1664,8 @@ void cClientHandle::Tick(float a_Dt) break; } } // for itr - m_ChunksToSend[] - + Lock.Unlock(); + // Check even if we didn't send anything - a chunk may have sent a notification that we'd miss otherwise CheckIfWorldDownloaded(); } @@ -1707,8 +1710,7 @@ void cClientHandle::Send(const cPacket * a_Packet, ENUM_PRIORITY a_Priority /* = } } - bool bSignalSemaphore = true; - cCSLock Lock(m_SendCriticalSection); + cCSLock Lock(m_CSPackets); if (a_Priority == E_PRIORITY_NORMAL) { if (a_Packet->m_PacketID == E_REL_ENT_MOVE_LOOK) @@ -1727,7 +1729,6 @@ void cClientHandle::Send(const cPacket * a_Packet, ENUM_PRIORITY a_Priority /* = { Packets.erase(itr); bBreak = true; - bSignalSemaphore = false; // Because 1 packet is removed, semaphore count is the same delete PacketData; break; } @@ -1747,10 +1748,9 @@ void cClientHandle::Send(const cPacket * a_Packet, ENUM_PRIORITY a_Priority /* = m_PendingLowSendPackets.push_back(a_Packet->Clone()); } Lock.Unlock(); - if (bSignalSemaphore) - { - m_Semaphore.Signal(); - } + + // Notify SocketThreads that we have something to write: + cRoot::Get()->GetServer()->NotifyClientWrite(this); } @@ -1797,90 +1797,6 @@ void cClientHandle::SendConfirmPosition(void) -void cClientHandle::SendThread(void *lpParam) -{ - cClientHandle* self = (cClientHandle*)lpParam; - PacketList & NrmSendPackets = self->m_PendingNrmSendPackets; - PacketList & LowSendPackets = self->m_PendingLowSendPackets; - - - while (self->m_bKeepThreadGoing && self->m_Socket.IsValid()) - { - self->m_Semaphore.Wait(); - cCSLock Lock(self->m_SendCriticalSection); - if (NrmSendPackets.size() + LowSendPackets.size() > MAX_SEMAPHORES) - { - LOGERROR("ERROR: Too many packets in queue for player %s !!", self->m_Username.c_str()); - cPacket_Disconnect DC("Too many packets in queue."); - self->m_Socket.Send(DC); - - cSleep::MilliSleep(1000); // Give packet some time to be received - - Lock.Unlock(); - self->Destroy(); - break; - } - - if (NrmSendPackets.size() == 0 && LowSendPackets.size() == 0) - { - ASSERT(!self->m_bKeepThreadGoing); - if (self->m_bKeepThreadGoing) - { - LOGERROR("ERROR: Semaphore was signaled while no packets to send"); - } - continue; - } - if (NrmSendPackets.size() > MAX_SEMAPHORES / 2) - { - LOGINFO("Pending packets: %i Last: 0x%02x", NrmSendPackets.size(), (*NrmSendPackets.rbegin())->m_PacketID); - } - - cPacket * Packet = NULL; - if (!NrmSendPackets.empty()) - { - Packet = *NrmSendPackets.begin(); - NrmSendPackets.erase(NrmSendPackets.begin()); - } - else if (!LowSendPackets.empty()) - { - Packet = *LowSendPackets.begin(); - LowSendPackets.erase(LowSendPackets.begin()); - } - Lock.Unlock(); - - if (!self->m_Socket.IsValid()) - { - break; - } - - // LOG("Sending packet 0x%02x to \"%s\" (\"%s\")", Packet->m_PacketID, self->m_Socket.GetIPString().c_str(), self->m_Username.c_str()); - - bool bSuccess = self->m_Socket.Send(Packet); - - if (!bSuccess) - { - LOGERROR("ERROR: While sending packet 0x%02x to client \"%s\"", Packet->m_PacketID, self->m_Username.c_str()); - delete Packet; - self->Destroy(); - break; - } - delete Packet; - - if (self->m_bKicking && (NrmSendPackets.size() + LowSendPackets.size() == 0)) // Disconnect player after all packets have been sent - { - cSleep::MilliSleep(1000); // Give all packets some time to be received - self->Destroy(); - break; - } - } - - return; -} - - - - - const AString & cClientHandle::GetUsername(void) const { return m_Username; @@ -1967,7 +1883,49 @@ void cClientHandle::GetOutgoingData(AString & a_Data) { // Data can be sent to client - // TODO + cCSLock Lock(m_CSPackets); + if (m_PendingNrmSendPackets.size() + m_PendingLowSendPackets.size() > MAX_OUTGOING_PACKETS) + { + LOGERROR("ERROR: Too many packets in queue for player %s !!", m_Username.c_str()); + cPacket_Disconnect DC("Too many packets in queue."); + m_Socket.Send(DC); + Lock.Unlock(); + Destroy(); + return; + } + + if ((m_PendingNrmSendPackets.size() == 0) && (m_PendingLowSendPackets.size() == 0)) + { + return; + } + + if (m_PendingNrmSendPackets.size() > MAX_OUTGOING_PACKETS / 2) + { + LOGINFO("Suspiciously many pending packets: %i; client \"%s\", LastType: 0x%02x", m_PendingNrmSendPackets.size(), m_Username.c_str(), (*m_PendingNrmSendPackets.rbegin())->m_PacketID); + } + + AString Data; + if (!m_PendingNrmSendPackets.empty()) + { + m_PendingNrmSendPackets.front()->Serialize(Data); + delete m_PendingNrmSendPackets.front(); + m_PendingNrmSendPackets.erase(m_PendingNrmSendPackets.begin()); + } + else if (!m_PendingLowSendPackets.empty()) + { + m_PendingLowSendPackets.front()->Serialize(Data); + delete m_PendingLowSendPackets.front(); + m_PendingLowSendPackets.erase(m_PendingLowSendPackets.begin()); + } + Lock.Unlock(); + + a_Data.append(Data); + + // Disconnect player after all packets have been sent + if (m_bKicking && (m_PendingNrmSendPackets.size() + m_PendingLowSendPackets.size() == 0)) + { + Destroy(); + } } diff --git a/source/cClientHandle.h b/source/cClientHandle.h index 6b567a88b..3fcfef716 100644 --- a/source/cClientHandle.h +++ b/source/cClientHandle.h @@ -98,8 +98,6 @@ public: void Send(const cPacket & a_Packet, ENUM_PRIORITY a_Priority = E_PRIORITY_NORMAL) { Send(&a_Packet, a_Priority); } void Send(const cPacket * a_Packet, ENUM_PRIORITY a_Priority = E_PRIORITY_NORMAL); - static void SendThread( void *lpParam ); - const AString & GetUsername(void) const; inline short GetPing() { return m_Ping; } @@ -118,20 +116,17 @@ private: AString m_ReceivedData; // Accumulator for the data received from the socket, waiting to be parsed; accessed from the cSocketThreads' thread only! - PacketList m_PendingNrmSendPackets; - PacketList m_PendingLowSendPackets; + cCriticalSection m_CSPackets; + PacketList m_PendingNrmSendPackets; + PacketList m_PendingLowSendPackets; cCriticalSection m_CSChunkLists; cChunkCoordsList m_LoadedChunks; // Chunks that the player belongs to cChunkCoordsList m_ChunksToSend; // Chunks that need to be sent to the player (queued because they weren't generated yet or there's not enough time to send them) - cThread * m_pSendThread; - cSocket m_Socket; cCriticalSection m_CriticalSection; - cCriticalSection m_SendCriticalSection; - cSemaphore m_Semaphore; Vector3d m_ConfirmPosition; diff --git a/source/cServer.cpp b/source/cServer.cpp index 97bcffaae..b1dab21e5 100644 --- a/source/cServer.cpp +++ b/source/cServer.cpp @@ -65,8 +65,6 @@ struct cServer::sServerState cThread* pListenThread; bool bStopListenThread; cThread* pTickThread; bool bStopTickThread; - ClientList Clients; - cEvent RestartEvent; std::string ServerID; }; @@ -109,6 +107,33 @@ void cServer::ClientDestroying(const cClientHandle * a_Client) +void cServer::NotifyClientWrite(const cClientHandle * a_Client) +{ + m_NotifyWriteThread.NotifyClientWrite(a_Client); +} + + + + + +void cServer::WriteToClient(const cSocket * a_Socket, const AString & a_Data) +{ + m_SocketThreads.Write(a_Socket, a_Data); +} + + + + + +void cServer::QueueClientClose(const cSocket * a_Socket) +{ + m_SocketThreads.QueueClose(a_Socket); +} + + + + + bool cServer::InitServer( int a_Port ) { if( m_bIsConnected ) @@ -209,6 +234,9 @@ bool cServer::InitServer( int a_Port ) LOGINFO("Setting default viewdistance to the maximum of %d", m_ClientViewDistance); } } + + m_NotifyWriteThread.Start(this); + return true; } @@ -250,9 +278,13 @@ cServer::~cServer() // TODO - Need to modify this or something, so it broadcasts to all worlds? And move this to cWorld? void cServer::Broadcast( const cPacket * a_Packet, cClientHandle* a_Exclude /* = 0 */ ) { - for( ClientList::iterator itr = m_pState->Clients.begin(); itr != m_pState->Clients.end(); ++itr) + cCSLock Lock(m_CSClients); + for( ClientList::iterator itr = m_Clients.begin(); itr != m_Clients.end(); ++itr) { - if( *itr == a_Exclude || !(*itr)->IsLoggedIn() ) continue; + if ((*itr == a_Exclude) || !(*itr)->IsLoggedIn()) + { + continue; + } (*itr)->Send( a_Packet ); } } @@ -289,7 +321,9 @@ void cServer::StartListenClient() delete NewHandle; return; } - m_pState->Clients.push_back( NewHandle ); // TODO - lock list + + cCSLock Lock(m_CSClients); + m_Clients.push_back( NewHandle ); } @@ -310,21 +344,21 @@ bool cServer::Tick(float a_Dt) cRoot::Get()->TickWorlds( a_Dt ); // TODO - Maybe give all worlds their own thread? - //World->LockClientHandle(); // TODO - Lock client list - for( ClientList::iterator itr = m_pState->Clients.begin(); itr != m_pState->Clients.end();) { - if( (*itr)->IsDestroyed() ) + cCSLock Lock(m_CSClients); + for (cClientHandleList::iterator itr = m_Clients.begin(); itr != m_Clients.end();) { - cClientHandle* RemoveMe = *itr; + if ((*itr)->IsDestroyed()) + { + cClientHandle* RemoveMe = *itr; + itr = m_Clients.erase(itr); + delete RemoveMe; + continue; + } + (*itr)->Tick(a_Dt); ++itr; - m_pState->Clients.remove( RemoveMe ); - delete RemoveMe; - continue; } - (*itr)->Tick(a_Dt); - ++itr; } - //World->UnlockClientHandle(); cRoot::Get()->GetPluginManager()->Tick( a_Dt ); @@ -550,14 +584,12 @@ void cServer::Shutdown() cRoot::Get()->GetWorld()->SaveAllChunks(); - //cWorld* World = cRoot::Get()->GetWorld(); - //World->LockClientHandle(); // TODO - Lock client list - for( ClientList::iterator itr = m_pState->Clients.begin(); itr != m_pState->Clients.end(); ++itr ) + cCSLock Lock(m_CSClients); + for( ClientList::iterator itr = m_Clients.begin(); itr != m_Clients.end(); ++itr ) { delete *itr; } - m_pState->Clients.clear(); - //World->UnlockClientHandle(); + m_Clients.clear(); } @@ -575,13 +607,14 @@ const AString & cServer::GetServerID(void) const void cServer::KickUser(const AString & iUserName, const AString & iReason) { - for (ClientList::iterator itr = m_pState->Clients.begin(); itr != m_pState->Clients.end(); ++itr) + cCSLock Lock(m_CSClients); + for (ClientList::iterator itr = m_Clients.begin(); itr != m_Clients.end(); ++itr) { if ((*itr)->GetUsername() == iUserName) { (*itr)->Kick(iReason); } - } // for itr - m_pState->Clients[] + } // for itr - m_Clients[] } @@ -590,13 +623,92 @@ void cServer::KickUser(const AString & iUserName, const AString & iReason) void cServer::AuthenticateUser(const AString & iUserName) { - for (ClientList::iterator itr = m_pState->Clients.begin(); itr != m_pState->Clients.end(); ++itr) + cCSLock Lock(m_CSClients); + for (ClientList::iterator itr = m_Clients.begin(); itr != m_Clients.end(); ++itr) { if ((*itr)->GetUsername() == iUserName) { (*itr)->Authenticate(); } - } // for itr - m_pState->Clients[] + } // for itr - m_Clients[] +} + + + + + +/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// cServer::cClientPacketThread: + +cServer::cNotifyWriteThread::cNotifyWriteThread(void) : + super("ClientPacketThread"), + m_Server(NULL) +{ +} + + + + + +cServer::cNotifyWriteThread::~cNotifyWriteThread() +{ + mShouldTerminate = true; + m_Event.Set(); + Wait(); +} + + + + + +bool cServer::cNotifyWriteThread::Start(cServer * a_Server) +{ + m_Server = a_Server; + return super::Start(); +} + + + + + +void cServer::cNotifyWriteThread::Execute(void) +{ + cClientHandleList Clients; + while (!mShouldTerminate) + { + cCSLock Lock(m_CS); + while (m_Clients.size() == 0) + { + cCSUnlock Unlock(Lock); + m_Event.Wait(); + if (mShouldTerminate) + { + return; + } + } + + // Copy the clients to notify and unlock the CS: + Clients.splice(Clients.begin(), m_Clients); + Lock.Unlock(); + + for (cClientHandleList::iterator itr = Clients.begin(); itr != Clients.end(); ++itr) + { + m_Server->m_SocketThreads.NotifyWrite(*itr); + } // for itr - Clients[] + Clients.clear(); + } // while (!mShouldTerminate) +} + + + + + +void cServer::cNotifyWriteThread::NotifyClientWrite(const cClientHandle * a_Client) +{ + cCSLock Lock(m_CS); + m_Clients.remove(const_cast(a_Client)); // Put it there only once + m_Clients.push_back(const_cast(a_Client)); + m_Event.Set(); } diff --git a/source/cServer.h b/source/cServer.h index 3e7fbf094..200f2bb1f 100644 --- a/source/cServer.h +++ b/source/cServer.h @@ -21,6 +21,8 @@ class cPlayer; class cClientHandle; class cPacket; +typedef std::list cClientHandleList; + @@ -58,16 +60,48 @@ public: //tolua_export void ClientDestroying(const cClientHandle * a_Client); // Called by cClientHandle::Destroy(); removes the client from m_SocketThreads + void NotifyClientWrite(const cClientHandle * a_Client); // Notifies m_SocketThreads that client has something to be written + + void WriteToClient(const cSocket * a_Socket, const AString & a_Data); // Queues outgoing data for the socket through m_SocketThreads + + void QueueClientClose(const cSocket * a_Socket); // Queues the socket to close when all its outgoing data is sent + private: friend class cRoot; // so cRoot can create and destroy cServer - cServer(); - ~cServer(); - + /// When NotifyClientWrite() is called, it is queued for this thread to process (to avoid deadlocks between cSocketThreads, cClientHandle and cChunkMap) + class cNotifyWriteThread : + public cIsThread + { + typedef cIsThread super; + + cEvent m_Event; // Set when m_Clients gets appended + cServer * m_Server; + + cCriticalSection m_CS; + cClientHandleList m_Clients; + + virtual void Execute(void); + + public: + + cNotifyWriteThread(void); + ~cNotifyWriteThread(); + + bool Start(cServer * a_Server); + + void NotifyClientWrite(const cClientHandle * a_Client); + } ; + struct sServerState; sServerState* m_pState; + cNotifyWriteThread m_NotifyWriteThread; + + cCriticalSection m_CSClients; // Locks client list + cClientHandleList m_Clients; // Clients that are connected to the server + cSocketThreads m_SocketThreads; int m_ClientViewDistance; // The default view distance for clients; settable in Settings.ini @@ -80,6 +114,10 @@ private: int m_iServerPort; bool m_bRestarting; + + cServer(); + ~cServer(); + }; //tolua_export -- cgit v1.2.3