summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTiger Wang <ziwei.tiger@hotmail.co.uk>2015-06-22 22:27:13 +0200
committerTiger Wang <ziwei.tiger@hotmail.co.uk>2015-06-22 22:27:13 +0200
commit33fc1474d90ea68df862e5a5c15980a11961bf16 (patch)
tree0db0968d8078cc0fc1fbd164f080f9e0d32f553e
parentReinstate "Chunk queue collapsing" (diff)
downloadcuberite-33fc1474d90ea68df862e5a5c15980a11961bf16.tar
cuberite-33fc1474d90ea68df862e5a5c15980a11961bf16.tar.gz
cuberite-33fc1474d90ea68df862e5a5c15980a11961bf16.tar.bz2
cuberite-33fc1474d90ea68df862e5a5c15980a11961bf16.tar.lz
cuberite-33fc1474d90ea68df862e5a5c15980a11961bf16.tar.xz
cuberite-33fc1474d90ea68df862e5a5c15980a11961bf16.tar.zst
cuberite-33fc1474d90ea68df862e5a5c15980a11961bf16.zip
-rw-r--r--src/ChunkSender.cpp64
-rw-r--r--src/ChunkSender.h23
-rw-r--r--src/OSSupport/Event.cpp56
-rw-r--r--src/OSSupport/Event.h11
-rw-r--r--src/OSSupport/IsThread.cpp4
-rw-r--r--src/OSSupport/IsThread.h15
-rw-r--r--src/World.cpp2
7 files changed, 83 insertions, 92 deletions
diff --git a/src/ChunkSender.cpp b/src/ChunkSender.cpp
index 877aacfc5..de886b497 100644
--- a/src/ChunkSender.cpp
+++ b/src/ChunkSender.cpp
@@ -34,7 +34,7 @@ class cNotifyChunkSender :
a_ChunkX, a_ChunkZ,
[&ChunkSender] (cChunk & a_Chunk) -> bool
{
- ChunkSender.QueueSendChunkTo(a_Chunk.GetPosX(), a_Chunk.GetPosZ(), cChunkSender::PRIORITY_BROADCAST, a_Chunk.GetAllClients());
+ ChunkSender.QueueSendChunkTo(a_Chunk.GetPosX(), a_Chunk.GetPosZ(), cChunkSender::E_CHUNK_PRIORITY_MIDHIGH, a_Chunk.GetAllClients());
return true;
}
);
@@ -51,8 +51,7 @@ public:
cChunkSender::cChunkSender(cWorld & a_World) :
super("ChunkSender"),
- m_World(a_World),
- m_RemoveCount(0)
+ m_World(a_World)
{
}
@@ -163,11 +162,9 @@ void cChunkSender::RemoveClient(cClientHandle * a_Client)
auto && clients = pair.second.m_Clients;
clients.erase(a_Client); // nop for sets that do not contain a_Client
}
-
- m_RemoveCount++;
}
m_evtQueue.Set();
- m_evtRemoved.Wait(); // Wait for removal confirmation
+ m_evtRemoved.Wait(); // Wait for all remaining instances of a_Client to be processed (Execute() makes a copy of m_ChunkInfo)
}
@@ -178,40 +175,32 @@ void cChunkSender::Execute(void)
{
while (!m_ShouldTerminate)
{
- cCSLock Lock(m_CS);
- do
+ m_evtQueue.Wait();
+
{
- int RemoveCount = m_RemoveCount;
- m_RemoveCount = 0;
- cCSUnlock Unlock(Lock);
- for (int i = 0; i < RemoveCount; i++)
- {
- m_evtRemoved.Set(); // Notify that the removed clients are safe to be deleted
- }
- m_evtQueue.Wait();
- if (m_ShouldTerminate)
+ cCSLock Lock(m_CS);
+ while (!m_SendChunks.empty())
{
- return;
+ // Take one from the queue:
+ auto Chunk = m_SendChunks.top().m_Chunk;
+ m_SendChunks.pop();
+ auto itr = m_ChunkInfo.find(Chunk);
+ if (itr == m_ChunkInfo.end())
+ {
+ continue;
+ }
+
+ std::unordered_set<cClientHandle *> clients;
+ std::swap(itr->second.m_Clients, clients);
+ m_ChunkInfo.erase(itr);
+
+ cCSUnlock Unlock(Lock);
+ SendChunk(Chunk.m_ChunkX, Chunk.m_ChunkZ, clients);
}
- } while (m_SendChunks.empty());
-
- // Take one from the queue:
- auto Chunk = m_SendChunks.top().m_Chunk;
- m_SendChunks.pop();
- auto itr = m_ChunkInfo.find(Chunk);
- if (itr == m_ChunkInfo.end())
- {
- continue;
}
-
- std::unordered_set<cClientHandle *> clients;
- std::swap(itr->second.m_Clients, clients);
- m_ChunkInfo.erase(itr);
-
- Lock.Unlock();
- SendChunk(Chunk.m_ChunkX, Chunk.m_ChunkZ, clients);
- } // while (!mShouldTerminate)
+ m_evtRemoved.SetAll(); // Notify all waiting threads that all clients are processed and thus safe to destroy
+ } // while (!m_ShouldTerminate)
}
@@ -220,7 +209,6 @@ void cChunkSender::Execute(void)
void cChunkSender::SendChunk(int a_ChunkX, int a_ChunkZ, std::unordered_set<cClientHandle *> a_Clients)
{
-
// Ask the client if it still wants the chunk:
for (auto itr = a_Clients.begin(); itr != a_Clients.end();)
{
@@ -260,13 +248,13 @@ void cChunkSender::SendChunk(int a_ChunkX, int a_ChunkZ, std::unordered_set<cCli
}
cChunkDataSerializer Data(m_BlockTypes, m_BlockMetas, m_BlockLight, m_BlockSkyLight, m_BiomeMap);
- for (auto client : a_Clients)
+ for (const auto client : a_Clients)
{
// Send:
client->SendChunkData(a_ChunkX, a_ChunkZ, Data);
// Send block-entity packets:
- for (auto Pos : m_BlockEntities)
+ for (const auto & Pos : m_BlockEntities)
{
m_World.SendBlockEntity(Pos.x, Pos.y, Pos.z, *client);
} // for itr - m_Packets[]
diff --git a/src/ChunkSender.h b/src/ChunkSender.h
index b0c48b92b..fd9275794 100644
--- a/src/ChunkSender.h
+++ b/src/ChunkSender.h
@@ -1,4 +1,4 @@
-
+
// ChunkSender.h
// Interfaces to the cChunkSender class representing the thread that waits for chunks becoming ready (loaded / generated) and sends them to clients
@@ -61,8 +61,8 @@ public:
enum eChunkPriority
{
- E_CHUNK_PRIORITY_HIGH = 0,
- PRIORITY_BROADCAST,
+ E_CHUNK_PRIORITY_HIGH = 0,
+ E_CHUNK_PRIORITY_MIDHIGH,
E_CHUNK_PRIORITY_MEDIUM,
E_CHUNK_PRIORITY_LOW,
@@ -86,7 +86,16 @@ protected:
eChunkPriority m_Priority;
cChunkCoords m_Chunk;
- bool operator <(const sChunkQueue & a_Other) const { return this->m_Priority < a_Other.m_Priority; }
+ bool operator <(const sChunkQueue & a_Other) const
+ {
+ /* The Standard Priority Queue sorts from biggest to smallest
+ return true here means you are smaller than the other object, and you get pushed down.
+
+ The priorities go from HIGH (0) to LOW (2), so a smaller priority should mean further up the list
+ therefore, return true (affirm we're "smaller", and get pushed down) only if our priority is bigger than theirs (they're more urgent)
+ */
+ return this->m_Priority > a_Other.m_Priority;
+ }
};
/// Used for sending chunks to specific clients
@@ -107,9 +116,9 @@ protected:
cCriticalSection m_CS;
std::priority_queue<sChunkQueue> m_SendChunks;
std::unordered_map<cChunkCoords, sSendChunk, cChunkCoordsHash> m_ChunkInfo;
- cEvent m_evtQueue; // Set when anything is added to m_ChunksReady
- cEvent m_evtRemoved; // Set when removed clients are safe to be deleted
- int m_RemoveCount; // Number of threads waiting for a client removal (m_evtRemoved needs to be set this many times)
+ cEvent m_evtQueue; // Set when anything is added to m_ChunksReady
+ cEvent m_evtRemoved; // Set when removed clients are safe to be deleted
+
// Data about the chunk that is being sent:
// NOTE that m_BlockData[] is inherited from the cChunkDataCollector
unsigned char m_BiomeMap[cChunkDef::Width * cChunkDef::Width];
diff --git a/src/OSSupport/Event.cpp b/src/OSSupport/Event.cpp
index 38144ead3..4c2adea3c 100644
--- a/src/OSSupport/Event.cpp
+++ b/src/OSSupport/Event.cpp
@@ -13,7 +13,7 @@
cEvent::cEvent(void) :
- m_ShouldWait(true)
+ m_ShouldContinue(false)
{
}
@@ -23,12 +23,11 @@ cEvent::cEvent(void) :
void cEvent::Wait(void)
{
- std::unique_lock<std::mutex> Lock(m_Mutex);
- while (m_ShouldWait)
{
- m_CondVar.wait(Lock);
+ std::unique_lock<std::mutex> Lock(m_Mutex);
+ m_CondVar.wait(Lock, [this](){ return m_ShouldContinue.load(); });
}
- m_ShouldWait = true;
+ m_ShouldContinue = false;
}
@@ -38,33 +37,13 @@ void cEvent::Wait(void)
bool cEvent::Wait(unsigned a_TimeoutMSec)
{
auto dst = std::chrono::system_clock::now() + std::chrono::milliseconds(a_TimeoutMSec);
- std::unique_lock<std::mutex> Lock(m_Mutex); // We assume that this lock is acquired without much delay - we are the only user of the mutex
- while (m_ShouldWait && (std::chrono::system_clock::now() <= dst))
+ bool Result;
{
- switch (m_CondVar.wait_until(Lock, dst))
- {
- case std::cv_status::no_timeout:
- {
- // The wait was successful, check for spurious wakeup:
- if (!m_ShouldWait)
- {
- m_ShouldWait = true;
- return true;
- }
- // This was a spurious wakeup, wait again:
- continue;
- }
-
- case std::cv_status::timeout:
- {
- // The wait timed out, return failure:
- return false;
- }
- } // switch (wait_until())
- } // while (m_ShouldWait && not timeout)
-
- // The wait timed out in the while condition:
- return false;
+ std::unique_lock<std::mutex> Lock(m_Mutex); // We assume that this lock is acquired without much delay - we are the only user of the mutex
+ Result = m_CondVar.wait_until(Lock, dst, [this](){ return m_ShouldContinue.load(); });
+ }
+ m_ShouldContinue = false;
+ return Result;
}
@@ -73,13 +52,20 @@ bool cEvent::Wait(unsigned a_TimeoutMSec)
void cEvent::Set(void)
{
- {
- std::unique_lock<std::mutex> Lock(m_Mutex);
- m_ShouldWait = false;
- }
+ m_ShouldContinue = true;
m_CondVar.notify_one();
}
+void cEvent::SetAll(void)
+{
+ m_ShouldContinue = true;
+ m_CondVar.notify_all();
+}
+
+
+
+
+
diff --git a/src/OSSupport/Event.h b/src/OSSupport/Event.h
index 572388a3f..2c58ba485 100644
--- a/src/OSSupport/Event.h
+++ b/src/OSSupport/Event.h
@@ -12,6 +12,7 @@
#include <mutex>
#include <condition_variable>
+#include <atomic>
@@ -28,7 +29,11 @@ public:
/** Sets the event - releases one thread that has been waiting in Wait().
If there was no thread waiting, the next call to Wait() will not block. */
- void Set (void);
+ void Set(void);
+
+ /** Sets the event - releases all threads that have been waiting in Wait().
+ If there was no thread waiting, the next call to Wait() will not block. */
+ void SetAll(void);
/** Waits for the event until either it is signalled, or the (relative) timeout is passed.
Returns true if the event was signalled, false if the timeout was hit or there was an error. */
@@ -37,9 +42,9 @@ public:
private:
/** Used for checking for spurious wakeups. */
- bool m_ShouldWait;
+ std::atomic<bool> m_ShouldContinue;
- /** Mutex protecting m_ShouldWait from multithreaded access. */
+ /** Mutex protecting m_ShouldContinue from multithreaded access. */
std::mutex m_Mutex;
/** The condition variable used as the Event. */
diff --git a/src/OSSupport/IsThread.cpp b/src/OSSupport/IsThread.cpp
index 55e96b622..e295d5f25 100644
--- a/src/OSSupport/IsThread.cpp
+++ b/src/OSSupport/IsThread.cpp
@@ -134,9 +134,9 @@ bool cIsThread::Wait(void)
m_Thread.join();
return true;
}
- catch (std::system_error & a_Exception)
+ catch (const std::system_error & a_Exception)
{
- LOGERROR("cIsThread::Wait error %i: could not join thread %s; %s", a_Exception.code().value(), m_ThreadName.c_str(), a_Exception.code().message().c_str());
+ LOGERROR("%s error %i: could not join thread %s; %s", __FUNCTION__, a_Exception.code().value(), m_ThreadName.c_str(), a_Exception.code().message().c_str());
return false;
}
}
diff --git a/src/OSSupport/IsThread.h b/src/OSSupport/IsThread.h
index f642c8724..fa6813cd7 100644
--- a/src/OSSupport/IsThread.h
+++ b/src/OSSupport/IsThread.h
@@ -32,10 +32,6 @@ protected:
/** The overriden Execute() method should check this value periodically and terminate if this is true. */
volatile bool m_ShouldTerminate;
-private:
- /** Wrapper for Execute() that waits for the initialization event, to prevent race conditions in thread initialization. */
- void DoExecute(void);
-
public:
cIsThread(const AString & a_ThreadName);
virtual ~cIsThread();
@@ -51,14 +47,21 @@ public:
/** Returns true if the thread calling this function is the thread contained within this object. */
bool IsCurrentThread(void) const { return std::this_thread::get_id() == m_Thread.get_id(); }
+
+private:
-protected:
+ /** The name of the thread, used to aid debugging in IDEs which support named threads */
AString m_ThreadName;
+
+ /** The thread object which holds the created thread for later manipulation */
std::thread m_Thread;
/** The event that is used to wait with the thread's execution until the thread object is fully initialized.
- This prevents the IsCurrentThread() call to fail because of a race-condition. */
+ This prevents the IsCurrentThread() call to fail because of a race-condition where the thread starts before m_Thread has been fully assigned. */
cEvent m_evtStart;
+
+ /** Wrapper for Execute() that waits for the initialization event, to prevent race conditions in thread initialization. */
+ void DoExecute(void);
} ;
diff --git a/src/World.cpp b/src/World.cpp
index 13b01c9e7..0ae115001 100644
--- a/src/World.cpp
+++ b/src/World.cpp
@@ -2487,7 +2487,7 @@ void cWorld::SetChunkData(cSetChunkData & a_SetChunkData)
ChunkSender.QueueSendChunkTo(
a_Chunk.GetPosX(),
a_Chunk.GetPosZ(),
- cChunkSender::PRIORITY_BROADCAST,
+ cChunkSender::E_CHUNK_PRIORITY_MEDIUM,
a_Chunk.GetAllClients()
);
}