Merge pull request #8015 from Robyt3/Engine-Jobs-Abort

Allow background jobs to be aborted, refactoring
This commit is contained in:
Dennis Felsing 2024-02-25 22:01:03 +00:00 committed by GitHub
commit ac61d8f88a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
18 changed files with 396 additions and 114 deletions

View file

@ -333,7 +333,9 @@ public:
virtual void OnDummyDisconnect() = 0; virtual void OnDummyDisconnect() = 0;
virtual void DummyResetInput() = 0; virtual void DummyResetInput() = 0;
virtual void Echo(const char *pString) = 0; virtual void Echo(const char *pString) = 0;
virtual bool CanDisplayWarning() const = 0; virtual bool CanDisplayWarning() const = 0;
virtual void RenderShutdownMessage() = 0;
virtual CNetObjHandler *GetNetObjHandler() = 0; virtual CNetObjHandler *GetNetObjHandler() = 0;
}; };

View file

@ -2607,7 +2607,7 @@ void CClient::Update()
if(!m_EditJobs.empty()) if(!m_EditJobs.empty())
{ {
std::shared_ptr<CDemoEdit> pJob = m_EditJobs.front(); std::shared_ptr<CDemoEdit> pJob = m_EditJobs.front();
if(pJob->Status() == IJob::STATE_DONE) if(pJob->State() == IJob::STATE_DONE)
{ {
char aBuf[IO_MAX_PATH_LENGTH + 64]; char aBuf[IO_MAX_PATH_LENGTH + 64];
str_format(aBuf, sizeof(aBuf), "Successfully saved the replay to %s!", pJob->Destination()); str_format(aBuf, sizeof(aBuf), "Successfully saved the replay to %s!", pJob->Destination());
@ -3050,6 +3050,9 @@ void CClient::Run()
m_GlobalTime = (time_get() - m_GlobalStartTime) / (float)time_freq(); m_GlobalTime = (time_get() - m_GlobalStartTime) / (float)time_freq();
} }
GameClient()->RenderShutdownMessage();
Disconnect();
if(!m_pConfigManager->Save()) if(!m_pConfigManager->Save())
{ {
char aError[128]; char aError[128];
@ -3059,15 +3062,16 @@ void CClient::Run()
m_Fifo.Shutdown(); m_Fifo.Shutdown();
m_Http.Shutdown(); m_Http.Shutdown();
GameClient()->OnShutdown(); Engine()->ShutdownJobs();
Disconnect();
// close socket GameClient()->RenderShutdownMessage();
GameClient()->OnShutdown();
delete m_pEditor;
// close sockets
for(unsigned int i = 0; i < std::size(m_aNetClient); i++) for(unsigned int i = 0; i < std::size(m_aNetClient); i++)
m_aNetClient[i].Close(); m_aNetClient[i].Close();
delete m_pEditor;
// shutdown text render while graphics are still available // shutdown text render while graphics are still available
m_pTextRender->Shutdown(); m_pTextRender->Shutdown();
} }

View file

@ -34,7 +34,7 @@ public:
bool GetBestUrl(const char **pBestUrl) const; bool GetBestUrl(const char **pBestUrl) const;
void Reset(); void Reset();
bool IsRefreshing() const { return m_pJob && m_pJob->Status() != IJob::STATE_DONE; } bool IsRefreshing() const { return m_pJob && !m_pJob->Done(); }
void Refresh(); void Refresh();
private: private:
@ -60,8 +60,12 @@ private:
public: public:
CJob(CChooseMaster *pParent, std::shared_ptr<CData> pData) : CJob(CChooseMaster *pParent, std::shared_ptr<CData> pData) :
m_pParent(pParent), m_pData(std::move(pData)) {} m_pParent(pParent),
void Abort() REQUIRES(!m_Lock); m_pData(std::move(pData))
{
Abortable(true);
}
bool Abort() override REQUIRES(!m_Lock);
}; };
IEngine *m_pEngine; IEngine *m_pEngine;
@ -130,12 +134,20 @@ void CChooseMaster::Reset()
void CChooseMaster::Refresh() void CChooseMaster::Refresh()
{ {
if(m_pJob == nullptr || m_pJob->Status() == IJob::STATE_DONE) if(m_pJob == nullptr || m_pJob->State() == IJob::STATE_DONE)
m_pEngine->AddJob(m_pJob = std::make_shared<CJob>(this, m_pData)); {
m_pJob = std::make_shared<CJob>(this, m_pData);
m_pEngine->AddJob(m_pJob);
}
} }
void CChooseMaster::CJob::Abort() bool CChooseMaster::CJob::Abort()
{ {
if(!IJob::Abort())
{
return false;
}
CLockScope ls(m_Lock); CLockScope ls(m_Lock);
if(m_pHead != nullptr) if(m_pHead != nullptr)
{ {
@ -146,6 +158,8 @@ void CChooseMaster::CJob::Abort()
{ {
m_pGet->Abort(); m_pGet->Abort();
} }
return true;
} }
void CChooseMaster::CJob::Run() void CChooseMaster::CJob::Run()
@ -184,7 +198,7 @@ void CChooseMaster::CJob::Run()
m_pParent->m_pHttp->Run(pHead); m_pParent->m_pHttp->Run(pHead);
pHead->Wait(); pHead->Wait();
if(pHead->State() == EHttpState::ABORTED) if(pHead->State() == EHttpState::ABORTED || State() == IJob::STATE_ABORTED)
{ {
dbg_msg("serverbrowse_http", "master chooser aborted"); dbg_msg("serverbrowse_http", "master chooser aborted");
return; return;
@ -207,7 +221,7 @@ void CChooseMaster::CJob::Run()
pGet->Wait(); pGet->Wait();
auto Time = std::chrono::duration_cast<std::chrono::milliseconds>(time_get_nanoseconds() - StartTime); auto Time = std::chrono::duration_cast<std::chrono::milliseconds>(time_get_nanoseconds() - StartTime);
if(pGet->State() == EHttpState::ABORTED) if(pGet->State() == EHttpState::ABORTED || State() == IJob::STATE_ABORTED)
{ {
dbg_msg("serverbrowse_http", "master chooser aborted"); dbg_msg("serverbrowse_http", "master chooser aborted");
return; return;

View file

@ -4,25 +4,24 @@
#define ENGINE_ENGINE_H #define ENGINE_ENGINE_H
#include "kernel.h" #include "kernel.h"
#include <engine/shared/jobs.h>
#include <memory>
class CFutureLogger; class CFutureLogger;
class IJob;
class ILogger; class ILogger;
class IEngine : public IInterface class IEngine : public IInterface
{ {
MACRO_INTERFACE("engine") MACRO_INTERFACE("engine")
protected:
class CJobPool m_JobPool;
public: public:
virtual ~IEngine() = default; virtual ~IEngine() = default;
virtual void Init() = 0; virtual void Init() = 0;
virtual void AddJob(std::shared_ptr<IJob> pJob) = 0; virtual void AddJob(std::shared_ptr<IJob> pJob) = 0;
virtual void ShutdownJobs() = 0;
virtual void SetAdditionalLogger(std::shared_ptr<ILogger> &&pLogger) = 0; virtual void SetAdditionalLogger(std::shared_ptr<ILogger> &&pLogger) = 0;
static void RunJobBlocking(IJob *pJob);
}; };
extern IEngine *CreateEngine(const char *pAppname, std::shared_ptr<CFutureLogger> pFutureLogger, int Jobs); extern IEngine *CreateEngine(const char *pAppname, std::shared_ptr<CFutureLogger> pFutureLogger, int Jobs);

View file

@ -1117,8 +1117,8 @@ void CServer::InitDnsbl(int ClientID)
str_format(aBuf, sizeof(aBuf), "%s.%d.%d.%d.%d.%s", Config()->m_SvDnsblKey, Addr.ip[3], Addr.ip[2], Addr.ip[1], Addr.ip[0], Config()->m_SvDnsblHost); str_format(aBuf, sizeof(aBuf), "%s.%d.%d.%d.%d.%s", Config()->m_SvDnsblKey, Addr.ip[3], Addr.ip[2], Addr.ip[1], Addr.ip[0], Config()->m_SvDnsblHost);
} }
IEngine *pEngine = Kernel()->RequestInterface<IEngine>(); m_aClients[ClientID].m_pDnsblLookup = std::make_shared<CHostLookup>(aBuf, NETTYPE_IPV4);
pEngine->AddJob(m_aClients[ClientID].m_pDnsblLookup = std::make_shared<CHostLookup>(aBuf, NETTYPE_IPV4)); Engine()->AddJob(m_aClients[ClientID].m_pDnsblLookup);
m_aClients[ClientID].m_DnsblState = CClient::DNSBL_STATE_PENDING; m_aClients[ClientID].m_DnsblState = CClient::DNSBL_STATE_PENDING;
} }
@ -2706,9 +2706,14 @@ int CServer::Run()
m_UPnP.Open(BindAddr); m_UPnP.Open(BindAddr);
#endif #endif
IEngine *pEngine = Kernel()->RequestInterface<IEngine>(); if(!m_Http.Init(std::chrono::seconds{2}))
IHttp *pHttp = Kernel()->RequestInterface<IHttp>(); {
m_pRegister = CreateRegister(&g_Config, m_pConsole, pEngine, pHttp, this->Port(), m_NetServer.GetGlobalToken()); log_error("server", "Failed to initialize the HTTP client.");
return -1;
}
m_pEngine = Kernel()->RequestInterface<IEngine>();
m_pRegister = CreateRegister(&g_Config, m_pConsole, m_pEngine, &m_Http, this->Port(), m_NetServer.GetGlobalToken());
m_NetServer.SetCallbacks(NewClientCallback, NewClientNoAuthCallback, ClientRejoinCallback, DelClientCallback, this); m_NetServer.SetCallbacks(NewClientCallback, NewClientNoAuthCallback, ClientRejoinCallback, DelClientCallback, this);
@ -2839,7 +2844,7 @@ int CServer::Run()
InitDnsbl(ClientID); InitDnsbl(ClientID);
} }
else if(m_aClients[ClientID].m_DnsblState == CClient::DNSBL_STATE_PENDING && else if(m_aClients[ClientID].m_DnsblState == CClient::DNSBL_STATE_PENDING &&
m_aClients[ClientID].m_pDnsblLookup->Status() == IJob::STATE_DONE) m_aClients[ClientID].m_pDnsblLookup->State() == IJob::STATE_DONE)
{ {
if(m_aClients[ClientID].m_pDnsblLookup->Result() != 0) if(m_aClients[ClientID].m_pDnsblLookup->Result() != 0)
{ {
@ -3009,23 +3014,20 @@ int CServer::Run()
m_NetServer.Drop(i, pDisconnectReason); m_NetServer.Drop(i, pDisconnectReason);
} }
m_pRegister->OnShutdown();
m_Econ.Shutdown(); m_Econ.Shutdown();
m_Fifo.Shutdown(); m_Fifo.Shutdown();
Engine()->ShutdownJobs();
GameServer()->OnShutdown(nullptr); GameServer()->OnShutdown(nullptr);
m_pMap->Unload(); m_pMap->Unload();
DbPool()->OnShutdown(); DbPool()->OnShutdown();
#if defined(CONF_UPNP) #if defined(CONF_UPNP)
m_UPnP.Shutdown(); m_UPnP.Shutdown();
#endif #endif
m_NetServer.Close(); m_NetServer.Close();
m_pRegister->OnShutdown();
return ErrorShutdown(); return ErrorShutdown();
} }
@ -3743,7 +3745,6 @@ void CServer::RegisterCommands()
m_pStorage = Kernel()->RequestInterface<IStorage>(); m_pStorage = Kernel()->RequestInterface<IStorage>();
m_pAntibot = Kernel()->RequestInterface<IEngineAntibot>(); m_pAntibot = Kernel()->RequestInterface<IEngineAntibot>();
m_Http.Init(std::chrono::seconds{2});
Kernel()->RegisterInterface(static_cast<IHttp *>(&m_Http), false); Kernel()->RegisterInterface(static_cast<IHttp *>(&m_Http), false);
// register console commands // register console commands

