From 6cdfa35727690635131991b3f366d3be7a75aa26 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20M=C3=BCller?= Date: Fri, 16 Feb 2024 20:44:33 +0100 Subject: [PATCH] Allow background jobs to be aborted, refactoring Add `IJob::Abortable(bool)` function which jobs can call to specify whether they can be aborted. Jobs are not abortable per default. Abortable jobs may have their state set to `IJob::STATE_ABORTED` at any point if the job was aborted. The job state should be checked periodically in the `IJob::Run` function and the job should terminate at the earliest, safe opportunity when aborted. Scheduled jobs which are not abortable are guaranteed to fully complete before the job pool is shut down. However, if the job pool is already shutting down, no additional jobs will be enqueue anymore and abortable jobs will immediately be aborted. In particular, the sound loading, community icon loading, master chooser and host lookup jobs are specified as being abortable. Conversely, the jobs saving replay demos, editor maps and screenshots are expected to finish before the client is shut down. When the client is quitting/restarting, it will now disconnect from the current server first, before saving the config, to ensure that any actions that happen on disconnect (demo recorders being stopped etc.) happen first. The shutdown message is rendered before disconnecting and waiting for background jobs to finish. The HTTP client is now initialized later during server launch, after the network initialization. Error handling is added and the server stops if the HTTP client could not be initialized, same as the client. The `RunBlocking` functions are removed, as they are not used anymore after curl-multi was added. The function `IJob::Status` is renamed to `State` and `IJob::STATE_PENDING` is renamed to `STATE_QUEUED` for consistency with naming of the HTTP client. The member variables of the engine interface are encapsulated and the `jobs.h` include is removed from `engine.h`, which removes transitive includes of `system.h`. Documentation for all job and job pool API is added. --- src/engine/client.h | 2 + src/engine/client/client.cpp | 16 +- src/engine/client/serverbrowser_http.cpp | 30 ++- src/engine/engine.h | 9 +- src/engine/server/server.cpp | 25 +-- src/engine/server/server.h | 3 + src/engine/shared/engine.cpp | 18 +- src/engine/shared/host_lookup.cpp | 1 + src/engine/shared/jobs.cpp | 187 +++++++++++++++---- src/engine/shared/jobs.h | 143 ++++++++++++-- src/game/client/components/menus.cpp | 2 + src/game/client/components/menus_browser.cpp | 3 +- src/game/client/components/sounds.cpp | 6 +- src/game/client/gameclient.cpp | 2 - src/game/client/gameclient.h | 2 +- src/game/editor/editor.cpp | 2 +- src/test/jobs.cpp | 57 ++++-- src/test/serverbrowser.cpp | 2 + 18 files changed, 396 insertions(+), 114 deletions(-) diff --git a/src/engine/client.h b/src/engine/client.h index 440afd9a6..720d76cc1 100644 --- a/src/engine/client.h +++ b/src/engine/client.h @@ -333,7 +333,9 @@ public: virtual void OnDummyDisconnect() = 0; virtual void DummyResetInput() = 0; virtual void Echo(const char *pString) = 0; + virtual bool CanDisplayWarning() const = 0; + virtual void RenderShutdownMessage() = 0; virtual CNetObjHandler *GetNetObjHandler() = 0; }; diff --git a/src/engine/client/client.cpp b/src/engine/client/client.cpp index 2ed047512..824c64648 100644 --- a/src/engine/client/client.cpp +++ b/src/engine/client/client.cpp @@ -2607,7 +2607,7 @@ void CClient::Update() if(!m_EditJobs.empty()) { std::shared_ptr pJob = m_EditJobs.front(); - if(pJob->Status() == IJob::STATE_DONE) + if(pJob->State() == IJob::STATE_DONE) { char aBuf[IO_MAX_PATH_LENGTH + 64]; str_format(aBuf, sizeof(aBuf), "Successfully saved the replay to %s!", pJob->Destination()); @@ -3050,6 +3050,9 @@ void CClient::Run() m_GlobalTime = (time_get() - m_GlobalStartTime) / (float)time_freq(); } + GameClient()->RenderShutdownMessage(); + Disconnect(); + if(!m_pConfigManager->Save()) { char aError[128]; @@ -3059,15 +3062,16 @@ void CClient::Run() m_Fifo.Shutdown(); m_Http.Shutdown(); - GameClient()->OnShutdown(); - Disconnect(); + Engine()->ShutdownJobs(); - // close socket + GameClient()->RenderShutdownMessage(); + GameClient()->OnShutdown(); + delete m_pEditor; + + // close sockets for(unsigned int i = 0; i < std::size(m_aNetClient); i++) m_aNetClient[i].Close(); - delete m_pEditor; - // shutdown text render while graphics are still available m_pTextRender->Shutdown(); } diff --git a/src/engine/client/serverbrowser_http.cpp b/src/engine/client/serverbrowser_http.cpp index eb3f35277..c33fbc560 100644 --- a/src/engine/client/serverbrowser_http.cpp +++ b/src/engine/client/serverbrowser_http.cpp @@ -34,7 +34,7 @@ public: bool GetBestUrl(const char **pBestUrl) const; void Reset(); - bool IsRefreshing() const { return m_pJob && m_pJob->Status() != IJob::STATE_DONE; } + bool IsRefreshing() const { return m_pJob && !m_pJob->Done(); } void Refresh(); private: @@ -60,8 +60,12 @@ private: public: CJob(CChooseMaster *pParent, std::shared_ptr pData) : - m_pParent(pParent), m_pData(std::move(pData)) {} - void Abort() REQUIRES(!m_Lock); + m_pParent(pParent), + m_pData(std::move(pData)) + { + Abortable(true); + } + bool Abort() override REQUIRES(!m_Lock); }; IEngine *m_pEngine; @@ -130,12 +134,20 @@ void CChooseMaster::Reset() void CChooseMaster::Refresh() { - if(m_pJob == nullptr || m_pJob->Status() == IJob::STATE_DONE) - m_pEngine->AddJob(m_pJob = std::make_shared(this, m_pData)); + if(m_pJob == nullptr || m_pJob->State() == IJob::STATE_DONE) + { + m_pJob = std::make_shared(this, m_pData); + m_pEngine->AddJob(m_pJob); + } } -void CChooseMaster::CJob::Abort() +bool CChooseMaster::CJob::Abort() { + if(!IJob::Abort()) + { + return false; + } + CLockScope ls(m_Lock); if(m_pHead != nullptr) { @@ -146,6 +158,8 @@ void CChooseMaster::CJob::Abort() { m_pGet->Abort(); } + + return true; } void CChooseMaster::CJob::Run() @@ -184,7 +198,7 @@ void CChooseMaster::CJob::Run() m_pParent->m_pHttp->Run(pHead); pHead->Wait(); - if(pHead->State() == EHttpState::ABORTED) + if(pHead->State() == EHttpState::ABORTED || State() == IJob::STATE_ABORTED) { dbg_msg("serverbrowse_http", "master chooser aborted"); return; @@ -207,7 +221,7 @@ void CChooseMaster::CJob::Run() pGet->Wait(); auto Time = std::chrono::duration_cast(time_get_nanoseconds() - StartTime); - if(pGet->State() == EHttpState::ABORTED) + if(pGet->State() == EHttpState::ABORTED || State() == IJob::STATE_ABORTED) { dbg_msg("serverbrowse_http", "master chooser aborted"); return; diff --git a/src/engine/engine.h b/src/engine/engine.h index 937a2a272..c763e7f6e 100644 --- a/src/engine/engine.h +++ b/src/engine/engine.h @@ -4,25 +4,24 @@ #define ENGINE_ENGINE_H #include "kernel.h" -#include + +#include class CFutureLogger; +class IJob; class ILogger; class IEngine : public IInterface { MACRO_INTERFACE("engine") -protected: - class CJobPool m_JobPool; - public: virtual ~IEngine() = default; virtual void Init() = 0; virtual void AddJob(std::shared_ptr pJob) = 0; + virtual void ShutdownJobs() = 0; virtual void SetAdditionalLogger(std::shared_ptr &&pLogger) = 0; - static void RunJobBlocking(IJob *pJob); }; extern IEngine *CreateEngine(const char *pAppname, std::shared_ptr pFutureLogger, int Jobs); diff --git a/src/engine/server/server.cpp b/src/engine/server/server.cpp index cf37ae216..ff92a5dea 100644 --- a/src/engine/server/server.cpp +++ b/src/engine/server/server.cpp @@ -1117,8 +1117,8 @@ void CServer::InitDnsbl(int ClientID) str_format(aBuf, sizeof(aBuf), "%s.%d.%d.%d.%d.%s", Config()->m_SvDnsblKey, Addr.ip[3], Addr.ip[2], Addr.ip[1], Addr.ip[0], Config()->m_SvDnsblHost); } - IEngine *pEngine = Kernel()->RequestInterface(); - pEngine->AddJob(m_aClients[ClientID].m_pDnsblLookup = std::make_shared(aBuf, NETTYPE_IPV4)); + m_aClients[ClientID].m_pDnsblLookup = std::make_shared(aBuf, NETTYPE_IPV4); + Engine()->AddJob(m_aClients[ClientID].m_pDnsblLookup); m_aClients[ClientID].m_DnsblState = CClient::DNSBL_STATE_PENDING; } @@ -2706,9 +2706,14 @@ int CServer::Run() m_UPnP.Open(BindAddr); #endif - IEngine *pEngine = Kernel()->RequestInterface(); - IHttp *pHttp = Kernel()->RequestInterface(); - m_pRegister = CreateRegister(&g_Config, m_pConsole, pEngine, pHttp, this->Port(), m_NetServer.GetGlobalToken()); + if(!m_Http.Init(std::chrono::seconds{2})) + { + log_error("server", "Failed to initialize the HTTP client."); + return -1; + } + + m_pEngine = Kernel()->RequestInterface(); + m_pRegister = CreateRegister(&g_Config, m_pConsole, m_pEngine, &m_Http, this->Port(), m_NetServer.GetGlobalToken()); m_NetServer.SetCallbacks(NewClientCallback, NewClientNoAuthCallback, ClientRejoinCallback, DelClientCallback, this); @@ -2839,7 +2844,7 @@ int CServer::Run() InitDnsbl(ClientID); } else if(m_aClients[ClientID].m_DnsblState == CClient::DNSBL_STATE_PENDING && - m_aClients[ClientID].m_pDnsblLookup->Status() == IJob::STATE_DONE) + m_aClients[ClientID].m_pDnsblLookup->State() == IJob::STATE_DONE) { if(m_aClients[ClientID].m_pDnsblLookup->Result() != 0) { @@ -3009,23 +3014,20 @@ int CServer::Run() m_NetServer.Drop(i, pDisconnectReason); } + m_pRegister->OnShutdown(); m_Econ.Shutdown(); - m_Fifo.Shutdown(); + Engine()->ShutdownJobs(); GameServer()->OnShutdown(nullptr); m_pMap->Unload(); - DbPool()->OnShutdown(); #if defined(CONF_UPNP) m_UPnP.Shutdown(); #endif - m_NetServer.Close(); - m_pRegister->OnShutdown(); - return ErrorShutdown(); } @@ -3743,7 +3745,6 @@ void CServer::RegisterCommands() m_pStorage = Kernel()->RequestInterface(); m_pAntibot = Kernel()->RequestInterface(); - m_Http.Init(std::chrono::seconds{2}); Kernel()->RegisterInterface(static_cast(&m_Http), false); // register console commands diff --git a/src/engine/server/server.h b/src/engine/server/server.h index 1b0b8af7c..c4fca2094 100644 --- a/src/engine/server/server.h +++ b/src/engine/server/server.h @@ -37,6 +37,7 @@ class CHostLookup; class CLogMessage; class CMsgPacker; class CPacker; +class IEngine; class IEngineMap; class ILogger; @@ -70,6 +71,7 @@ class CServer : public IServer class IStorage *m_pStorage; class IEngineAntibot *m_pAntibot; class IRegister *m_pRegister; + IEngine *m_pEngine; #if defined(CONF_UPNP) CUPnP m_UPnP; @@ -96,6 +98,7 @@ public: class IStorage *Storage() { return m_pStorage; } class IEngineAntibot *Antibot() { return m_pAntibot; } class CDbConnectionPool *DbPool() { return m_pConnectionPool; } + IEngine *Engine() { return m_pEngine; } enum { diff --git a/src/engine/shared/engine.cpp b/src/engine/shared/engine.cpp index 9f0469e98..411eae5ab 100644 --- a/src/engine/shared/engine.cpp +++ b/src/engine/shared/engine.cpp @@ -7,20 +7,22 @@ #include #include #include +#include #include #include class CEngine : public IEngine { -public: IConsole *m_pConsole; IStorage *m_pStorage; - bool m_Logging; + bool m_Logging; std::shared_ptr m_pFutureLogger; char m_aAppName[256]; + CJobPool m_JobPool; + static void Con_DbgLognetwork(IConsole::IResult *pResult, void *pUserData) { CEngine *pEngine = static_cast(pUserData); @@ -43,6 +45,7 @@ public: } } +public: CEngine(bool Test, const char *pAppname, std::shared_ptr pFutureLogger, int Jobs) : m_pFutureLogger(std::move(pFutureLogger)) { @@ -70,7 +73,6 @@ public: ~CEngine() override { - m_JobPool.Destroy(); CNetBase::CloseLog(); } @@ -92,16 +94,16 @@ public: m_JobPool.Add(std::move(pJob)); } + void ShutdownJobs() override + { + m_JobPool.Shutdown(); + } + void SetAdditionalLogger(std::shared_ptr &&pLogger) override { m_pFutureLogger->Set(pLogger); } }; -void IEngine::RunJobBlocking(IJob *pJob) -{ - CJobPool::RunBlocking(pJob); -} - IEngine *CreateEngine(const char *pAppname, std::shared_ptr pFutureLogger, int Jobs) { return new CEngine(false, pAppname, std::move(pFutureLogger), Jobs); } IEngine *CreateTestEngine(const char *pAppname, int Jobs) { return new CEngine(true, pAppname, nullptr, Jobs); } diff --git a/src/engine/shared/host_lookup.cpp b/src/engine/shared/host_lookup.cpp index ea79b97f0..a6efe45de 100644 --- a/src/engine/shared/host_lookup.cpp +++ b/src/engine/shared/host_lookup.cpp @@ -11,6 +11,7 @@ CHostLookup::CHostLookup(const char *pHostname, int Nettype) { str_copy(m_aHostname, pHostname); m_Nettype = Nettype; + Abortable(true); } void CHostLookup::Run() diff --git a/src/engine/shared/jobs.cpp b/src/engine/shared/jobs.cpp index 852243367..fb1b142c6 100644 --- a/src/engine/shared/jobs.cpp +++ b/src/engine/shared/jobs.cpp @@ -3,93 +3,218 @@ #include "jobs.h" IJob::IJob() : - m_Status(STATE_PENDING) + m_pNext(nullptr), + m_State(STATE_QUEUED), + m_Abortable(false) { } IJob::~IJob() = default; -int IJob::Status() +IJob::EJobState IJob::State() const { - return m_Status.load(); + return m_State; +} + +bool IJob::Done() const +{ + EJobState State = m_State; + return State != STATE_QUEUED && State != STATE_RUNNING; +} + +bool IJob::Abort() +{ + if(!IsAbortable()) + return false; + + m_State = STATE_ABORTED; + return true; +} + +void IJob::Abortable(bool Abortable) +{ + m_Abortable = Abortable; +} + +bool IJob::IsAbortable() const +{ + return m_Abortable; } CJobPool::CJobPool() { - // empty the pool - m_Shutdown = false; - sphore_init(&m_Semaphore); - m_pFirstJob = 0; - m_pLastJob = 0; + m_Shutdown = true; } CJobPool::~CJobPool() { if(!m_Shutdown) { - Destroy(); + Shutdown(); } } void CJobPool::WorkerThread(void *pUser) { - CJobPool *pPool = (CJobPool *)pUser; + static_cast(pUser)->RunLoop(); +} - while(!pPool->m_Shutdown) +void CJobPool::RunLoop() +{ + while(true) { - std::shared_ptr pJob = 0; + // wait for job to become available + sphore_wait(&m_Semaphore); // fetch job from queue - sphore_wait(&pPool->m_Semaphore); + std::shared_ptr pJob = nullptr; { - CLockScope ls(pPool->m_Lock); - if(pPool->m_pFirstJob) + const CLockScope LockScope(m_Lock); + if(m_pFirstJob) { - pJob = pPool->m_pFirstJob; - pPool->m_pFirstJob = pPool->m_pFirstJob->m_pNext; + pJob = m_pFirstJob; + m_pFirstJob = m_pFirstJob->m_pNext; // allow remaining objects in list to destruct, even when current object stays alive pJob->m_pNext = nullptr; - if(!pPool->m_pFirstJob) - pPool->m_pLastJob = 0; + if(!m_pFirstJob) + m_pLastJob = nullptr; } } - // do the job if we have one if(pJob) { - RunBlocking(pJob.get()); + // do the job if we have one + const IJob::EJobState OldStateQueued = pJob->m_State.exchange(IJob::STATE_RUNNING); + if(OldStateQueued != IJob::STATE_QUEUED) + { + if(OldStateQueued == IJob::STATE_ABORTED) + { + // job was aborted before it was started + pJob->m_State = IJob::STATE_ABORTED; + continue; + } + dbg_assert(false, "Job state invalid. Job was reused or uninitialized."); + dbg_break(); + } + + // remember running jobs so we can abort them + { + const CLockScope LockScope(m_LockRunning); + m_RunningJobs.push_back(pJob); + } + pJob->Run(); + { + const CLockScope LockScope(m_LockRunning); + m_RunningJobs.erase(std::find(m_RunningJobs.begin(), m_RunningJobs.end(), pJob)); + } + + // do not change state to done if job was not completed successfully + const IJob::EJobState OldStateRunning = pJob->m_State.exchange(IJob::STATE_DONE); + if(OldStateRunning != IJob::STATE_RUNNING) + { + pJob->m_State = OldStateRunning; + } + } + else if(m_Shutdown) + { + // shut down worker thread when pool is shutting down and no more jobs are left + break; } } } void CJobPool::Init(int NumThreads) { - // start threads - char aName[32]; + dbg_assert(m_Shutdown, "Job pool already running"); + m_Shutdown = false; + + const CLockScope LockScope(m_Lock); + sphore_init(&m_Semaphore); + m_pFirstJob = nullptr; + m_pLastJob = nullptr; + + // start worker threads + char aName[16]; // unix kernel length limit m_vpThreads.reserve(NumThreads); for(int i = 0; i < NumThreads; i++) { - str_format(aName, sizeof(aName), "CJobPool worker %d", i); + str_format(aName, sizeof(aName), "CJobPool W%d", i); m_vpThreads.push_back(thread_init(WorkerThread, this, aName)); } } -void CJobPool::Destroy() +void CJobPool::Shutdown() { + dbg_assert(!m_Shutdown, "Job pool already shut down"); m_Shutdown = true; + + // abort queued jobs + { + const CLockScope LockScope(m_Lock); + std::shared_ptr pJob = m_pFirstJob; + std::shared_ptr pPrev = nullptr; + while(pJob != nullptr) + { + std::shared_ptr pNext = pJob->m_pNext; + if(pJob->Abort()) + { + // only remove abortable jobs from queue + pJob->m_pNext = nullptr; + if(pPrev) + { + pPrev->m_pNext = pNext; + } + else + { + m_pFirstJob = pNext; + } + } + else + { + pPrev = pJob; + } + pJob = pNext; + } + m_pLastJob = pPrev; + } + + // abort running jobs + { + const CLockScope LockScope(m_LockRunning); + for(const std::shared_ptr &pJob : m_RunningJobs) + { + pJob->Abort(); + } + } + + // wake up all worker threads for(size_t i = 0; i < m_vpThreads.size(); i++) + { sphore_signal(&m_Semaphore); + } + + // wait for all worker threads to finish for(void *pThread : m_vpThreads) + { thread_wait(pThread); + } + m_vpThreads.clear(); sphore_destroy(&m_Semaphore); } void CJobPool::Add(std::shared_ptr pJob) { + if(m_Shutdown) { - CLockScope ls(m_Lock); - // add job to queue + // no jobs are accepted when the job pool is already shutting down + pJob->Abort(); + return; + } + + // add job to queue + { + const CLockScope LockScope(m_Lock); if(m_pLastJob) m_pLastJob->m_pNext = pJob; m_pLastJob = std::move(pJob); @@ -97,12 +222,6 @@ void CJobPool::Add(std::shared_ptr pJob) m_pFirstJob = m_pLastJob; } + // signal a worker thread that a job is available sphore_signal(&m_Semaphore); } - -void CJobPool::RunBlocking(IJob *pJob) -{ - pJob->m_Status = IJob::STATE_RUNNING; - pJob->Run(); - pJob->m_Status = IJob::STATE_DONE; -} diff --git a/src/engine/shared/jobs.h b/src/engine/shared/jobs.h index 8f347b16c..fcc36d5ac 100644 --- a/src/engine/shared/jobs.h +++ b/src/engine/shared/jobs.h @@ -7,36 +7,123 @@ #include #include +#include #include #include -class CJobPool; - +/** + * A job which runs in a worker thread of a job pool. + * + * @see CJobPool + */ class IJob { - friend CJobPool; + friend class CJobPool; + +public: + /** + * The state of a job in the job pool. + */ + enum EJobState + { + /** + * Job has been created/queued but not started on a worker thread yet. + */ + STATE_QUEUED = 0, + + /** + * Job is currently running on a worker thread. + */ + STATE_RUNNING, + + /** + * Job was completed successfully. + */ + STATE_DONE, + + /** + * Job was aborted. Note the job may or may not still be running while + * in this state. + * + * @see IsAbortable + */ + STATE_ABORTED, + }; private: std::shared_ptr m_pNext; + std::atomic m_State; + std::atomic m_Abortable; - std::atomic m_Status; +protected: + /** + * Performs tasks in a worker thread. + */ virtual void Run() = 0; + /** + * Sets whether this job can be aborted. + * + * @remark Has no effect if the job has already been aborted. + * + * @see IsAbortable + */ + void Abortable(bool Abortable); + public: IJob(); + virtual ~IJob(); + IJob(const IJob &Other) = delete; IJob &operator=(const IJob &Other) = delete; - virtual ~IJob(); - int Status(); - enum - { - STATE_PENDING = 0, - STATE_RUNNING, - STATE_DONE - }; + /** + * Returns the state of the job. + * + * @remark Accessing jobs in any other way that with the base functions of `IJob` + * is generally not thread-safe unless the job is in @link STATE_DONE @endlink + * or has not been enqueued yet. + * + * @return State of the job. + */ + EJobState State() const; + + /** + * Returns whether the job was completed, i.e. whether it's not still queued + * or running. + * + * @return `true` if the job is done, `false` otherwise. + */ + bool Done() const; + + /** + * Aborts the job, if it can be aborted. + * + * @return `true` if abort was accepted, `false` otherwise. + * + * @remark May be overridden to delegate abort to other jobs. Note that this + * function may be called from any thread and should be thread-safe. + */ + virtual bool Abort(); + + /** + * Returns whether the job can be aborted. Jobs that are abortable may have + * their state set to `STATE_ABORTED` at any point if the job was aborted. + * The job state should be checked periodically in the `Run` function and the + * job should terminate at the earliest, safe opportunity when aborted. + * Scheduled jobs which are not abortable are guaranteed to fully complete + * before the job pool is shut down. + * + * @return `true` if the job can be aborted, `false` otherwise. + */ + bool IsAbortable() const; }; +/** + * A job pool which runs jobs in one or more worker threads. + * + * @see IJob + */ class CJobPool { std::vector m_vpThreads; @@ -47,15 +134,41 @@ class CJobPool std::shared_ptr m_pFirstJob GUARDED_BY(m_Lock); std::shared_ptr m_pLastJob GUARDED_BY(m_Lock); + CLock m_LockRunning; + std::deque> m_RunningJobs GUARDED_BY(m_LockRunning); + static void WorkerThread(void *pUser) NO_THREAD_SAFETY_ANALYSIS; + void RunLoop() NO_THREAD_SAFETY_ANALYSIS; public: CJobPool(); ~CJobPool(); - void Init(int NumThreads); - void Destroy(); + /** + * Initializes the job pool with the given number of worker threads. + * + * @param NumTheads The number of worker threads. + * + * @remark Must be called on the main thread. + */ + void Init(int NumThreads) REQUIRES(!m_Lock); + + /** + * Shuts down the job pool. Aborts all abortable jobs. Then waits for all + * worker threads to complete all remaining queued jobs and terminate. + * + * @remark Must be called on the main thread. + */ + void Shutdown() REQUIRES(!m_Lock) REQUIRES(!m_LockRunning); + + /** + * Adds a job to the queue of the job pool. + * + * @param pJob The job to enqueue. + * + * @remark If the job pool is already shutting down, no additional jobs + * will be enqueue anymore. Abortable jobs will immediately be aborted. + */ void Add(std::shared_ptr pJob) REQUIRES(!m_Lock); - static void RunBlocking(IJob *pJob); }; #endif diff --git a/src/game/client/components/menus.cpp b/src/game/client/components/menus.cpp index 1e2c9a473..b636634fe 100644 --- a/src/game/client/components/menus.cpp +++ b/src/game/client/components/menus.cpp @@ -1997,6 +1997,8 @@ void CMenus::OnReset() void CMenus::OnShutdown() { KillServer(); + m_CommunityIconLoadJobs.clear(); + m_CommunityIconDownloadJobs.clear(); } bool CMenus::OnCursorMove(float x, float y, IInput::ECursorType CursorType) diff --git a/src/game/client/components/menus_browser.cpp b/src/game/client/components/menus_browser.cpp index 923d915c7..a46dc28c2 100644 --- a/src/game/client/components/menus_browser.cpp +++ b/src/game/client/components/menus_browser.cpp @@ -1925,6 +1925,7 @@ void CMenus::CCommunityIconLoadJob::Run() CMenus::CCommunityIconLoadJob::CCommunityIconLoadJob(CMenus *pMenus, const char *pCommunityId, int StorageType) : CAbstractCommunityIconJob(pMenus, pCommunityId, StorageType) { + Abortable(true); } CMenus::CCommunityIconLoadJob::~CCommunityIconLoadJob() @@ -2040,7 +2041,7 @@ void CMenus::UpdateCommunityIcons() if(!m_CommunityIconLoadJobs.empty()) { std::shared_ptr pJob = m_CommunityIconLoadJobs.front(); - if(pJob->Status() == IJob::STATE_DONE) + if(pJob->Done()) { if(pJob->Success()) LoadCommunityIconFinish(pJob->CommunityId(), pJob->ImageInfo(), pJob->Sha256()); diff --git a/src/game/client/components/sounds.cpp b/src/game/client/components/sounds.cpp index 46578cd55..e924756a5 100644 --- a/src/game/client/components/sounds.cpp +++ b/src/game/client/components/sounds.cpp @@ -16,6 +16,7 @@ CSoundLoading::CSoundLoading(CGameClient *pGameClient, bool Render) : m_pGameClient(pGameClient), m_Render(Render) { + Abortable(true); } void CSoundLoading::Run() @@ -27,6 +28,9 @@ void CSoundLoading::Run() for(int i = 0; i < g_pData->m_aSounds[s].m_NumSounds; i++) { + if(State() == IJob::STATE_ABORTED) + return; + int Id = m_pGameClient->Sound()->LoadWV(g_pData->m_aSounds[s].m_aSounds[i].m_pFilename); g_pData->m_aSounds[s].m_aSounds[i].m_Id = Id; // try to render a frame @@ -114,7 +118,7 @@ void CSounds::OnRender() // check for sound initialisation if(m_WaitForSoundJob) { - if(m_pSoundJob->Status() == IJob::STATE_DONE) + if(m_pSoundJob->State() == IJob::STATE_DONE) m_WaitForSoundJob = false; else return; diff --git a/src/game/client/gameclient.cpp b/src/game/client/gameclient.cpp index c5d20a4cb..87e0fd7bc 100644 --- a/src/game/client/gameclient.cpp +++ b/src/game/client/gameclient.cpp @@ -979,8 +979,6 @@ void CGameClient::OnStateChange(int NewState, int OldState) void CGameClient::OnShutdown() { - RenderShutdownMessage(); - for(auto &pComponent : m_vpAll) pComponent->OnShutdown(); } diff --git a/src/game/client/gameclient.h b/src/game/client/gameclient.h index aaf18ae96..48b52eef5 100644 --- a/src/game/client/gameclient.h +++ b/src/game/client/gameclient.h @@ -502,7 +502,7 @@ public: void RefreshSkins(); - void RenderShutdownMessage(); + void RenderShutdownMessage() override; const char *GetItemName(int Type) const override; const char *Version() const override; diff --git a/src/game/editor/editor.cpp b/src/game/editor/editor.cpp index 1a1f64ac5..89ec0b081 100644 --- a/src/game/editor/editor.cpp +++ b/src/game/editor/editor.cpp @@ -8561,7 +8561,7 @@ void CEditor::HandleWriterFinishJobs() return; std::shared_ptr pJob = m_WriterFinishJobs.front(); - if(pJob->Status() != IJob::STATE_DONE) + if(!pJob->Done()) return; m_WriterFinishJobs.pop_front(); diff --git a/src/test/jobs.cpp b/src/test/jobs.cpp index 9c8b49fd0..3a3d3910d 100644 --- a/src/test/jobs.cpp +++ b/src/test/jobs.cpp @@ -15,19 +15,20 @@ class Jobs : public ::testing::Test protected: CJobPool m_Pool; - Jobs() + void SetUp() override { m_Pool.Init(TEST_NUM_THREADS); } + void TearDown() override + { + m_Pool.Shutdown(); + } + void Add(std::shared_ptr pJob) { m_Pool.Add(std::move(pJob)); } - void RunBlocking(IJob *pJob) - { - CJobPool::RunBlocking(pJob); - } }; class CJob : public IJob @@ -38,6 +39,11 @@ class CJob : public IJob public: CJob(std::function &&JobFunction) : m_JobFunction(JobFunction) {} + + void Abortable(bool Abortable) + { + IJob::Abortable(Abortable); + } }; TEST_F(Jobs, Constructor) @@ -49,15 +55,6 @@ TEST_F(Jobs, Simple) Add(std::make_shared([] {})); } -TEST_F(Jobs, RunBlocking) -{ - int Result = 0; - CJob Job([&] { Result = 1; }); - EXPECT_EQ(Result, 0); - RunBlocking(&Job); - EXPECT_EQ(Result, 1); -} - TEST_F(Jobs, Wait) { SEMAPHORE sphore; @@ -67,6 +64,26 @@ TEST_F(Jobs, Wait) sphore_destroy(&sphore); } +TEST_F(Jobs, AbortAbortable) +{ + auto pJob = std::make_shared([&] {}); + pJob->Abortable(true); + EXPECT_TRUE(pJob->IsAbortable()); + Add(pJob); + EXPECT_TRUE(pJob->Abort()); + EXPECT_EQ(pJob->State(), IJob::STATE_ABORTED); +} + +TEST_F(Jobs, AbortUnabortable) +{ + auto pJob = std::make_shared([&] {}); + pJob->Abortable(false); + EXPECT_FALSE(pJob->IsAbortable()); + Add(pJob); + EXPECT_FALSE(pJob->Abort()); + EXPECT_NE(pJob->State(), IJob::STATE_ABORTED); +} + TEST_F(Jobs, LookupHost) { static const char *HOST = "example.com"; @@ -77,7 +94,7 @@ TEST_F(Jobs, LookupHost) EXPECT_EQ(pJob->Nettype(), NETTYPE); Add(pJob); - while(pJob->Status() != IJob::STATE_DONE) + while(pJob->State() != IJob::STATE_DONE) { // yay, busy loop... thread_yield(); @@ -101,7 +118,7 @@ TEST_F(Jobs, LookupHostWebsocket) EXPECT_EQ(pJob->Nettype(), NETTYPE); Add(pJob); - while(pJob->Status() != IJob::STATE_DONE) + while(pJob->State() != IJob::STATE_DONE) { // yay, busy loop... thread_yield(); @@ -130,7 +147,7 @@ TEST_F(Jobs, Many) sphore_signal(&sphore); } }); - EXPECT_EQ(pJob->Status(), IJob::STATE_PENDING); + EXPECT_EQ(pJob->State(), IJob::STATE_QUEUED); vpJobs.push_back(pJob); } for(auto &pJob : vpJobs) @@ -139,10 +156,10 @@ TEST_F(Jobs, Many) } sphore_wait(&sphore); sphore_destroy(&sphore); - m_Pool.~CJobPool(); + TearDown(); for(auto &pJob : vpJobs) { - EXPECT_EQ(pJob->Status(), IJob::STATE_DONE); + EXPECT_EQ(pJob->State(), IJob::STATE_DONE); } - new(&m_Pool) CJobPool(); + SetUp(); } diff --git a/src/test/serverbrowser.cpp b/src/test/serverbrowser.cpp index 7c466f370..811e4c938 100644 --- a/src/test/serverbrowser.cpp +++ b/src/test/serverbrowser.cpp @@ -1,6 +1,8 @@ #include #include +#include + #include #include #include