Merge pull request #887 from heinrich5991/pr_ddnet_async

Add asynchronous output system and use it for teehistorian and `dbg_msg()`s
This commit is contained in:
Learath2 2017-10-13 15:30:24 +02:00 committed by GitHub
commit 4394933d6c
12 changed files with 781 additions and 190 deletions

View file

@ -891,8 +891,10 @@ add_custom_target(everything DEPENDS ${TARGETS_OWN})
if(GTEST_FOUND)
set_glob(TESTS GLOB src/test
aio.cpp
strip_path_and_extension.cpp
teehistorian.cpp
thread.cpp
)
set(TESTS_EXTRA
src/game/server/teehistorian.cpp

View file

@ -82,7 +82,14 @@ IOHANDLE io_stdin() { return (IOHANDLE)stdin; }
IOHANDLE io_stdout() { return (IOHANDLE)stdout; }
IOHANDLE io_stderr() { return (IOHANDLE)stderr; }
static DBG_LOGGER loggers[16];
typedef struct
{
DBG_LOGGER logger;
DBG_LOGGER_FINISH finish;
void *user;
} DBG_LOGGER_DATA;
static DBG_LOGGER_DATA loggers[16];
static int num_loggers = 0;
static NETSTATS network_stats = {0};
@ -103,73 +110,11 @@ void dbg_assert_imp(const char *filename, int line, int test, const char *msg)
void dbg_break_imp()
{
#ifdef __GNUC__
__builtin_trap();
#else
*((volatile unsigned*)0) = 0x0;
}
#define QUEUE_SIZE 64
typedef struct
{
char q[QUEUE_SIZE][1024*4];
int begin;
int end;
LOCK mutex;
SEMAPHORE notempty;
SEMAPHORE notfull;
} Queue;
static int dbg_msg_threaded = 0;
static Queue log_queue;
int queue_empty(Queue *q)
{
return q->begin == q->end;
}
int queue_full(Queue *q)
{
return ((q->end+1) % QUEUE_SIZE) == q->begin;
}
void dbg_msg_thread(void *v)
{
char str[1024*4];
int i;
int num;
while(1)
{
sphore_wait(&log_queue.notempty);
lock_wait(log_queue.mutex);
str_copy(str, log_queue.q[log_queue.begin], sizeof(str));
log_queue.begin = (log_queue.begin + 1) % QUEUE_SIZE;
sphore_signal(&log_queue.notfull);
num = num_loggers;
lock_unlock(log_queue.mutex);
for(i = 0; i < num; i++)
loggers[i](str);
}
}
void dbg_enable_threaded()
{
Queue *q;
void *Thread;
q = &log_queue;
q->begin = 0;
q->end = 0;
q->mutex = lock_create();
sphore_init(&q->notempty);
sphore_init(&q->notfull);
dbg_msg_threaded = 1;
Thread = thread_init(dbg_msg_thread, 0);
thread_detach(Thread);
#endif
}
void dbg_msg(const char *sys, const char *fmt, ...)
@ -178,104 +123,108 @@ void dbg_msg(const char *sys, const char *fmt, ...)
char *msg;
int len;
char str[1024*4];
int i;
//str_format(str, sizeof(str), "[%08x][%s]: ", (int)time(0), sys);
char timestr[80];
str_timestamp_format(timestr, sizeof(timestr), FORMAT_SPACE);
if(dbg_msg_threaded)
{
while(queue_full(&log_queue))
sphore_wait(&log_queue.notfull);
lock_wait(log_queue.mutex);
str_format(str, sizeof(str), "[%s][%s]: ", timestr, sys);
str_format(log_queue.q[log_queue.end], sizeof(log_queue.q[log_queue.end]), "[%s][%s]: ", timestr, sys);
len = strlen(str);
msg = (char *)str + len;
len = strlen(log_queue.q[log_queue.end]);
msg = (char *)log_queue.q[log_queue.end] + len;
va_start(args, fmt);
va_start(args, fmt);
#if defined(CONF_FAMILY_WINDOWS)
_vsnprintf(msg, sizeof(log_queue.q[log_queue.end])-len, fmt, args);
_vsnprintf(msg, sizeof(str)-len, fmt, args);
#else
vsnprintf(msg, sizeof(log_queue.q[log_queue.end])-len, fmt, args);
vsnprintf(msg, sizeof(str)-len, fmt, args);
#endif
va_end(args);
va_end(args);
log_queue.end = (log_queue.end + 1) % QUEUE_SIZE;
sphore_signal(&log_queue.notempty);
lock_unlock(log_queue.mutex);
}
else
{
char str[1024*4];
int i;
str_format(str, sizeof(str), "[%s][%s]: ", timestr, sys);
len = strlen(str);
msg = (char *)str + len;
va_start(args, fmt);
#if defined(CONF_FAMILY_WINDOWS)
_vsnprintf(msg, sizeof(str)-len, fmt, args);
#else
vsnprintf(msg, sizeof(str)-len, fmt, args);
#endif
va_end(args);
for(i = 0; i < num_loggers; i++)
loggers[i](str);
}
for(i = 0; i < num_loggers; i++)
loggers[i].logger(str, loggers[i].user);
}
static void logger_stdout(const char *line)
{
printf("%s\n", line);
fflush(stdout);
#if defined(__ANDROID__)
__android_log_print(ANDROID_LOG_INFO, "DDNet", "%s", line);
#endif
}
static void logger_debugger(const char *line)
#if defined(CONF_FAMILY_WINDOWS) || defined(__ANDROID__)
static void logger_debugger(const char *line, void *user)
{
(void)user;
#if defined(CONF_FAMILY_WINDOWS)
OutputDebugString(line);
OutputDebugString("\n");
#elif defined(__ANDROID__)
__android_log_print(ANDROID_LOG_INFO, "DDNet", "%s", line);
#endif
}
#endif
static void logger_file(const char *line, void *user)
{
ASYNCIO *logfile = (ASYNCIO *)user;
aio_write(logfile, line, strlen(line));
aio_write_newline(logfile);
}
static void logger_stdout_finish(void *user)
{
ASYNCIO *logfile = (ASYNCIO *)user;
aio_wait(logfile);
aio_free(logfile);
}
static void logger_file_finish(void *user)
{
ASYNCIO *logfile = (ASYNCIO *)user;
aio_close(logfile);
logger_stdout_finish(user);
}
static void dbg_logger_finish(void)
{
int i;
for(i = 0; i < num_loggers; i++)
{
if(loggers[i].finish)
{
loggers[i].finish(loggers[i].user);
}
}
}
void dbg_logger(DBG_LOGGER logger, DBG_LOGGER_FINISH finish, void *user)
{
DBG_LOGGER_DATA data;
if(num_loggers == 0)
{
atexit(dbg_logger_finish);
}
data.logger = logger;
data.finish = finish;
data.user = user;
loggers[num_loggers] = data;
num_loggers++;
}
void dbg_logger_stdout()
{
dbg_logger(logger_file, logger_stdout_finish, aio_new(io_stdout()));
}
void dbg_logger_debugger()
{
#if defined(CONF_FAMILY_WINDOWS) || defined(__ANDROID__)
dbg_logger(logger_debugger, 0, 0);
#endif
}
static IOHANDLE logfile = 0;
static void logger_file(const char *line)
{
io_write(logfile, line, strlen(line));
io_write_newline(logfile);
io_flush(logfile);
}
void dbg_logger(DBG_LOGGER logger)
{
if(dbg_msg_threaded)
lock_wait(log_queue.mutex);
loggers[num_loggers] = logger;
num_loggers++;
if(dbg_msg_threaded)
lock_unlock(log_queue.mutex);
}
void dbg_logger_stdout() { dbg_logger(logger_stdout); }
void dbg_logger_debugger() { dbg_logger(logger_debugger); }
void dbg_logger_file(const char *filename)
{
logfile = io_open(filename, IOFLAG_WRITE);
IOHANDLE logfile = io_open(filename, IOFLAG_WRITE);
if(logfile)
dbg_logger(logger_file);
dbg_logger(logger_file, logger_file_finish, aio_new(logfile));
else
dbg_msg("dbg/logger", "failed to open '%s' for logging", filename);
@ -465,6 +414,11 @@ long int io_length(IOHANDLE io)
return length;
}
int io_error(IOHANDLE io)
{
return ferror((FILE*)io);
}
unsigned io_write(IOHANDLE io, const void *buffer, unsigned size)
{
return fwrite(buffer, 1, size, (FILE*)io);
@ -491,11 +445,306 @@ int io_flush(IOHANDLE io)
return 0;
}
#define ASYNC_BUFSIZE 8 * 1024
typedef struct ASYNCIO
{
LOCK lock;
IOHANDLE io;
SEMAPHORE sphore;
void *thread;
unsigned char *old_buffer;
unsigned char *buffer;
unsigned int buffer_size;
unsigned int read_pos;
unsigned int write_pos;
int error;
unsigned char finish;
unsigned char refcount;
} ASYNCIO;
enum
{
ASYNCIO_RUNNING,
ASYNCIO_CLOSE,
ASYNCIO_EXIT,
};
struct BUFFERS
{
unsigned char *buf1;
unsigned int len1;
unsigned char *buf2;
unsigned int len2;
};
static void buffer_ptrs(ASYNCIO *aio, struct BUFFERS *buffers)
{
mem_zero(buffers, sizeof(*buffers));
if(aio->read_pos < aio->write_pos)
{
buffers->buf1 = aio->buffer + aio->read_pos;
buffers->len1 = aio->write_pos - aio->read_pos;
}
else if(aio->read_pos > aio->write_pos)
{
buffers->buf1 = aio->buffer + aio->read_pos;
buffers->len1 = aio->buffer_size - aio->read_pos;
buffers->buf2 = aio->buffer;
buffers->len2 = aio->write_pos;
}
}
static void aio_handle_free_and_unlock(ASYNCIO *aio)
{
int do_free;
aio->refcount--;
do_free = aio->refcount == 0;
lock_unlock(aio->lock);
if(do_free)
{
mem_free(aio->buffer);
sphore_destroy(&aio->sphore);
lock_destroy(aio->lock);
mem_free(aio);
}
}
static void aio_thread(void *user)
{
ASYNCIO *aio = user;
lock_wait(aio->lock);
while(1)
{
struct BUFFERS buffers;
int result_io_error;
if(aio->read_pos == aio->write_pos)
{
if(aio->finish != ASYNCIO_RUNNING)
{
if(aio->finish == ASYNCIO_CLOSE)
{
io_close(aio->io);
}
aio_handle_free_and_unlock(aio);
break;
}
lock_unlock(aio->lock);
sphore_wait(&aio->sphore);
lock_wait(aio->lock);
continue;
}
buffer_ptrs(aio, &buffers);
lock_unlock(aio->lock);
io_write(aio->io, buffers.buf1, buffers.len1);
if(buffers.buf2)
{
io_write(aio->io, buffers.buf2, buffers.len2);
}
io_flush(aio->io);
result_io_error = io_error(aio->io);
lock_wait(aio->lock);
aio->error = result_io_error;
aio->read_pos = (aio->read_pos + buffers.len1 + buffers.len2) % aio->buffer_size;
if(aio->old_buffer)
{
mem_free(aio->old_buffer);
aio->old_buffer = 0;
}
}
}
ASYNCIO *aio_new(IOHANDLE io)
{
ASYNCIO *aio = mem_alloc(sizeof(*aio), sizeof(void *));
if(!aio)
{
return 0;
}
aio->io = io;
aio->lock = lock_create();
sphore_init(&aio->sphore);
aio->thread = 0;
aio->old_buffer = 0;
aio->buffer = mem_alloc(ASYNC_BUFSIZE, 1);
if(!aio->buffer)
{
sphore_destroy(&aio->sphore);
lock_destroy(aio->lock);
mem_free(aio);
return 0;
}
aio->buffer_size = ASYNC_BUFSIZE;
aio->read_pos = 0;
aio->write_pos = 0;
aio->error = 0;
aio->finish = ASYNCIO_RUNNING;
aio->refcount = 2;
aio->thread = thread_init(aio_thread, aio);
if(!aio->thread)
{
mem_free(aio->buffer);
sphore_destroy(&aio->sphore);
lock_destroy(aio->lock);
mem_free(aio);
return 0;
}
return aio;
}
static unsigned int buffer_len(ASYNCIO *aio)
{
if(aio->write_pos >= aio->read_pos)
{
return aio->write_pos - aio->read_pos;
}
else
{
return aio->buffer_size + aio->write_pos - aio->read_pos;
}
}
static unsigned int next_buffer_size(unsigned int cur_size, unsigned int need_size)
{
while(cur_size < need_size)
{
cur_size *= 2;
}
return cur_size;
}
void aio_write(ASYNCIO *aio, const void *buffer, unsigned size)
{
unsigned int remaining;
lock_wait(aio->lock);
remaining = aio->buffer_size - buffer_len(aio);
// Don't allow full queue to distinguish between empty and full queue.
if(size < remaining)
{
unsigned int remaining_contiguous = aio->buffer_size - aio->write_pos;
if(size > remaining_contiguous)
{
mem_copy(aio->buffer + aio->write_pos, buffer, remaining_contiguous);
size -= remaining_contiguous;
buffer = ((unsigned char *)buffer) + remaining_contiguous;
aio->write_pos = 0;
}
mem_copy(aio->buffer + aio->write_pos, buffer, size);
aio->write_pos = (aio->write_pos + size) % aio->buffer_size;
}
else
{
// Add 1 so the new buffer isn't completely filled.
unsigned int new_written = buffer_len(aio) + size + 1;
unsigned int next_size = next_buffer_size(aio->buffer_size, new_written);
unsigned int next_len = 0;
unsigned char *next_buffer = mem_alloc(next_size, 1);
struct BUFFERS buffers;
buffer_ptrs(aio, &buffers);
if(buffers.buf1)
{
mem_copy(next_buffer + next_len, buffers.buf1, buffers.len1);
next_len += buffers.len1;
if(buffers.buf2)
{
mem_copy(next_buffer + next_len, buffers.buf2, buffers.len2);
next_len += buffers.len2;
}
}
mem_copy(next_buffer + next_len, buffer, size);
next_len += size;
if(!aio->old_buffer)
{
aio->old_buffer = aio->buffer;
}
else
{
mem_free(aio->buffer);
}
aio->buffer = next_buffer;
aio->buffer_size = next_size;
aio->read_pos = 0;
aio->write_pos = next_len;
}
lock_unlock(aio->lock);
sphore_signal(&aio->sphore);
}
void aio_write_newline(ASYNCIO *aio)
{
#if defined(CONF_FAMILY_WINDOWS)
aio_write(aio, "\r\n", 2);
#else
aio_write(aio, "\n", 1);
#endif
}
int aio_error(ASYNCIO *aio)
{
int result;
lock_wait(aio->lock);
result = aio->error;
lock_unlock(aio->lock);
return result;
}
void aio_free(ASYNCIO *aio)
{
lock_wait(aio->lock);
if(aio->thread)
{
thread_detach(aio->thread);
aio->thread = 0;
}
aio_handle_free_and_unlock(aio);
}
void aio_close(ASYNCIO *aio)
{
lock_wait(aio->lock);
aio->finish = ASYNCIO_CLOSE;
lock_unlock(aio->lock);
sphore_signal(&aio->sphore);
}
void aio_wait(ASYNCIO *aio)
{
void *thread;
lock_wait(aio->lock);
thread = aio->thread;
aio->thread = 0;
if(aio->finish == ASYNCIO_RUNNING)
{
aio->finish = ASYNCIO_EXIT;
}
lock_unlock(aio->lock);
sphore_signal(&aio->sphore);
thread_wait(thread);
}
void *thread_init(void (*threadfunc)(void *), void *u)
{
#if defined(CONF_FAMILY_UNIX)
pthread_t id;
pthread_create(&id, NULL, (void *(*)(void*))threadfunc, u);
if(pthread_create(&id, NULL, (void *(*)(void*))threadfunc, u) != 0)
{
return 0;
}
return (void*)id;
#elif defined(CONF_FAMILY_WINDOWS)
return CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)threadfunc, u, 0, NULL);
@ -512,6 +761,7 @@ void thread_wait(void *thread)
dbg_msg("thread", "!! %d", result);
#elif defined(CONF_FAMILY_WINDOWS)
WaitForSingleObject((HANDLE)thread, INFINITE);
CloseHandle(thread);
#else
#error not implemented
#endif
@ -629,7 +879,7 @@ void sphore_destroy(SEMAPHORE *sem) { CloseHandle((HANDLE)*sem); }
void sphore_init(SEMAPHORE *sem)
{
char aBuf[64];
str_format(aBuf, sizeof(aBuf), "/%d-ddphore-%p", pid(), (void *)sem);
str_format(aBuf, sizeof(aBuf), "/%d-ddnet.tw-%p", pid(), (void *)sem);
*sem = sem_open(aBuf, O_CREAT | O_EXCL, S_IRWXU | S_IRWXG, 0);
}
void sphore_wait(SEMAPHORE *sem) { sem_wait(*sem); }
@ -638,7 +888,7 @@ void sphore_destroy(SEMAPHORE *sem)
{
char aBuf[64];
sem_close(*sem);
str_format(aBuf, sizeof(aBuf), "/%d-ddphore-%p", pid(), (void *)sem);
str_format(aBuf, sizeof(aBuf), "/%d-ddnet.tw-%p", pid(), (void *)sem);
sem_unlink(aBuf);
}
#elif defined(CONF_FAMILY_UNIX)