View file

@ -37,6 +37,7 @@ class CHostLookup;
class CLogMessage; class CLogMessage;
class CMsgPacker; class CMsgPacker;
class CPacker; class CPacker;
class IEngine;
class IEngineMap; class IEngineMap;
class ILogger; class ILogger;
@ -70,6 +71,7 @@ class CServer : public IServer
class IStorage *m_pStorage; class IStorage *m_pStorage;
class IEngineAntibot *m_pAntibot; class IEngineAntibot *m_pAntibot;
class IRegister *m_pRegister; class IRegister *m_pRegister;
IEngine *m_pEngine;
#if defined(CONF_UPNP) #if defined(CONF_UPNP)
CUPnP m_UPnP; CUPnP m_UPnP;
@ -96,6 +98,7 @@ public:
class IStorage *Storage() { return m_pStorage; } class IStorage *Storage() { return m_pStorage; }
class IEngineAntibot *Antibot() { return m_pAntibot; } class IEngineAntibot *Antibot() { return m_pAntibot; }
class CDbConnectionPool *DbPool() { return m_pConnectionPool; } class CDbConnectionPool *DbPool() { return m_pConnectionPool; }
IEngine *Engine() { return m_pEngine; }
enum enum
{ {

View file

@ -7,20 +7,22 @@
#include <engine/console.h> #include <engine/console.h>
#include <engine/engine.h> #include <engine/engine.h>
#include <engine/shared/config.h> #include <engine/shared/config.h>
#include <engine/shared/jobs.h>
#include <engine/shared/network.h> #include <engine/shared/network.h>
#include <engine/storage.h> #include <engine/storage.h>
class CEngine : public IEngine class CEngine : public IEngine
{ {
public:
IConsole *m_pConsole; IConsole *m_pConsole;
IStorage *m_pStorage; IStorage *m_pStorage;
bool m_Logging;
bool m_Logging;
std::shared_ptr<CFutureLogger> m_pFutureLogger; std::shared_ptr<CFutureLogger> m_pFutureLogger;
char m_aAppName[256]; char m_aAppName[256];
CJobPool m_JobPool;
static void Con_DbgLognetwork(IConsole::IResult *pResult, void *pUserData) static void Con_DbgLognetwork(IConsole::IResult *pResult, void *pUserData)
{ {
CEngine *pEngine = static_cast<CEngine *>(pUserData); CEngine *pEngine = static_cast<CEngine *>(pUserData);
@ -43,6 +45,7 @@ public:
} }
} }
public:
CEngine(bool Test, const char *pAppname, std::shared_ptr<CFutureLogger> pFutureLogger, int Jobs) : CEngine(bool Test, const char *pAppname, std::shared_ptr<CFutureLogger> pFutureLogger, int Jobs) :
m_pFutureLogger(std::move(pFutureLogger)) m_pFutureLogger(std::move(pFutureLogger))
{ {
@ -70,7 +73,6 @@ public:
~CEngine() override ~CEngine() override
{ {
m_JobPool.Destroy();
CNetBase::CloseLog(); CNetBase::CloseLog();
} }
@ -92,16 +94,16 @@ public:
m_JobPool.Add(std::move(pJob)); m_JobPool.Add(std::move(pJob));
} }
void ShutdownJobs() override
{
m_JobPool.Shutdown();
}
void SetAdditionalLogger(std::shared_ptr<ILogger> &&pLogger) override void SetAdditionalLogger(std::shared_ptr<ILogger> &&pLogger) override
{ {
m_pFutureLogger->Set(pLogger); m_pFutureLogger->Set(pLogger);
} }
}; };
void IEngine::RunJobBlocking(IJob *pJob)
{
CJobPool::RunBlocking(pJob);
}
IEngine *CreateEngine(const char *pAppname, std::shared_ptr<CFutureLogger> pFutureLogger, int Jobs) { return new CEngine(false, pAppname, std::move(pFutureLogger), Jobs); } IEngine *CreateEngine(const char *pAppname, std::shared_ptr<CFutureLogger> pFutureLogger, int Jobs) { return new CEngine(false, pAppname, std::move(pFutureLogger), Jobs); }
IEngine *CreateTestEngine(const char *pAppname, int Jobs) { return new CEngine(true, pAppname, nullptr, Jobs); } IEngine *CreateTestEngine(const char *pAppname, int Jobs) { return new CEngine(true, pAppname, nullptr, Jobs); }

