diff --git a/src/base/tl/threading.h b/src/base/tl/threading.h index 275b28d29..7c3b3cc5e 100644 --- a/src/base/tl/threading.h +++ b/src/base/tl/threading.h @@ -9,21 +9,21 @@ class CSemaphore SEMAPHORE m_Sem; // implement the counter seperatly, because the `sem_getvalue`-API is // deprecated on macOS: https://stackoverflow.com/a/16655541 - std::atomic_int count; + std::atomic_int m_Count{0}; public: CSemaphore() { sphore_init(&m_Sem); } ~CSemaphore() { sphore_destroy(&m_Sem); } CSemaphore(const CSemaphore &) = delete; - int GetApproximateValue() { return count.load(); } + int GetApproximateValue() { return m_Count.load(); } void Wait() { sphore_wait(&m_Sem); - count.fetch_sub(1); + m_Count.fetch_sub(1); } void Signal() { - count.fetch_add(1); + m_Count.fetch_add(1); sphore_signal(&m_Sem); } }; diff --git a/src/engine/server/databases/connection_pool.cpp b/src/engine/server/databases/connection_pool.cpp index 04b338eca..6b0a0dab8 100644 --- a/src/engine/server/databases/connection_pool.cpp +++ b/src/engine/server/databases/connection_pool.cpp @@ -127,8 +127,15 @@ void CDbConnectionPool::Worker() // remember last working server and try to connect to it first int ReadServer = 0; int WriteServer = 0; + // enter fail mode when a sql request fails, skip read request during it and + // write to the backup database until all requests are handled + bool FailMode = false; while(1) { + if(FailMode && m_NumElem.GetApproximateValue() == 0) + { + FailMode = false; + } m_NumElem.Wait(); auto pThreadData = std::move(m_aTasks[LastElem++]); // work through all database jobs after OnShutdown is called before exiting the thread @@ -145,6 +152,16 @@ void CDbConnectionPool::Worker() { for(int i = 0; i < (int)m_aapDbConnections[Mode::READ].size(); i++) { + if(m_Shutdown) + { + dbg_msg("sql", "%s dismissed read request during shutdown", pThreadData->m_pName); + break; + } + if(FailMode) + { + dbg_msg("sql", "%s dismissed read request during FailMode", pThreadData->m_pName); + break; + } int CurServer = (ReadServer + i) % (int)m_aapDbConnections[Mode::READ].size(); if(ExecSqlFunc(m_aapDbConnections[Mode::READ][CurServer].get(), pThreadData.get(), false)) { @@ -154,12 +171,26 @@ void CDbConnectionPool::Worker() break; } } + if(!Success) + { + FailMode = true; + } } break; case CSqlExecData::WRITE_ACCESS: { for(int i = 0; i < (int)m_aapDbConnections[Mode::WRITE].size(); i++) { + if(m_Shutdown && !m_aapDbConnections[Mode::WRITE_BACKUP].empty()) + { + dbg_msg("sql", "%s skipped to backup database during shutdown", pThreadData->m_pName); + break; + } + if(FailMode && !m_aapDbConnections[Mode::WRITE_BACKUP].empty()) + { + dbg_msg("sql", "%s skipped to backup database during FailMode", pThreadData->m_pName); + break; + } int CurServer = (WriteServer + i) % (int)m_aapDbConnections[Mode::WRITE].size(); if(ExecSqlFunc(m_aapDbConnections[Mode::WRITE][i].get(), pThreadData.get(), false)) { @@ -171,6 +202,7 @@ void CDbConnectionPool::Worker() } if(!Success) { + FailMode = true; for(int i = 0; i < (int)m_aapDbConnections[Mode::WRITE_BACKUP].size(); i++) { if(ExecSqlFunc(m_aapDbConnections[Mode::WRITE_BACKUP][i].get(), pThreadData.get(), true)) diff --git a/src/engine/server/databases/connection_pool.h b/src/engine/server/databases/connection_pool.h index 0971c23eb..dd56d113e 100644 --- a/src/engine/server/databases/connection_pool.h +++ b/src/engine/server/databases/connection_pool.h @@ -75,7 +75,7 @@ private: void Worker(); bool ExecSqlFunc(IDbConnection *pConnection, struct CSqlExecData *pData, bool Failure); - std::atomic_bool m_Shutdown; + std::atomic_bool m_Shutdown{false}; CSemaphore m_NumElem; int FirstElem; int LastElem;