View file

@ -336,6 +336,18 @@ int io_close(IOHANDLE io);
*/
int io_flush(IOHANDLE io);
/*
Function: io_error
Checks whether an error occured during I/O with the file.
Parameters:
io - Handle to the file.
Returns:
Returns nonzero on error, 0 otherwise.
*/
int io_error(IOHANDLE io);
/*
Function: io_stdin
@ -355,6 +367,90 @@ IOHANDLE io_stdout();
*/
IOHANDLE io_stderr();
typedef struct ASYNCIO ASYNCIO;
/*
Function: aio_new
Wraps a <IOHANDLE> for asynchronous writing.
Parameters:
io - Handle to the file.
Returns:
Returns the handle for asynchronous writing.
*/
ASYNCIO *aio_new(IOHANDLE io);
/*
Function: aio_write
Queues a chunk of data for writing.
Parameters:
aio - Handle to the file.
buffer - Pointer to the data that should be written.
size - Number of bytes to write.
*/
void aio_write(ASYNCIO *aio, const void *buffer, unsigned size);
/*
Function: aio_write_newline
Queues a newline for writing.
Parameters:
aio - Handle to the file.
*/
void aio_write_newline(ASYNCIO *aio);
/*
Function: aio_error
Checks whether errors have occured during the asynchronous
writing.
Call this function regularly to see if there are errors. Call
this function after <aio_wait> to see if the process of writing
to the file succeeded.
Parameters:
aio - Handle to the file.
Returns:
Returns 0 if no error occured, and nonzero on error.
*/
int aio_error(ASYNCIO *aio);
/*
Function: aio_close
Queues file closing.
Parameters:
aio - Handle to the file.
*/
void aio_close(ASYNCIO *aio);
/*
Function: aio_wait
Wait for the asynchronous operations to complete.
Parameters:
aio - Handle to the file.
*/
void aio_wait(ASYNCIO *aio);
/*
Function: aio_free
Frees the resources associated to the asynchronous file handle.
Parameters:
aio - Handle to the file.
*/
void aio_free(ASYNCIO *aio);
/* Group: Threads */
@ -388,7 +484,7 @@ void *thread_init(void (*threadfunc)(void *), void *user);
void thread_wait(void *thread);
/*
Function: thread_yeild
Function: thread_yield
Yield the current threads execution slice.
*/
void thread_yield();
@ -1261,10 +1357,10 @@ void mem_debug_dump(IOHANDLE file);
void swap_endian(void *data, unsigned elem_size, unsigned num);
typedef void (*DBG_LOGGER)(const char *line);
void dbg_logger(DBG_LOGGER logger);
typedef void (*DBG_LOGGER)(const char *line, void *user);
typedef void (*DBG_LOGGER_FINISH)(void *user);
void dbg_logger(DBG_LOGGER logger, DBG_LOGGER_FINISH finish, void *user);
void dbg_enable_threaded();
void dbg_logger_stdout();
void dbg_logger_debugger();
void dbg_logger_file(const char *filename);

