diff --git a/CMakeLists.txt b/CMakeLists.txt index 0b5d22913..b157f5a17 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -891,8 +891,10 @@ add_custom_target(everything DEPENDS ${TARGETS_OWN}) if(GTEST_FOUND) set_glob(TESTS GLOB src/test + aio.cpp strip_path_and_extension.cpp teehistorian.cpp + thread.cpp ) set(TESTS_EXTRA src/game/server/teehistorian.cpp diff --git a/src/base/system.c b/src/base/system.c index 0148c0b46..8d1d78b24 100644 --- a/src/base/system.c +++ b/src/base/system.c @@ -82,7 +82,14 @@ IOHANDLE io_stdin() { return (IOHANDLE)stdin; } IOHANDLE io_stdout() { return (IOHANDLE)stdout; } IOHANDLE io_stderr() { return (IOHANDLE)stderr; } -static DBG_LOGGER loggers[16]; +typedef struct +{ + DBG_LOGGER logger; + DBG_LOGGER_FINISH finish; + void *user; +} DBG_LOGGER_DATA; + +static DBG_LOGGER_DATA loggers[16]; static int num_loggers = 0; static NETSTATS network_stats = {0}; @@ -103,73 +110,11 @@ void dbg_assert_imp(const char *filename, int line, int test, const char *msg) void dbg_break_imp() { +#ifdef __GNUC__ + __builtin_trap(); +#else *((volatile unsigned*)0) = 0x0; -} - -#define QUEUE_SIZE 64 - -typedef struct -{ - char q[QUEUE_SIZE][1024*4]; - int begin; - int end; - LOCK mutex; - SEMAPHORE notempty; - SEMAPHORE notfull; -} Queue; - -static int dbg_msg_threaded = 0; -static Queue log_queue; - -int queue_empty(Queue *q) -{ - return q->begin == q->end; -} - -int queue_full(Queue *q) -{ - return ((q->end+1) % QUEUE_SIZE) == q->begin; -} - -void dbg_msg_thread(void *v) -{ - char str[1024*4]; - int i; - int num; - while(1) - { - sphore_wait(&log_queue.notempty); - lock_wait(log_queue.mutex); - - str_copy(str, log_queue.q[log_queue.begin], sizeof(str)); - log_queue.begin = (log_queue.begin + 1) % QUEUE_SIZE; - - sphore_signal(&log_queue.notfull); - - num = num_loggers; - lock_unlock(log_queue.mutex); - - for(i = 0; i < num; i++) - loggers[i](str); - } -} - -void dbg_enable_threaded() -{ - Queue *q; - void *Thread; - - q = &log_queue; - q->begin = 0; - q->end = 0; - q->mutex = lock_create(); - sphore_init(&q->notempty); - sphore_init(&q->notfull); - - dbg_msg_threaded = 1; - - Thread = thread_init(dbg_msg_thread, 0); - thread_detach(Thread); +#endif } void dbg_msg(const char *sys, const char *fmt, ...) @@ -178,104 +123,108 @@ void dbg_msg(const char *sys, const char *fmt, ...) char *msg; int len; + char str[1024*4]; + int i; + //str_format(str, sizeof(str), "[%08x][%s]: ", (int)time(0), sys); char timestr[80]; str_timestamp_format(timestr, sizeof(timestr), FORMAT_SPACE); - if(dbg_msg_threaded) - { - while(queue_full(&log_queue)) - sphore_wait(&log_queue.notfull); - lock_wait(log_queue.mutex); + str_format(str, sizeof(str), "[%s][%s]: ", timestr, sys); - str_format(log_queue.q[log_queue.end], sizeof(log_queue.q[log_queue.end]), "[%s][%s]: ", timestr, sys); + len = strlen(str); + msg = (char *)str + len; - len = strlen(log_queue.q[log_queue.end]); - msg = (char *)log_queue.q[log_queue.end] + len; - - va_start(args, fmt); + va_start(args, fmt); #if defined(CONF_FAMILY_WINDOWS) - _vsnprintf(msg, sizeof(log_queue.q[log_queue.end])-len, fmt, args); + _vsnprintf(msg, sizeof(str)-len, fmt, args); #else - vsnprintf(msg, sizeof(log_queue.q[log_queue.end])-len, fmt, args); + vsnprintf(msg, sizeof(str)-len, fmt, args); #endif - va_end(args); + va_end(args); - log_queue.end = (log_queue.end + 1) % QUEUE_SIZE; - - sphore_signal(&log_queue.notempty); - - lock_unlock(log_queue.mutex); - } - else - { - char str[1024*4]; - int i; - - str_format(str, sizeof(str), "[%s][%s]: ", timestr, sys); - - len = strlen(str); - msg = (char *)str + len; - - va_start(args, fmt); -#if defined(CONF_FAMILY_WINDOWS) - _vsnprintf(msg, sizeof(str)-len, fmt, args); -#else - vsnprintf(msg, sizeof(str)-len, fmt, args); -#endif - va_end(args); - - for(i = 0; i < num_loggers; i++) - loggers[i](str); - } + for(i = 0; i < num_loggers; i++) + loggers[i].logger(str, loggers[i].user); } -static void logger_stdout(const char *line) -{ - printf("%s\n", line); - fflush(stdout); -#if defined(__ANDROID__) - __android_log_print(ANDROID_LOG_INFO, "DDNet", "%s", line); -#endif -} - -static void logger_debugger(const char *line) +#if defined(CONF_FAMILY_WINDOWS) || defined(__ANDROID__) +static void logger_debugger(const char *line, void *user) { + (void)user; #if defined(CONF_FAMILY_WINDOWS) OutputDebugString(line); OutputDebugString("\n"); +#elif defined(__ANDROID__) + __android_log_print(ANDROID_LOG_INFO, "DDNet", "%s", line); +#endif +} +#endif + + +static void logger_file(const char *line, void *user) +{ + ASYNCIO *logfile = (ASYNCIO *)user; + aio_write(logfile, line, strlen(line)); + aio_write_newline(logfile); +} + +static void logger_stdout_finish(void *user) +{ + ASYNCIO *logfile = (ASYNCIO *)user; + aio_wait(logfile); + aio_free(logfile); +} + +static void logger_file_finish(void *user) +{ + ASYNCIO *logfile = (ASYNCIO *)user; + aio_close(logfile); + logger_stdout_finish(user); +} + +static void dbg_logger_finish(void) +{ + int i; + for(i = 0; i < num_loggers; i++) + { + if(loggers[i].finish) + { + loggers[i].finish(loggers[i].user); + } + } +} + +void dbg_logger(DBG_LOGGER logger, DBG_LOGGER_FINISH finish, void *user) +{ + DBG_LOGGER_DATA data; + if(num_loggers == 0) + { + atexit(dbg_logger_finish); + } + data.logger = logger; + data.finish = finish; + data.user = user; + loggers[num_loggers] = data; + num_loggers++; +} + +void dbg_logger_stdout() +{ + dbg_logger(logger_file, logger_stdout_finish, aio_new(io_stdout())); +} + +void dbg_logger_debugger() +{ +#if defined(CONF_FAMILY_WINDOWS) || defined(__ANDROID__) + dbg_logger(logger_debugger, 0, 0); #endif } - -static IOHANDLE logfile = 0; -static void logger_file(const char *line) -{ - io_write(logfile, line, strlen(line)); - io_write_newline(logfile); - io_flush(logfile); -} - -void dbg_logger(DBG_LOGGER logger) -{ - if(dbg_msg_threaded) - lock_wait(log_queue.mutex); - - loggers[num_loggers] = logger; - num_loggers++; - - if(dbg_msg_threaded) - lock_unlock(log_queue.mutex); -} - -void dbg_logger_stdout() { dbg_logger(logger_stdout); } - -void dbg_logger_debugger() { dbg_logger(logger_debugger); } void dbg_logger_file(const char *filename) { - logfile = io_open(filename, IOFLAG_WRITE); + IOHANDLE logfile = io_open(filename, IOFLAG_WRITE); if(logfile) - dbg_logger(logger_file); + dbg_logger(logger_file, logger_file_finish, aio_new(logfile)); else dbg_msg("dbg/logger", "failed to open '%s' for logging", filename); @@ -465,6 +414,11 @@ long int io_length(IOHANDLE io) return length; } +int io_error(IOHANDLE io) +{ + return ferror((FILE*)io); +} + unsigned io_write(IOHANDLE io, const void *buffer, unsigned size) { return fwrite(buffer, 1, size, (FILE*)io); @@ -491,11 +445,306 @@ int io_flush(IOHANDLE io) return 0; } + +#define ASYNC_BUFSIZE 8 * 1024 + +typedef struct ASYNCIO +{ + LOCK lock; + IOHANDLE io; + SEMAPHORE sphore; + void *thread; + + unsigned char *old_buffer; + + unsigned char *buffer; + unsigned int buffer_size; + unsigned int read_pos; + unsigned int write_pos; + + int error; + unsigned char finish; + unsigned char refcount; +} ASYNCIO; + +enum +{ + ASYNCIO_RUNNING, + ASYNCIO_CLOSE, + ASYNCIO_EXIT, +}; + +struct BUFFERS +{ + unsigned char *buf1; + unsigned int len1; + unsigned char *buf2; + unsigned int len2; +}; + +static void buffer_ptrs(ASYNCIO *aio, struct BUFFERS *buffers) +{ + mem_zero(buffers, sizeof(*buffers)); + if(aio->read_pos < aio->write_pos) + { + buffers->buf1 = aio->buffer + aio->read_pos; + buffers->len1 = aio->write_pos - aio->read_pos; + } + else if(aio->read_pos > aio->write_pos) + { + buffers->buf1 = aio->buffer + aio->read_pos; + buffers->len1 = aio->buffer_size - aio->read_pos; + buffers->buf2 = aio->buffer; + buffers->len2 = aio->write_pos; + } +} + +static void aio_handle_free_and_unlock(ASYNCIO *aio) +{ + int do_free; + aio->refcount--; + + do_free = aio->refcount == 0; + lock_unlock(aio->lock); + if(do_free) + { + mem_free(aio->buffer); + sphore_destroy(&aio->sphore); + lock_destroy(aio->lock); + mem_free(aio); + } +} + +static void aio_thread(void *user) +{ + ASYNCIO *aio = user; + + lock_wait(aio->lock); + while(1) + { + struct BUFFERS buffers; + int result_io_error; + + if(aio->read_pos == aio->write_pos) + { + if(aio->finish != ASYNCIO_RUNNING) + { + if(aio->finish == ASYNCIO_CLOSE) + { + io_close(aio->io); + } + aio_handle_free_and_unlock(aio); + break; + } + lock_unlock(aio->lock); + sphore_wait(&aio->sphore); + lock_wait(aio->lock); + continue; + } + + buffer_ptrs(aio, &buffers); + lock_unlock(aio->lock); + + io_write(aio->io, buffers.buf1, buffers.len1); + if(buffers.buf2) + { + io_write(aio->io, buffers.buf2, buffers.len2); + } + io_flush(aio->io); + result_io_error = io_error(aio->io); + + lock_wait(aio->lock); + aio->error = result_io_error; + aio->read_pos = (aio->read_pos + buffers.len1 + buffers.len2) % aio->buffer_size; + if(aio->old_buffer) + { + mem_free(aio->old_buffer); + aio->old_buffer = 0; + } + } +} + +ASYNCIO *aio_new(IOHANDLE io) +{ + ASYNCIO *aio = mem_alloc(sizeof(*aio), sizeof(void *)); + if(!aio) + { + return 0; + } + aio->io = io; + aio->lock = lock_create(); + sphore_init(&aio->sphore); + aio->thread = 0; + + aio->old_buffer = 0; + aio->buffer = mem_alloc(ASYNC_BUFSIZE, 1); + if(!aio->buffer) + { + sphore_destroy(&aio->sphore); + lock_destroy(aio->lock); + mem_free(aio); + return 0; + } + aio->buffer_size = ASYNC_BUFSIZE; + aio->read_pos = 0; + aio->write_pos = 0; + aio->error = 0; + aio->finish = ASYNCIO_RUNNING; + aio->refcount = 2; + + aio->thread = thread_init(aio_thread, aio); + if(!aio->thread) + { + mem_free(aio->buffer); + sphore_destroy(&aio->sphore); + lock_destroy(aio->lock); + mem_free(aio); + return 0; + } + return aio; +} + +static unsigned int buffer_len(ASYNCIO *aio) +{ + if(aio->write_pos >= aio->read_pos) + { + return aio->write_pos - aio->read_pos; + } + else + { + return aio->buffer_size + aio->write_pos - aio->read_pos; + } +} + +static unsigned int next_buffer_size(unsigned int cur_size, unsigned int need_size) +{ + while(cur_size < need_size) + { + cur_size *= 2; + } + return cur_size; +} + +void aio_write(ASYNCIO *aio, const void *buffer, unsigned size) +{ + unsigned int remaining; + lock_wait(aio->lock); + remaining = aio->buffer_size - buffer_len(aio); + + // Don't allow full queue to distinguish between empty and full queue. + if(size < remaining) + { + unsigned int remaining_contiguous = aio->buffer_size - aio->write_pos; + if(size > remaining_contiguous) + { + mem_copy(aio->buffer + aio->write_pos, buffer, remaining_contiguous); + size -= remaining_contiguous; + buffer = ((unsigned char *)buffer) + remaining_contiguous; + aio->write_pos = 0; + } + mem_copy(aio->buffer + aio->write_pos, buffer, size); + aio->write_pos = (aio->write_pos + size) % aio->buffer_size; + } + else + { + // Add 1 so the new buffer isn't completely filled. + unsigned int new_written = buffer_len(aio) + size + 1; + unsigned int next_size = next_buffer_size(aio->buffer_size, new_written); + unsigned int next_len = 0; + unsigned char *next_buffer = mem_alloc(next_size, 1); + + struct BUFFERS buffers; + buffer_ptrs(aio, &buffers); + if(buffers.buf1) + { + mem_copy(next_buffer + next_len, buffers.buf1, buffers.len1); + next_len += buffers.len1; + if(buffers.buf2) + { + mem_copy(next_buffer + next_len, buffers.buf2, buffers.len2); + next_len += buffers.len2; + } + } + mem_copy(next_buffer + next_len, buffer, size); + next_len += size; + + if(!aio->old_buffer) + { + aio->old_buffer = aio->buffer; + } + else + { + mem_free(aio->buffer); + } + aio->buffer = next_buffer; + aio->buffer_size = next_size; + aio->read_pos = 0; + aio->write_pos = next_len; + } + lock_unlock(aio->lock); + sphore_signal(&aio->sphore); +} + +void aio_write_newline(ASYNCIO *aio) +{ +#if defined(CONF_FAMILY_WINDOWS) + aio_write(aio, "\r\n", 2); +#else + aio_write(aio, "\n", 1); +#endif +} + +int aio_error(ASYNCIO *aio) +{ + int result; + lock_wait(aio->lock); + result = aio->error; + lock_unlock(aio->lock); + return result; +} + +void aio_free(ASYNCIO *aio) +{ + lock_wait(aio->lock); + if(aio->thread) + { + thread_detach(aio->thread); + aio->thread = 0; + } + aio_handle_free_and_unlock(aio); +} + +void aio_close(ASYNCIO *aio) +{ + lock_wait(aio->lock); + aio->finish = ASYNCIO_CLOSE; + lock_unlock(aio->lock); + sphore_signal(&aio->sphore); +} + +void aio_wait(ASYNCIO *aio) +{ + void *thread; + lock_wait(aio->lock); + thread = aio->thread; + aio->thread = 0; + if(aio->finish == ASYNCIO_RUNNING) + { + aio->finish = ASYNCIO_EXIT; + } + lock_unlock(aio->lock); + sphore_signal(&aio->sphore); + thread_wait(thread); +} + void *thread_init(void (*threadfunc)(void *), void *u) { #if defined(CONF_FAMILY_UNIX) pthread_t id; - pthread_create(&id, NULL, (void *(*)(void*))threadfunc, u); + if(pthread_create(&id, NULL, (void *(*)(void*))threadfunc, u) != 0) + { + return 0; + } return (void*)id; #elif defined(CONF_FAMILY_WINDOWS) return CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)threadfunc, u, 0, NULL); @@ -512,6 +761,7 @@ void thread_wait(void *thread) dbg_msg("thread", "!! %d", result); #elif defined(CONF_FAMILY_WINDOWS) WaitForSingleObject((HANDLE)thread, INFINITE); + CloseHandle(thread); #else #error not implemented #endif @@ -629,7 +879,7 @@ void sphore_destroy(SEMAPHORE *sem) { CloseHandle((HANDLE)*sem); } void sphore_init(SEMAPHORE *sem) { char aBuf[64]; - str_format(aBuf, sizeof(aBuf), "/%d-ddphore-%p", pid(), (void *)sem); + str_format(aBuf, sizeof(aBuf), "/%d-ddnet.tw-%p", pid(), (void *)sem); *sem = sem_open(aBuf, O_CREAT | O_EXCL, S_IRWXU | S_IRWXG, 0); } void sphore_wait(SEMAPHORE *sem) { sem_wait(*sem); } @@ -638,7 +888,7 @@ void sphore_destroy(SEMAPHORE *sem) { char aBuf[64]; sem_close(*sem); - str_format(aBuf, sizeof(aBuf), "/%d-ddphore-%p", pid(), (void *)sem); + str_format(aBuf, sizeof(aBuf), "/%d-ddnet.tw-%p", pid(), (void *)sem); sem_unlink(aBuf); } #elif defined(CONF_FAMILY_UNIX) diff --git a/src/base/system.h b/src/base/system.h index 9a05cb2f4..00f2c850f 100644 --- a/src/base/system.h +++ b/src/base/system.h @@ -336,6 +336,18 @@ int io_close(IOHANDLE io); */ int io_flush(IOHANDLE io); +/* + Function: io_error + Checks whether an error occured during I/O with the file. + + Parameters: + io - Handle to the file. + + Returns: + Returns nonzero on error, 0 otherwise. +*/ +int io_error(IOHANDLE io); + /* Function: io_stdin @@ -355,6 +367,90 @@ IOHANDLE io_stdout(); */ IOHANDLE io_stderr(); +typedef struct ASYNCIO ASYNCIO; + +/* + Function: aio_new + Wraps a for asynchronous writing. + + Parameters: + io - Handle to the file. + + Returns: + Returns the handle for asynchronous writing. + +*/ +ASYNCIO *aio_new(IOHANDLE io); + +/* + Function: aio_write + Queues a chunk of data for writing. + + Parameters: + aio - Handle to the file. + buffer - Pointer to the data that should be written. + size - Number of bytes to write. + +*/ +void aio_write(ASYNCIO *aio, const void *buffer, unsigned size); + +/* + Function: aio_write_newline + Queues a newline for writing. + + Parameters: + aio - Handle to the file. + +*/ +void aio_write_newline(ASYNCIO *aio); + +/* + Function: aio_error + Checks whether errors have occured during the asynchronous + writing. + + Call this function regularly to see if there are errors. Call + this function after to see if the process of writing + to the file succeeded. + + Parameters: + aio - Handle to the file. + + Returns: + Returns 0 if no error occured, and nonzero on error. + +*/ +int aio_error(ASYNCIO *aio); + +/* + Function: aio_close + Queues file closing. + + Parameters: + aio - Handle to the file. + +*/ +void aio_close(ASYNCIO *aio); + +/* + Function: aio_wait + Wait for the asynchronous operations to complete. + + Parameters: + aio - Handle to the file. + +*/ +void aio_wait(ASYNCIO *aio); + +/* + Function: aio_free + Frees the resources associated to the asynchronous file handle. + + Parameters: + aio - Handle to the file. + +*/ +void aio_free(ASYNCIO *aio); /* Group: Threads */ @@ -388,7 +484,7 @@ void *thread_init(void (*threadfunc)(void *), void *user); void thread_wait(void *thread); /* - Function: thread_yeild + Function: thread_yield Yield the current threads execution slice. */ void thread_yield(); @@ -1261,10 +1357,10 @@ void mem_debug_dump(IOHANDLE file); void swap_endian(void *data, unsigned elem_size, unsigned num); -typedef void (*DBG_LOGGER)(const char *line); -void dbg_logger(DBG_LOGGER logger); +typedef void (*DBG_LOGGER)(const char *line, void *user); +typedef void (*DBG_LOGGER_FINISH)(void *user); +void dbg_logger(DBG_LOGGER logger, DBG_LOGGER_FINISH finish, void *user); -void dbg_enable_threaded(); void dbg_logger_stdout(); void dbg_logger_debugger(); void dbg_logger_file(const char *filename); diff --git a/src/engine/client/client.cpp b/src/engine/client/client.cpp index 3c79b7147..86c617a9d 100644 --- a/src/engine/client/client.cpp +++ b/src/engine/client/client.cpp @@ -3444,10 +3444,6 @@ int main(int argc, const char **argv) // ignore_convention } } -#if !defined(CONF_PLATFORM_MACOSX) - dbg_enable_threaded(); -#endif - if(secure_random_init() != 0) { dbg_msg("secure", "could not initialize secure RNG"); diff --git a/src/engine/server.h b/src/engine/server.h index 87bb5ae3c..4bdc3041f 100644 --- a/src/engine/server.h +++ b/src/engine/server.h @@ -180,6 +180,8 @@ public: virtual void ResetNetErrorString(int ClientID) = 0; virtual bool SetTimedOut(int ClientID, int OrigID) = 0; virtual void SetTimeoutProtected(int ClientID) = 0; + + virtual void SetErrorShutdown(const char *pReason) = 0; }; class IGameServer : public IInterface diff --git a/src/engine/server/server.cpp b/src/engine/server/server.cpp index da909891d..86ca758c4 100644 --- a/src/engine/server/server.cpp +++ b/src/engine/server/server.cpp @@ -334,6 +334,8 @@ CServer::CServer() CSqlConnector::SetWriteServers(m_apSqlWriteServers); #endif + m_aErrorShutdownReason[0] = 0; + Init(); } @@ -1746,6 +1748,10 @@ int CServer::Run() Console()->Print(IConsole::OUTPUT_LEVEL_STANDARD, "server", aBuf); GameServer()->OnInit(); + if(ErrorShutdown()) + { + return 1; + } str_format(aBuf, sizeof(aBuf), "version %s", GameServer()->NetVersion()); Console()->Print(IConsole::OUTPUT_LEVEL_STANDARD, "server", aBuf); @@ -1808,6 +1814,10 @@ int CServer::Run() m_ServerInfoFirstRequest = 0; Kernel()->ReregisterInterface(GameServer()); GameServer()->OnInit(); + if(ErrorShutdown()) + { + break; + } UpdateServerInfo(); } else @@ -1885,6 +1895,10 @@ int CServer::Run() } GameServer()->OnTick(); + if(ErrorShutdown()) + { + break; + } } // snap game @@ -1942,11 +1956,16 @@ int CServer::Run() } } } + const char *pDisconnectReason = "Server shutdown"; + if(ErrorShutdown()) + { + pDisconnectReason = m_aErrorShutdownReason; + } // disconnect all clients on shutdown for(int i = 0; i < MAX_CLIENTS; ++i) { if(m_aClients[i].m_State != CClient::STATE_EMPTY) - m_NetServer.Drop(i, "Server shutdown"); + m_NetServer.Drop(i, pDisconnectReason); } m_Econ.Shutdown(); @@ -1962,17 +1981,17 @@ int CServer::Run() mem_free(m_pCurrentMapData); #if defined (CONF_SQL) - for (int i = 0; i < MAX_SQLSERVERS; i++) - { - if (m_apSqlReadServers[i]) - delete m_apSqlReadServers[i]; + for (int i = 0; i < MAX_SQLSERVERS; i++) + { + if (m_apSqlReadServers[i]) + delete m_apSqlReadServers[i]; - if (m_apSqlWriteServers[i]) - delete m_apSqlWriteServers[i]; - } + if (m_apSqlWriteServers[i]) + delete m_apSqlWriteServers[i]; + } #endif - return 0; + return ErrorShutdown(); } void CServer::ConTestingCommands(CConsole::IResult *pResult, void *pUser) @@ -2684,10 +2703,6 @@ int main(int argc, const char **argv) // ignore_convention } } -#if !defined(CONF_PLATFORM_MACOSX) && !defined(FUZZING) - dbg_enable_threaded(); -#endif - if(secure_random_init() != 0) { dbg_msg("secure", "could not initialize secure RNG"); @@ -2764,6 +2779,7 @@ int main(int argc, const char **argv) // ignore_convention // free delete pKernel; + return 0; } @@ -2815,12 +2831,13 @@ const char *CServer::GetAnnouncementLine(char const *pFileName) return v[m_AnnouncementLastLine]; } -int* CServer::GetIdMap(int ClientID) +int *CServer::GetIdMap(int ClientID) { - return (int*)(IdMap + VANILLA_MAX_CLIENTS * ClientID); + return (int *)(IdMap + VANILLA_MAX_CLIENTS * ClientID); } -bool CServer::SetTimedOut(int ClientID, int OrigID) { +bool CServer::SetTimedOut(int ClientID, int OrigID) +{ if (!m_NetServer.SetTimedOut(ClientID, OrigID)) { return false; @@ -2829,3 +2846,8 @@ bool CServer::SetTimedOut(int ClientID, int OrigID) { m_aClients[ClientID].m_Authed = IServer::AUTHED_NO; return true; } + +void CServer::SetErrorShutdown(const char *pReason) +{ + str_copy(m_aErrorShutdownReason, pReason, sizeof(m_aErrorShutdownReason)); +} diff --git a/src/engine/server/server.h b/src/engine/server/server.h index 291d8e59c..6d40b062a 100644 --- a/src/engine/server/server.h +++ b/src/engine/server/server.h @@ -209,6 +209,8 @@ public: int64 m_ServerInfoFirstRequest; int m_ServerInfoNumRequests; + char m_aErrorShutdownReason[128]; + CServer(); int TrySetClientName(int ClientID, const char *pName); @@ -353,6 +355,9 @@ public: void ResetNetErrorString(int ClientID) { m_NetServer.ResetErrorString(ClientID); }; bool SetTimedOut(int ClientID, int OrigID); void SetTimeoutProtected(int ClientID) { m_NetServer.SetTimeoutProtected(ClientID); }; + + bool ErrorShutdown() const { return m_aErrorShutdownReason[0] != 0; } + void SetErrorShutdown(const char *pReason); }; #endif diff --git a/src/engine/shared/engine.cpp b/src/engine/shared/engine.cpp index 9c7f28cc8..dc401c476 100644 --- a/src/engine/shared/engine.cpp +++ b/src/engine/shared/engine.cpp @@ -59,9 +59,7 @@ public: { if(!Silent) dbg_logger_stdout(); -#if defined(CONF_FAMILY_WINDOWS) dbg_logger_debugger(); -#endif // dbg_msg("engine", "running on %s-%s-%s", CONF_FAMILY_STRING, CONF_PLATFORM_STRING, CONF_ARCH_STRING); diff --git a/src/game/server/gamecontext.cpp b/src/game/server/gamecontext.cpp index 4bd5e3bc7..ce4a50537 100644 --- a/src/game/server/gamecontext.cpp +++ b/src/game/server/gamecontext.cpp @@ -104,7 +104,7 @@ void CGameContext::Clear() void CGameContext::TeeHistorianWrite(const void *pData, int DataSize, void *pUser) { CGameContext *pSelf = (CGameContext *)pUser; - io_write(pSelf->m_TeeHistorianFile, pData, DataSize); + aio_write(pSelf->m_pTeeHistorianFile, pData, DataSize); } void CGameContext::CommandCallback(int ClientID, int FlagMask, const char *pCmd, IConsole::IResult *pResult, void *pUser) @@ -604,6 +604,13 @@ void CGameContext::OnTick() if(m_TeeHistorianActive) { + int Error = aio_error(m_pTeeHistorianFile); + if(Error) + { + dbg_msg("teehistorian", "error writing to file, err=%d", Error); + Server()->SetErrorShutdown("teehistorian io error"); + } + if(!m_TeeHistorian.Starting()) { m_TeeHistorian.EndInputs(); @@ -1048,10 +1055,6 @@ void CGameContext::OnClientConnected(int ClientID) void CGameContext::OnClientDrop(int ClientID, const char *pReason) { - if(m_TeeHistorianActive) - { - io_flush(m_TeeHistorianFile); - } AbortVoteKickOnDisconnect(ClientID); m_apPlayers[ClientID]->OnDisconnect(pReason); delete m_apPlayers[ClientID]; @@ -2451,6 +2454,7 @@ void CGameContext::OnConsoleInit() { m_pServer = Kernel()->RequestInterface(); m_pConsole = Kernel()->RequestInterface(); + m_pStorage = Kernel()->RequestInterface(); m_ChatPrintCBIndex = Console()->RegisterPrintCallback(0, SendChatResponse, this); @@ -2491,6 +2495,7 @@ void CGameContext::OnInit(/*class IKernel *pKernel*/) { m_pServer = Kernel()->RequestInterface(); m_pConsole = Kernel()->RequestInterface(); + m_pStorage = Kernel()->RequestInterface(); m_World.SetGameServer(this); m_Events.SetGameServer(this); @@ -2576,16 +2581,18 @@ void CGameContext::OnInit(/*class IKernel *pKernel*/) char aFilename[64]; str_format(aFilename, sizeof(aFilename), "teehistorian/%s.teehistorian", aGameUuid); - m_TeeHistorianFile = Kernel()->RequestInterface()->OpenFile(aFilename, IOFLAG_WRITE, IStorage::TYPE_SAVE); - if(!m_TeeHistorianFile) + IOHANDLE File = Storage()->OpenFile(aFilename, IOFLAG_WRITE, IStorage::TYPE_SAVE); + if(!File) { dbg_msg("teehistorian", "failed to open '%s'", aFilename); - exit(1); + Server()->SetErrorShutdown("teehistorian open error"); + return; } else { dbg_msg("teehistorian", "recording to '%s'", aFilename); } + m_pTeeHistorianFile = aio_new(File); char aVersion[128]; #ifdef GIT_SHORTREV_HASH @@ -2610,7 +2617,6 @@ void CGameContext::OnInit(/*class IKernel *pKernel*/) GameInfo.m_pMapName = aMapName; m_TeeHistorian.Reset(&GameInfo, TeeHistorianWrite, this); - io_flush(m_TeeHistorianFile); } if(g_Config.m_SvSoloServer) @@ -2767,22 +2773,19 @@ void CGameContext::DeleteTempfile() { if(m_aDeleteTempfile[0] != 0) { - IStorage *pStorage = Kernel()->RequestInterface(); - pStorage->RemoveFile(m_aDeleteTempfile, IStorage::TYPE_SAVE); + Storage()->RemoveFile(m_aDeleteTempfile, IStorage::TYPE_SAVE); m_aDeleteTempfile[0] = 0; } } void CGameContext::OnMapChange(char *pNewMapName, int MapNameSize) { - IStorage *pStorage = Kernel()->RequestInterface(); - char aConfig[128]; char aTemp[128]; str_format(aConfig, sizeof(aConfig), "maps/%s.cfg", g_Config.m_SvMap); str_format(aTemp, sizeof(aTemp), "%s.temp.%d", pNewMapName, pid()); - IOHANDLE File = pStorage->OpenFile(aConfig, IOFLAG_READ, IStorage::TYPE_ALL); + IOHANDLE File = Storage()->OpenFile(aConfig, IOFLAG_READ, IStorage::TYPE_ALL); if(!File) { // No map-specific config, just return. @@ -2815,7 +2818,7 @@ void CGameContext::OnMapChange(char *pNewMapName, int MapNameSize) } CDataFileReader Reader; - Reader.Open(pStorage, pNewMapName, IStorage::TYPE_ALL); + Reader.Open(Storage(), pNewMapName, IStorage::TYPE_ALL); CDataFileWriter Writer; Writer.Init(); @@ -2893,7 +2896,7 @@ void CGameContext::OnMapChange(char *pNewMapName, int MapNameSize) dbg_msg("mapchange", "imported settings"); Reader.Close(); - Writer.OpenFile(pStorage, aTemp); + Writer.OpenFile(Storage(), aTemp); Writer.Finish(); str_copy(pNewMapName, aTemp, MapNameSize); @@ -2908,7 +2911,15 @@ void CGameContext::OnShutdown(bool FullShutdown) if(m_TeeHistorianActive) { m_TeeHistorian.Finish(); - io_close(m_TeeHistorianFile); + aio_close(m_pTeeHistorianFile); + aio_wait(m_pTeeHistorianFile); + int Error = aio_error(m_pTeeHistorianFile); + if(Error) + { + dbg_msg("teehistorian", "error closing file, err=%d", Error); + Server()->SetErrorShutdown("teehistorian close error"); + } + aio_free(m_pTeeHistorianFile); } DeleteTempfile(); diff --git a/src/game/server/gamecontext.h b/src/game/server/gamecontext.h index 045d2379f..aa1ab3785 100644 --- a/src/game/server/gamecontext.h +++ b/src/game/server/gamecontext.h @@ -52,10 +52,13 @@ enum NUM_TUNEZONES = 256 }; +class IStorage; + class CGameContext : public IGameServer { IServer *m_pServer; class IConsole *m_pConsole; + IStorage *m_pStorage; CLayers m_Layers; CCollision m_Collision; CNetObjHandler m_NetObjHandler; @@ -64,7 +67,7 @@ class CGameContext : public IGameServer bool m_TeeHistorianActive; CTeeHistorian m_TeeHistorian; - IOHANDLE m_TeeHistorianFile; + ASYNCIO *m_pTeeHistorianFile; CUuid m_GameUuid; static void CommandCallback(int ClientID, int FlagMask, const char *pCmd, IConsole::IResult *pResult, void *pUser); @@ -107,6 +110,7 @@ class CGameContext : public IGameServer public: IServer *Server() const { return m_pServer; } class IConsole *Console() { return m_pConsole; } + IStorage *Storage() { return m_pStorage; } CCollision *Collision() { return &m_Collision; } CTuningParams *Tuning() { return &m_Tuning; } CTuningParams *TuningList() { return &m_aTuningList[0]; } diff --git a/src/test/aio.cpp b/src/test/aio.cpp new file mode 100644 index 000000000..be591e960 --- /dev/null +++ b/src/test/aio.cpp @@ -0,0 +1,124 @@ +#include + +#include + +static const int BUF_SIZE = 64 * 1024; + +class Async : public ::testing::Test +{ +protected: + ASYNCIO *m_pAio; + char m_aFilename[64]; + bool Delete; + + Async() + { + const ::testing::TestInfo *pTestInfo = + ::testing::UnitTest::GetInstance()->current_test_info(); + const char *pTestName = pTestInfo->name(); + + str_format(m_aFilename, sizeof(m_aFilename), "Async.%s-%d.tmp", pTestName, pid()); + m_pAio = aio_new(io_open(m_aFilename, IOFLAG_WRITE)); + Delete = false; + } + + ~Async() + { + if(Delete) + { + fs_remove(m_aFilename); + } + } + + void Write(const char *pText) + { + aio_write(m_pAio, pText, str_length(pText)); + } + + void Expect(const char *pOutput) + { + aio_close(m_pAio); + aio_wait(m_pAio); + aio_free(m_pAio); + + char aBuf[BUF_SIZE]; + IOHANDLE File = io_open(m_aFilename, IOFLAG_READ); + int Read = io_read(File, aBuf, sizeof(aBuf)); + + ASSERT_EQ(str_length(pOutput), Read); + ASSERT_TRUE(mem_comp(aBuf, pOutput, Read) == 0); + Delete = true; + } +}; + +TEST_F(Async, Empty) +{ + Expect(""); +} + +TEST_F(Async, Simple) +{ + static const char TEXT[] = "a\n"; + Write(TEXT); + Expect(TEXT); +} + +TEST_F(Async, Long) +{ + char aText[BUF_SIZE + 1]; + for(unsigned i = 0; i < sizeof(aText) - 1; i++) + { + aText[i] = 'a'; + } + aText[sizeof(aText) - 1] = 0; + Write(aText); + Expect(aText); +} + +TEST_F(Async, Pieces) +{ + char aText[BUF_SIZE + 1]; + for(unsigned i = 0; i < sizeof(aText) - 1; i++) + { + aText[i] = 'a'; + } + aText[sizeof(aText) - 1] = 0; + for(unsigned i = 0; i < sizeof(aText) - 1; i++) + { + Write("a"); + } + Expect(aText); +} + +TEST_F(Async, Mixed) +{ + char aText[BUF_SIZE + 1]; + for(unsigned i = 0; i < sizeof(aText) - 1; i++) + { + aText[i] = 'a' + i % 26; + } + aText[sizeof(aText) - 1] = 0; + for(unsigned i = 0; i < sizeof(aText) - 1; i++) + { + char w = 'a' + i % 26; + aio_write(m_pAio, &w, 1); + } + Expect(aText); +} + +TEST_F(Async, NonDivisor) +{ + static const int NUM_LETTERS = 13; + static const int SIZE = BUF_SIZE / NUM_LETTERS * NUM_LETTERS; + char aText[SIZE + 1]; + for(unsigned i = 0; i < sizeof(aText) - 1; i++) + { + aText[i] = 'a' + i % NUM_LETTERS; + } + aText[sizeof(aText) - 1] = 0; + for(unsigned i = 0; i < (sizeof(aText) - 1) / NUM_LETTERS; i++) + { + Write("abcdefghijklm"); + } + Expect(aText); +} diff --git a/src/test/thread.cpp b/src/test/thread.cpp new file mode 100644 index 000000000..6ea9fe375 --- /dev/null +++ b/src/test/thread.cpp @@ -0,0 +1,81 @@ +#include + +#include + +static void Nothing(void *pUser) +{ + (void)pUser; +} + +TEST(Thread, Detach) +{ + void *pThread = thread_init(Nothing, 0); + thread_detach(pThread); +} + +static void SetToOne(void *pUser) +{ + *(int *)pUser = 1; +} + +TEST(Thread, Wait) +{ + int Integer = 0; + void *pThread = thread_init(SetToOne, &Integer); + thread_wait(pThread); + EXPECT_EQ(Integer, 1); +} + +TEST(Thread, Yield) +{ + thread_yield(); +} + +TEST(Thread, Semaphore) +{ + SEMAPHORE Semaphore; + sphore_init(&Semaphore); + sphore_destroy(&Semaphore); +} + +TEST(Thread, SemaphoreSingleThreaded) +{ + SEMAPHORE Semaphore; + sphore_init(&Semaphore); + sphore_signal(&Semaphore); + sphore_signal(&Semaphore); + sphore_wait(&Semaphore); + sphore_wait(&Semaphore); + sphore_destroy(&Semaphore); +} + +static void SemaphoreThread(void *pUser) +{ + SEMAPHORE *pSemaphore = (SEMAPHORE *)pUser; + sphore_wait(pSemaphore); +} + +TEST(Thread, SemaphoreMultiThreaded) +{ + SEMAPHORE Semaphore; + sphore_init(&Semaphore); + sphore_signal(&Semaphore); + void *pThread = thread_init(SemaphoreThread, &Semaphore); + thread_wait(pThread); + sphore_destroy(&Semaphore); +} + +static void LockThread(void *pUser) +{ + LOCK *pLock = (LOCK *)pUser; + lock_wait(*pLock); + lock_unlock(*pLock); +} + +TEST(Thread, Lock) +{ + LOCK Lock = lock_create(); + void *pThread = thread_init(LockThread, &Lock); + lock_unlock(Lock); + thread_wait(pThread); +}