View file

@ -11,6 +11,7 @@ CHostLookup::CHostLookup(const char *pHostname, int Nettype)
{ {
str_copy(m_aHostname, pHostname); str_copy(m_aHostname, pHostname);
m_Nettype = Nettype; m_Nettype = Nettype;
Abortable(true);
} }
void CHostLookup::Run() void CHostLookup::Run()

View file

@ -3,93 +3,218 @@
#include "jobs.h" #include "jobs.h"
IJob::IJob() : IJob::IJob() :
m_Status(STATE_PENDING) m_pNext(nullptr),
m_State(STATE_QUEUED),
m_Abortable(false)
{ {
} }
IJob::~IJob() = default; IJob::~IJob() = default;
int IJob::Status() IJob::EJobState IJob::State() const
{ {
return m_Status.load(); return m_State;
}
bool IJob::Done() const
{
EJobState State = m_State;
return State != STATE_QUEUED && State != STATE_RUNNING;
}
bool IJob::Abort()
{
if(!IsAbortable())
return false;
m_State = STATE_ABORTED;
return true;
}
void IJob::Abortable(bool Abortable)
{
m_Abortable = Abortable;
}
bool IJob::IsAbortable() const
{
return m_Abortable;
} }
CJobPool::CJobPool() CJobPool::CJobPool()
{ {
// empty the pool m_Shutdown = true;
m_Shutdown = false;
sphore_init(&m_Semaphore);
m_pFirstJob = 0;
m_pLastJob = 0;
} }
CJobPool::~CJobPool() CJobPool::~CJobPool()
{ {
if(!m_Shutdown) if(!m_Shutdown)
{ {
Destroy(); Shutdown();
} }
} }
void CJobPool::WorkerThread(void *pUser) void CJobPool::WorkerThread(void *pUser)
{ {
CJobPool *pPool = (CJobPool *)pUser; static_cast<CJobPool *>(pUser)->RunLoop();
}
while(!pPool->m_Shutdown) void CJobPool::RunLoop()
{ {
std::shared_ptr<IJob> pJob = 0; while(true)
{
// wait for job to become available
sphore_wait(&m_Semaphore);
// fetch job from queue // fetch job from queue
sphore_wait(&pPool->m_Semaphore); std::shared_ptr<IJob> pJob = nullptr;
{ {
CLockScope ls(pPool->m_Lock); const CLockScope LockScope(m_Lock);
if(pPool->m_pFirstJob) if(m_pFirstJob)
{ {
pJob = pPool->m_pFirstJob; pJob = m_pFirstJob;
pPool->m_pFirstJob = pPool->m_pFirstJob->m_pNext; m_pFirstJob = m_pFirstJob->m_pNext;
// allow remaining objects in list to destruct, even when current object stays alive // allow remaining objects in list to destruct, even when current object stays alive
pJob->m_pNext = nullptr; pJob->m_pNext = nullptr;
if(!pPool->m_pFirstJob) if(!m_pFirstJob)
pPool->m_pLastJob = 0; m_pLastJob = nullptr;
} }
} }
// do the job if we have one
if(pJob) if(pJob)
{ {
RunBlocking(pJob.get()); // do the job if we have one
const IJob::EJobState OldStateQueued = pJob->m_State.exchange(IJob::STATE_RUNNING);
if(OldStateQueued != IJob::STATE_QUEUED)
{
if(OldStateQueued == IJob::STATE_ABORTED)
{
// job was aborted before it was started
pJob->m_State = IJob::STATE_ABORTED;
continue;
}
dbg_assert(false, "Job state invalid. Job was reused or uninitialized.");
dbg_break();
}
// remember running jobs so we can abort them
{
const CLockScope LockScope(m_LockRunning);
m_RunningJobs.push_back(pJob);
}
pJob->Run();
{
const CLockScope LockScope(m_LockRunning);
m_RunningJobs.erase(std::find(m_RunningJobs.begin(), m_RunningJobs.end(), pJob));
}
// do not change state to done if job was not completed successfully
const IJob::EJobState OldStateRunning = pJob->m_State.exchange(IJob::STATE_DONE);
if(OldStateRunning != IJob::STATE_RUNNING)
{
pJob->m_State = OldStateRunning;
}
}
else if(m_Shutdown)
{
// shut down worker thread when pool is shutting down and no more jobs are left
break;
} }
} }
} }
void CJobPool::Init(int NumThreads) void CJobPool::Init(int NumThreads)
{ {
// start threads dbg_assert(m_Shutdown, "Job pool already running");
char aName[32]; m_Shutdown = false;
const CLockScope LockScope(m_Lock);
sphore_init(&m_Semaphore);
m_pFirstJob = nullptr;
m_pLastJob = nullptr;
// start worker threads
char aName[16]; // unix kernel length limit
m_vpThreads.reserve(NumThreads); m_vpThreads.reserve(NumThreads);
for(int i = 0; i < NumThreads; i++) for(int i = 0; i < NumThreads; i++)
{ {
str_format(aName, sizeof(aName), "CJobPool worker %d", i); str_format(aName, sizeof(aName), "CJobPool W%d", i);
m_vpThreads.push_back(thread_init(WorkerThread, this, aName)); m_vpThreads.push_back(thread_init(WorkerThread, this, aName));
} }
} }
void CJobPool::Destroy() void CJobPool::Shutdown()
{ {
dbg_assert(!m_Shutdown, "Job pool already shut down");
m_Shutdown = true; m_Shutdown = true;
// abort queued jobs
{
const CLockScope LockScope(m_Lock);
std::shared_ptr<IJob> pJob = m_pFirstJob;
std::shared_ptr<IJob> pPrev = nullptr;
while(pJob != nullptr)
{
std::shared_ptr<IJob> pNext = pJob->m_pNext;
if(pJob->Abort())
{
// only remove abortable jobs from queue
pJob->m_pNext = nullptr;
if(pPrev)
{
pPrev->m_pNext = pNext;
}
else
{
m_pFirstJob = pNext;
}
}
else
{
pPrev = pJob;
}
pJob = pNext;
}
m_pLastJob = pPrev;
}
// abort running jobs
{
const CLockScope LockScope(m_LockRunning);
for(const std::shared_ptr<IJob> &pJob : m_RunningJobs)
{
pJob->Abort();
}
}
// wake up all worker threads
for(size_t i = 0; i < m_vpThreads.size(); i++) for(size_t i = 0; i < m_vpThreads.size(); i++)
{
sphore_signal(&m_Semaphore); sphore_signal(&m_Semaphore);
}
// wait for all worker threads to finish
for(void *pThread : m_vpThreads) for(void *pThread : m_vpThreads)
{
thread_wait(pThread); thread_wait(pThread);
}
m_vpThreads.clear(); m_vpThreads.clear();
sphore_destroy(&m_Semaphore); sphore_destroy(&m_Semaphore);
} }
void CJobPool::Add(std::shared_ptr<IJob> pJob) void CJobPool::Add(std::shared_ptr<IJob> pJob)
{ {
if(m_Shutdown)
{ {
CLockScope ls(m_Lock); // no jobs are accepted when the job pool is already shutting down
pJob->Abort();
return;
}
// add job to queue // add job to queue
{
const CLockScope LockScope(m_Lock);
if(m_pLastJob) if(m_pLastJob)
m_pLastJob->m_pNext = pJob; m_pLastJob->m_pNext = pJob;
m_pLastJob = std::move(pJob); m_pLastJob = std::move(pJob);
@ -97,12 +222,6 @@ void CJobPool::Add(std::shared_ptr<IJob> pJob)
m_pFirstJob = m_pLastJob; m_pFirstJob = m_pLastJob;
} }
// signal a worker thread that a job is available
sphore_signal(&m_Semaphore); sphore_signal(&m_Semaphore);
} }
void CJobPool::RunBlocking(IJob *pJob)
{
pJob->m_Status = IJob::STATE_RUNNING;
pJob->Run();
pJob->m_Status = IJob::STATE_DONE;
}

