Store ranks in sqlite first to not loose them if server shuts down during stuck mysql transaction

This commit is contained in:
Zwelf 2022-10-13 13:52:19 +02:00 committed by Dennis Felsing
parent a7bc593725
commit a4b7c9d08a
11 changed files with 737 additions and 333 deletions

View file

@ -2,10 +2,10 @@
#include <engine/shared/protocol.h>
void IDbConnection::FormatCreateRace(char *aBuf, unsigned int BufferSize)
void IDbConnection::FormatCreateRace(char *aBuf, unsigned int BufferSize, bool Backup)
{
str_format(aBuf, BufferSize,
"CREATE TABLE IF NOT EXISTS %s_race ("
"CREATE TABLE IF NOT EXISTS %s_race%s ("
" Map VARCHAR(128) COLLATE %s NOT NULL, "
" Name VARCHAR(%d) COLLATE %s NOT NULL, "
" Timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, "
@ -24,13 +24,14 @@ void IDbConnection::FormatCreateRace(char *aBuf, unsigned int BufferSize)
" DDNet7 BOOL DEFAULT FALSE, "
" PRIMARY KEY (Map, Name, Time, Timestamp, Server)"
")",
GetPrefix(), BinaryCollate(), MAX_NAME_LENGTH, BinaryCollate());
GetPrefix(), Backup ? "_backup" : "",
BinaryCollate(), MAX_NAME_LENGTH, BinaryCollate());
}
void IDbConnection::FormatCreateTeamrace(char *aBuf, unsigned int BufferSize, const char *pIdType)
void IDbConnection::FormatCreateTeamrace(char *aBuf, unsigned int BufferSize, const char *pIdType, bool Backup)
{
str_format(aBuf, BufferSize,
"CREATE TABLE IF NOT EXISTS %s_teamrace ("
"CREATE TABLE IF NOT EXISTS %s_teamrace%s ("
" Map VARCHAR(128) COLLATE %s NOT NULL, "
" Name VARCHAR(%d) COLLATE %s NOT NULL, "
" Timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, "
@ -40,7 +41,8 @@ void IDbConnection::FormatCreateTeamrace(char *aBuf, unsigned int BufferSize, co
" DDNet7 BOOL DEFAULT FALSE, "
" PRIMARY KEY (ID, Name)"
")",
GetPrefix(), BinaryCollate(), MAX_NAME_LENGTH, BinaryCollate(), pIdType);
GetPrefix(), Backup ? "_backup" : "",
BinaryCollate(), MAX_NAME_LENGTH, BinaryCollate(), pIdType);
}
void IDbConnection::FormatCreateMaps(char *aBuf, unsigned int BufferSize)
@ -58,10 +60,10 @@ void IDbConnection::FormatCreateMaps(char *aBuf, unsigned int BufferSize)
GetPrefix(), BinaryCollate(), BinaryCollate(), BinaryCollate());
}
void IDbConnection::FormatCreateSaves(char *aBuf, unsigned int BufferSize)
void IDbConnection::FormatCreateSaves(char *aBuf, unsigned int BufferSize, bool Backup)
{
str_format(aBuf, BufferSize,
"CREATE TABLE IF NOT EXISTS %s_saves ("
"CREATE TABLE IF NOT EXISTS %s_saves%s ("
" Savegame TEXT COLLATE %s NOT NULL, "
" Map VARCHAR(128) COLLATE %s NOT NULL, "
" Code VARCHAR(128) COLLATE %s NOT NULL, "
@ -71,7 +73,8 @@ void IDbConnection::FormatCreateSaves(char *aBuf, unsigned int BufferSize)
" SaveID VARCHAR(36) DEFAULT NULL, "
" PRIMARY KEY (Map, Code)"
")",
GetPrefix(), BinaryCollate(), BinaryCollate(), BinaryCollate());
GetPrefix(), Backup ? "_backup" : "",
BinaryCollate(), BinaryCollate(), BinaryCollate());
}
void IDbConnection::FormatCreatePoints(char *aBuf, unsigned int BufferSize)

View file

@ -1,6 +1,7 @@
#ifndef ENGINE_SERVER_DATABASES_CONNECTION_H
#define ENGINE_SERVER_DATABASES_CONNECTION_H
#include "connection_pool.h"
#include <base/system.h>
#include <memory>
@ -90,26 +91,19 @@ private:
char m_aPrefix[64];
protected:
void FormatCreateRace(char *aBuf, unsigned int BufferSize);
void FormatCreateTeamrace(char *aBuf, unsigned int BufferSize, const char *pIdType);
void FormatCreateRace(char *aBuf, unsigned int BufferSize, bool Backup);
void FormatCreateTeamrace(char *aBuf, unsigned int BufferSize, const char *pIdType, bool Backup);
void FormatCreateMaps(char *aBuf, unsigned int BufferSize);
void FormatCreateSaves(char *aBuf, unsigned int BufferSize);
void FormatCreateSaves(char *aBuf, unsigned int BufferSize, bool Backup);
void FormatCreatePoints(char *aBuf, unsigned int BufferSize);
};
bool MysqlAvailable();
int MysqlInit();
void MysqlUninit();
std::unique_ptr<IDbConnection> CreateSqliteConnection(const char *pFilename, bool Setup);
// Returns nullptr if MySQL support is not compiled in.
std::unique_ptr<IDbConnection> CreateMysqlConnection(
const char *pDatabase,
const char *pPrefix,
const char *pUser,
const char *pPass,
const char *pIp,
const char *pBindaddr,
int Port,
bool Setup);
std::unique_ptr<IDbConnection> CreateMysqlConnection(CMysqlConfig Config);
#endif // ENGINE_SERVER_DATABASES_CONNECTION_H

View file

