From aa64f94f5e4dd6bc8c530cf66da0075fa3d85e5e Mon Sep 17 00:00:00 2001 From: Antonis Geralis Date: Sun, 24 Sep 2023 12:12:59 +0300 Subject: [PATCH 1/3] New synchronization primitives --- tests/tbarrier.nim | 28 ++++++++++ tests/tonce.nim | 12 +++++ tests/tonce2.nim | 35 ++++++++++++ tests/trwlock.nim | 29 ++++++++++ tests/trwlock1.nim | 31 +++++++++++ tests/tsemaphore.nim | 40 ++++++++++++++ tests/tsemaphore1.nim | 28 ++++++++++ threading/barrier.nim | 88 ++++++++++++++++++++++++++++++ threading/once.nim | 76 ++++++++++++++++++++++++++ threading/rwlock.nim | 115 ++++++++++++++++++++++++++++++++++++++++ threading/semaphore.nim | 84 +++++++++++++++++++++++++++++ 11 files changed, 566 insertions(+) create mode 100644 tests/tbarrier.nim create mode 100644 tests/tonce.nim create mode 100644 tests/tonce2.nim create mode 100644 tests/trwlock.nim create mode 100644 tests/trwlock1.nim create mode 100644 tests/tsemaphore.nim create mode 100644 tests/tsemaphore1.nim create mode 100644 threading/barrier.nim create mode 100644 threading/once.nim create mode 100644 threading/rwlock.nim create mode 100644 threading/semaphore.nim diff --git a/tests/tbarrier.nim b/tests/tbarrier.nim new file mode 100644 index 0000000..d70b03c --- /dev/null +++ b/tests/tbarrier.nim @@ -0,0 +1,28 @@ +import threading/barrier, std/[os, strformat] + +const + numThreads = 10 + numIters = 100 + +var + barrier: Barrier + phases: array[numThreads, int] + threads: array[numThreads, Thread[int]] + +proc routine(id: int) = + for i in 0 ..< numIters: + phases[id] = i + if (id + i) mod numThreads == 0: + sleep 1 + wait barrier + for j in 0 ..< numThreads: + assert phases[j] == i, &"{id} in phase {i} sees {j} in phase {phases[j]}" + wait barrier + +proc testBarrier = + init barrier, numThreads + for i in 0 ..< numThreads: + createThread(threads[i], routine, i) + joinThreads(threads) + +testBarrier() diff --git a/tests/tonce.nim b/tests/tonce.nim new file mode 100644 index 0000000..321a443 --- /dev/null +++ b/tests/tonce.nim @@ -0,0 +1,12 @@ +import threading/once + +var o: Once +proc smokeOnce() = + init o + var a = 0 + o.once(a += 1) + assert a == 1 + o.once(a += 1) + assert a == 1 + +smokeOnce() diff --git a/tests/tonce2.nim b/tests/tonce2.nim new file mode 100644 index 0000000..181d677 --- /dev/null +++ b/tests/tonce2.nim @@ -0,0 +1,35 @@ +import threading/once + +const + numThreads = 10 + maxIters = 1000 + +type + Singleton = object + data: int + +var + threads: array[numThreads, Thread[void]] + counter = 1 + instance: ptr Singleton + o: Once + +proc getInstance(): ptr Singleton = + once(o): + instance = createSharedU(Singleton) + instance.data = counter + inc counter + result = instance + +proc routine {.thread.} = + for i in 1 .. maxIters: + assert getInstance().data == 1 + +proc main = + init o + for i in 0 ..< numThreads: + createThread(threads[i], routine) + joinThreads(threads) + deallocShared(instance) + +main() diff --git a/tests/trwlock.nim b/tests/trwlock.nim new file mode 100644 index 0000000..37bc3ec --- /dev/null +++ b/tests/trwlock.nim @@ -0,0 +1,29 @@ +import threading/rwlock, std/os + +const + numThreads = 10 + numIters = 100 + +var + rw: RwLock + data = 0 + threads: array[numThreads, Thread[void]] + +proc routine = + for i in 0..= 0) + joinThreads(threads) + assert data == numIters * numThreads + +frob() diff --git a/tests/trwlock1.nim b/tests/trwlock1.nim new file mode 100644 index 0000000..8bf7e26 --- /dev/null +++ b/tests/trwlock1.nim @@ -0,0 +1,31 @@ +import threading/rwlock, std/[random, os] + +const + numThreads = 10 + numIters = 100 + +var + rw: RwLock + data = 0 + threads: array[numThreads, Thread[void]] + +proc routine = + var r = initRand(getThreadId()) + for i in 0..= 0 + +proc frob = + init rw + for i in 0..= 0) + joinThreads(threads) + assert data == 1000 + + +import std / locks + +type + RwLock* = object + ## Readers-writer lock. Multiple readers can acquire the lock at the same + ## time, but only one writer can acquire the lock at a time. + readPhase: Cond + writePhase: Cond + L: Lock + counter: int # can be in three states: free = 0, reading > 0, writing = -1 + +when defined(nimAllowNonVarDestructor): + proc `=destroy`*(rw: RwLock) {.inline.} = + let x = addr(rw) + deinitCond(x.readPhase) + deinitCond(x.writePhase) + deinitLock(x.L) +else: + proc `=destroy`*(rw: var RwLock) {.inline.} = + deinitCond(rw.readPhase) + deinitCond(rw.writePhase) + deinitLock(rw.L) + +proc `=sink`*(dest: var RwLock; source: RwLock) {.error.} +proc `=copy`*(dest: var RwLock; source: RwLock) {.error.} + +proc createRwLock*(): RwLock = + result = default(RwLock) + initCond(result.readPhase) + initCond(result.writePhase) + initLock(result.L) + +proc beginRead*(rw: var RwLock) = + ## Acquire a read lock. + acquire(rw.L) + while rw.counter == -1: + wait(rw.readPhase, rw.L) + inc rw.counter + release(rw.L) + +proc beginWrite*(rw: var RwLock) = + ## Acquire a write lock. + acquire(rw.L) + while rw.counter != 0: + wait(rw.writePhase, rw.L) + rw.counter = -1 + release(rw.L) + +proc endRead*(rw: var RwLock) {.inline.} = + ## Release a read lock. + acquire(rw.L) + dec rw.counter + if rw.counter == 0: + rw.writePhase.signal() + release(rw.L) + +proc endWrite*(rw: var RwLock) {.inline.} = + ## Release a write lock. + acquire(rw.L) + rw.counter = 0 + rw.readPhase.broadcast() + rw.writePhase.signal() + release(rw.L) + +template readWith*(a: RwLock, body: untyped) = + ## Acquire a read lock and execute `body`. Release the lock afterwards. + beginRead(a) + {.locks: [a].}: + try: + body + finally: + endRead(a) + +template writeWith*(a: RwLock, body: untyped) = + ## Acquire a write lock and execute `body`. Release the lock afterwards. + beginWrite(a) + {.locks: [a].}: + try: + body + finally: + endWrite(a) diff --git a/threading/semaphore.nim b/threading/semaphore.nim new file mode 100644 index 0000000..b47191c --- /dev/null +++ b/threading/semaphore.nim @@ -0,0 +1,84 @@ +# +# +# Nim's Runtime Library +# (c) Copyright 2023 Nim contributors +# +# See the file "copying.txt", included in this +# distribution, for details about the copyright. + +## Semaphore for Nim. + +runnableExamples: + + import std / os + + var arrived = createSemaphore(2) + + proc worker(i: int) = + echo i, " starts" + wait arrived + sleep 1 + echo i, " progresses" + signal arrived + + var threads: array[4, Thread[int]] + for i in 0..<4: + createThread(threads[i], worker, i) + joinThreads(threads) + + +import std/locks + +type + Semaphore* = object + ## A semaphore is a synchronization primitive that controls access to a + ## shared resource through the use of a counter. If the counter is greater + ## than zero, then access is allowed. If it is zero, then access is denied. + ## What the access is depends on the use of the semaphore. + ## + ## Semaphores are typically used to limit the number of threads than can + ## access some (physical or logical) resource. + ## + ## Semaphores are of two types: counting semaphores and binary semaphores. + ## Counting semaphores can take non-negative integer values to indicate the + ## number of units of a particular resource that are available. Binary + ## semaphores can only take the values 0 and 1 and are used to implement + ## locks. + c: Cond + L: Lock + counter: int + +when defined(nimAllowNonVarDestructor): + proc `=destroy`*(s: Semaphore) {.inline.} = + let x = addr(s) + deinitCond(x.c) + deinitLock(x.L) +else: + proc `=destroy`*(s: var Semaphore) {.inline.} = + deinitCond(s.c) + deinitLock(s.L) + +proc `=sink`*(dest: var Semaphore; src: Semaphore) {.error.} +proc `=copy`*(dest: var Semaphore; src: Semaphore) {.error.} + +proc createSemaphore*(count: Natural = 0): Semaphore = + result = default(Semaphore) + result.counter = count + initCond(result.c) + initLock(result.L) + +proc wait*(s: var Semaphore) = + ## Wait for the semaphore to be signaled. If the semaphore's counter is zero, + ## wait blocks until it becomes greater than zero. + acquire(s.L) + while s.counter <= 0: + wait(s.c, s.L) + dec s.counter + release(s.L) + +proc signal*(s: var Semaphore) {.inline.} = + ## Signal the semaphore. + acquire(s.L) + inc s.counter + signal(s.c) + release(s.L) From a909a467645ff995ead4bce72c754edc264ac910 Mon Sep 17 00:00:00 2001 From: Antonis Geralis Date: Sun, 24 Sep 2023 12:35:31 +0300 Subject: [PATCH 2/3] fix tests --- tests/tbarrier.nim | 23 +++++++++++------------ tests/tonce.nim | 3 +-- tests/tonce2.nim | 13 ++++++------- tests/trwlock.nim | 17 ++++++++--------- tests/trwlock1.nim | 15 +++++++-------- tests/tsemaphore.nim | 22 ++++++++++------------ tests/tsemaphore1.nim | 4 +--- 7 files changed, 44 insertions(+), 53 deletions(-) diff --git a/tests/tbarrier.nim b/tests/tbarrier.nim index d70b03c..34c373f 100644 --- a/tests/tbarrier.nim +++ b/tests/tbarrier.nim @@ -1,27 +1,26 @@ import threading/barrier, std/[os, strformat] const - numThreads = 10 - numIters = 100 + NumThreads = 10 + NumIters = 100 var - barrier: Barrier - phases: array[numThreads, int] - threads: array[numThreads, Thread[int]] + b = createBarrier(NumThreads) + phases: array[NumThreads, int] + threads: array[NumThreads, Thread[int]] proc routine(id: int) = - for i in 0 ..< numIters: + for i in 0..= 0) joinThreads(threads) - assert data == numIters * numThreads + assert data == NumIters * NumThreads frob() diff --git a/tests/trwlock1.nim b/tests/trwlock1.nim index 8bf7e26..6e826a2 100644 --- a/tests/trwlock1.nim +++ b/tests/trwlock1.nim @@ -1,18 +1,18 @@ import threading/rwlock, std/[random, os] const - numThreads = 10 - numIters = 100 + NumThreads = 10 + NumIters = 100 var - rw: RwLock + rw = createRwLock() data = 0 - threads: array[numThreads, Thread[void]] + threads: array[NumThreads, Thread[void]] proc routine = var r = initRand(getThreadId()) - for i in 0..= 0 proc frob = - init rw - for i in 0.. Date: Sun, 24 Sep 2023 12:47:34 +0300 Subject: [PATCH 3/3] algo variant from wiki implemented saves 144-112=32bytes --- threading/rwlock.nim | 39 ++++++++++++++++++--------------------- 1 file changed, 18 insertions(+), 21 deletions(-) diff --git a/threading/rwlock.nim b/threading/rwlock.nim index 561d35a..22cfaef 100644 --- a/threading/rwlock.nim +++ b/threading/rwlock.nim @@ -38,21 +38,19 @@ type RwLock* = object ## Readers-writer lock. Multiple readers can acquire the lock at the same ## time, but only one writer can acquire the lock at a time. - readPhase: Cond - writePhase: Cond + c: Cond L: Lock - counter: int # can be in three states: free = 0, reading > 0, writing = -1 + activeReaders, waitingWriters: int + activeWriter: bool when defined(nimAllowNonVarDestructor): proc `=destroy`*(rw: RwLock) {.inline.} = let x = addr(rw) - deinitCond(x.readPhase) - deinitCond(x.writePhase) + deinitCond(x.c) deinitLock(x.L) else: proc `=destroy`*(rw: var RwLock) {.inline.} = - deinitCond(rw.readPhase) - deinitCond(rw.writePhase) + deinitCond(rw.c) deinitLock(rw.L) proc `=sink`*(dest: var RwLock; source: RwLock) {.error.} @@ -60,40 +58,39 @@ proc `=copy`*(dest: var RwLock; source: RwLock) {.error.} proc createRwLock*(): RwLock = result = default(RwLock) - initCond(result.readPhase) - initCond(result.writePhase) + initCond(result.c) initLock(result.L) proc beginRead*(rw: var RwLock) = ## Acquire a read lock. acquire(rw.L) - while rw.counter == -1: - wait(rw.readPhase, rw.L) - inc rw.counter + while rw.waitingWriters > 0 or rw.activeWriter: + wait(rw.c, rw.L) + inc rw.activeReaders release(rw.L) proc beginWrite*(rw: var RwLock) = ## Acquire a write lock. acquire(rw.L) - while rw.counter != 0: - wait(rw.writePhase, rw.L) - rw.counter = -1 + inc rw.waitingWriters + while rw.activeReaders > 0 or rw.activeWriter: + wait(rw.c, rw.L) + dec rw.waitingWriters + rw.activeWriter = true release(rw.L) proc endRead*(rw: var RwLock) {.inline.} = ## Release a read lock. acquire(rw.L) - dec rw.counter - if rw.counter == 0: - rw.writePhase.signal() + dec rw.activeReaders + rw.c.broadcast() release(rw.L) proc endWrite*(rw: var RwLock) {.inline.} = ## Release a write lock. acquire(rw.L) - rw.counter = 0 - rw.readPhase.broadcast() - rw.writePhase.signal() + rw.activeWriter = false + rw.c.broadcast() release(rw.L) template readWith*(a: RwLock, body: untyped) =