View file

@ -7,36 +7,123 @@
#include <base/system.h> #include <base/system.h>
#include <atomic> #include <atomic>
#include <deque>
#include <memory> #include <memory>
#include <vector> #include <vector>
class CJobPool; /**
* A job which runs in a worker thread of a job pool.
*
* @see CJobPool
*/
class IJob class IJob
{ {
friend CJobPool; friend class CJobPool;
public:
/**
* The state of a job in the job pool.
*/
enum EJobState
{
/**
* Job has been created/queued but not started on a worker thread yet.
*/
STATE_QUEUED = 0,
/**
* Job is currently running on a worker thread.
*/
STATE_RUNNING,
/**
* Job was completed successfully.
*/
STATE_DONE,
/**
* Job was aborted. Note the job may or may not still be running while
* in this state.
*
* @see IsAbortable
*/
STATE_ABORTED,
};
private: private:
std::shared_ptr<IJob> m_pNext; std::shared_ptr<IJob> m_pNext;
std::atomic<EJobState> m_State;
std::atomic<bool> m_Abortable;
std::atomic<int> m_Status; protected:
/**
* Performs tasks in a worker thread.
*/
virtual void Run() = 0; virtual void Run() = 0;
/**
* Sets whether this job can be aborted.
*
* @remark Has no effect if the job has already been aborted.
*
* @see IsAbortable
*/
void Abortable(bool Abortable);
public: public:
IJob(); IJob();
virtual ~IJob();
IJob(const IJob &Other) = delete; IJob(const IJob &Other) = delete;
IJob &operator=(const IJob &Other) = delete; IJob &operator=(const IJob &Other) = delete;
virtual ~IJob();
int Status();
enum /**
{ * Returns the state of the job.
STATE_PENDING = 0, *
STATE_RUNNING, * @remark Accessing jobs in any other way that with the base functions of `IJob`
STATE_DONE * is generally not thread-safe unless the job is in @link STATE_DONE @endlink
}; * or has not been enqueued yet.
*
* @return State of the job.
*/
EJobState State() const;
/**
* Returns whether the job was completed, i.e. whether it's not still queued
* or running.
*
* @return `true` if the job is done, `false` otherwise.
*/
bool Done() const;
/**
* Aborts the job, if it can be aborted.
*
* @return `true` if abort was accepted, `false` otherwise.
*
* @remark May be overridden to delegate abort to other jobs. Note that this
* function may be called from any thread and should be thread-safe.
*/
virtual bool Abort();
/**
* Returns whether the job can be aborted. Jobs that are abortable may have
* their state set to `STATE_ABORTED` at any point if the job was aborted.
* The job state should be checked periodically in the `Run` function and the
* job should terminate at the earliest, safe opportunity when aborted.
* Scheduled jobs which are not abortable are guaranteed to fully complete
* before the job pool is shut down.
*
* @return `true` if the job can be aborted, `false` otherwise.
*/
bool IsAbortable() const;
}; };
/**
* A job pool which runs jobs in one or more worker threads.
*
* @see IJob
*/
class CJobPool class CJobPool
{ {
std::vector<void *> m_vpThreads; std::vector<void *> m_vpThreads;
@ -47,15 +134,41 @@ class CJobPool
std::shared_ptr<IJob> m_pFirstJob GUARDED_BY(m_Lock); std::shared_ptr<IJob> m_pFirstJob GUARDED_BY(m_Lock);
std::shared_ptr<IJob> m_pLastJob GUARDED_BY(m_Lock); std::shared_ptr<IJob> m_pLastJob GUARDED_BY(m_Lock);
CLock m_LockRunning;
std::deque<std::shared_ptr<IJob>> m_RunningJobs GUARDED_BY(m_LockRunning);
static void WorkerThread(void *pUser) NO_THREAD_SAFETY_ANALYSIS; static void WorkerThread(void *pUser) NO_THREAD_SAFETY_ANALYSIS;
void RunLoop() NO_THREAD_SAFETY_ANALYSIS;
public: public:
CJobPool(); CJobPool();
~CJobPool(); ~CJobPool();
void Init(int NumThreads); /**
void Destroy(); * Initializes the job pool with the given number of worker threads.
*
* @param NumTheads The number of worker threads.
*
* @remark Must be called on the main thread.
*/
void Init(int NumThreads) REQUIRES(!m_Lock);
/**
* Shuts down the job pool. Aborts all abortable jobs. Then waits for all
* worker threads to complete all remaining queued jobs and terminate.
*
* @remark Must be called on the main thread.
*/
void Shutdown() REQUIRES(!m_Lock) REQUIRES(!m_LockRunning);
/**
* Adds a job to the queue of the job pool.
*
* @param pJob The job to enqueue.
*
* @remark If the job pool is already shutting down, no additional jobs
* will be enqueue anymore. Abortable jobs will immediately be aborted.
*/
void Add(std::shared_ptr<IJob> pJob) REQUIRES(!m_Lock); void Add(std::shared_ptr<IJob> pJob) REQUIRES(!m_Lock);
static void RunBlocking(IJob *pJob);
}; };
#endif #endif