@ -2,11 +2,14 @@
#include "connection.h"
#include <base/system.h>
#include <cstring>
#include <engine/console.h>
#include <chrono>
#include <iterator>
#include <memory>
#include <thread>
#include <vector>
using namespace std::chrono_literals;
@ -21,17 +24,42 @@ struct CSqlExecData
CDbConnectionPool::FWrite pFunc,
std::unique_ptr<const ISqlData> pThreadData,
const char *pName);
CSqlExecData(
CDbConnectionPool::Mode m,
const char aFileName[64]);
CSqlExecData(
CDbConnectionPool::Mode m,
const CMysqlConfig *pMysqlConfig);
CSqlExecData(IConsole *pConsole, CDbConnectionPool::Mode m);
~CSqlExecData() = default;
enum
{
READ_ACCESS,
WRITE_ACCESS,
ADD_MYSQL,
ADD_SQLITE,
PRINT,
} m_Mode;
union
{
CDbConnectionPool::FRead m_pReadFunc;
CDbConnectionPool::FWrite m_pWriteFunc;
struct
{
CDbConnectionPool::Mode m_Mode;
CMysqlConfig m_Config;
} m_MySql;
struct
{
CDbConnectionPool::Mode m_Mode;
char m_FileName[64];
} m_Sqlite;
struct
{
IConsole *m_pConsole;
CDbConnectionPool::Mode m_Mode;
} m_Print;
} m_Ptr;
std::unique_ptr<const ISqlData> m_pThreadData;
@ -60,30 +88,56 @@ CSqlExecData::CSqlExecData(
m_Ptr.m_pWriteFunc = pFunc;
}
CDbConnectionPool::CDbConnectionPool() :
m_FirstElem(0),
m_LastElem(0)
CSqlExecData::CSqlExecData(
CDbConnectionPool::Mode m,
const char aFileName[64]) :
m_Mode(ADD_SQLITE),
m_pThreadData(nullptr),
m_pName("add sqlite server")
{
thread_init_and_detach(CDbConnectionPool::Worker, this, "database worker thread");
m_Ptr.m_Sqlite.m_Mode = m;
mem_copy(m_Ptr.m_Sqlite.m_FileName, aFileName, sizeof(m_Ptr.m_Sqlite.m_FileName));
}
CSqlExecData::CSqlExecData(CDbConnectionPool::Mode m,
const CMysqlConfig *pMysqlConfig) :
m_Mode(ADD_MYSQL),
m_pThreadData(nullptr),
m_pName("add mysql server")
{
m_Ptr.m_MySql.m_Mode = m;
mem_copy(&m_Ptr.m_MySql.m_Config, pMysqlConfig, sizeof(m_Ptr.m_MySql.m_Config));
}
CSqlExecData::CSqlExecData(IConsole *pConsole, CDbConnectionPool::Mode m) :
m_Mode(PRINT),
m_pThreadData(nullptr),
m_pName("print database server")
{
m_Ptr.m_Print.m_pConsole = pConsole;
m_Ptr.m_Print.m_Mode = m;
}
CDbConnectionPool::~CDbConnectionPool() = default;
void CDbConnectionPool::Print(IConsole *pConsole, Mode DatabaseMode)
{
static const char *s_apModeDesc[] = {"Read", "Write", "WriteBackup"};
for(unsigned int i = 0; i < m_vvpDbConnections[DatabaseMode].size(); i++)
{
m_vvpDbConnections[DatabaseMode][i]->Print(pConsole, s_apModeDesc[DatabaseMode]);
}
m_pShared->m_aQueries[m_InsertIdx++] = std::make_unique<CSqlExecData>(pConsole, DatabaseMode);
m_InsertIdx %= std::size(m_pShared->m_aQueries);
m_pShared->m_NumBackup.Signal();
}
void CDbConnectionPool::RegisterDatabase(std::unique_ptr<IDbConnection> pDatabase, Mode DatabaseMode)
void CDbConnectionPool::RegisterSqliteDatabase(Mode DatabaseMode, const char aFileName[64])
{
if(DatabaseMode < 0 || NUM_MODES <= DatabaseMode)
return;
m_vvpDbConnections[DatabaseMode].push_back(std::move(pDatabase));
m_pShared->m_aQueries[m_InsertIdx++] = std::make_unique<CSqlExecData>(DatabaseMode, aFileName);
m_InsertIdx %= std::size(m_pShared->m_aQueries);
m_pShared->m_NumBackup.Signal();
}
void CDbConnectionPool::RegisterMysqlDatabase(Mode DatabaseMode, const CMysqlConfig *pMysqlConfig)
{
m_pShared->m_aQueries[m_InsertIdx++] = std::make_unique<CSqlExecData>(DatabaseMode, pMysqlConfig);
m_InsertIdx %= std::size(m_pShared->m_aQueries);
m_pShared->m_NumBackup.Signal();
}
void CDbConnectionPool::Execute(
@ -91,9 +145,9 @@ void CDbConnectionPool::Execute(
std::unique_ptr<const ISqlData> pSqlRequestData,
const char *pName)
{
m_aTasks[m_FirstElem++] = std::make_unique<CSqlExecData>(pFunc, std::move(pSqlRequestData), pName);
m_FirstElem %= std::size(m_aTasks);
m_NumElem.Signal();
m_pShared->m_aQueries[m_InsertIdx++] = std::make_unique<CSqlExecData>(pFunc, std::move(pSqlRequestData), pName);
m_InsertIdx %= std::size(m_pShared->m_aQueries);
m_pShared->m_NumBackup.Signal();
}
void CDbConnectionPool::ExecuteWrite(
@ -101,17 +155,17 @@ void CDbConnectionPool::ExecuteWrite(
std::unique_ptr<const ISqlData> pSqlRequestData,
const char *pName)
{
m_aTasks[m_FirstElem++] = std::make_unique<CSqlExecData>(pFunc, std::move(pSqlRequestData), pName);
m_FirstElem %= std::size(m_aTasks);
m_NumElem.Signal();
m_pShared->m_aQueries[m_InsertIdx++] = std::make_unique<CSqlExecData>(pFunc, std::move(pSqlRequestData), pName);
m_InsertIdx %= std::size(m_pShared->m_aQueries);
m_pShared->m_NumBackup.Signal();
}
void CDbConnectionPool::OnShutdown()
{
m_Shutdown.store(true);
m_NumElem.Signal();
m_pShared->m_Shutdown.store(true);
m_pShared->m_NumBackup.Signal();
int i = 0;
while(m_Shutdown.load())
while(m_pShared->m_Shutdown.load())
{
// print a log about every two seconds
if(i % 20 == 0 && i > 0)
@ -123,57 +177,143 @@ void CDbConnectionPool::OnShutdown()
}
}
void CDbConnectionPool::Worker(void *pUser)
// The backup worker thread looks at write queries and stores them
// in the sqilte database (WRITE_BACKUP). It skips over read queries.
// After processing the query, it gets passed on to the Worker thread.
// This is done to not loose ranks when the server shuts down before all
// queries are executed on the mysql server
class CBackup
{
CDbConnectionPool *pThis = (CDbConnectionPool *)pUser;
pThis->Worker();
public:
CBackup(std::shared_ptr<CDbConnectionPool::CSharedData> pShared) :
m_pShared(std::move(pShared)) {}
static void Start(void *pUser);
private:
void ProcessQueries();
std::unique_ptr<IDbConnection> m_pWriteBackup;
std::shared_ptr<CDbConnectionPool::CSharedData> m_pShared;
};
/* static */
void CBackup::Start(void *pUser)
{
CBackup *pThis = (CBackup *)pUser;
pThis->ProcessQueries();
delete pThis;
}
void CDbConnectionPool::Worker()
void CBackup::ProcessQueries()
{
// remember last working server and try to connect to it first
int ReadServer = 0;
int WriteServer = 0;
// enter fail mode when a sql request fails, skip read request during it and
// write to the backup database until all requests are handled
bool FailMode = false;
while(true)
for(int JobNum = 0;; JobNum++)
{
if(FailMode && m_NumElem.GetApproximateValue() == 0)
{
FailMode = false;
}
m_NumElem.Wait();
auto pThreadData = std::move(m_aTasks[m_LastElem++]);
m_pShared->m_NumBackup.Wait();
CSqlExecData *pThreadData = m_pShared->m_aQueries[JobNum % std::size(m_pShared->m_aQueries)].get();
// work through all database jobs after OnShutdown is called before exiting the thread
if(pThreadData == nullptr)
{
m_Shutdown.store(false);
m_pShared->m_NumWorker.Signal();
return;
}
if(pThreadData->m_Mode == CSqlExecData::ADD_SQLITE &&
pThreadData->m_Ptr.m_Sqlite.m_Mode == CDbConnectionPool::Mode::WRITE_BACKUP)
{
m_pWriteBackup = CreateSqliteConnection(pThreadData->m_Ptr.m_Sqlite.m_FileName, true);
}
else if(pThreadData->m_Mode == CSqlExecData::WRITE_ACCESS && m_pWriteBackup.get())
{
bool Success = CDbConnectionPool::ExecSqlFunc(m_pWriteBackup.get(), pThreadData, Write::BACKUP_FIRST);
dbg_msg("sql", "[%i] %s done on write backup database, Success=%i", JobNum, pThreadData->m_pName, Success);
}
m_pShared->m_NumWorker.Signal();
}
}
// the worker threads executes queries on mysql or sqlite. If we write on
// a mysql server and have a backup server configured, we'll remove the
// entry from the backup server after completing it on the write server.
// static void Worker(void *pUser);
class CWorker
{
public:
CWorker(std::shared_ptr<CDbConnectionPool::CSharedData> pShared) :
m_pShared(std::move(pShared)) {}
static void Start(void *pUser);
void ProcessQueries();
private:
void Print(IConsole *pConsole, CDbConnectionPool::Mode DatabaseMode);
// There are two possible configurations
// * sqlite mode: There exists exactly one READ and the same WRITE server
// with no WRITE_BACKUP server
// * mysql mode: there can exist multiple READ server, there must be at
// most one WRITE server. The WRITE server for all DDNet
// Servers must be the same (to counteract double loads).
// There may be one WRITE_BACKUP sqlite server.
// This variable should only change, before the worker threads
std::vector<std::unique_ptr<IDbConnection>> m_vpReadConnections;
std::unique_ptr<IDbConnection> m_pWriteConnection;
std::unique_ptr<IDbConnection> m_pWriteBackup;
std::shared_ptr<CDbConnectionPool::CSharedData> m_pShared;
};
/* static */
void CWorker::Start(void *pUser)
{
CWorker *pThis = (CWorker *)pUser;
pThis->ProcessQueries();
delete pThis;
}
void CWorker::ProcessQueries()
{
// remember last working server and try to connect to it first
int ReadServer = 0;
// enter fail mode when a sql request fails, skip read request during it and
// write to the backup database until all requests are handled
bool FailMode = false;
for(int JobNum = 0;; JobNum++)
{
if(FailMode && m_pShared->m_NumWorker.GetApproximateValue() == 0)
{
FailMode = false;
}
m_pShared->m_NumWorker.Wait();
auto pThreadData = std::move(m_pShared->m_aQueries[JobNum % std::size(m_pShared->m_aQueries)]);
// work through all database jobs after OnShutdown is called before exiting the thread
if(pThreadData == nullptr)
{
m_pShared->m_Shutdown.store(false);
return;
}
m_LastElem %= std::size(m_aTasks);
bool Success = false;
switch(pThreadData->m_Mode)
{
case CSqlExecData::READ_ACCESS:
{
for(int i = 0; i < (int)m_vvpDbConnections[Mode::READ].size(); i++)
for(size_t i = 0; i < m_vpReadConnections.size(); i++)
{
if(m_Shutdown)
if(m_pShared->m_Shutdown)
{
dbg_msg("sql", "%s dismissed read request during shutdown", pThreadData->m_pName);
dbg_msg("sql", "[%i] %s dismissed read request during shutdown", JobNum, pThreadData->m_pName);
break;
}
if(FailMode)
{
dbg_msg("sql", "%s dismissed read request during FailMode", pThreadData->m_pName);
dbg_msg("sql", "[%i] %s dismissed read request during FailMode", JobNum, pThreadData->m_pName);
break;
}
int CurServer = (ReadServer + i) % (int)m_vvpDbConnections[Mode::READ].size();
if(ExecSqlFunc(m_vvpDbConnections[Mode::READ][CurServer].get(), pThreadData.get(), false))
int CurServer = (ReadServer + i) % (int)m_vpReadConnections.size();
if(CDbConnectionPool::ExecSqlFunc(m_vpReadConnections[CurServer].get(), pThreadData.get(), Write::NORMAL))
{
ReadServer = CurServer;
dbg_msg("sql", "%s done on read database %d", pThreadData->m_pName, CurServer);
dbg_msg("sql", "[%i] %s done on read database %d", JobNum, pThreadData->m_pName, CurServer);
Success = true;
break;
}
@ -186,46 +326,81 @@ void CDbConnectionPool::Worker()
break;
case CSqlExecData::WRITE_ACCESS:
{
for(int i = 0; i < (int)m_vvpDbConnections[Mode::WRITE].size(); i++)
if(m_pShared->m_Shutdown && m_pWriteBackup != nullptr)
{
if(m_Shutdown && !m_vvpDbConnections[Mode::WRITE_BACKUP].empty())
{
dbg_msg("sql", "%s skipped to backup database during shutdown", pThreadData->m_pName);
break;
}
if(FailMode && !m_vvpDbConnections[Mode::WRITE_BACKUP].empty())
{
dbg_msg("sql", "%s skipped to backup database during FailMode", pThreadData->m_pName);
break;
}
int CurServer = (WriteServer + i) % (int)m_vvpDbConnections[Mode::WRITE].size();
if(ExecSqlFunc(m_vvpDbConnections[Mode::WRITE][i].get(), pThreadData.get(), false))
{
WriteServer = CurServer;
dbg_msg("sql", "%s done on write database %d", pThreadData->m_pName, CurServer);
Success = true;
break;
}
dbg_msg("sql", "[%i] %s skipped to backup database during shutdown", JobNum, pThreadData->m_pName);
break;
}
if(!Success)
else if(FailMode && m_pWriteBackup != nullptr)
{
FailMode = true;
for(int i = 0; i < (int)m_vvpDbConnections[Mode::WRITE_BACKUP].size(); i++)
{
if(ExecSqlFunc(m_vvpDbConnections[Mode::WRITE_BACKUP][i].get(), pThreadData.get(), true))
{
dbg_msg("sql", "%s done on write backup database %d", pThreadData->m_pName, i);
Success = true;
break;
}
}
dbg_msg("sql", "[%i] %s skipped to backup database during FailMode", JobNum, pThreadData->m_pName);
break;
}
else if(CDbConnectionPool::ExecSqlFunc(m_pWriteConnection.get(), pThreadData.get(), Write::NORMAL))
{
dbg_msg("sql", "[%i] %s done on write database", JobNum, pThreadData->m_pName);
Success = true;
break;
}
// enter fail mode if not successful
FailMode = FailMode || !Success;
const Write w = Success ? Write::NORMAL_SUCCEEDED : Write::NORMAL_FAILED;
if(CDbConnectionPool::ExecSqlFunc(m_pWriteBackup.get(), pThreadData.get(), w))
{
dbg_msg("sql", "[%i] %s done move write on backup database to non-backup table", JobNum, pThreadData->m_pName);
Success = true;
break;
}
}
break;
case CSqlExecData::ADD_MYSQL:
{
auto pMysql = CreateMysqlConnection(pThreadData->m_Ptr.m_MySql.m_Config);
switch(pThreadData->m_Ptr.m_MySql.m_Mode)
{
case CDbConnectionPool::Mode::READ:
m_vpReadConnections.push_back(std::move(pMysql));
break;
case CDbConnectionPool::Mode::WRITE:
m_pWriteConnection = std::move(pMysql);
break;
case CDbConnectionPool::Mode::WRITE_BACKUP:
m_pWriteBackup = std::move(pMysql);
break;
case CDbConnectionPool::Mode::NUM_MODES:
break;
}
Success = true;
break;
}
case CSqlExecData::ADD_SQLITE:
{
auto pSqlite = CreateSqliteConnection(pThreadData->m_Ptr.m_Sqlite.m_FileName, true);
switch(pThreadData->m_Ptr.m_Sqlite.m_Mode)
{
case CDbConnectionPool::Mode::READ:
m_vpReadConnections.push_back(std::move(pSqlite));
break;
case CDbConnectionPool::Mode::WRITE:
m_pWriteConnection = std::move(pSqlite);
break;
case CDbConnectionPool::Mode::WRITE_BACKUP:
m_pWriteBackup = std::move(pSqlite);
break;
case CDbConnectionPool::Mode::NUM_MODES:
break;
}
Success = true;
break;
}
case CSqlExecData::PRINT:
Print(pThreadData->m_Ptr.m_Print.m_pConsole, pThreadData->m_Ptr.m_Print.m_Mode);
Success = true;
break;
}
if(!Success)
dbg_msg("sql", "%s failed on all databases", pThreadData->m_pName);
if(pThreadData->m_pThreadData->m_pResult != nullptr)
dbg_msg("sql", "[%i] %s failed on all databases", JobNum, pThreadData->m_pName);
if(pThreadData->m_pThreadData != nullptr && pThreadData->m_pThreadData->m_pResult != nullptr)
{
pThreadData->m_pThreadData->m_pResult->m_Success = Success;
pThreadData->m_pThreadData->m_pResult->m_Completed.store(true);
@ -233,9 +408,40 @@ void CDbConnectionPool::Worker()
}
}
bool CDbConnectionPool::ExecSqlFunc(IDbConnection *pConnection, CSqlExecData *pData, bool Failure)
void CWorker::Print(IConsole *pConsole, CDbConnectionPool::Mode DatabaseMode)
{
if(DatabaseMode == CDbConnectionPool::Mode::READ)
{
for(auto &pReadConnection : m_vpReadConnections)
pReadConnection->Print(pConsole, "Read");
if(m_vpReadConnections.empty())
pConsole->Print(IConsole::OUTPUT_LEVEL_STANDARD, "server", "There are no read databases");
}
else if(DatabaseMode == CDbConnectionPool::Mode::WRITE)
{
if(m_pWriteConnection)
m_pWriteConnection->Print(pConsole, "Write");
else
pConsole->Print(IConsole::OUTPUT_LEVEL_STANDARD, "server", "There are no write databases");
}
else if(DatabaseMode == CDbConnectionPool::Mode::WRITE_BACKUP)
{
if(m_pWriteBackup)
m_pWriteBackup->Print(pConsole, "WriteBackup");
else
pConsole->Print(IConsole::OUTPUT_LEVEL_STANDARD, "server", "There are no write backup databases");
}
}
/* static */
bool CDbConnectionPool::ExecSqlFunc(IDbConnection *pConnection, CSqlExecData *pData, Write w)
{
char aError[256] = "error message not initialized";
if(pConnection == nullptr)
{
str_format(aError, sizeof(aError), "No database given");
return false;
}
if(pConnection->Connect(aError, sizeof(aError)))
{
dbg_msg("sql", "failed connecting to db: %s", aError);
@ -248,8 +454,10 @@ bool CDbConnectionPool::ExecSqlFunc(IDbConnection *pConnection, CSqlExecData *pD
Success = !pData->m_Ptr.m_pReadFunc(pConnection, pData->m_pThreadData.get(), aError, sizeof(aError));
break;
case CSqlExecData::WRITE_ACCESS:
Success = !pData->m_Ptr.m_pWriteFunc(pConnection, pData->m_pThreadData.get(), Failure, aError, sizeof(aError));
Success = !pData->m_Ptr.m_pWriteFunc(pConnection, pData->m_pThreadData.get(), w, aError, sizeof(aError));
break;
default:
dbg_assert(false, "unreachable");
}
pConnection->Disconnect();
if(!Success)
@ -258,3 +466,11 @@ bool CDbConnectionPool::ExecSqlFunc(IDbConnection *pConnection, CSqlExecData *pD
}
return Success;
}
CDbConnectionPool::CDbConnectionPool()
{
m_pShared = std::make_shared<CSharedData>();
thread_init_and_detach(CWorker::Start, new CWorker(m_pShared), "database worker thread");
thread_init_and_detach(CBackup::Start, new CBackup(m_pShared), "database backup worker thread");
}

View file

@ -26,13 +26,37 @@ struct ISqlData
m_pResult(std::move(pResult))
{
}
virtual ~ISqlData(){};
virtual ~ISqlData() = default;
mutable std::shared_ptr<ISqlResult> m_pResult;
};
enum Write
{
// write everything into the backup db first
BACKUP_FIRST,
// now try to write it into remote db
NORMAL,
// succeeded writing -> remove copy from backup
NORMAL_SUCCEEDED,
// failed writing -> notify about failure
NORMAL_FAILED,
};
class IConsole;
struct CMysqlConfig
{
char m_aDatabase[64];
char m_aPrefix[64];
char m_aUser[64];
char m_aPass[64];
char m_aIp[64];
char m_aBindaddr[128];
int m_Port;
bool m_Setup;
};
class CDbConnectionPool
{
public:
@ -42,7 +66,7 @@ public:
// Returns false on success.
typedef bool (*FRead)(IDbConnection *, const ISqlData *, char *pError, int ErrorSize);
typedef bool (*FWrite)(IDbConnection *, const ISqlData *, bool, char *pError, int ErrorSize);
typedef bool (*FWrite)(IDbConnection *, const ISqlData *, Write, char *pError, int ErrorSize);
enum Mode
{
@ -54,13 +78,15 @@ public:
void Print(IConsole *pConsole, Mode DatabaseMode);
void RegisterDatabase(std::unique_ptr<IDbConnection> pDatabase, Mode DatabaseMode);
void RegisterSqliteDatabase(Mode DatabaseMode, const char FileName[64]);
void RegisterMysqlDatabase(Mode DatabaseMode, const CMysqlConfig *pMysqlConfig);
void Execute(
FRead pFunc,
std::unique_ptr<const ISqlData> pSqlRequestData,
const char *pName);
// writes to WRITE_BACKUP server in case of failure
// writes to WRITE_BACKUP first and removes it from there when successfully
// executed on WRITE server
void ExecuteWrite(
FWrite pFunc,
std::unique_ptr<const ISqlData> pSqlRequestData,
@ -68,18 +94,36 @@ public:
void OnShutdown();
friend class CWorker;
friend class CBackup;
private:
std::vector<std::unique_ptr<IDbConnection>> m_vvpDbConnections[NUM_MODES];
static bool ExecSqlFunc(IDbConnection *pConnection, struct CSqlExecData *pData, Write w);
static void Worker(void *pUser);
void Worker();
bool ExecSqlFunc(IDbConnection *pConnection, struct CSqlExecData *pData, bool Failure);
// Only the main thread accesses this variable. It points to the index,
// where the next query is added to the queue.
int m_InsertIdx = 0;
std::atomic_bool m_Shutdown{false};
CSemaphore m_NumElem;
int m_FirstElem;
int m_LastElem;
std::unique_ptr<struct CSqlExecData> m_aTasks[512];
struct CSharedData
{
// Used as signal that shutdown is in progress from main thread to
// speed up the queries by discarding read queries and writing to
// the sqlite file instead of the remote mysql server.
// The worker thread signals the main thread that all queries are
// processed by setting this variable to false again.
std::atomic_bool m_Shutdown{false};
// Queries go first to the backup thread. This semaphore signals about
// new queries.
CSemaphore m_NumBackup;
// When the backup thread processed the query, it signals the main
// thread with this semaphore about the new query
CSemaphore m_NumWorker;
// spsc queue with additional backup worker to look at queries first.
std::unique_ptr<struct CSqlExecData> m_aQueries[512];
};
std::shared_ptr<CSharedData> m_pShared;
};
#endif // ENGINE_SERVER_DATABASES_CONNECTION_POOL_H

View file

@ -1,4 +1,5 @@
#include "connection.h"
#include "engine/server/databases/connection_pool.h"
#if defined(CONF_MYSQL)
#include <mysql.h>
@ -25,6 +26,11 @@ enum
std::atomic_int g_MysqlState = {MYSQLSTATE_UNINITIALIZED};
std::atomic_int g_MysqlNumConnections;
bool MysqlAvailable()
{
return true;
}
int MysqlInit()
{
dbg_assert(mysql_thread_safe(), "MySQL library without thread safety");
@ -58,15 +64,7 @@ void MysqlUninit()
class CMysqlConnection : public IDbConnection
{
public:
CMysqlConnection(
const char *pDatabase,
const char *pPrefix,
const char *pUser,
const char *pPass,
const char *pIp,
const char *pBindaddr,
int Port,
bool Setup);
explicit CMysqlConnection(CMysqlConfig m_Config);
~CMysqlConnection();
void Print(IConsole *pConsole, const char *pMode) override;
@ -134,14 +132,8 @@ private:
std::vector<MYSQL_BIND> m_vStmtParameters;
std::vector<UParameterExtra> m_vStmtParameterExtras;
// copy of config vars
char m_aDatabase[64];
char m_aUser[64];
char m_aPass[64];
char m_aIp[64];
char m_aBindaddr[128];
int m_Port;
bool m_Setup;
// copy of m_Config vars
CMysqlConfig m_Config;
std::atomic_bool m_InUse;
};
@ -151,18 +143,9 @@ void CMysqlConnection::CStmtDeleter::operator()(MYSQL_STMT *pStmt) const
mysql_stmt_close(pStmt);
}
CMysqlConnection::CMysqlConnection(
const char *pDatabase,
const char *pPrefix,
const char *pUser,
const char *pPass,
const char *pIp,
const char *pBindaddr,
int Port,
bool Setup) :
IDbConnection(pPrefix),
m_Port(Port),
m_Setup(Setup),
CMysqlConnection::CMysqlConnection(CMysqlConfig Config) :
IDbConnection(Config.m_aPrefix),
m_Config(Config),
m_InUse(false)
{
g_MysqlNumConnections += 1;
@ -171,12 +154,6 @@ CMysqlConnection::CMysqlConnection(
mem_zero(m_aErrorDetail, sizeof(m_aErrorDetail));
mem_zero(&m_Mysql, sizeof(m_Mysql));
mysql_init(&m_Mysql);
str_copy(m_aDatabase, pDatabase, sizeof(m_aDatabase));
str_copy(m_aUser, pUser, sizeof(m_aUser));
str_copy(m_aPass, pPass, sizeof(m_aPass));
str_copy(m_aIp, pIp, sizeof(m_aIp));
str_copy(m_aBindaddr, pBindaddr, sizeof(m_aBindaddr));
}
CMysqlConnection::~CMysqlConnection()
@ -215,13 +192,13 @@ void CMysqlConnection::Print(IConsole *pConsole, const char *pMode)
char aBuf[512];
str_format(aBuf, sizeof(aBuf),
"MySQL-%s: DB: '%s' Prefix: '%s' User: '%s' IP: <{'%s'}> Port: %d",
pMode, m_aDatabase, GetPrefix(), m_aUser, m_aIp, m_Port);
pMode, m_Config.m_aDatabase, GetPrefix(), m_Config.m_aUser, m_Config.m_aIp, m_Config.m_Port);
pConsole->Print(IConsole::OUTPUT_LEVEL_STANDARD, "server", aBuf);
}
CMysqlConnection *CMysqlConnection::Copy()
{
return new CMysqlConnection(m_aDatabase, GetPrefix(), m_aUser, m_aPass, m_aIp, m_aBindaddr, m_Port, m_Setup);
return new CMysqlConnection(m_Config);
}
void CMysqlConnection::ToUnixTimestamp(const char *pTimestamp, char *aBuf, unsigned int BufferSize)
@ -255,7 +232,7 @@ bool CMysqlConnection::ConnectImpl()
StoreErrorStmt("free_result");
dbg_msg("mysql", "can't free last result %s", m_aErrorDetail);
}
if(!mysql_select_db(&m_Mysql, m_aDatabase))
if(!mysql_select_db(&m_Mysql, m_Config.m_aDatabase))
{
// Success.
return false;
@ -277,12 +254,12 @@ bool CMysqlConnection::ConnectImpl()
mysql_options(&m_Mysql, MYSQL_OPT_WRITE_TIMEOUT, &OptWriteTimeout);
mysql_options(&m_Mysql, MYSQL_OPT_RECONNECT, &OptReconnect);
mysql_options(&m_Mysql, MYSQL_SET_CHARSET_NAME, "utf8mb4");
if(m_aBindaddr[0] != '\0')
if(m_Config.m_aBindaddr[0] != '\0')
{
mysql_options(&m_Mysql, MYSQL_OPT_BIND, m_aBindaddr);
mysql_options(&m_Mysql, MYSQL_OPT_BIND, m_Config.m_aBindaddr);
}
if(!mysql_real_connect(&m_Mysql, m_aIp, m_aUser, m_aPass, nullptr, m_Port, nullptr, CLIENT_IGNORE_SIGPIPE))
if(!mysql_real_connect(&m_Mysql, m_Config.m_aIp, m_Config.m_aUser, m_Config.m_aPass, nullptr, m_Config.m_Port, nullptr, CLIENT_IGNORE_SIGPIPE))
{
StoreErrorMysql("real_connect");
return true;
@ -297,11 +274,11 @@ bool CMysqlConnection::ConnectImpl()
return true;
}
if(m_Setup)
if(m_Config.m_Setup)
{
char aCreateDatabase[1024];
// create database
str_format(aCreateDatabase, sizeof(aCreateDatabase), "CREATE DATABASE IF NOT EXISTS %s CHARACTER SET utf8mb4", m_aDatabase);
str_format(aCreateDatabase, sizeof(aCreateDatabase), "CREATE DATABASE IF NOT EXISTS %s CHARACTER SET utf8mb4", m_Config.m_aDatabase);
if(PrepareAndExecuteStatement(aCreateDatabase))
{
return true;
@ -309,23 +286,23 @@ bool CMysqlConnection::ConnectImpl()
}
// Connect to specific database
if(mysql_select_db(&m_Mysql, m_aDatabase))
if(mysql_select_db(&m_Mysql, m_Config.m_aDatabase))
{
StoreErrorMysql("select_db");
return true;
}
if(m_Setup)
if(m_Config.m_Setup)
{
char aCreateRace[1024];
char aCreateTeamrace[1024];
char aCreateMaps[1024];
char aCreateSaves[1024];
char aCreatePoints[1024];
FormatCreateRace(aCreateRace, sizeof(aCreateRace));
FormatCreateTeamrace(aCreateTeamrace, sizeof(aCreateTeamrace), "VARBINARY(16)");
FormatCreateRace(aCreateRace, sizeof(aCreateRace), /* Backup */ false);
FormatCreateTeamrace(aCreateTeamrace, sizeof(aCreateTeamrace), "VARBINARY(16)", /* Backup */ false);
FormatCreateMaps(aCreateMaps, sizeof(aCreateMaps));
FormatCreateSaves(aCreateSaves, sizeof(aCreateSaves));
FormatCreateSaves(aCreateSaves, sizeof(aCreateSaves), /* Backup */ false);
FormatCreatePoints(aCreatePoints, sizeof(aCreatePoints));
if(PrepareAndExecuteStatement(aCreateRace) ||
@ -336,7 +313,7 @@ bool CMysqlConnection::ConnectImpl()
{
return true;
}
m_Setup = false;
m_Config.m_Setup = false;
}
dbg_msg("mysql", "connection established");
return false;
@ -713,19 +690,15 @@ bool CMysqlConnection::AddPoints(const char *pPlayer, int Points, char *pError,
return ExecuteUpdate(&NumUpdated, pError, ErrorSize);
}
std::unique_ptr<IDbConnection> CreateMysqlConnection(
const char *pDatabase,
const char *pPrefix,
const char *pUser,
const char *pPass,
const char *pIp,
const char *pBindaddr,
int Port,
bool Setup)
std::unique_ptr<IDbConnection> CreateMysqlConnection(CMysqlConfig Config)
{
return std::make_unique<CMysqlConnection>(pDatabase, pPrefix, pUser, pPass, pIp, pBindaddr, Port, Setup);
return std::make_unique<CMysqlConnection>(Config);
}
#else
bool MysqlAvailable()
{
return false;
}
int MysqlInit()
{
return 0;
@ -733,15 +706,7 @@ int MysqlInit()
void MysqlUninit()
{
}
std::unique_ptr<IDbConnection> CreateMysqlConnection(
const char *pDatabase,
const char *pPrefix,
const char *pUser,
const char *pPass,
const char *pIp,
const char *pBindaddr,
int Port,
bool Setup)
std::unique_ptr<IDbConnection> CreateMysqlConnection(CMysqlConfig Config)
{
return nullptr;
}

View file

@ -54,6 +54,9 @@ public:
bool AddPoints(const char *pPlayer, int Points, char *pError, int ErrorSize) override;
// fail safe
bool CreateFailsafeTables();
private:
// copy of config vars
char m_aFilename[IO_MAX_PATH_LENGTH];
@ -139,20 +142,32 @@ bool CSqliteConnection::Connect(char *pError, int ErrorSize)
if(m_Setup)
{
if(Execute("PRAGMA journal_mode=WAL", pError, ErrorSize))
return true;
char aBuf[1024];
FormatCreateRace(aBuf, sizeof(aBuf));
FormatCreateRace(aBuf, sizeof(aBuf), /* Backup */ false);
if(Execute(aBuf, pError, ErrorSize))
return true;
FormatCreateTeamrace(aBuf, sizeof(aBuf), "BLOB");
FormatCreateTeamrace(aBuf, sizeof(aBuf), "BLOB", /* Backup */ false);
if(Execute(aBuf, pError, ErrorSize))
return true;
FormatCreateMaps(aBuf, sizeof(aBuf));
if(Execute(aBuf, pError, ErrorSize))
return true;
FormatCreateSaves(aBuf, sizeof(aBuf));
FormatCreateSaves(aBuf, sizeof(aBuf), /* Backup */ false);
if(Execute(aBuf, pError, ErrorSize))
return true;
FormatCreatePoints(aBuf, sizeof(aBuf));
if(Execute(aBuf, pError, ErrorSize))
return true;
FormatCreateRace(aBuf, sizeof(aBuf), /* Backup */ true);
if(Execute(aBuf, pError, ErrorSize))
return true;
FormatCreateTeamrace(aBuf, sizeof(aBuf), "BLOB", /* Backup */ true);
if(Execute(aBuf, pError, ErrorSize))
return true;
FormatCreateSaves(aBuf, sizeof(aBuf), /* Backup */ true);
if(Execute(aBuf, pError, ErrorSize))
return true;
m_Setup = false;

View file

@ -2543,17 +2543,15 @@ int CServer::Run()
{
char aFullPath[IO_MAX_PATH_LENGTH];
Storage()->GetCompletePath(IStorage::TYPE_SAVE_OR_ABSOLUTE, Config()->m_SvSqliteFile, aFullPath, sizeof(aFullPath));
auto pSqliteConn = CreateSqliteConnection(aFullPath, true);
if(Config()->m_SvUseSQL)
{
DbPool()->RegisterDatabase(std::move(pSqliteConn), CDbConnectionPool::WRITE_BACKUP);
DbPool()->RegisterSqliteDatabase(CDbConnectionPool::WRITE_BACKUP, Config()->m_SvSqliteFile);
}
else
{
auto pCopy = std::unique_ptr<IDbConnection>(pSqliteConn->Copy());
DbPool()->RegisterDatabase(std::move(pSqliteConn), CDbConnectionPool::READ);
DbPool()->RegisterDatabase(std::move(pCopy), CDbConnectionPool::WRITE);
DbPool()->RegisterSqliteDatabase(CDbConnectionPool::READ, Config()->m_SvSqliteFile);
DbPool()->RegisterSqliteDatabase(CDbConnectionPool::WRITE, Config()->m_SvSqliteFile);
}
}
@ -3377,6 +3375,12 @@ void CServer::ConAddSqlServer(IConsole::IResult *pResult, void *pUserData)
{
CServer *pSelf = (CServer *)pUserData;
if(!MysqlAvailable())
{
pSelf->Console()->Print(IConsole::OUTPUT_LEVEL_STANDARD, "server", "can't add MySQL server: compiled without MySQL support");
return;
}
if(!pSelf->Config()->m_SvUseSQL)
return;
@ -3386,38 +3390,34 @@ void CServer::ConAddSqlServer(IConsole::IResult *pResult, void *pUserData)
return;
}
bool ReadOnly;
CMysqlConfig Config;
bool Write;
if(str_comp_nocase(pResult->GetString(0), "w") == 0)
ReadOnly = false;
Write = false;
else if(str_comp_nocase(pResult->GetString(0), "r") == 0)
ReadOnly = true;
Write = true;
else
{
pSelf->Console()->Print(IConsole::OUTPUT_LEVEL_STANDARD, "server", "choose either 'r' for SqlReadServer or 'w' for SqlWriteServer");
return;
}
bool SetUpDb = pResult->NumArguments() == 8 ? pResult->GetInteger(7) : true;
auto pMysqlConn = CreateMysqlConnection(
pResult->GetString(1), pResult->GetString(2), pResult->GetString(3),
pResult->GetString(4), pResult->GetString(5), g_Config.m_SvSqlBindaddr,
pResult->GetInteger(6), SetUpDb);
if(!pMysqlConn)
{
pSelf->Console()->Print(IConsole::OUTPUT_LEVEL_STANDARD, "server", "can't add MySQL server: compiled without MySQL support");
return;
}
str_copy(Config.m_aDatabase, pResult->GetString(1), sizeof(Config.m_aDatabase));
str_copy(Config.m_aPrefix, pResult->GetString(2), sizeof(Config.m_aPrefix));
str_copy(Config.m_aUser, pResult->GetString(3), sizeof(Config.m_aUser));
str_copy(Config.m_aPass, pResult->GetString(4), sizeof(Config.m_aPass));
str_copy(Config.m_aIp, pResult->GetString(5), sizeof(Config.m_aIp));
str_copy(Config.m_aBindaddr, Config.m_aBindaddr, sizeof(Config.m_aBindaddr));
Config.m_Port = pResult->GetInteger(6);
Config.m_Setup = pResult->NumArguments() == 8 ? pResult->GetInteger(7) : true;
char aBuf[512];
str_format(aBuf, sizeof(aBuf),
"Added new Sql%sServer: DB: '%s' Prefix: '%s' User: '%s' IP: <{%s}> Port: %d",
ReadOnly ? "Read" : "Write",
pResult->GetString(1), pResult->GetString(2), pResult->GetString(3),
pResult->GetString(5), pResult->GetInteger(6));
"Adding new Sql%sServer: DB: '%s' Prefix: '%s' User: '%s' IP: <{%s}> Port: %d",
Write ? "Write" : "Read",
Config.m_aDatabase, Config.m_aPrefix, Config.m_aUser, Config.m_aIp, Config.m_Port);
pSelf->Console()->Print(IConsole::OUTPUT_LEVEL_STANDARD, "server", aBuf);
pSelf->DbPool()->RegisterDatabase(std::move(pMysqlConn), ReadOnly ? CDbConnectionPool::READ : CDbConnectionPool::WRITE);
pSelf->DbPool()->RegisterMysqlDatabase(Write ? CDbConnectionPool::WRITE : CDbConnectionPool::READ, &Config);
}
void CServer::ConDumpSqlServers(IConsole::IResult *pResult, void *pUserData)

View file

@ -173,6 +173,7 @@ void CScore::SaveTeamScore(int *pClientIDs, unsigned int Size, float Time, const
str_copy(Tmp->m_aTimestamp, pTimestamp, sizeof(Tmp->m_aTimestamp));
FormatUuid(GameServer()->GameUuid(), Tmp->m_aGameUuid, sizeof(Tmp->m_aGameUuid));
str_copy(Tmp->m_aMap, g_Config.m_SvMap, sizeof(Tmp->m_aMap));
Tmp->m_TeamrankUuid = RandomUuid();
m_pPool->ExecuteWrite(CScoreWorker::SaveTeamScore, std::move(Tmp), "save team score");
}

View file

@ -392,7 +392,7 @@ bool CScoreWorker::MapInfo(IDbConnection *pSqlServer, const ISqlData *pGameData,
return false;
}
bool CScoreWorker::SaveScore(IDbConnection *pSqlServer, const ISqlData *pGameData, bool Failure, char *pError, int ErrorSize)
bool CScoreWorker::SaveScore(IDbConnection *pSqlServer, const ISqlData *pGameData, Write w, char *pError, int ErrorSize)
{
const CSqlScoreData *pData = dynamic_cast<const CSqlScoreData *>(pGameData);
CScorePlayerResult *pResult = dynamic_cast<CScorePlayerResult *>(pGameData->m_pResult.get());
@ -400,52 +400,105 @@ bool CScoreWorker::SaveScore(IDbConnection *pSqlServer, const ISqlData *pGameDat
char aBuf[1024];
str_format(aBuf, sizeof(aBuf),
"SELECT COUNT(*) AS NumFinished FROM %s_race WHERE Map=? AND Name=? ORDER BY time ASC LIMIT 1",
pSqlServer->GetPrefix());
if(pSqlServer->PrepareStatement(aBuf, pError, ErrorSize))
if(w == Write::NORMAL_SUCCEEDED)
{
return true;
str_format(aBuf, sizeof(aBuf),
"DELETE FROM %s_race_backup WHERE GameId=? AND Name=? AND Timestamp=%s",
pSqlServer->GetPrefix(), pSqlServer->InsertTimestampAsUtc());
if(pSqlServer->PrepareStatement(aBuf, pError, ErrorSize))
{
return true;
}
pSqlServer->BindString(1, pData->m_aGameUuid);
pSqlServer->BindString(2, pData->m_aName);
pSqlServer->BindString(3, pData->m_aTimestamp);
pSqlServer->Print();
int NumInserted;
pSqlServer->ExecuteUpdate(&NumInserted, pError, ErrorSize);
return false;
}
pSqlServer->BindString(1, pData->m_aMap);
pSqlServer->BindString(2, pData->m_aName);
if(w == Write::NORMAL_FAILED)
{
int NumInserted;
// move to non-tmp table succeded. delete from backup again
str_format(aBuf, sizeof(aBuf),
"INSERT INTO %s_race SELECT * FROM %s_race_backup WHERE GameId=? AND Name=? AND Timestamp=%s",
pSqlServer->GetPrefix(), pSqlServer->GetPrefix(), pSqlServer->InsertTimestampAsUtc());
if(pSqlServer->PrepareStatement(aBuf, pError, ErrorSize))
{
return true;
}
pSqlServer->BindString(1, pData->m_aGameUuid);
pSqlServer->BindString(2, pData->m_aName);
pSqlServer->BindString(3, pData->m_aTimestamp);
pSqlServer->Print();
pSqlServer->ExecuteUpdate(&NumInserted, pError, ErrorSize);
bool End;
if(pSqlServer->Step(&End, pError, ErrorSize))
{
return true;
// move to non-tmp table succeded. delete from backup again
str_format(aBuf, sizeof(aBuf),
"DELETE FROM %s_race_backup WHERE GameId=? AND Name=? AND Timestamp=%s",
pSqlServer->GetPrefix(), pSqlServer->InsertTimestampAsUtc());
if(pSqlServer->PrepareStatement(aBuf, pError, ErrorSize))
{
return true;
}
pSqlServer->BindString(1, pData->m_aGameUuid);
pSqlServer->BindString(2, pData->m_aName);
pSqlServer->BindString(3, pData->m_aTimestamp);
pSqlServer->Print();
pSqlServer->ExecuteUpdate(&NumInserted, pError, ErrorSize);
return false;
}
int NumFinished = pSqlServer->GetInt(1);
if(NumFinished == 0)
if(w == Write::NORMAL)
{
str_format(aBuf, sizeof(aBuf), "SELECT Points FROM %s_maps WHERE Map=?", pSqlServer->GetPrefix());
str_format(aBuf, sizeof(aBuf),
"SELECT COUNT(*) AS NumFinished FROM %s_race WHERE Map=? AND Name=? ORDER BY time ASC LIMIT 1",
pSqlServer->GetPrefix());
if(pSqlServer->PrepareStatement(aBuf, pError, ErrorSize))
{
return true;
}
pSqlServer->BindString(1, pData->m_aMap);
pSqlServer->BindString(2, pData->m_aName);
bool End2;
if(pSqlServer->Step(&End2, pError, ErrorSize))
bool End;
if(pSqlServer->Step(&End, pError, ErrorSize))
{
return true;
}
if(!End2)
int NumFinished = pSqlServer->GetInt(1);
if(NumFinished == 0)
{
int Points = pSqlServer->GetInt(1);
if(pSqlServer->AddPoints(pData->m_aName, Points, pError, ErrorSize))
str_format(aBuf, sizeof(aBuf), "SELECT Points FROM %s_maps WHERE Map=?", pSqlServer->GetPrefix());
if(pSqlServer->PrepareStatement(aBuf, pError, ErrorSize))
{
return true;
}
str_format(paMessages[0], sizeof(paMessages[0]),
"You earned %d point%s for finishing this map!",
Points, Points == 1 ? "" : "s");
pSqlServer->BindString(1, pData->m_aMap);
bool End2;
if(pSqlServer->Step(&End2, pError, ErrorSize))
{
return true;
}
if(!End2)
{
int Points = pSqlServer->GetInt(1);
if(pSqlServer->AddPoints(pData->m_aName, Points, pError, ErrorSize))
{
return true;
}
str_format(paMessages[0], sizeof(paMessages[0]),
"You earned %d point%s for finishing this map!",
Points, Points == 1 ? "" : "s");
}
}
}
// save score. Can't fail, because no UNIQUE/PRIMARY KEY constrain is defined.
str_format(aBuf, sizeof(aBuf),
"%s INTO %s_race("
"%s INTO %s_race%s("
" Map, Name, Timestamp, Time, Server, "
" cp1, cp2, cp3, cp4, cp5, cp6, cp7, cp8, cp9, cp10, cp11, cp12, cp13, "
" cp14, cp15, cp16, cp17, cp18, cp19, cp20, cp21, cp22, cp23, cp24, cp25, "
@ -456,6 +509,7 @@ bool CScoreWorker::SaveScore(IDbConnection *pSqlServer, const ISqlData *pGameDat
" %.2f, %.2f, %.2f, %.2f, %.2f, %.2f, %.2f, "
" ?, %s)",
pSqlServer->InsertIgnore(), pSqlServer->GetPrefix(),
w == Write::NORMAL ? "" : "_backup",
pSqlServer->InsertTimestampAsUtc(), pData->m_Time,
pData->m_aCurrentTimeCp[0], pData->m_aCurrentTimeCp[1], pData->m_aCurrentTimeCp[2],
pData->m_aCurrentTimeCp[3], pData->m_aCurrentTimeCp[4], pData->m_aCurrentTimeCp[5],
@ -480,108 +534,159 @@ bool CScoreWorker::SaveScore(IDbConnection *pSqlServer, const ISqlData *pGameDat
return pSqlServer->ExecuteUpdate(&NumInserted, pError, ErrorSize);
}
bool CScoreWorker::SaveTeamScore(IDbConnection *pSqlServer, const ISqlData *pGameData, bool Failure, char *pError, int ErrorSize)
bool CScoreWorker::SaveTeamScore(IDbConnection *pSqlServer, const ISqlData *pGameData, Write w, char *pError, int ErrorSize)
{
const CSqlTeamScoreData *pData = dynamic_cast<const CSqlTeamScoreData *>(pGameData);
char aBuf[512];
// get the names sorted in a tab separated string
std::vector<std::string> vNames;
for(unsigned int i = 0; i < pData->m_Size; i++)
vNames.emplace_back(pData->m_aaNames[i]);
std::sort(vNames.begin(), vNames.end());
str_format(aBuf, sizeof(aBuf),
"SELECT l.ID, Name, Time "
"FROM (" // preselect teams with first name in team
" SELECT ID "
" FROM %s_teamrace "
" WHERE Map = ? AND Name = ? AND DDNet7 = %s"
") as l INNER JOIN %s_teamrace AS r ON l.ID = r.ID "
"ORDER BY l.ID, Name COLLATE %s",
pSqlServer->GetPrefix(), pSqlServer->False(), pSqlServer->GetPrefix(), pSqlServer->BinaryCollate());
if(pSqlServer->PrepareStatement(aBuf, pError, ErrorSize))
if(w == Write::NORMAL_SUCCEEDED)
{
return true;
}
pSqlServer->BindString(1, pData->m_aMap);
pSqlServer->BindString(2, pData->m_aaNames[0]);
bool FoundTeam = false;
float Time;
CTeamrank Teamrank;
bool End;
if(pSqlServer->Step(&End, pError, ErrorSize))
{
return true;
}
if(!End)
{
bool SearchTeamEnd = false;
while(!SearchTeamEnd)
str_format(aBuf, sizeof(aBuf),
"DELETE FROM %s_teamrace_backup WHERE GameId=?",
pSqlServer->GetPrefix());
if(pSqlServer->PrepareStatement(aBuf, pError, ErrorSize))
{
Time = pSqlServer->GetFloat(3);
if(Teamrank.NextSqlResult(pSqlServer, &SearchTeamEnd, pError, ErrorSize))
{
return true;
}
if(Teamrank.SamePlayers(&vNames))
{
FoundTeam = true;
break;
}
return true;
}
// copy uuid, because mysql BindBlob doesn't support const buffers
CUuid TeamrankId = pData->m_TeamrankUuid;
pSqlServer->BindBlob(1, TeamrankId.m_aData, sizeof(TeamrankId.m_aData));
int NumInserted;
return pSqlServer->ExecuteUpdate(&NumInserted, pError, ErrorSize);
}
if(FoundTeam)
if(w == Write::NORMAL_FAILED)
{
dbg_msg("sql", "found team rank from same team (old time: %f, new time: %f)", Time, pData->m_Time);
if(pData->m_Time < Time)
int NumInserted;
CUuid TeamrankId = pData->m_TeamrankUuid;
str_format(aBuf, sizeof(aBuf),
"INSERT INTO %s_teamrace SELECT * FROM %s_teamrace_backup WHERE GameId=?",
pSqlServer->GetPrefix(), pSqlServer->GetPrefix());
if(pSqlServer->PrepareStatement(aBuf, pError, ErrorSize))
{
str_format(aBuf, sizeof(aBuf),
"UPDATE %s_teamrace SET Time=%.2f, Timestamp=?, DDNet7=%s, GameID=? WHERE ID = ?",
pSqlServer->GetPrefix(), pData->m_Time, pSqlServer->False());
if(pSqlServer->PrepareStatement(aBuf, pError, ErrorSize))
{
return true;
}
pSqlServer->BindString(1, pData->m_aTimestamp);
pSqlServer->BindString(2, pData->m_aGameUuid);
pSqlServer->BindBlob(3, Teamrank.m_TeamID.m_aData, sizeof(Teamrank.m_TeamID.m_aData));
pSqlServer->Print();
int NumUpdated;
if(pSqlServer->ExecuteUpdate(&NumUpdated, pError, ErrorSize))
{
return true;
}
return true;
}
pSqlServer->BindBlob(1, TeamrankId.m_aData, sizeof(TeamrankId.m_aData));
if(pSqlServer->ExecuteUpdate(&NumInserted, pError, ErrorSize))
{
return true;
}
str_format(aBuf, sizeof(aBuf),
"DELETE FROM %s_teamrace_backup WHERE GameId=?",
pSqlServer->GetPrefix());
if(pSqlServer->PrepareStatement(aBuf, pError, ErrorSize))
{
return true;
}
pSqlServer->BindBlob(1, TeamrankId.m_aData, sizeof(TeamrankId.m_aData));
return pSqlServer->ExecuteUpdate(&NumInserted, pError, ErrorSize);
}
else
if(w == Write::NORMAL)
{
CUuid GameID = RandomUuid();
// get the names sorted in a tab separated string
std::vector<std::string> vNames;
for(unsigned int i = 0; i < pData->m_Size; i++)
vNames.emplace_back(pData->m_aaNames[i]);
std::sort(vNames.begin(), vNames.end());
str_format(aBuf, sizeof(aBuf),
"SELECT l.ID, Name, Time "
"FROM (" // preselect teams with first name in team
" SELECT ID "
" FROM %s_teamrace "
" WHERE Map = ? AND Name = ? AND DDNet7 = %s"
") as l INNER JOIN %s_teamrace AS r ON l.ID = r.ID "
"ORDER BY l.ID, Name COLLATE %s",
pSqlServer->GetPrefix(), pSqlServer->False(), pSqlServer->GetPrefix(), pSqlServer->BinaryCollate());
if(pSqlServer->PrepareStatement(aBuf, pError, ErrorSize))
{
// if no entry found... create a new one
str_format(aBuf, sizeof(aBuf),
"%s INTO %s_teamrace(Map, Name, Timestamp, Time, ID, GameID, DDNet7) "
"VALUES (?, ?, %s, %.2f, ?, ?, %s)",
pSqlServer->InsertIgnore(), pSqlServer->GetPrefix(),
pSqlServer->InsertTimestampAsUtc(), pData->m_Time, pSqlServer->False());
if(pSqlServer->PrepareStatement(aBuf, pError, ErrorSize))
return true;
}
pSqlServer->BindString(1, pData->m_aMap);
pSqlServer->BindString(2, pData->m_aaNames[0]);
bool FoundTeam = false;
float Time;
CTeamrank Teamrank;
bool End;
if(pSqlServer->Step(&End, pError, ErrorSize))
{
return true;
}
if(!End)
{
bool SearchTeamEnd = false;
while(!SearchTeamEnd)
{
return true;
Time = pSqlServer->GetFloat(3);
if(Teamrank.NextSqlResult(pSqlServer, &SearchTeamEnd, pError, ErrorSize))
{
return true;
}
if(Teamrank.SamePlayers(&vNames))
{
FoundTeam = true;
break;
}
}
pSqlServer->BindString(1, pData->m_aMap);
pSqlServer->BindString(2, pData->m_aaNames[i]);
pSqlServer->BindString(3, pData->m_aTimestamp);
pSqlServer->BindBlob(4, GameID.m_aData, sizeof(GameID.m_aData));
pSqlServer->BindString(5, pData->m_aGameUuid);
pSqlServer->Print();
int NumInserted;
if(pSqlServer->ExecuteUpdate(&NumInserted, pError, ErrorSize))
}
if(FoundTeam)
{
dbg_msg("sql", "found team rank from same team (old time: %f, new time: %f)", Time, pData->m_Time);
if(pData->m_Time < Time)
{
return true;
str_format(aBuf, sizeof(aBuf),
"UPDATE %s_teamrace SET Time=%.2f, Timestamp=?, DDNet7=%s, GameID=? WHERE ID = ?",
pSqlServer->GetPrefix(), pData->m_Time, pSqlServer->False());
if(pSqlServer->PrepareStatement(aBuf, pError, ErrorSize))
{
return true;
}
pSqlServer->BindString(1, pData->m_aTimestamp);
pSqlServer->BindString(2, pData->m_aGameUuid);
// copy uuid, because mysql BindBlob doesn't support const buffers
CUuid TeamrankId = pData->m_TeamrankUuid;
pSqlServer->BindBlob(3, TeamrankId.m_aData, sizeof(TeamrankId.m_aData));
pSqlServer->Print();
int NumUpdated;
if(pSqlServer->ExecuteUpdate(&NumUpdated, pError, ErrorSize))
{
return true;
}
}
return false;
}
}
for(unsigned int i = 0; i < pData->m_Size; i++)
{
// if no entry found... create a new one
str_format(aBuf, sizeof(aBuf),
"%s INTO %s_teamrace%s(Map, Name, Timestamp, Time, ID, GameID, DDNet7) "
"VALUES (?, ?, %s, %.2f, ?, ?, %s)",
pSqlServer->InsertIgnore(), pSqlServer->GetPrefix(),
w == Write::NORMAL ? "" : "_backup",
pSqlServer->InsertTimestampAsUtc(), pData->m_Time, pSqlServer->False());
if(pSqlServer->PrepareStatement(aBuf, pError, ErrorSize))
{
return true;
}
pSqlServer->BindString(1, pData->m_aMap);
pSqlServer->BindString(2, pData->m_aaNames[i]);
pSqlServer->BindString(3, pData->m_aTimestamp);
// copy uuid, because mysql BindBlob doesn't support const buffers
CUuid TeamrankId = pData->m_TeamrankUuid;
pSqlServer->BindBlob(4, TeamrankId.m_aData, sizeof(TeamrankId.m_aData));
pSqlServer->BindString(5, pData->m_aGameUuid);
pSqlServer->Print();
int NumInserted;
if(pSqlServer->ExecuteUpdate(&NumInserted, pError, ErrorSize))
{
return true;
}
}
return false;
@ -1363,19 +1468,64 @@ bool CScoreWorker::RandomUnfinishedMap(IDbConnection *pSqlServer, const ISqlData
return false;
}
bool CScoreWorker::SaveTeam(IDbConnection *pSqlServer, const ISqlData *pGameData, bool Failure, char *pError, int ErrorSize)
bool CScoreWorker::SaveTeam(IDbConnection *pSqlServer, const ISqlData *pGameData, Write w, char *pError, int ErrorSize)
{
const CSqlTeamSave *pData = dynamic_cast<const CSqlTeamSave *>(pGameData);
CScoreSaveResult *pResult = dynamic_cast<CScoreSaveResult *>(pGameData->m_pResult.get());
if(w == Write::NORMAL_SUCCEEDED)
{
// write succeded on mysql server. delete from sqlite again
char aBuf[128] = {0};
str_format(aBuf, sizeof(aBuf),
"DELETE FROM %s_saves_backup WHERE Code = ?",
pSqlServer->GetPrefix());
if(pSqlServer->PrepareStatement(aBuf, pError, ErrorSize))
{
return true;
}
pSqlServer->BindString(1, pData->m_aGeneratedCode);
bool End;
return pSqlServer->Step(&End, pError, ErrorSize);
}
if(w == Write::NORMAL_FAILED)
{
char aBuf[128] = {0};
bool End;
// move to non-tmp table succeded. delete from backup again
str_format(aBuf, sizeof(aBuf),
"INSERT INTO %s_saves SELECT * FROM %s_saves_backup WHERE Code = ?",
pSqlServer->GetPrefix(), pSqlServer->GetPrefix());
if(pSqlServer->PrepareStatement(aBuf, pError, ErrorSize))
{
return true;
}
pSqlServer->BindString(1, pData->m_aGeneratedCode);
if(pSqlServer->Step(&End, pError, ErrorSize))
{
return true;
}
// move to non-tmp table succeded. delete from backup again
str_format(aBuf, sizeof(aBuf),
"DELETE FROM %s_saves_backup WHERE Code = ?",
pSqlServer->GetPrefix());
if(pSqlServer->PrepareStatement(aBuf, pError, ErrorSize))
{
return true;
}
pSqlServer->BindString(1, pData->m_aGeneratedCode);
return pSqlServer->Step(&End, pError, ErrorSize);
}
char aSaveID[UUID_MAXSTRSIZE];
FormatUuid(pResult->m_SaveID, aSaveID, UUID_MAXSTRSIZE);
char *pSaveState = pResult->m_SavedTeam.GetString();
char aBuf[65536];
dbg_msg("score/dbg", "code=%s failure=%d", pData->m_aCode, (int)Failure);
bool UseGeneratedCode = pData->m_aCode[0] == '\0' || Failure;
dbg_msg("score/dbg", "code=%s failure=%d", pData->m_aCode, (int)w);
bool UseGeneratedCode = pData->m_aCode[0] == '\0' || w != Write::NORMAL;
bool Retry = false;
// two tries, first use the user provided code, then the autogenerated
@ -1389,9 +1539,10 @@ bool CScoreWorker::SaveTeam(IDbConnection *pSqlServer, const ISqlData *pGameData
str_copy(aCode, pData->m_aCode, sizeof(aCode));
str_format(aBuf, sizeof(aBuf),
"%s INTO %s_saves(Savegame, Map, Code, Timestamp, Server, SaveID, DDNet7) "
"%s INTO %s_saves%s(Savegame, Map, Code, Timestamp, Server, SaveID, DDNet7) "
"VALUES (?, ?, ?, CURRENT_TIMESTAMP, ?, ?, %s)",
pSqlServer->InsertIgnore(), pSqlServer->GetPrefix(), pSqlServer->False());
pSqlServer->InsertIgnore(), pSqlServer->GetPrefix(),
w == Write::NORMAL ? "" : "_backup", pSqlServer->False());
if(pSqlServer->PrepareStatement(aBuf, pError, ErrorSize))
{
return true;
@ -1409,8 +1560,9 @@ bool CScoreWorker::SaveTeam(IDbConnection *pSqlServer, const ISqlData *pGameData
}
if(NumInserted == 1)
{
if(!Failure)
if(w == Write::NORMAL)
{
pResult->m_aBroadcast[0] = '\0';
if(str_comp(pData->m_aServer, g_Config.m_SvSqlServerName) == 0)
{
str_format(pResult->m_aMessage, sizeof(pResult->m_aMessage),
@ -1461,8 +1613,10 @@ bool CScoreWorker::SaveTeam(IDbConnection *pSqlServer, const ISqlData *pGameData
return false;
}
bool CScoreWorker::LoadTeam(IDbConnection *pSqlServer, const ISqlData *pGameData, bool Failure, char *pError, int ErrorSize)
bool CScoreWorker::LoadTeam(IDbConnection *pSqlServer, const ISqlData *pGameData, Write w, char *pError, int ErrorSize)
{
if(w == Write::NORMAL_SUCCEEDED || Write::BACKUP_FIRST)
return false;
const CSqlTeamLoad *pData = dynamic_cast<const CSqlTeamLoad *>(pGameData);
CScoreSaveResult *pResult = dynamic_cast<CScoreSaveResult *>(pGameData->m_pResult.get());
pResult->m_Status = CScoreSaveResult::LOAD_FAILED;

View file

@ -185,6 +185,7 @@ struct CSqlTeamScoreData : ISqlData
char m_aTimestamp[TIMESTAMP_STR_LENGTH];
unsigned int m_Size;
char m_aaNames[MAX_CLIENTS][MAX_NAME_LENGTH];
CUuid m_TeamrankUuid;
};
struct CSqlTeamSave : ISqlData
@ -291,11 +292,11 @@ struct CScoreWorker
static bool ShowTopPoints(IDbConnection *pSqlServer, const ISqlData *pGameData, char *pError, int ErrorSize);
static bool GetSaves(IDbConnection *pSqlServer, const ISqlData *pGameData, char *pError, int ErrorSize);
static bool SaveTeam(IDbConnection *pSqlServer, const ISqlData *pGameData, bool Failure, char *pError, int ErrorSize);
static bool LoadTeam(IDbConnection *pSqlServer, const ISqlData *pGameData, bool Failure, char *pError, int ErrorSize);
static bool SaveTeam(IDbConnection *pSqlServer, const ISqlData *pGameData, Write w, char *pError, int ErrorSize);
static bool LoadTeam(IDbConnection *pSqlServer, const ISqlData *pGameData, Write w, char *pError, int ErrorSize);
static bool SaveScore(IDbConnection *pSqlServer, const ISqlData *pGameData, bool Failure, char *pError, int ErrorSize);
static bool SaveTeamScore(IDbConnection *pSqlServer, const ISqlData *pGameData, bool Failure, char *pError, int ErrorSize);
static bool SaveScore(IDbConnection *pSqlServer, const ISqlData *pGameData, Write w, char *pError, int ErrorSize);
static bool SaveTeamScore(IDbConnection *pSqlServer, const ISqlData *pGameData, Write w, char *pError, int ErrorSize);
};
#endif // GAME_SERVER_SCOREWORKER_H

View file

@ -1,3 +1,4 @@
#include "engine/server/databases/connection_pool.h"
#include <gmock/gmock.h>
#include <gtest/gtest.h>
@ -103,7 +104,7 @@ struct Score : public testing::TestWithParam<IDbConnection *>
for(int i = 0; i < NUM_CHECKPOINTS; i++)
ScoreData.m_aCurrentTimeCp[i] = WithTimeCheckPoints ? i : 0;
str_copy(ScoreData.m_aRequestingPlayer, "deen", sizeof(ScoreData.m_aRequestingPlayer));
ASSERT_FALSE(CScoreWorker::SaveScore(m_pConn, &ScoreData, false, m_aError, sizeof(m_aError))) << m_aError;
ASSERT_FALSE(CScoreWorker::SaveScore(m_pConn, &ScoreData, Write::NORMAL, m_aError, sizeof(m_aError))) << m_aError;
}
void ExpectLines(const std::shared_ptr<CScorePlayerResult> &pPlayerResult, std::initializer_list<const char *> Lines, bool All = false)
@ -248,7 +249,7 @@ struct TeamScore : public Score
str_copy(teamScoreData.m_aaNames[1], "brainless tee", sizeof(teamScoreData.m_aaNames[1]));
teamScoreData.m_Time = 100.0;
str_copy(teamScoreData.m_aTimestamp, "2021-11-24 19:24:08", sizeof(teamScoreData.m_aTimestamp));
ASSERT_FALSE(CScoreWorker::SaveTeamScore(m_pConn, &teamScoreData, false, m_aError, sizeof(m_aError))) << m_aError;
ASSERT_FALSE(CScoreWorker::SaveTeamScore(m_pConn, &teamScoreData, Write::NORMAL, m_aError, sizeof(m_aError))) << m_aError;
str_copy(m_PlayerRequest.m_aMap, "Kobra 3", sizeof(m_PlayerRequest.m_aMap));
str_copy(m_PlayerRequest.m_aRequestingPlayer, "brainless tee", sizeof(m_PlayerRequest.m_aRequestingPlayer));
@ -515,7 +516,17 @@ TEST_P(RandomMap, UnfinishedDoesntExist)
auto g_pSqliteConn = CreateSqliteConnection(":memory:", true);
#if defined(CONF_TEST_MYSQL)
auto g_pMysqlConn = CreateMysqlConnection("ddnet", "record", "ddnet", "thebestpassword", "localhost", "", 3306, true);
CMysqlConfig gMysqlConfig{
"ddnet", // database
"record", // prefix
"ddnet", // user
"thebestpassword", // password
"localhost", // ip
"", // bindaddr
3306, // port
true, // setup
};
auto g_pMysqlConn = CreateMysqlConnection(gMysqlConfig);
#endif
auto g_TestValues