Add assertions to all thread and semaphore functions

Assert on failures in all `thread_*` and `sphore_*` functions on all operating systems instead of only printing log messages on Unix, as these functions are only expected to fail when used with incorrect arguments or in some cases when a dead-lock is detected.

On macOS, `sphore_wait` was not correctly calling `sem_wait` in a loop to repeat the wait operation if it is interrupted by a signal.

On Windows, the AIO tests were failing with the additional assertions. The maximum count that semaphores on Windows could be incremented to was previously, arbitrarily limited to 10000, which was causing the `ReleaseSemaphore` call to fail as the AIO semaphore is signaled 65536 times (for each write operation) in multiple of the AIO tests.
This commit is contained in:
Robert Müller 2024-02-25 15:22:20 +01:00 committed by Dennis Felsing
parent 66cc7b5e5b
commit b544c3e839
2 changed files with 61 additions and 53 deletions

View file

@ -802,12 +802,12 @@ void *thread_init(void (*threadfunc)(void *), void *u, const char *name)
data->u = u;
#if defined(CONF_FAMILY_UNIX)
{
pthread_t id;
pthread_attr_t attr;
pthread_attr_init(&attr);
dbg_assert(pthread_attr_init(&attr) == 0, "pthread_attr_init failure");
#if defined(CONF_PLATFORM_MACOS) && defined(__MAC_10_10) && __MAC_OS_X_VERSION_MIN_REQUIRED >= __MAC_10_10
pthread_attr_set_qos_class_np(&attr, QOS_CLASS_USER_INTERACTIVE, 0);
dbg_assert(pthread_attr_set_qos_class_np(&attr, QOS_CLASS_USER_INTERACTIVE, 0) == 0, "pthread_attr_set_qos_class_np failure");
#endif
pthread_t id;
dbg_assert(pthread_create(&id, &attr, thread_run, data) == 0, "pthread_create failure");
return (void *)id;
}
@ -824,12 +824,10 @@ void *thread_init(void (*threadfunc)(void *), void *u, const char *name)
void thread_wait(void *thread)
{
#if defined(CONF_FAMILY_UNIX)
int result = pthread_join((pthread_t)thread, NULL);
if(result != 0)
dbg_msg("thread", "!! %d", result);
dbg_assert(pthread_join((pthread_t)thread, nullptr) == 0, "pthread_join failure");
#elif defined(CONF_FAMILY_WINDOWS)
WaitForSingleObject((HANDLE)thread, INFINITE);
CloseHandle(thread);
dbg_assert(WaitForSingleObject((HANDLE)thread, INFINITE) == WAIT_OBJECT_0, "WaitForSingleObject failure");
dbg_assert(CloseHandle(thread), "CloseHandle failure");
#else
#error not implemented
#endif
@ -838,9 +836,7 @@ void thread_wait(void *thread)
void thread_yield()
{
#if defined(CONF_FAMILY_UNIX)
int result = sched_yield();
if(result != 0)
dbg_msg("thread", "yield failed: %d", errno);
dbg_assert(sched_yield() == 0, "sched_yield failure");
#elif defined(CONF_FAMILY_WINDOWS)
Sleep(0);
#else
@ -851,76 +847,87 @@ void thread_yield()
void thread_detach(void *thread)
{
#if defined(CONF_FAMILY_UNIX)
int result = pthread_detach((pthread_t)(thread));
if(result != 0)
dbg_msg("thread", "detach failed: %d", result);
dbg_assert(pthread_detach((pthread_t)thread) == 0, "pthread_detach failure");
#elif defined(CONF_FAMILY_WINDOWS)
CloseHandle(thread);
dbg_assert(CloseHandle(thread), "CloseHandle failure");
#else
#error not implemented
#endif
}
bool thread_init_and_detach(void (*threadfunc)(void *), void *u, const char *name)
void thread_init_and_detach(void (*threadfunc)(void *), void *u, const char *name)
{
void *thread = thread_init(threadfunc, u, name);
if(thread)
thread_detach(thread);
return thread != nullptr;
thread_detach(thread);
}
#if defined(CONF_FAMILY_WINDOWS)
void sphore_init(SEMAPHORE *sem)
{
*sem = CreateSemaphore(0, 0, 10000, 0);
*sem = CreateSemaphoreW(nullptr, 0, std::numeric_limits<LONG>::max(), nullptr);
dbg_assert(*sem != nullptr, "CreateSemaphoreW failure");
}
void sphore_wait(SEMAPHORE *sem)
{
dbg_assert(WaitForSingleObject((HANDLE)*sem, INFINITE) == WAIT_OBJECT_0, "WaitForSingleObject failure");
}
void sphore_signal(SEMAPHORE *sem)
{
dbg_assert(ReleaseSemaphore((HANDLE)*sem, 1, nullptr), "ReleaseSemaphore failure");
}
void sphore_destroy(SEMAPHORE *sem)
{
dbg_assert(CloseHandle((HANDLE)*sem), "CloseHandle failure");
}
void sphore_wait(SEMAPHORE *sem) { WaitForSingleObject((HANDLE)*sem, INFINITE); }
void sphore_signal(SEMAPHORE *sem) { ReleaseSemaphore((HANDLE)*sem, 1, NULL); }
void sphore_destroy(SEMAPHORE *sem) { CloseHandle((HANDLE)*sem); }
#elif defined(CONF_PLATFORM_MACOS)
void sphore_init(SEMAPHORE *sem)
{
char aBuf[64];
char aBuf[32];
str_format(aBuf, sizeof(aBuf), "%p", (void *)sem);
*sem = sem_open(aBuf, O_CREAT | O_EXCL, S_IRWXU | S_IRWXG, 0);
if(*sem == SEM_FAILED)
dbg_msg("sphore", "init failed: %d", errno);
dbg_assert(*sem != SEM_FAILED, "sem_open failure");
}
void sphore_wait(SEMAPHORE *sem)
{
while(true)
{
if(sem_wait(*sem) == 0)
break;
dbg_assert(errno == EINTR, "sem_wait failure");
}
}
void sphore_signal(SEMAPHORE *sem)
{
dbg_assert(sem_post(*sem) == 0, "sem_post failure");
}
void sphore_wait(SEMAPHORE *sem) { sem_wait(*sem); }
void sphore_signal(SEMAPHORE *sem) { sem_post(*sem); }
void sphore_destroy(SEMAPHORE *sem)
{
char aBuf[64];
sem_close(*sem);
dbg_assert(sem_close(*sem) == 0, "sem_close failure");
char aBuf[32];
str_format(aBuf, sizeof(aBuf), "%p", (void *)sem);
sem_unlink(aBuf);
dbg_assert(sem_unlink(aBuf) == 0, "sem_unlink failure");
}
#elif defined(CONF_FAMILY_UNIX)
void sphore_init(SEMAPHORE *sem)
{
if(sem_init(sem, 0, 0) != 0)
dbg_msg("sphore", "init failed: %d", errno);
dbg_assert(sem_init(sem, 0, 0) == 0, "sem_init failure");
}
void sphore_wait(SEMAPHORE *sem)
{
do
while(true)
{
errno = 0;
if(sem_wait(sem) != 0)
dbg_msg("sphore", "wait failed: %d", errno);
} while(errno == EINTR);
if(sem_wait(sem) == 0)
break;
dbg_assert(errno == EINTR, "sem_wait failure");
}
}
void sphore_signal(SEMAPHORE *sem)
{
if(sem_post(sem) != 0)
dbg_msg("sphore", "post failed: %d", errno);
dbg_assert(sem_post(sem) == 0, "sem_post failure");
}
void sphore_destroy(SEMAPHORE *sem)
{
if(sem_destroy(sem) != 0)
dbg_msg("sphore", "destroy failed: %d", errno);
dbg_assert(sem_destroy(sem) == 0, "sem_destroy failure");
}
#endif

View file

@ -558,6 +558,7 @@ void aio_free(ASYNCIO *aio);
* Threading related functions.
*
* @see Locks
* @see Semaphore
*/
/**
@ -567,7 +568,9 @@ void aio_free(ASYNCIO *aio);
*
* @param threadfunc Entry point for the new thread.
* @param user Pointer to pass to the thread.
* @param name name describing the use of the thread
* @param name Name describing the use of the thread.
*
* @return Handle for the new thread.
*/
void *thread_init(void (*threadfunc)(void *), void *user, const char *name);
@ -581,35 +584,33 @@ void *thread_init(void (*threadfunc)(void *), void *user, const char *name);
void thread_wait(void *thread);
/**
* Yield the current threads execution slice.
* Yield the current thread's execution slice.
*
* @ingroup Threads
*/
void thread_yield();
/**
* Puts the thread in the detached thread, guaranteeing that
* Puts the thread in the detached state, guaranteeing that
* resources of the thread will be freed immediately when the
* thread terminates.
*
* @ingroup Threads
*
* @param thread Thread to detach
* @param thread Thread to detach.
*/
void thread_detach(void *thread);
/**
* Creates a new thread and if it succeeded detaches it.
* Creates a new thread and detaches it.
*
* @ingroup Threads
*
* @param threadfunc Entry point for the new thread.
* @param user Pointer to pass to the thread.
* @param name Name describing the use of the thread
*
* @return true on success, false on failure.
* @param name Name describing the use of the thread.
*/
bool thread_init_and_detach(void (*threadfunc)(void *), void *user, const char *name);
void thread_init_and_detach(void (*threadfunc)(void *), void *user, const char *name);
/**
* @defgroup Semaphore