View file

@ -3444,10 +3444,6 @@ int main(int argc, const char **argv) // ignore_convention
}
}
#if !defined(CONF_PLATFORM_MACOSX)
dbg_enable_threaded();
#endif
if(secure_random_init() != 0)
{
dbg_msg("secure", "could not initialize secure RNG");

View file

@ -180,6 +180,8 @@ public:
virtual void ResetNetErrorString(int ClientID) = 0;
virtual bool SetTimedOut(int ClientID, int OrigID) = 0;
virtual void SetTimeoutProtected(int ClientID) = 0;
virtual void SetErrorShutdown(const char *pReason) = 0;
};
class IGameServer : public IInterface

View file

@ -334,6 +334,8 @@ CServer::CServer()
CSqlConnector::SetWriteServers(m_apSqlWriteServers);
#endif
m_aErrorShutdownReason[0] = 0;
Init();
}
@ -1746,6 +1748,10 @@ int CServer::Run()
Console()->Print(IConsole::OUTPUT_LEVEL_STANDARD, "server", aBuf);
GameServer()->OnInit();
if(ErrorShutdown())
{
return 1;
}
str_format(aBuf, sizeof(aBuf), "version %s", GameServer()->NetVersion());
Console()->Print(IConsole::OUTPUT_LEVEL_STANDARD, "server", aBuf);
@ -1808,6 +1814,10 @@ int CServer::Run()
m_ServerInfoFirstRequest = 0;
Kernel()->ReregisterInterface(GameServer());
GameServer()->OnInit();
if(ErrorShutdown())
{
break;
}
UpdateServerInfo();
}
else
@ -1885,6 +1895,10 @@ int CServer::Run()
}
GameServer()->OnTick();
if(ErrorShutdown())
{
break;
}
}
// snap game
@ -1942,11 +1956,16 @@ int CServer::Run()
}
}
}
const char *pDisconnectReason = "Server shutdown";
if(ErrorShutdown())
{
pDisconnectReason = m_aErrorShutdownReason;
}
// disconnect all clients on shutdown
for(int i = 0; i < MAX_CLIENTS; ++i)
{
if(m_aClients[i].m_State != CClient::STATE_EMPTY)
m_NetServer.Drop(i, "Server shutdown");
m_NetServer.Drop(i, pDisconnectReason);
}
m_Econ.Shutdown();
@ -1962,17 +1981,17 @@ int CServer::Run()
mem_free(m_pCurrentMapData);
#if defined (CONF_SQL)
for (int i = 0; i < MAX_SQLSERVERS; i++)
{
if (m_apSqlReadServers[i])
delete m_apSqlReadServers[i];
for (int i = 0; i < MAX_SQLSERVERS; i++)
{
if (m_apSqlReadServers[i])
delete m_apSqlReadServers[i];
if (m_apSqlWriteServers[i])
delete m_apSqlWriteServers[i];
}
if (m_apSqlWriteServers[i])
delete m_apSqlWriteServers[i];
}
#endif
return 0;
return ErrorShutdown();
}
void CServer::ConTestingCommands(CConsole::IResult *pResult, void *pUser)
@ -2684,10 +2703,6 @@ int main(int argc, const char **argv) // ignore_convention
}
}
#if !defined(CONF_PLATFORM_MACOSX) && !defined(FUZZING)
dbg_enable_threaded();
#endif
if(secure_random_init() != 0)
{
dbg_msg("secure", "could not initialize secure RNG");
@ -2764,6 +2779,7 @@ int main(int argc, const char **argv) // ignore_convention
// free
delete pKernel;
return 0;
}
@ -2815,12 +2831,13 @@ const char *CServer::GetAnnouncementLine(char const *pFileName)
return v[m_AnnouncementLastLine];
}
int* CServer::GetIdMap(int ClientID)
int *CServer::GetIdMap(int ClientID)
{
return (int*)(IdMap + VANILLA_MAX_CLIENTS * ClientID);
return (int *)(IdMap + VANILLA_MAX_CLIENTS * ClientID);
}
bool CServer::SetTimedOut(int ClientID, int OrigID) {
bool CServer::SetTimedOut(int ClientID, int OrigID)
{
if (!m_NetServer.SetTimedOut(ClientID, OrigID))
{
return false;
@ -2829,3 +2846,8 @@ bool CServer::SetTimedOut(int ClientID, int OrigID) {
m_aClients[ClientID].m_Authed = IServer::AUTHED_NO;
return true;
}
void CServer::SetErrorShutdown(const char *pReason)
{
str_copy(m_aErrorShutdownReason, pReason, sizeof(m_aErrorShutdownReason));
}

View file

@ -209,6 +209,8 @@ public:
int64 m_ServerInfoFirstRequest;
int m_ServerInfoNumRequests;
char m_aErrorShutdownReason[128];
CServer();
int TrySetClientName(int ClientID, const char *pName);
@ -353,6 +355,9 @@ public:
void ResetNetErrorString(int ClientID) { m_NetServer.ResetErrorString(ClientID); };
bool SetTimedOut(int ClientID, int OrigID);
void SetTimeoutProtected(int ClientID) { m_NetServer.SetTimeoutProtected(ClientID); };
bool ErrorShutdown() const { return m_aErrorShutdownReason[0] != 0; }
void SetErrorShutdown(const char *pReason);
};
#endif

View file

@ -59,9 +59,7 @@ public:
{
if(!Silent)
dbg_logger_stdout();
#if defined(CONF_FAMILY_WINDOWS)
dbg_logger_debugger();
#endif
//
dbg_msg("engine", "running on %s-%s-%s", CONF_FAMILY_STRING, CONF_PLATFORM_STRING, CONF_ARCH_STRING);

View file

@ -104,7 +104,7 @@ void CGameContext::Clear()
void CGameContext::TeeHistorianWrite(const void *pData, int DataSize, void *pUser)
{
CGameContext *pSelf = (CGameContext *)pUser;
io_write(pSelf->m_TeeHistorianFile, pData, DataSize);
aio_write(pSelf->m_pTeeHistorianFile, pData, DataSize);
}
void CGameContext::CommandCallback(int ClientID, int FlagMask, const char *pCmd, IConsole::IResult *pResult, void *pUser)
@ -604,6 +604,13 @@ void CGameContext::OnTick()
if(m_TeeHistorianActive)
{
int Error = aio_error(m_pTeeHistorianFile);
if(Error)
{
dbg_msg("teehistorian", "error writing to file, err=%d", Error);
Server()->SetErrorShutdown("teehistorian io error");
}
if(!m_TeeHistorian.Starting())
{
m_TeeHistorian.EndInputs();
@ -1048,10 +1055,6 @@ void CGameContext::OnClientConnected(int ClientID)
void CGameContext::OnClientDrop(int ClientID, const char *pReason)
{
if(m_TeeHistorianActive)
{
io_flush(m_TeeHistorianFile);
}
AbortVoteKickOnDisconnect(ClientID);
m_apPlayers[ClientID]->OnDisconnect(pReason);
delete m_apPlayers[ClientID];
@ -2451,6 +2454,7 @@ void CGameContext::OnConsoleInit()
{
m_pServer = Kernel()->RequestInterface<IServer>();
m_pConsole = Kernel()->RequestInterface<IConsole>();
m_pStorage = Kernel()->RequestInterface<IStorage>();
m_ChatPrintCBIndex = Console()->RegisterPrintCallback(0, SendChatResponse, this);
@ -2491,6 +2495,7 @@ void CGameContext::OnInit(/*class IKernel *pKernel*/)
{
m_pServer = Kernel()->RequestInterface<IServer>();
m_pConsole = Kernel()->RequestInterface<IConsole>();
m_pStorage = Kernel()->RequestInterface<IStorage>();
m_World.SetGameServer(this);
m_Events.SetGameServer(this);
@ -2576,16 +2581,18 @@ void CGameContext::OnInit(/*class IKernel *pKernel*/)
char aFilename[64];
str_format(aFilename, sizeof(aFilename), "teehistorian/%s.teehistorian", aGameUuid);
m_TeeHistorianFile = Kernel()->RequestInterface<IStorage>()->OpenFile(aFilename, IOFLAG_WRITE, IStorage::TYPE_SAVE);
if(!m_TeeHistorianFile)
IOHANDLE File = Storage()->OpenFile(aFilename, IOFLAG_WRITE, IStorage::TYPE_SAVE);
if(!File)
{
dbg_msg("teehistorian", "failed to open '%s'", aFilename);
exit(1);
Server()->SetErrorShutdown("teehistorian open error");
return;
}
else
{
dbg_msg("teehistorian", "recording to '%s'", aFilename);
}
m_pTeeHistorianFile = aio_new(File);
char aVersion[128];
#ifdef GIT_SHORTREV_HASH
@ -2610,7 +2617,6 @@ void CGameContext::OnInit(/*class IKernel *pKernel*/)
GameInfo.m_pMapName = aMapName;
m_TeeHistorian.Reset(&GameInfo, TeeHistorianWrite, this);
io_flush(m_TeeHistorianFile);
}
if(g_Config.m_SvSoloServer)
@ -2767,22 +2773,19 @@ void CGameContext::DeleteTempfile()
{
if(m_aDeleteTempfile[0] != 0)
{
IStorage *pStorage = Kernel()->RequestInterface<IStorage>();
pStorage->RemoveFile(m_aDeleteTempfile, IStorage::TYPE_SAVE);
Storage()->RemoveFile(m_aDeleteTempfile, IStorage::TYPE_SAVE);
m_aDeleteTempfile[0] = 0;
}
}
void CGameContext::OnMapChange(char *pNewMapName, int MapNameSize)
{
IStorage *pStorage = Kernel()->RequestInterface<IStorage>();
char aConfig[128];
char aTemp[128];
str_format(aConfig, sizeof(aConfig), "maps/%s.cfg", g_Config.m_SvMap);
str_format(aTemp, sizeof(aTemp), "%s.temp.%d", pNewMapName, pid());
IOHANDLE File = pStorage->OpenFile(aConfig, IOFLAG_READ, IStorage::TYPE_ALL);
IOHANDLE File = Storage()->OpenFile(aConfig, IOFLAG_READ, IStorage::TYPE_ALL);
if(!File)
{
// No map-specific config, just return.
@ -2815,7 +2818,7 @@ void CGameContext::OnMapChange(char *pNewMapName, int MapNameSize)
}
CDataFileReader Reader;
Reader.Open(pStorage, pNewMapName, IStorage::TYPE_ALL);
Reader.Open(Storage(), pNewMapName, IStorage::TYPE_ALL);
CDataFileWriter Writer;
Writer.Init();
@ -2893,7 +2896,7 @@ void CGameContext::OnMapChange(char *pNewMapName, int MapNameSize)
dbg_msg("mapchange", "imported settings");
Reader.Close();
Writer.OpenFile(pStorage, aTemp);
Writer.OpenFile(Storage(), aTemp);
Writer.Finish();
str_copy(pNewMapName, aTemp, MapNameSize);
@ -2908,7 +2911,15 @@ void CGameContext::OnShutdown(bool FullShutdown)
if(m_TeeHistorianActive)
{
m_TeeHistorian.Finish();
io_close(m_TeeHistorianFile);
aio_close(m_pTeeHistorianFile);
aio_wait(m_pTeeHistorianFile);
int Error = aio_error(m_pTeeHistorianFile);
if(Error)
{
dbg_msg("teehistorian", "error closing file, err=%d", Error);
Server()->SetErrorShutdown("teehistorian close error");
}
aio_free(m_pTeeHistorianFile);
}
DeleteTempfile();

View file

@ -52,10 +52,13 @@ enum
NUM_TUNEZONES = 256
};
class IStorage;
class CGameContext : public IGameServer
{
IServer *m_pServer;
class IConsole *m_pConsole;
IStorage *m_pStorage;
CLayers m_Layers;
CCollision m_Collision;
CNetObjHandler m_NetObjHandler;
@ -64,7 +67,7 @@ class CGameContext : public IGameServer
bool m_TeeHistorianActive;
CTeeHistorian m_TeeHistorian;
IOHANDLE m_TeeHistorianFile;
ASYNCIO *m_pTeeHistorianFile;
CUuid m_GameUuid;
static void CommandCallback(int ClientID, int FlagMask, const char *pCmd, IConsole::IResult *pResult, void *pUser);
@ -107,6 +110,7 @@ class CGameContext : public IGameServer
public:
IServer *Server() const { return m_pServer; }
class IConsole *Console() { return m_pConsole; }
IStorage *Storage() { return m_pStorage; }
CCollision *Collision() { return &m_Collision; }
CTuningParams *Tuning() { return &m_Tuning; }
CTuningParams *TuningList() { return &m_aTuningList[0]; }

124
src/test/aio.cpp Normal file
View file

@ -0,0 +1,124 @@
#include <gtest/gtest.h>
#include <base/system.h>
static const int BUF_SIZE = 64 * 1024;
class Async : public ::testing::Test
{
protected:
ASYNCIO *m_pAio;
char m_aFilename[64];
bool Delete;
Async()
{
const ::testing::TestInfo *pTestInfo =
::testing::UnitTest::GetInstance()->current_test_info();
const char *pTestName = pTestInfo->name();
str_format(m_aFilename, sizeof(m_aFilename), "Async.%s-%d.tmp", pTestName, pid());
m_pAio = aio_new(io_open(m_aFilename, IOFLAG_WRITE));
Delete = false;
}
~Async()
{
if(Delete)
{
fs_remove(m_aFilename);
}
}
void Write(const char *pText)
{
aio_write(m_pAio, pText, str_length(pText));
}
void Expect(const char *pOutput)
{
aio_close(m_pAio);
aio_wait(m_pAio);
aio_free(m_pAio);
char aBuf[BUF_SIZE];
IOHANDLE File = io_open(m_aFilename, IOFLAG_READ);
int Read = io_read(File, aBuf, sizeof(aBuf));
ASSERT_EQ(str_length(pOutput), Read);
ASSERT_TRUE(mem_comp(aBuf, pOutput, Read) == 0);
Delete = true;
}
};
TEST_F(Async, Empty)
{
Expect("");
}
TEST_F(Async, Simple)
{
static const char TEXT[] = "a\n";
Write(TEXT);
Expect(TEXT);
}
TEST_F(Async, Long)
{
char aText[BUF_SIZE + 1];
for(unsigned i = 0; i < sizeof(aText) - 1; i++)
{
aText[i] = 'a';
}
aText[sizeof(aText) - 1] = 0;
Write(aText);
Expect(aText);
}
TEST_F(Async, Pieces)
{
char aText[BUF_SIZE + 1];
for(unsigned i = 0; i < sizeof(aText) - 1; i++)
{
aText[i] = 'a';
}
aText[sizeof(aText) - 1] = 0;
for(unsigned i = 0; i < sizeof(aText) - 1; i++)
{
Write("a");
}
Expect(aText);
}
TEST_F(Async, Mixed)
{
char aText[BUF_SIZE + 1];
for(unsigned i = 0; i < sizeof(aText) - 1; i++)
{
aText[i] = 'a' + i % 26;
}
aText[sizeof(aText) - 1] = 0;
for(unsigned i = 0; i < sizeof(aText) - 1; i++)
{
char w = 'a' + i % 26;
aio_write(m_pAio, &w, 1);
}
Expect(aText);
}
TEST_F(Async, NonDivisor)
{
static const int NUM_LETTERS = 13;
static const int SIZE = BUF_SIZE / NUM_LETTERS * NUM_LETTERS;
char aText[SIZE + 1];
for(unsigned i = 0; i < sizeof(aText) - 1; i++)
{
aText[i] = 'a' + i % NUM_LETTERS;
}
aText[sizeof(aText) - 1] = 0;
for(unsigned i = 0; i < (sizeof(aText) - 1) / NUM_LETTERS; i++)
{
Write("abcdefghijklm");
}
Expect(aText);
}

81
src/test/thread.cpp Normal file
View file

@ -0,0 +1,81 @@
#include <gtest/gtest.h>
#include <base/system.h>
static void Nothing(void *pUser)
{
(void)pUser;
}
TEST(Thread, Detach)
{
void *pThread = thread_init(Nothing, 0);
thread_detach(pThread);
}
static void SetToOne(void *pUser)
{
*(int *)pUser = 1;
}
TEST(Thread, Wait)
{
int Integer = 0;
void *pThread = thread_init(SetToOne, &Integer);
thread_wait(pThread);
EXPECT_EQ(Integer, 1);
}
TEST(Thread, Yield)
{
thread_yield();
}
TEST(Thread, Semaphore)
{
SEMAPHORE Semaphore;
sphore_init(&Semaphore);
sphore_destroy(&Semaphore);
}
TEST(Thread, SemaphoreSingleThreaded)
{
SEMAPHORE Semaphore;
sphore_init(&Semaphore);
sphore_signal(&Semaphore);
sphore_signal(&Semaphore);
sphore_wait(&Semaphore);
sphore_wait(&Semaphore);
sphore_destroy(&Semaphore);
}
static void SemaphoreThread(void *pUser)
{
SEMAPHORE *pSemaphore = (SEMAPHORE *)pUser;
sphore_wait(pSemaphore);
}
TEST(Thread, SemaphoreMultiThreaded)
{
SEMAPHORE Semaphore;
sphore_init(&Semaphore);
sphore_signal(&Semaphore);
void *pThread = thread_init(SemaphoreThread, &Semaphore);
thread_wait(pThread);
sphore_destroy(&Semaphore);
}
static void LockThread(void *pUser)
{
LOCK *pLock = (LOCK *)pUser;
lock_wait(*pLock);
lock_unlock(*pLock);
}
TEST(Thread, Lock)
{
LOCK Lock = lock_create();
void *pThread = thread_init(LockThread, &Lock);
lock_unlock(Lock);
thread_wait(pThread);
}