From 7dad488b9663339d2a84eccdad1b04ae602d176a Mon Sep 17 00:00:00 2001 From: ThePedroo Date: Mon, 6 Feb 2023 16:54:09 -0300 Subject: [PATCH] feat(thread): Replace pthread to CThreads CThreads can also work in Windows using Windows threads, due to this, pthread was replaced by it. --- core/Makefile | 3 +- core/cthreads.c | 268 +++++++++++++++++++++++++++++++++++ core/cthreads.h | 119 ++++++++++++++++ core/threadpool.c | 53 ++++--- core/user-agent.c | 16 +-- core/websockets.c | 46 +++--- include/discord-internal.h | 23 +-- src/Makefile | 3 +- src/discord-cache.c | 43 +++--- src/discord-client.c | 12 +- src/discord-gateway.c | 8 +- src/discord-refcount.c | 28 ++-- src/discord-rest_ratelimit.c | 4 +- src/discord-rest_request.c | 53 +++---- src/discord-timer.c | 36 ++--- src/discord-voice.c | 22 +-- src/discord-worker.c | 17 ++- 17 files changed, 572 insertions(+), 182 deletions(-) create mode 100644 core/cthreads.c create mode 100644 core/cthreads.h diff --git a/core/Makefile b/core/Makefile index 043545f99..bfdfccec2 100644 --- a/core/Makefile +++ b/core/Makefile @@ -12,7 +12,8 @@ OBJS = cog-utils.o \ priority_queue.o \ anomap.o \ sha1.o \ - threadpool.o + threadpool.o \ + cthreads.o WFLAGS = -Wall -Wextra -Wpedantic CFLAGS += -std=c99 -pthread -D_XOPEN_SOURCE=600 -DLOG_USE_COLOR \ diff --git a/core/cthreads.c b/core/cthreads.c new file mode 100644 index 000000000..d32e390d8 --- /dev/null +++ b/core/cthreads.c @@ -0,0 +1,268 @@ +#include +#include + +#include "cthreads.h" + +#if _WIN32 +#include +DWORD WINAPI __cthreads_winthreads_function_wrapper(void *data) { + struct cthreads_args *args = data; + args->func(args->data); + + free(data); + + return TRUE; +} +#else +#include +void *__cthreads_pthread_function_wrapper(void *data) { + struct cthreads_args *args = data; + args->func(args->data); + + free(data); + + return NULL; +} +#endif + +int cthreads_thread_create(struct cthreads_thread *thread, struct cthreads_thread_attr *attr, void *(*func)(void *data), void *data) { + #ifdef _WIN32 + struct cthreads_args *args = malloc(sizeof(struct cthreads_args)); + args->func = func; + args->data = data; + + if (attr) thread->wThread = CreateThread(NULL, attr->stacksize ? attr->stacksize : 0, + __cthreads_winthreads_function_wrapper, args, + attr->dwCreationFlags ? (DWORD)attr->dwCreationFlags : 0, NULL); + else thread->wThread = CreateThread(NULL, 0, __cthreads_winthreads_function_wrapper, args, 0, NULL); + + return 0; + #else + pthread_t pthread; + + struct cthreads_args *args = malloc(sizeof(struct cthreads_args)); + args->func = func; + args->data = data; + + pthread_attr_t pAttr; + if (attr) { + if (attr->detachstate) pthread_attr_setdetachstate(&pAttr, attr->detachstate); + if (attr->guardsize) pthread_attr_setguardsize(&pAttr, attr->guardsize); + if (attr->inheritsched) pthread_attr_setinheritsched(&pAttr, attr->inheritsched); + if (attr->schedpolicy) pthread_attr_setschedpolicy(&pAttr, attr->schedpolicy); + if (attr->scope) pthread_attr_setscope(&pAttr, attr->scope); + if (attr->stack) pthread_attr_setstack(&pAttr, attr->stackaddr, attr->stack); + if (attr->stacksize) pthread_attr_setstacksize(&pAttr, attr->stacksize); + } + + int res = pthread_create(&pthread, attr ? &pAttr : NULL, __cthreads_pthread_function_wrapper, (void *)args); + + thread->pThread = pthread; + + return res; + #endif +} + +int cthreads_thread_detach(struct cthreads_thread *thread) { + #ifdef _WIN32 + return CloseHandle(thread->wThread); + #else + return pthread_detach(thread->pThread); + #endif +} + +void cthreads_thread_close(void *code) { + #ifdef _WIN32 + return ExitThread((DWORD)code); + #else + return pthread_exit(code); + #endif +} + +int cthreads_mutex_init(struct cthreads_mutex *mutex, struct cthreads_mutex_attr *attr) { + #ifdef _WIN32 + HANDLE wMutex; + if (attr) wMutex= CreateMutex(NULL, attr->bInitialOwner ? TRUE : FALSE, + attr->lpName ? (LPCSTR)attr->lpName : NULL); + else wMutex= CreateMutex(NULL, FALSE, NULL); + + if (wMutex == NULL) return 1; + else { + mutex->wMutex = wMutex; + return 0; + } + #else + pthread_mutexattr_t pAttr; + if (attr) { + if (attr->pshared) pthread_mutexattr_setpshared(&pAttr, attr->pshared); + if (attr->type) pthread_mutexattr_settype(&pAttr, attr->type); + if (attr->protocol) pthread_mutexattr_setprotocol(&pAttr, attr->protocol); + if (attr->robust) pthread_mutexattr_setrobust(&pAttr, attr->robust); + if (attr->prioceiling) pthread_mutexattr_setprioceiling(&pAttr, attr->prioceiling); + } + return pthread_mutex_init(&mutex->pMutex, attr ? &pAttr : NULL); + #endif +} + +int cthreads_mutex_lock(struct cthreads_mutex *mutex) { + #ifdef _WIN32 + DWORD ret = WaitForSingleObject(mutex->wMutex, INFINITE); + + if (ret == WAIT_OBJECT_0) return 0; + return 1; + #else + return pthread_mutex_lock(&mutex->pMutex); + #endif +} + +int cthreads_mutex_trylock(struct cthreads_mutex *mutex) { + #ifdef _WIN32 + DWORD ret = WaitForSingleObject(mutex->wMutex, 0); + + if (ret == WAIT_OBJECT_0) return 0; + return 1; + #else + return pthread_mutex_trylock(&mutex->pMutex); + #endif +} + +int cthreads_mutex_unlock(struct cthreads_mutex *mutex) { + #ifdef _WIN32 + return ReleaseMutex(mutex->wMutex) == 0 ? 1 : 0; + #else + return pthread_mutex_unlock(&mutex->pMutex); + #endif +} + +int cthreads_mutex_destroy(struct cthreads_mutex *mutex) { + #ifdef _WIN32 + return CloseHandle(mutex->wMutex) == 0 ? 1 : 0; + #else + return pthread_mutex_destroy(&mutex->pMutex); + #endif +} + +int cthreads_cond_init(struct cthreads_cond *cond, struct cthreads_cond_attr *attr) { + #ifdef _WIN32 + if (attr) cond->wCond = CreateEvent(NULL, attr->bManualReset ? TRUE : FALSE, + attr->bInitialState ? TRUE : FALSE, + attr->lpName ? (LPTSTR)attr->lpName : NULL); + else cond->wCond = CreateEvent(NULL, FALSE, FALSE, NULL); + + if (cond->wCond == NULL) return 1; + return 0; + #else + pthread_condattr_t pAttr; + if (attr) { + if (attr->pshared) pthread_condattr_setpshared(&pAttr, attr->pshared); + if (attr->clock) pthread_condattr_setclock(&pAttr, attr->clock); + } + return pthread_cond_init(&cond->pCond, attr ? &pAttr : NULL); + #endif +} + +int cthreads_cond_signal(struct cthreads_cond *cond) { + #ifdef _WIN32 + return SetEvent(cond->wCond) == 0 ? 1 : 0; + #else + return pthread_cond_signal(&cond->pCond); + #endif +} + +int cthreads_cond_broadcast(struct cthreads_cond *cond) { + #ifdef _WIN32 + return SetEvent(cond->wCond) == 0 ? 1 : 0; + #else + return pthread_cond_broadcast(&cond->pCond); + #endif +} + +int cthreads_cond_destroy(struct cthreads_cond *cond) { + #ifdef _WIN32 + return CloseHandle(cond->wCond) == 0 ? 1 : 0; + #else + return pthread_cond_destroy(&cond->pCond); + #endif +} + +int cthreads_cond_wait(struct cthreads_cond *cond, struct cthreads_mutex *mutex) { + #ifdef _WIN32 + if (cthreads_mutex_unlock(mutex) == 1) return 1; + if (WaitForSingleObject(cond->wCond, INFINITE) == WAIT_FAILED) return 1; + return cthreads_mutex_lock(mutex) == 1 ? 1 : 0; + #else + return pthread_cond_wait(&cond->pCond, &mutex->pMutex); + #endif +} + +int cthreads_join(struct cthreads_thread *thread, void *code) { + #ifdef _WIN32 + if (WaitForSingleObject(thread->wThread, INFINITE) == WAIT_FAILED) return 0; + return GetExitCodeThread(thread->wThread, (LPDWORD)&code) == 0 ? 1 : 0; + #else + return pthread_join(thread->pThread, code ? &code : NULL); + #endif +} + +int cthreads_rwlock_init(struct cthreads_rwlock *rwlock) { + #ifdef _WIN32 + rwlock->wRWLock = CreateMutex(NULL, FALSE, NULL); + + if (rwlock->wRWLock == NULL) return 1; + return 0; + #else + return pthread_rwlock_init(&rwlock->pRWLock, NULL); + #endif +} + +int cthreads_rwlock_rdlock(struct cthreads_rwlock *rwlock) { + #ifdef _WIN32 + return WaitForSingleObject(rwlock->wRWLock, INFINITE) == WAIT_FAILED ? 1 : 0; + #else + return pthread_rwlock_rdlock(&rwlock->pRWLock); + #endif +} + +int cthreads_rwlock_unlock(struct cthreads_rwlock *rwlock) { + #ifdef _WIN32 + return ReleaseMutex(rwlock->wRWLock) == 0 ? 1 : 0; + #else + return pthread_rwlock_unlock(&rwlock->pRWLock); + #endif +} + +int cthreads_rwlock_wrlock(struct cthreads_rwlock *rwlock) { + #ifdef _WIN32 + return WaitForSingleObject(rwlock->wRWLock, INFINITE) == WAIT_FAILED ? 1 : 0; + #else + return pthread_rwlock_wrlock(&rwlock->pRWLock); + #endif +} + +int cthreads_rwlock_destroy(struct cthreads_rwlock *rwlock) { + #ifdef _WIN32 + return CloseHandle(rwlock->wRWLock) == 0 ? 1 : 0; + #else + return pthread_rwlock_destroy(&rwlock->pRWLock); + #endif +} + +int cthreads_equal(struct cthreads_thread thread1, struct cthreads_thread thread2) { + #ifdef _WIN32 + return thread1.wThread == thread2.wThread; + #else + return pthread_equal(thread1.pThread, thread2.pThread); + #endif +} + +struct cthreads_thread cthreads_self() { + struct cthreads_thread t; + + #ifdef _WIN32 + t.wThread = GetCurrentThread(); + #else + t.pThread = pthread_self(); + #endif + + return t; +} diff --git a/core/cthreads.h b/core/cthreads.h new file mode 100644 index 000000000..5f8a7c651 --- /dev/null +++ b/core/cthreads.h @@ -0,0 +1,119 @@ +#ifndef CTHREADS_H +#define CTHREADS_H + +struct cthreads_args { + void *(*func)(void *data); + void *data; +}; + +#if _WIN32 +#include +#else +#include +#endif + +struct cthreads_thread { + #ifdef _WIN32 + HANDLE wThread; + #else + pthread_t pThread; + #endif +}; + +struct cthreads_thread_attr { + int detachstate; + size_t guardsize; + int inheritsched; + int schedpolicy; + int scope; + size_t stack; + void *stackaddr; + size_t stacksize; + int dwCreationFlags; +}; + +struct cthreads_mutex { + #ifdef _WIN32 + HANDLE wMutex; + #else + pthread_mutex_t pMutex; + #endif +}; + +struct cthreads_mutex_attr { + int pshared; + int type; + int protocol; + int robust; + int prioceiling; + int bInitialOwner; + char *lpName; +}; + +struct cthreads_cond { + #ifdef _WIN32 + HANDLE wCond; + #else + pthread_cond_t pCond; + #endif +}; + +struct cthreads_cond_attr { + int pshared; + int clock; + int bManualReset; + int bInitialState; + char *lpName; +}; + +struct cthreads_rwlock { + #ifdef _WIN32 + HANDLE wRWLock; + #else + pthread_rwlock_t pRWLock; + #endif +}; + +int cthreads_thread_create(struct cthreads_thread *thread, struct cthreads_thread_attr *attr, void *(*func)(void *data), void *data); + +int cthreads_thread_detach(struct cthreads_thread *thread); + +void cthreads_thread_close(void *code); + +int cthreads_mutex_init(struct cthreads_mutex *mutex, struct cthreads_mutex_attr *attr); + +int cthreads_mutex_lock(struct cthreads_mutex *mutex); + +int cthreads_mutex_trylock(struct cthreads_mutex *mutex); + +int cthreads_mutex_unlock(struct cthreads_mutex *mutex); + +int cthreads_mutex_destroy(struct cthreads_mutex *mutex); + +int cthreads_cond_init(struct cthreads_cond *cond, struct cthreads_cond_attr *attr); + +int cthreads_cond_signal(struct cthreads_cond *cond); + +int cthreads_cond_broadcast(struct cthreads_cond *cond); + +int cthreads_cond_destroy(struct cthreads_cond *cond); + +int cthreads_cond_wait(struct cthreads_cond *cond, struct cthreads_mutex *mutex); + +int cthreads_join(struct cthreads_thread *thread, void *code); + +int cthreads_rwlock_init(struct cthreads_rwlock *rwlock); + +int cthreads_rwlock_rdlock(struct cthreads_rwlock *rwlock); + +int cthreads_rwlock_unlock(struct cthreads_rwlock *rwlock); + +int cthreads_rwlock_wrlock(struct cthreads_rwlock *rwlock); + +int cthreads_rwlock_destroy(struct cthreads_rwlock *rwlock); + +int cthreads_equal(struct cthreads_thread thread1, struct cthreads_thread thread2); + +struct cthreads_thread cthreads_self(); + +#endif /* CTHREADS_H */ diff --git a/core/threadpool.c b/core/threadpool.c index 1fe3fb084..667bea1d9 100644 --- a/core/threadpool.c +++ b/core/threadpool.c @@ -32,10 +32,9 @@ */ #include -#include -#include #include "threadpool.h" +#include "cthreads.h" typedef enum { immediate_shutdown = 1, @@ -71,9 +70,9 @@ typedef struct { * @var started Number of started threads */ struct threadpool_t { - pthread_mutex_t lock; - pthread_cond_t notify; - pthread_t *threads; + struct cthreads_mutex lock; + struct cthreads_cond notify; + struct cthreads_thread *threads; threadpool_task_t *queue; int thread_count; int queue_size; @@ -114,13 +113,13 @@ threadpool_t *threadpool_create(int thread_count, int queue_size, int flags) pool->shutdown = pool->started = 0; /* Allocate thread and task queue */ - pool->threads = (pthread_t *)malloc(sizeof(pthread_t) * thread_count); + pool->threads = (struct cthreads_thread *)malloc(sizeof(struct cthreads_thread) * thread_count); pool->queue = (threadpool_task_t *)malloc (sizeof(threadpool_task_t) * queue_size); /* Initialize mutex and conditional variable first */ - if((pthread_mutex_init(&(pool->lock), NULL) != 0) || - (pthread_cond_init(&(pool->notify), NULL) != 0) || + if((cthreads_mutex_init(&(pool->lock), NULL) != 0) || + (cthreads_cond_init(&(pool->notify), NULL) != 0) || (pool->threads == NULL) || (pool->queue == NULL)) { goto err; @@ -128,7 +127,7 @@ threadpool_t *threadpool_create(int thread_count, int queue_size, int flags) /* Start worker threads */ for(i = 0; i < thread_count; i++) { - if(pthread_create(&(pool->threads[i]), NULL, + if(cthreads_thread_create(&(pool->threads[i]), NULL, threadpool_thread, (void*)pool) != 0) { threadpool_destroy(pool, 0); return NULL; @@ -157,7 +156,7 @@ int threadpool_add(threadpool_t *pool, void (*function)(void *), return threadpool_invalid; } - if(pthread_mutex_lock(&(pool->lock)) != 0) { + if(cthreads_mutex_lock(&(pool->lock)) != 0) { return threadpool_lock_failure; } @@ -182,14 +181,14 @@ int threadpool_add(threadpool_t *pool, void (*function)(void *), pool->tail = next; pool->count += 1; - /* pthread_cond_broadcast */ - if(pthread_cond_signal(&(pool->notify)) != 0) { + /* cthreads_cond_signal */ + if(cthreads_cond_signal(&(pool->notify)) != 0) { err = threadpool_lock_failure; break; } } while(0); - if(pthread_mutex_unlock(&pool->lock) != 0) { + if(cthreads_mutex_unlock(&pool->lock) != 0) { err = threadpool_lock_failure; } @@ -204,7 +203,7 @@ int threadpool_destroy(threadpool_t *pool, int flags) return threadpool_invalid; } - if(pthread_mutex_lock(&(pool->lock)) != 0) { + if(cthreads_mutex_lock(&(pool->lock)) != 0) { return threadpool_lock_failure; } @@ -219,15 +218,15 @@ int threadpool_destroy(threadpool_t *pool, int flags) graceful_shutdown : immediate_shutdown; /* Wake up all worker threads */ - if((pthread_cond_broadcast(&(pool->notify)) != 0) || - (pthread_mutex_unlock(&(pool->lock)) != 0)) { + if((cthreads_cond_broadcast(&(pool->notify)) != 0) || + (cthreads_mutex_unlock(&(pool->lock)) != 0)) { err = threadpool_lock_failure; break; } /* Join all worker thread */ for(i = 0; i < pool->thread_count; i++) { - if(pthread_join(pool->threads[i], NULL) != 0) { + if(cthreads_join(&pool->threads[i], NULL) != 0) { err = threadpool_thread_failure; } } @@ -254,9 +253,9 @@ int threadpool_free(threadpool_t *pool) /* Because we allocate pool->threads after initializing the mutex and condition variable, we're sure they're initialized. Let's lock the mutex just in case. */ - pthread_mutex_lock(&(pool->lock)); - pthread_mutex_destroy(&(pool->lock)); - pthread_cond_destroy(&(pool->notify)); + cthreads_mutex_lock(&(pool->lock)); + cthreads_mutex_destroy(&(pool->lock)); + cthreads_cond_destroy(&(pool->notify)); } free(pool); return 0; @@ -270,12 +269,12 @@ static void *threadpool_thread(void *threadpool) for(;;) { /* Lock must be taken to wait on conditional variable */ - pthread_mutex_lock(&(pool->lock)); + cthreads_mutex_lock(&(pool->lock)); /* Wait on condition variable, check for spurious wakeups. - When returning from pthread_cond_wait(), we own the lock. */ + When returning from cthreads_cond_wait(), we own the lock. */ while((pool->count == 0) && (!pool->shutdown)) { - pthread_cond_wait(&(pool->notify), &(pool->lock)); + cthreads_cond_wait(&(pool->notify), &(pool->lock)); } if((pool->shutdown == immediate_shutdown) || @@ -291,7 +290,7 @@ static void *threadpool_thread(void *threadpool) pool->count -= 1; /* Unlock */ - pthread_mutex_unlock(&(pool->lock)); + cthreads_mutex_unlock(&(pool->lock)); /* Get to work */ (*(task.function))(task.argument); @@ -299,7 +298,7 @@ static void *threadpool_thread(void *threadpool) pool->started--; - pthread_mutex_unlock(&(pool->lock)); - pthread_exit(NULL); + cthreads_mutex_unlock(&(pool->lock)); + cthreads_thread_close(NULL); return(NULL); -} +} \ No newline at end of file diff --git a/core/user-agent.c b/core/user-agent.c index b207d5a90..72fb5bd75 100644 --- a/core/user-agent.c +++ b/core/user-agent.c @@ -5,11 +5,11 @@ #include /* isspace() */ #include #include -#include #include "user-agent.h" #include "cog-utils.h" #include "queue.h" +#include "cthreads.h" #define CURLE_LOG(conn, ecode) \ logconf_fatal(&conn->ua->conf, "(CURLE code: %d) %s", ecode, \ @@ -51,7 +51,7 @@ struct ua_conn_queue { /** total amount of created connection handles */ int total; /** lock for blocking queue operations */ - pthread_mutex_t lock; + struct cthreads_mutex lock; }; struct ua_conn { @@ -450,7 +450,7 @@ ua_conn_start(struct user_agent *ua) QUEUE(struct ua_conn) *qelem = NULL; struct ua_conn *conn = NULL; - pthread_mutex_lock(&ua->connq->lock); + cthreads_mutex_lock(&ua->connq->lock); if (QUEUE_EMPTY(&ua->connq->idle)) { conn = _ua_conn_init(ua); @@ -465,7 +465,7 @@ ua_conn_start(struct user_agent *ua) } QUEUE_INSERT_TAIL(&ua->connq->busy, &conn->entry); - pthread_mutex_unlock(&ua->connq->lock); + cthreads_mutex_unlock(&ua->connq->lock); return conn; } @@ -526,10 +526,10 @@ ua_conn_stop(struct ua_conn *conn) } /* move conn from 'busy' to 'idle' queue */ - pthread_mutex_lock(&ua->connq->lock); + cthreads_mutex_lock(&ua->connq->lock); QUEUE_REMOVE(&conn->entry); QUEUE_INSERT_TAIL(&ua->connq->idle, &conn->entry); - pthread_mutex_unlock(&ua->connq->lock); + cthreads_mutex_unlock(&ua->connq->lock); } struct user_agent * @@ -543,7 +543,7 @@ ua_init(struct ua_attr *attr) QUEUE_INIT(&new_ua->connq->idle); QUEUE_INIT(&new_ua->connq->busy); - if (pthread_mutex_init(&new_ua->connq->lock, NULL)) { + if (cthreads_mutex_init(&new_ua->connq->lock, NULL)) { logconf_fatal(&new_ua->conf, "Couldn't initialize mutex"); abort(); } @@ -569,7 +569,7 @@ ua_cleanup(struct user_agent *ua) _ua_conn_cleanup(conn); } } - pthread_mutex_destroy(&ua->connq->lock); + cthreads_mutex_destroy(&ua->connq->lock); free(ua->connq); /* cleanup logging module */ diff --git a/core/websockets.c b/core/websockets.c index 982400fbd..5a31e2b09 100644 --- a/core/websockets.c +++ b/core/websockets.c @@ -1,12 +1,12 @@ #include #include #include -#include #include "curl-websocket.h" #include "websockets.h" #include "cog-utils.h" +#include "cthreads.h" #define CURLM_LOG(ws, mcode) \ logconf_fatal(&ws->conf, "(CURLM code: %d) %s", mcode, \ @@ -48,9 +48,9 @@ struct websockets { */ char errbuf[CURL_ERROR_SIZE]; /** lock for functions that may be called in other threads */ - pthread_mutex_t lock; + struct cthreads_mutex lock; /** lock for reading/writing the event-loop timestamp */ - pthread_rwlock_t rwlock; + struct cthreads_rwlock rwlock; /** * user-triggered actions @@ -181,9 +181,9 @@ _ws_set_status_nolock(struct websockets *ws, enum ws_status status) static void _ws_set_status(struct websockets *ws, enum ws_status status) { - pthread_mutex_lock(&ws->lock); + cthreads_mutex_lock(&ws->lock); _ws_set_status_nolock(ws, status); - pthread_mutex_unlock(&ws->lock); + cthreads_mutex_unlock(&ws->lock); } static void @@ -345,7 +345,7 @@ _ws_check_action_cb(void *p_userdata, (void)ultotal; (void)ulnow; - pthread_mutex_lock(&ws->lock); + cthreads_mutex_lock(&ws->lock); switch (ws->action) { case WS_ACTION_BEGIN_CLOSE: logconf_warn(&ws->conf, @@ -364,7 +364,7 @@ _ws_check_action_cb(void *p_userdata, break; } ws->action = WS_ACTION_NONE; - pthread_mutex_unlock(&ws->lock); + cthreads_mutex_unlock(&ws->lock); return ret; } @@ -456,9 +456,9 @@ ws_get_status(struct websockets *ws) { enum ws_status status; - pthread_mutex_lock(&ws->lock); + cthreads_mutex_lock(&ws->lock); status = ws->status; - pthread_mutex_unlock(&ws->lock); + cthreads_mutex_unlock(&ws->lock); return status; } @@ -494,10 +494,10 @@ ws_init(struct ws_callbacks *cbs, CURLM *mhandle, struct ws_attr *attr) /** respond ping with a pong by default */ if (!new_ws->cbs.on_ping) new_ws->cbs.on_ping = &default_on_ping; - if (pthread_mutex_init(&new_ws->lock, NULL)) - ERR("[%s] Couldn't initialize pthread mutex", new_ws->conf.id); - if (pthread_rwlock_init(&new_ws->rwlock, NULL)) - ERR("[%s] Couldn't initialize pthread rwlock", new_ws->conf.id); + if (cthreads_mutex_init(&new_ws->lock, NULL)) + ERR("[%s] Couldn't initialize cthreads mutex", new_ws->conf.id); + if (cthreads_rwlock_init(&new_ws->rwlock)) + ERR("[%s] Couldn't initialize cthreads rwlock", new_ws->conf.id); return new_ws; } @@ -509,7 +509,7 @@ ws_set_url(struct websockets *ws, { size_t len; - pthread_mutex_lock(&ws->lock); + cthreads_mutex_lock(&ws->lock); if (!*ws->base_url) logconf_debug(&ws->conf, "Websockets new URL: %s", base_url); @@ -529,15 +529,15 @@ ws_set_url(struct websockets *ws, "[%s] Out of bounds write attempt", ws->conf.id); } - pthread_mutex_unlock(&ws->lock); + cthreads_mutex_unlock(&ws->lock); } void ws_cleanup(struct websockets *ws) { if (ws->ehandle) cws_free(ws->ehandle); - pthread_mutex_destroy(&ws->lock); - pthread_rwlock_destroy(&ws->rwlock); + cthreads_mutex_destroy(&ws->lock); + cthreads_rwlock_destroy(&ws->rwlock); free(ws); } @@ -815,9 +815,9 @@ ws_timestamp(struct websockets *ws) { uint64_t now_tstamp; - pthread_rwlock_rdlock(&ws->rwlock); + cthreads_rwlock_rdlock(&ws->rwlock); now_tstamp = ws->now_tstamp; - pthread_rwlock_unlock(&ws->rwlock); + cthreads_rwlock_unlock(&ws->rwlock); return now_tstamp; } @@ -827,9 +827,9 @@ ws_timestamp_update(struct websockets *ws) { uint64_t now_tstamp; - pthread_rwlock_wrlock(&ws->rwlock); + cthreads_rwlock_wrlock(&ws->rwlock); now_tstamp = ws->now_tstamp = cog_timestamp_ms(); - pthread_rwlock_unlock(&ws->rwlock); + cthreads_rwlock_unlock(&ws->rwlock); return now_tstamp; } @@ -844,14 +844,14 @@ ws_close(struct websockets *ws, "Attempting to close WebSockets connection with %s : %.*s", ws_close_opcode_print(code), (int)len, reason); - pthread_mutex_lock(&ws->lock); + cthreads_mutex_lock(&ws->lock); ws->action = WS_ACTION_BEGIN_CLOSE; ws->pending_close.code = code; snprintf(ws->pending_close.reason, sizeof(ws->pending_close.reason), "%.*s", (int)len, reason); - pthread_mutex_unlock(&ws->lock); + cthreads_mutex_unlock(&ws->lock); } void diff --git a/include/discord-internal.h b/include/discord-internal.h index 0c580c77a..c256ff8ed 100644 --- a/include/discord-internal.h +++ b/include/discord-internal.h @@ -28,6 +28,7 @@ extern "C" { #include "io_poller.h" #include "queue.h" #include "priority_queue.h" +#include "cthreads.h" /** @brief Return 1 if string isn't considered empty */ #define NOT_EMPTY_STR(str) ((str) && *(str)) @@ -89,12 +90,12 @@ struct discord_timers { struct io_poller *io; struct { bool is_active; - pthread_t thread; + struct cthreads_thread thread; struct discord_timer *timer; bool skip_update_phase; } active; - pthread_mutex_t lock; - pthread_cond_t cond; + struct cthreads_mutex lock; + struct cthreads_cond cond; }; /** @@ -459,7 +460,7 @@ struct discord_request { /** current retry attempt (stop at rest->retry_limit) */ int retry_attempt; /** synchronize synchronous requests */ - pthread_cond_t *cond; + struct cthreads_cond *cond; /** entry for @ref discord_ratelimiter and @ref discord_bucket queues */ QUEUE entry; }; @@ -494,11 +495,11 @@ struct discord_requestor { /** queue locks */ struct { /** recycling queue lock */ - pthread_mutex_t recycling; + struct cthreads_mutex recycling; /** pending queue lock */ - pthread_mutex_t pending; + struct cthreads_mutex pending; /** finished queue lock */ - pthread_mutex_t finished; + struct cthreads_mutex finished; } * qlocks; }; @@ -775,7 +776,7 @@ struct discord_gateway { */ int ping_ms; /** ping rwlock */ - pthread_rwlock_t rwlock; + struct cthreads_rwlock rwlock; } * timer; /** the identify structure for client authentication */ @@ -944,7 +945,7 @@ struct discord_refcounter { */ struct _discord_ref *refs; /** global lock */ - pthread_mutex_t *g_lock; + struct cthreads_mutex *g_lock; }; /** @@ -1228,9 +1229,9 @@ struct discord { /** amount of worker-threads currently being used by client */ int count; /** synchronize `count` between workers */ - pthread_mutex_t lock; + struct cthreads_mutex lock; /** notify of `count` decrement */ - pthread_cond_t cond; + struct cthreads_cond cond; } * workers; #ifdef CCORD_VOICE diff --git a/src/Makefile b/src/Makefile index 5d3777933..0d2ef8a45 100644 --- a/src/Makefile +++ b/src/Makefile @@ -19,7 +19,8 @@ CORE_OBJS = $(CORE_DIR)/cog-utils.o \ $(CORE_DIR)/priority_queue.o \ $(CORE_DIR)/anomap.o \ $(CORE_DIR)/sha1.o \ - $(CORE_DIR)/threadpool.o + $(CORE_DIR)/threadpool.o \ + $(CORE_DIR)/cthreads.o GENCODECS_OBJ = $(GENCODECS_DIR)/discord_codecs.o VOICE_OBJS = discord-voice.o diff --git a/src/discord-cache.c b/src/discord-cache.c index 1a8d02b27..cc992227b 100644 --- a/src/discord-cache.c +++ b/src/discord-cache.c @@ -1,4 +1,3 @@ -#include #include #include "discord.h" @@ -21,7 +20,7 @@ _calculate_shard(u64snowflake guild_id, int total_shards) } struct _discord_shard_cache { - pthread_mutex_t lock; + struct cthreads_mutex lock; bool valid; struct anomap *guild_map; struct anomap *msg_map; @@ -38,7 +37,7 @@ static void _discord_shard_cache_cleanup(struct discord *client, struct _discord_shard_cache *cache) { - pthread_mutex_lock(&cache->lock); + cthreads_mutex_lock(&cache->lock); for (size_t i = 0; i < anomap_length(cache->guild_map); i++) { struct discord_guild *guild; anomap_at_index(cache->guild_map, i, NULL, &guild); @@ -51,7 +50,7 @@ _discord_shard_cache_cleanup(struct discord *client, } anomap_clear(cache->guild_map); anomap_clear(cache->msg_map); - pthread_mutex_unlock(&cache->lock); + cthreads_mutex_unlock(&cache->lock); } #define EV_CB(name, data) \ @@ -61,18 +60,18 @@ _discord_shard_cache_cleanup(struct discord *client, struct _discord_cache_data *const DATA = client->cache.data; \ const int SHARD = _calculate_shard(GUILD_ID, DATA->total_shards); \ struct _discord_shard_cache *const cache = &data->caches[SHARD]; \ - pthread_mutex_lock(&CACHE->lock) + cthreads_mutex_lock(&CACHE->lock) -#define CACHE_END(CACHE) pthread_mutex_unlock(&CACHE->lock) +#define CACHE_END(CACHE) cthreads_mutex_unlock(&CACHE->lock) EV_CB(ready, discord_ready) { int shard = ev->shard ? ev->shard->array[0] : 0; struct _discord_cache_data *data = client->cache.data; struct _discord_shard_cache *cache = &data->caches[shard]; - pthread_mutex_lock(&cache->lock); + cthreads_mutex_lock(&cache->lock); cache->valid = true; - pthread_mutex_unlock(&cache->lock); + cthreads_mutex_unlock(&cache->lock); } static void @@ -81,9 +80,9 @@ _on_shard_resumed(struct discord *client, const struct discord_identify *ev) int shard = ev->shard ? ev->shard->array[0] : 0; struct _discord_cache_data *data = client->cache.data; struct _discord_shard_cache *cache = &data->caches[shard]; - pthread_mutex_lock(&cache->lock); + cthreads_mutex_lock(&cache->lock); cache->valid = true; - pthread_mutex_unlock(&cache->lock); + cthreads_mutex_unlock(&cache->lock); } static void @@ -94,9 +93,9 @@ _on_shard_reconnected(struct discord *client, struct _discord_cache_data *data = client->cache.data; struct _discord_shard_cache *cache = &data->caches[shard]; _discord_shard_cache_cleanup(client, cache); - pthread_mutex_lock(&cache->lock); + cthreads_mutex_lock(&cache->lock); cache->valid = true; - pthread_mutex_unlock(&cache->lock); + cthreads_mutex_unlock(&cache->lock); } static void @@ -108,9 +107,9 @@ _on_shard_disconnected(struct discord *client, struct _discord_cache_data *data = client->cache.data; struct _discord_shard_cache *cache = &data->caches[shard]; if (!resumable) _discord_shard_cache_cleanup(client, cache); - pthread_mutex_lock(&cache->lock); + cthreads_mutex_lock(&cache->lock); cache->valid = false; - pthread_mutex_unlock(&cache->lock); + cthreads_mutex_unlock(&cache->lock); } #define GUILD_BEGIN(guild) \ @@ -206,7 +205,7 @@ _on_garbage_collection(struct discord *client, struct discord_timer *timer) struct _discord_cache_data *data = timer->data; for (int i = 0; i < data->total_shards; i++) { struct _discord_shard_cache *const cache = &data->caches[i]; - pthread_mutex_lock(&cache->lock); + cthreads_mutex_lock(&cache->lock); { // DELETE MESSAGES u64snowflake delete_before = ((cog_timestamp_ms() - DISCORD_EPOCH) - 10 * 60 * 1000) << 22; @@ -223,7 +222,7 @@ _on_garbage_collection(struct discord *client, struct discord_timer *timer) (void *)vals[j]); } } // !DELETE MESSAGES - pthread_mutex_unlock(&cache->lock); + cthreads_mutex_unlock(&cache->lock); } timer->repeat = 1; timer->interval = 1000 * 60; @@ -238,7 +237,7 @@ _discord_cache_cleanup(struct discord *client) _discord_shard_cache_cleanup(client, cache); anomap_destroy(cache->guild_map); anomap_destroy(cache->msg_map); - pthread_mutex_destroy(&cache->lock); + cthreads_mutex_destroy(&cache->lock); } free(data->caches); discord_internal_timer_ctl(client, @@ -267,7 +266,7 @@ discord_cache_enable(struct discord *client, data->caches = calloc(nshards, sizeof *data->caches); for (int i = 0; i < data->total_shards; i++) { struct _discord_shard_cache *cache = &data->caches[i]; - pthread_mutex_init(&cache->lock, NULL); + cthreads_mutex_init(&cache->lock, NULL); cache->guild_map = anomap_create(sizeof(u64snowflake), sizeof(void *), cmp_sf); cache->msg_map = @@ -314,13 +313,13 @@ discord_cache_get_channel_message(struct discord *client, for (int i = 0; i < data->total_shards; i++) { struct _discord_shard_cache *cache = &data->caches[i]; struct discord_message *message = NULL; - pthread_mutex_lock(&cache->lock); + cthreads_mutex_lock(&cache->lock); anomap_do(cache->msg_map, anomap_getval, &message_id, &message); const bool found = message; const bool valid = cache->valid; if (found && message->channel_id != channel_id) message = NULL; if (message && valid) (void)discord_claim(client, message); - pthread_mutex_unlock(&cache->lock); + cthreads_mutex_unlock(&cache->lock); if (found) return valid ? message : NULL; } return NULL; @@ -334,11 +333,11 @@ discord_cache_get_guild(struct discord *client, u64snowflake guild_id) for (int i = 0; i < data->total_shards; i++) { struct _discord_shard_cache *cache = &data->caches[i]; struct discord_guild *guild = NULL; - pthread_mutex_lock(&cache->lock); + cthreads_mutex_lock(&cache->lock); anomap_do(cache->guild_map, anomap_getval, &guild_id, &guild); const bool valid = cache->valid; if (guild && valid) (void)discord_claim(client, guild); - pthread_mutex_unlock(&cache->lock); + cthreads_mutex_unlock(&cache->lock); if (guild) return valid ? guild : NULL; } return NULL; diff --git a/src/discord-client.c b/src/discord-client.c index 10a350223..6579dc1b2 100644 --- a/src/discord-client.c +++ b/src/discord-client.c @@ -59,9 +59,9 @@ _discord_init(struct discord *new_client) discord_timers_init(&new_client->timers.user, new_client->io_poller); new_client->workers = calloc(1, sizeof *new_client->workers); - ASSERT_S(!pthread_mutex_init(&new_client->workers->lock, NULL), + ASSERT_S(!cthreads_mutex_init(&new_client->workers->lock, NULL), "Couldn't initialize Client's mutex"); - ASSERT_S(!pthread_cond_init(&new_client->workers->cond, NULL), + ASSERT_S(!cthreads_cond_init(&new_client->workers->cond, NULL), "Couldn't initialize Client's cond"); discord_refcounter_init(&new_client->refcounter, &new_client->conf); @@ -229,8 +229,8 @@ discord_cleanup(struct discord *client) io_poller_destroy(client->io_poller); logconf_cleanup(&client->conf); if (client->token) free(client->token); - pthread_mutex_destroy(&client->workers->lock); - pthread_cond_destroy(&client->workers->cond); + cthreads_mutex_destroy(&client->workers->lock); + cthreads_cond_destroy(&client->workers->cond); free(client->workers); } free(client); @@ -310,9 +310,9 @@ discord_get_ping(struct discord *client) { int ping_ms; - pthread_rwlock_rdlock(&client->gw.timer->rwlock); + cthreads_rwlock_rdlock(&client->gw.timer->rwlock); ping_ms = client->gw.timer->ping_ms; - pthread_rwlock_unlock(&client->gw.timer->rwlock); + cthreads_rwlock_unlock(&client->gw.timer->rwlock); return ping_ms; } diff --git a/src/discord-gateway.c b/src/discord-gateway.c index 0a8754925..df811165f 100644 --- a/src/discord-gateway.c +++ b/src/discord-gateway.c @@ -317,9 +317,9 @@ static void _discord_on_heartbeat_ack(struct discord_gateway *gw) { /* get request / response interval in milliseconds */ - pthread_rwlock_wrlock(&gw->timer->rwlock); + cthreads_rwlock_wrlock(&gw->timer->rwlock); gw->timer->ping_ms = (int)(gw->timer->now - gw->timer->hbeat_last); - pthread_rwlock_unlock(&gw->timer->rwlock); + cthreads_rwlock_unlock(&gw->timer->rwlock); logconf_trace(&gw->conf, "PING: %d ms", gw->timer->ping_ms); } @@ -548,7 +548,7 @@ discord_gateway_init(struct discord_gateway *gw, logconf_branch(&gw->conf, conf, "DISCORD_GATEWAY"); gw->timer = calloc(1, sizeof *gw->timer); - ASSERT_S(!pthread_rwlock_init(&gw->timer->rwlock, NULL), + ASSERT_S(!cthreads_rwlock_init(&gw->timer->rwlock), "Couldn't initialize Gateway's rwlock"); /* client connection status */ @@ -589,7 +589,7 @@ discord_gateway_cleanup(struct discord_gateway *gw) curl_multi_cleanup(gw->mhandle); ws_cleanup(gw->ws); /* cleanup timers */ - pthread_rwlock_destroy(&gw->timer->rwlock); + cthreads_rwlock_destroy(&gw->timer->rwlock); free(gw->timer); /* cleanup bot identification */ free(gw->id.properties); diff --git a/src/discord-refcount.c b/src/discord-refcount.c index 8e67cfdd9..67868c2c3 100644 --- a/src/discord-refcount.c +++ b/src/discord-refcount.c @@ -100,7 +100,7 @@ discord_refcounter_init(struct discord_refcounter *rc, struct logconf *conf) __chash_init(rc, REFCOUNTER_TABLE); rc->g_lock = malloc(sizeof *rc->g_lock); - ASSERT_S(!pthread_mutex_init(rc->g_lock, NULL), + ASSERT_S(!cthreads_mutex_init(rc->g_lock, NULL), "Couldn't initialize refcounter mutex"); } @@ -108,7 +108,7 @@ void discord_refcounter_cleanup(struct discord_refcounter *rc) { __chash_free(rc, REFCOUNTER_TABLE); - pthread_mutex_destroy(rc->g_lock); + cthreads_mutex_destroy(rc->g_lock); free(rc->g_lock); } @@ -172,14 +172,14 @@ discord_refcounter_claim(struct discord_refcounter *rc, const void *data) { CCORDcode code = CCORD_UNAVAILABLE; - pthread_mutex_lock(rc->g_lock); + cthreads_mutex_lock(rc->g_lock); if (_discord_refcounter_contains(rc, data)) { struct _discord_refvalue *value = _discord_refvalue_find(rc, data); ++value->claims; code = _discord_refcounter_incr_no_lock(rc, (void *)data); } - pthread_mutex_unlock(rc->g_lock); + cthreads_mutex_unlock(rc->g_lock); return code; } @@ -188,7 +188,7 @@ discord_refcounter_unclaim(struct discord_refcounter *rc, void *data) { CCORDcode code = CCORD_UNAVAILABLE; - pthread_mutex_lock(rc->g_lock); + cthreads_mutex_lock(rc->g_lock); if (_discord_refcounter_contains(rc, data)) { struct _discord_refvalue *value = _discord_refvalue_find(rc, data); @@ -200,7 +200,7 @@ discord_refcounter_unclaim(struct discord_refcounter *rc, void *data) code = _discord_refcounter_decr_no_lock(rc, data); } } - pthread_mutex_unlock(rc->g_lock); + cthreads_mutex_unlock(rc->g_lock); return code; } @@ -211,14 +211,14 @@ discord_refcounter_add_internal(struct discord_refcounter *rc, void (*cleanup)(void *data), bool should_free) { - pthread_mutex_lock(rc->g_lock); + cthreads_mutex_lock(rc->g_lock); _discord_refvalue_init(rc, data, &(struct _discord_refvalue){ .expects_client = false, .cleanup.internal = cleanup, .should_free = should_free, }); - pthread_mutex_unlock(rc->g_lock); + cthreads_mutex_unlock(rc->g_lock); } void @@ -228,23 +228,23 @@ discord_refcounter_add_client(struct discord_refcounter *rc, void *data), bool should_free) { - pthread_mutex_lock(rc->g_lock); + cthreads_mutex_lock(rc->g_lock); _discord_refvalue_init(rc, data, &(struct _discord_refvalue){ .expects_client = true, .cleanup.client = cleanup, .should_free = should_free, }); - pthread_mutex_unlock(rc->g_lock); + cthreads_mutex_unlock(rc->g_lock); } CCORDcode discord_refcounter_incr(struct discord_refcounter *rc, void *data) { CCORDcode code; - pthread_mutex_lock(rc->g_lock); + cthreads_mutex_lock(rc->g_lock); code = _discord_refcounter_incr_no_lock(rc, data); - pthread_mutex_unlock(rc->g_lock); + cthreads_mutex_unlock(rc->g_lock); return code; } @@ -252,8 +252,8 @@ CCORDcode discord_refcounter_decr(struct discord_refcounter *rc, void *data) { CCORDcode code; - pthread_mutex_lock(rc->g_lock); + cthreads_mutex_lock(rc->g_lock); code = _discord_refcounter_decr_no_lock(rc, data); - pthread_mutex_unlock(rc->g_lock); + cthreads_mutex_unlock(rc->g_lock); return code; } diff --git a/src/discord-rest_ratelimit.c b/src/discord-rest_ratelimit.c index 69b3567f2..1ad3ee75e 100644 --- a/src/discord-rest_ratelimit.c +++ b/src/discord-rest_ratelimit.c @@ -171,9 +171,9 @@ _discord_bucket_cancel_all(struct discord_ratelimiter *rl, if (b->busy_req) discord_request_cancel(rqtor, b->busy_req); /* move pending tranfers to recycling */ - pthread_mutex_lock(&rqtor->qlocks->recycling); + cthreads_mutex_lock(&rqtor->qlocks->recycling); QUEUE_ADD(&rqtor->queues->recycling, &b->queues.next); - pthread_mutex_unlock(&rqtor->qlocks->recycling); + cthreads_mutex_unlock(&rqtor->qlocks->recycling); QUEUE_INIT(&b->queues.next); } diff --git a/src/discord-rest_request.c b/src/discord-rest_request.c index 5872a6b82..9ceda1fee 100644 --- a/src/discord-rest_request.c +++ b/src/discord-rest_request.c @@ -52,11 +52,11 @@ discord_requestor_init(struct discord_requestor *rqtor, QUEUE_INIT(&rqtor->queues->finished); rqtor->qlocks = malloc(sizeof *rqtor->qlocks); - ASSERT_S(!pthread_mutex_init(&rqtor->qlocks->recycling, NULL), + ASSERT_S(!cthreads_mutex_init(&rqtor->qlocks->recycling, NULL), "Couldn't initialize requestor's recycling queue mutex"); - ASSERT_S(!pthread_mutex_init(&rqtor->qlocks->pending, NULL), + ASSERT_S(!cthreads_mutex_init(&rqtor->qlocks->pending, NULL), "Couldn't initialize requestor's pending queue mutex"); - ASSERT_S(!pthread_mutex_init(&rqtor->qlocks->finished, NULL), + ASSERT_S(!cthreads_mutex_init(&rqtor->qlocks->finished, NULL), "Couldn't initialize requestor's finished queue mutex"); rqtor->mhandle = curl_multi_init(); @@ -94,9 +94,9 @@ discord_requestor_cleanup(struct discord_requestor *rqtor) free(rqtor->queues); /* cleanup queue locks */ - pthread_mutex_destroy(&rqtor->qlocks->recycling); - pthread_mutex_destroy(&rqtor->qlocks->pending); - pthread_mutex_destroy(&rqtor->qlocks->finished); + cthreads_mutex_destroy(&rqtor->qlocks->recycling); + cthreads_mutex_destroy(&rqtor->qlocks->pending); + cthreads_mutex_destroy(&rqtor->qlocks->finished); free(rqtor->qlocks); /* cleanup curl's multi handle */ @@ -278,9 +278,9 @@ discord_request_cancel(struct discord_requestor *rqtor, memset(req, 0, sizeof(struct discord_attributes)); QUEUE_REMOVE(&req->entry); - pthread_mutex_lock(&rqtor->qlocks->recycling); + cthreads_mutex_lock(&rqtor->qlocks->recycling); QUEUE_INSERT_TAIL(&rqtor->queues->recycling, &req->entry); - pthread_mutex_unlock(&rqtor->qlocks->recycling); + cthreads_mutex_unlock(&rqtor->qlocks->recycling); } static CCORDcode @@ -313,10 +313,10 @@ _discord_request_dispatch_response(struct discord_requestor *rqtor, void discord_requestor_dispatch_responses(struct discord_requestor *rqtor) { - if (0 == pthread_mutex_trylock(&rqtor->qlocks->finished)) { + if (0 == cthreads_mutex_trylock(&rqtor->qlocks->finished)) { QUEUE(struct discord_request) queue; QUEUE_MOVE(&rqtor->queues->finished, &queue); - pthread_mutex_unlock(&rqtor->qlocks->finished); + cthreads_mutex_unlock(&rqtor->qlocks->finished); if (!QUEUE_EMPTY(&queue)) { struct discord_rest *rest = @@ -432,14 +432,14 @@ discord_requestor_info_read(struct discord_requestor *rqtor) req); if (req->dispatch.sync) { - pthread_mutex_lock(&rqtor->qlocks->pending); - pthread_cond_signal(req->cond); - pthread_mutex_unlock(&rqtor->qlocks->pending); + cthreads_mutex_lock(&rqtor->qlocks->pending); + cthreads_cond_signal(req->cond); + cthreads_mutex_unlock(&rqtor->qlocks->pending); } else { - pthread_mutex_lock(&rqtor->qlocks->finished); + cthreads_mutex_lock(&rqtor->qlocks->finished); QUEUE_INSERT_TAIL(&rqtor->queues->finished, &req->entry); - pthread_mutex_unlock(&rqtor->qlocks->finished); + cthreads_mutex_unlock(&rqtor->qlocks->finished); } } } @@ -491,9 +491,9 @@ discord_requestor_start_pending(struct discord_requestor *rqtor) struct discord_request *req; struct discord_bucket *b; - pthread_mutex_lock(&rqtor->qlocks->pending); + cthreads_mutex_lock(&rqtor->qlocks->pending); QUEUE_MOVE(&rqtor->queues->pending, &queue); - pthread_mutex_unlock(&rqtor->qlocks->pending); + cthreads_mutex_unlock(&rqtor->qlocks->pending); /* match pending requests to their buckets */ while (!QUEUE_EMPTY(&queue)) { @@ -562,7 +562,7 @@ _discord_request_get(struct discord_requestor *rqtor) { struct discord_request *req; - pthread_mutex_lock(&rqtor->qlocks->recycling); + cthreads_mutex_lock(&rqtor->qlocks->recycling); if (QUEUE_EMPTY(&rqtor->queues->recycling)) { /* new request struct */ req = _discord_request_init(); } @@ -573,7 +573,7 @@ _discord_request_get(struct discord_requestor *rqtor) QUEUE_REMOVE(qelem); req = QUEUE_DATA(qelem, struct discord_request, entry); } - pthread_mutex_unlock(&rqtor->qlocks->recycling); + cthreads_mutex_unlock(&rqtor->qlocks->recycling); QUEUE_INIT(&req->entry); @@ -627,19 +627,20 @@ discord_request_begin(struct discord_requestor *rqtor, req->dispatch.cleanup, false); } - pthread_mutex_lock(&rqtor->qlocks->pending); + cthreads_mutex_lock(&rqtor->qlocks->pending); QUEUE_INSERT_TAIL(&rqtor->queues->pending, &req->entry); io_poller_wakeup(rest->io_poller); if (!req->dispatch.sync) { - pthread_mutex_unlock(&rqtor->qlocks->pending); + cthreads_mutex_unlock(&rqtor->qlocks->pending); code = CCORD_PENDING; } else { /* wait for request's completion if sync mode is active */ - pthread_cond_t temp_cond = PTHREAD_COND_INITIALIZER; - req->cond = &temp_cond; - pthread_cond_wait(req->cond, &rqtor->qlocks->pending); - req->cond = NULL; - pthread_mutex_unlock(&rqtor->qlocks->pending); + struct cthreads_cond cond; + cthreads_cond_init(&cond, NULL); + req->cond = &cond; + cthreads_cond_wait(req->cond, &rqtor->qlocks->pending); + cthreads_cond_destroy(&cond); + cthreads_mutex_unlock(&rqtor->qlocks->pending); code = _discord_request_dispatch_response(rqtor, req); } return code; diff --git a/src/discord-timer.c b/src/discord-timer.c index 3cda106a6..c9e8c7d3d 100644 --- a/src/discord-timer.c +++ b/src/discord-timer.c @@ -32,8 +32,8 @@ discord_timers_init(struct discord_timers *timers, struct io_poller *io) timers->q = priority_queue_create( sizeof(int64_t), sizeof(struct discord_timer), cmp_timers, 0); timers->io = io; - pthread_mutex_init(&timers->lock, NULL); - pthread_cond_init(&timers->cond, NULL); + cthreads_mutex_init(&timers->lock, NULL); + cthreads_cond_init(&timers->cond, NULL); } static void @@ -52,8 +52,8 @@ discord_timers_cleanup(struct discord *client, struct discord_timers *timers) { priority_queue_set_max_capacity(timers->q, 0); discord_timers_cancel_all(client, timers); - pthread_cond_destroy(&timers->cond); - pthread_mutex_destroy(&timers->lock); + cthreads_cond_destroy(&timers->cond); + cthreads_mutex_destroy(&timers->lock); priority_queue_destroy(timers->q); memset(timers, 0, sizeof *timers); } @@ -68,7 +68,7 @@ discord_timers_get_next_trigger(struct discord_timers *const timers[], for (unsigned i = 0; i < n; i++) { int64_t trigger; - if (0 != pthread_mutex_trylock(&timers[i]->lock)) return 0; + if (0 != cthreads_mutex_trylock(&timers[i]->lock)) return 0; if (priority_queue_peek(timers[i]->q, &trigger, NULL)) { if (trigger < 0) goto unlock; @@ -79,7 +79,7 @@ discord_timers_get_next_trigger(struct discord_timers *const timers[], max_time = trigger - now; } unlock: - pthread_mutex_unlock(&timers[i]->lock); + cthreads_mutex_unlock(&timers[i]->lock); } return max_time; } @@ -128,16 +128,16 @@ _discord_timer_ctl_no_lock(struct discord *client, #define LOCK_TIMERS(timers) \ do { \ - pthread_mutex_lock(&timers.lock); \ + cthreads_mutex_lock(&timers.lock); \ if (timers.active.is_active \ - && !pthread_equal(pthread_self(), timers.active.thread)) \ - pthread_cond_wait(&timers.cond, &timers.lock); \ + && !cthreads_equal(cthreads_self(), timers.active.thread)) \ + cthreads_cond_wait(&timers.cond, &timers.lock); \ } while (0); #define UNLOCK_TIMERS(timers) \ do { \ bool should_wakeup = !timers.active.is_active; \ - pthread_mutex_unlock(&timers.lock); \ + cthreads_mutex_unlock(&timers.lock); \ if (should_wakeup) io_poller_wakeup(timers.io); \ } while (0) @@ -157,9 +157,9 @@ _discord_timer_ctl(struct discord *client, if (timer.flags & DISCORD_TIMER_DELETE) { \ priority_queue_del(timers->q, timer.id); \ if (timer.on_status_changed) { \ - pthread_mutex_unlock(&timers->lock); \ + cthreads_mutex_unlock(&timers->lock); \ timer.on_status_changed(client, &timer); \ - pthread_mutex_lock(&timers->lock); \ + cthreads_mutex_lock(&timers->lock); \ } \ timers->active.skip_update_phase = false; \ continue; \ @@ -171,9 +171,9 @@ discord_timers_run(struct discord *client, struct discord_timers *timers) int64_t now = (int64_t)discord_timestamp_us(client); const int64_t start_time = now; - pthread_mutex_lock(&timers->lock); + cthreads_mutex_lock(&timers->lock); timers->active.is_active = true; - timers->active.thread = pthread_self(); + timers->active.thread = cthreads_self(); struct discord_timer timer; timers->active.timer = &timer; @@ -206,9 +206,9 @@ discord_timers_run(struct discord *client, struct discord_timers *timers) enum discord_timer_flags prev_flags = timer.flags; if (cb) { - pthread_mutex_unlock(&timers->lock); + cthreads_mutex_unlock(&timers->lock); cb(client, &timer); - pthread_mutex_lock(&timers->lock); + cthreads_mutex_lock(&timers->lock); } timer.flags &= ~(enum discord_timer_flags)DISCORD_TIMER_TICK; @@ -252,8 +252,8 @@ discord_timers_run(struct discord *client, struct discord_timers *timers) timers->active.is_active = false; timers->active.timer = NULL; - pthread_cond_broadcast(&timers->cond); - pthread_mutex_unlock(&timers->lock); + cthreads_cond_broadcast(&timers->cond); + cthreads_mutex_unlock(&timers->lock); } unsigned diff --git a/src/discord-voice.c b/src/discord-voice.c index cceb2bd69..1a9dff723 100644 --- a/src/discord-voice.c +++ b/src/discord-voice.c @@ -126,7 +126,7 @@ struct discord_voice { return #code /* TODO: use a per-client lock instead */ -static pthread_mutex_t client_lock = PTHREAD_MUTEX_INITIALIZER; +struct cthreads_mutex client_lock; static const char * opcode_print(enum discord_voice_opcodes opcode) @@ -643,6 +643,8 @@ discord_voice_join(struct discord *client, bool self_mute, bool self_deaf) { + cthreads_mutex_init(&client_lock, NULL); + struct discord_update_voice_state state = { .guild_id = guild_id, .channel_id = vchannel_id, .self_mute = self_mute, @@ -652,7 +654,7 @@ discord_voice_join(struct discord *client, if (!ws_is_functional(client->gw.ws)) return DISCORD_VOICE_ERROR; - pthread_mutex_lock(&client_lock); + cthreads_mutex_lock(&client_lock); for (int i = 0; i < DISCORD_MAX_VCS; ++i) { if (0 == client->vcs[i].guild_id) { vc = client->vcs + i; @@ -667,7 +669,7 @@ discord_voice_join(struct discord *client, break; } } - pthread_mutex_unlock(&client_lock); + cthreads_mutex_unlock(&client_lock); if (!vc) { logconf_error(&client->conf, @@ -697,7 +699,7 @@ _discord_on_voice_state_update(struct discord *client, { struct discord_voice *vc = NULL; - pthread_mutex_lock(&client_lock); + cthreads_mutex_lock(&client_lock); for (int i = 0; i < DISCORD_MAX_VCS; ++i) { if (event->guild_id == client->vcs[i].guild_id) { vc = client->vcs + i; @@ -714,7 +716,7 @@ _discord_on_voice_state_update(struct discord *client, break; } } - pthread_mutex_unlock(&client_lock); + cthreads_mutex_unlock(&client_lock); if (!vc) { if (event->channel_id) { @@ -821,14 +823,14 @@ _discord_on_voice_server_update(struct discord *client, struct discord_voice *vc = NULL; int len; - pthread_mutex_lock(&client_lock); + cthreads_mutex_lock(&client_lock); for (int i = 0; i < DISCORD_MAX_VCS; ++i) { if (event->guild_id == client->vcs[i].guild_id) { vc = client->vcs + i; break; } } - pthread_mutex_unlock(&client_lock); + cthreads_mutex_unlock(&client_lock); if (!vc) { logconf_fatal(&client->conf, "Couldn't match voice server to client"); @@ -851,15 +853,15 @@ _discord_on_voice_server_update(struct discord *client, ws_close(vc->ws, WS_CLOSE_REASON_NORMAL, reason, sizeof(reason)); } else { - pthread_t tid; + struct cthreads_thread tid; memcpy(vc->token, vc->new_token, sizeof(vc->new_token)); ws_set_url(vc->ws, vc->new_url, NULL); /** TODO: replace with a threadpool */ - if (pthread_create(&tid, NULL, &start_voice_ws_thread, vc)) + if (cthreads_thread_create(&tid, NULL, &start_voice_ws_thread, vc)) ERR("Couldn't create thread"); - if (pthread_detach(tid)) ERR("Couldn't detach thread"); + if (cthreads_thread_detach(&tid)) ERR("Couldn't detach thread"); } } diff --git a/src/discord-worker.c b/src/discord-worker.c index d4381a016..67ffb7860 100644 --- a/src/discord-worker.c +++ b/src/discord-worker.c @@ -1,6 +1,5 @@ #include #include -#include #include #include "threadpool.h" @@ -57,16 +56,16 @@ _discord_worker_cb(void *p_cxt) { struct discord_worker_context *cxt = p_cxt; - pthread_mutex_lock(&cxt->client->workers->lock); + cthreads_mutex_lock(&cxt->client->workers->lock); ++cxt->client->workers->count; - pthread_mutex_unlock(&cxt->client->workers->lock); + cthreads_mutex_unlock(&cxt->client->workers->lock); cxt->callback(cxt->data); - pthread_mutex_lock(&cxt->client->workers->lock); + cthreads_mutex_lock(&cxt->client->workers->lock); --cxt->client->workers->count; - pthread_cond_signal(&cxt->client->workers->cond); - pthread_mutex_unlock(&cxt->client->workers->lock); + cthreads_cond_signal(&cxt->client->workers->cond); + cthreads_mutex_unlock(&cxt->client->workers->lock); free(cxt); } @@ -87,11 +86,11 @@ discord_worker_add(struct discord *client, CCORDcode discord_worker_join(struct discord *client) { - pthread_mutex_lock(&client->workers->lock); + cthreads_mutex_lock(&client->workers->lock); while (client->workers->count != 0) { - pthread_cond_wait(&client->workers->cond, &client->workers->lock); + cthreads_cond_wait(&client->workers->cond, &client->workers->lock); } - pthread_mutex_unlock(&client->workers->lock); + cthreads_mutex_unlock(&client->workers->lock); return CCORD_OK; }