View file

@ -1997,6 +1997,8 @@ void CMenus::OnReset()
void CMenus::OnShutdown() void CMenus::OnShutdown()
{ {
KillServer(); KillServer();
m_CommunityIconLoadJobs.clear();
m_CommunityIconDownloadJobs.clear();
} }
bool CMenus::OnCursorMove(float x, float y, IInput::ECursorType CursorType) bool CMenus::OnCursorMove(float x, float y, IInput::ECursorType CursorType)

View file

@ -1925,6 +1925,7 @@ void CMenus::CCommunityIconLoadJob::Run()
CMenus::CCommunityIconLoadJob::CCommunityIconLoadJob(CMenus *pMenus, const char *pCommunityId, int StorageType) : CMenus::CCommunityIconLoadJob::CCommunityIconLoadJob(CMenus *pMenus, const char *pCommunityId, int StorageType) :
CAbstractCommunityIconJob(pMenus, pCommunityId, StorageType) CAbstractCommunityIconJob(pMenus, pCommunityId, StorageType)
{ {
Abortable(true);
} }
CMenus::CCommunityIconLoadJob::~CCommunityIconLoadJob() CMenus::CCommunityIconLoadJob::~CCommunityIconLoadJob()
@ -2040,7 +2041,7 @@ void CMenus::UpdateCommunityIcons()
if(!m_CommunityIconLoadJobs.empty()) if(!m_CommunityIconLoadJobs.empty())
{ {
std::shared_ptr<CCommunityIconLoadJob> pJob = m_CommunityIconLoadJobs.front(); std::shared_ptr<CCommunityIconLoadJob> pJob = m_CommunityIconLoadJobs.front();
if(pJob->Status() == IJob::STATE_DONE) if(pJob->Done())
{ {
if(pJob->Success()) if(pJob->Success())
LoadCommunityIconFinish(pJob->CommunityId(), pJob->ImageInfo(), pJob->Sha256()); LoadCommunityIconFinish(pJob->CommunityId(), pJob->ImageInfo(), pJob->Sha256());

View file

@ -16,6 +16,7 @@ CSoundLoading::CSoundLoading(CGameClient *pGameClient, bool Render) :
m_pGameClient(pGameClient), m_pGameClient(pGameClient),
m_Render(Render) m_Render(Render)
{ {
Abortable(true);
} }
void CSoundLoading::Run() void CSoundLoading::Run()
@ -27,6 +28,9 @@ void CSoundLoading::Run()
for(int i = 0; i < g_pData->m_aSounds[s].m_NumSounds; i++) for(int i = 0; i < g_pData->m_aSounds[s].m_NumSounds; i++)
{ {
if(State() == IJob::STATE_ABORTED)
return;
int Id = m_pGameClient->Sound()->LoadWV(g_pData->m_aSounds[s].m_aSounds[i].m_pFilename); int Id = m_pGameClient->Sound()->LoadWV(g_pData->m_aSounds[s].m_aSounds[i].m_pFilename);
g_pData->m_aSounds[s].m_aSounds[i].m_Id = Id; g_pData->m_aSounds[s].m_aSounds[i].m_Id = Id;
// try to render a frame // try to render a frame
@ -114,7 +118,7 @@ void CSounds::OnRender()
// check for sound initialisation // check for sound initialisation
if(m_WaitForSoundJob) if(m_WaitForSoundJob)
{ {
if(m_pSoundJob->Status() == IJob::STATE_DONE) if(m_pSoundJob->State() == IJob::STATE_DONE)
m_WaitForSoundJob = false; m_WaitForSoundJob = false;
else else
return; return;

View file

@ -979,8 +979,6 @@ void CGameClient::OnStateChange(int NewState, int OldState)
void CGameClient::OnShutdown() void CGameClient::OnShutdown()
{ {
RenderShutdownMessage();
for(auto &pComponent : m_vpAll) for(auto &pComponent : m_vpAll)
pComponent->OnShutdown(); pComponent->OnShutdown();
} }

View file

@ -502,7 +502,7 @@ public:
void RefreshSkins(); void RefreshSkins();
void RenderShutdownMessage(); void RenderShutdownMessage() override;
const char *GetItemName(int Type) const override; const char *GetItemName(int Type) const override;
const char *Version() const override; const char *Version() const override;

View file

@ -8561,7 +8561,7 @@ void CEditor::HandleWriterFinishJobs()
return; return;
std::shared_ptr<CDataFileWriterFinishJob> pJob = m_WriterFinishJobs.front(); std::shared_ptr<CDataFileWriterFinishJob> pJob = m_WriterFinishJobs.front();
if(pJob->Status() != IJob::STATE_DONE) if(!pJob->Done())
return; return;
m_WriterFinishJobs.pop_front(); m_WriterFinishJobs.pop_front();

View file

@ -15,19 +15,20 @@ class Jobs : public ::testing::Test
protected: protected:
CJobPool m_Pool; CJobPool m_Pool;
Jobs() void SetUp() override
{ {
m_Pool.Init(TEST_NUM_THREADS); m_Pool.Init(TEST_NUM_THREADS);
} }
void TearDown() override
{
m_Pool.Shutdown();
}
void Add(std::shared_ptr<IJob> pJob) void Add(std::shared_ptr<IJob> pJob)
{ {
m_Pool.Add(std::move(pJob)); m_Pool.Add(std::move(pJob));
} }
void RunBlocking(IJob *pJob)
{
CJobPool::RunBlocking(pJob);
}
}; };
class CJob : public IJob class CJob : public IJob
@ -38,6 +39,11 @@ class CJob : public IJob
public: public:
CJob(std::function<void()> &&JobFunction) : CJob(std::function<void()> &&JobFunction) :
m_JobFunction(JobFunction) {} m_JobFunction(JobFunction) {}
void Abortable(bool Abortable)
{
IJob::Abortable(Abortable);
}
}; };
TEST_F(Jobs, Constructor) TEST_F(Jobs, Constructor)
@ -49,15 +55,6 @@ TEST_F(Jobs, Simple)
Add(std::make_shared<CJob>([] {})); Add(std::make_shared<CJob>([] {}));
} }
TEST_F(Jobs, RunBlocking)
{
int Result = 0;
CJob Job([&] { Result = 1; });
EXPECT_EQ(Result, 0);
RunBlocking(&Job);
EXPECT_EQ(Result, 1);
}
TEST_F(Jobs, Wait) TEST_F(Jobs, Wait)
{ {
SEMAPHORE sphore; SEMAPHORE sphore;
@ -67,6 +64,26 @@ TEST_F(Jobs, Wait)
sphore_destroy(&sphore); sphore_destroy(&sphore);
} }
TEST_F(Jobs, AbortAbortable)
{
auto pJob = std::make_shared<CJob>([&] {});
pJob->Abortable(true);
EXPECT_TRUE(pJob->IsAbortable());
Add(pJob);
EXPECT_TRUE(pJob->Abort());
EXPECT_EQ(pJob->State(), IJob::STATE_ABORTED);
}
TEST_F(Jobs, AbortUnabortable)
{
auto pJob = std::make_shared<CJob>([&] {});
pJob->Abortable(false);
EXPECT_FALSE(pJob->IsAbortable());
Add(pJob);
EXPECT_FALSE(pJob->Abort());
EXPECT_NE(pJob->State(), IJob::STATE_ABORTED);
}
TEST_F(Jobs, LookupHost) TEST_F(Jobs, LookupHost)
{ {
static const char *HOST = "example.com"; static const char *HOST = "example.com";
@ -77,7 +94,7 @@ TEST_F(Jobs, LookupHost)
EXPECT_EQ(pJob->Nettype(), NETTYPE); EXPECT_EQ(pJob->Nettype(), NETTYPE);
Add(pJob); Add(pJob);
while(pJob->Status() != IJob::STATE_DONE) while(pJob->State() != IJob::STATE_DONE)
{ {
// yay, busy loop... // yay, busy loop...
thread_yield(); thread_yield();
@ -101,7 +118,7 @@ TEST_F(Jobs, LookupHostWebsocket)
EXPECT_EQ(pJob->Nettype(), NETTYPE); EXPECT_EQ(pJob->Nettype(), NETTYPE);
Add(pJob); Add(pJob);
while(pJob->Status() != IJob::STATE_DONE) while(pJob->State() != IJob::STATE_DONE)
{ {
// yay, busy loop... // yay, busy loop...
thread_yield(); thread_yield();
@ -130,7 +147,7 @@ TEST_F(Jobs, Many)
sphore_signal(&sphore); sphore_signal(&sphore);
} }
}); });
EXPECT_EQ(pJob->Status(), IJob::STATE_PENDING); EXPECT_EQ(pJob->State(), IJob::STATE_QUEUED);
vpJobs.push_back(pJob); vpJobs.push_back(pJob);
} }
for(auto &pJob : vpJobs) for(auto &pJob : vpJobs)
@ -139,10 +156,10 @@ TEST_F(Jobs, Many)
} }
sphore_wait(&sphore); sphore_wait(&sphore);
sphore_destroy(&sphore); sphore_destroy(&sphore);
m_Pool.~CJobPool(); TearDown();
for(auto &pJob : vpJobs) for(auto &pJob : vpJobs)
{ {
EXPECT_EQ(pJob->Status(), IJob::STATE_DONE); EXPECT_EQ(pJob->State(), IJob::STATE_DONE);
} }
new(&m_Pool) CJobPool(); SetUp();
} }

View file

@ -1,6 +1,8 @@
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <memory> #include <memory>
#include <base/system.h>
#include <engine/client/serverbrowser_ping_cache.h> #include <engine/client/serverbrowser_ping_cache.h>
#include <engine/console.h> #include <engine/console.h>
#include <engine/engine.h> #include <engine/engine.h>