Skip to content

Commit

Permalink
Merge pull request #47 from planetis-m/new_primitives
Browse files Browse the repository at this point in the history
New synchronization primitives
  • Loading branch information
Araq authored May 8, 2024
2 parents 11b315f + d0c8c50 commit f326fd9
Show file tree
Hide file tree
Showing 11 changed files with 554 additions and 0 deletions.
27 changes: 27 additions & 0 deletions tests/tbarrier.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import threading/barrier, std/[os, strformat]

const
NumThreads = 10
NumIters = 100

var
b = createBarrier(NumThreads)
phases: array[NumThreads, int]
threads: array[NumThreads, Thread[int]]

proc routine(id: int) =
for i in 0..<NumIters:
phases[id] = i
if (id + i) mod NumThreads == 0:
sleep 1
wait b
for j in 0..<NumThreads:
assert phases[j] == i, &"{id} in phase {i} sees {j} in phase {phases[j]}"
wait b

proc testBarrier =
for i in 0..<NumThreads:
createThread(threads[i], routine, i)
joinThreads(threads)

testBarrier()
11 changes: 11 additions & 0 deletions tests/tonce.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import threading/once

var o = createOnce()
proc smokeOnce() =
var a = 0
o.once(a += 1)
assert a == 1
o.once(a += 1)
assert a == 1

smokeOnce()
34 changes: 34 additions & 0 deletions tests/tonce2.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import threading/once

const
NumThreads = 10
NumIters = 1000

type
Singleton = object
data: int

var
threads: array[NumThreads, Thread[void]]
counter = 1
instance: ptr Singleton
o = createOnce()

proc getInstance(): ptr Singleton =
once(o):
instance = createSharedU(Singleton)
instance.data = counter
inc counter
result = instance

proc routine {.thread.} =
for i in 1..NumIters:
assert getInstance().data == 1

proc main =
for i in 0..<NumThreads:
createThread(threads[i], routine)
joinThreads(threads)
deallocShared(instance)

main()
28 changes: 28 additions & 0 deletions tests/trwlock.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import threading/rwlock, std/os

const
NumThreads = 10
NumIters = 100

var
rw = createRwLock()
data = 0
threads: array[NumThreads, Thread[void]]

proc routine =
for i in 0..<NumIters:
writeWith rw:
let tmp = data
data = -1
sleep 1
data = tmp + 1

proc frob =
for i in 0..<NumThreads:
createThread(threads[i], routine)
for i in 0..<NumIters:
readWith(rw, assert data >= 0)
joinThreads(threads)
assert data == NumIters * NumThreads

frob()
30 changes: 30 additions & 0 deletions tests/trwlock1.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import threading/rwlock, std/[random, os]

const
NumThreads = 10
NumIters = 100

var
rw = createRwLock()
data = 0
threads: array[NumThreads, Thread[void]]

proc routine =
var r = initRand(getThreadId())
for i in 0..<NumIters:
if r.rand(1.0) <= 1 / NumThreads:
writeWith rw:
let tmp = data
data = -1
sleep 1
data = tmp + 1
else:
readWith rw:
assert data >= 0

proc frob =
for i in 0..<NumThreads:
createThread(threads[i], routine)
joinThreads(threads)

frob()
38 changes: 38 additions & 0 deletions tests/tsemaphore.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import threading/semaphore, std/strformat

const
BufSize = 16
NumIters = 1000

var
thr1, thr2: Thread[void]
buf: array[BufSize, int]
head, tail = 0
chars = createSemaphore()
spaces = createSemaphore(BufSize)

template next(current: untyped): untyped = (current + 1) and BufSize - 1

proc producer =
for i in 0..<NumIters:
wait spaces
assert buf[head] <= i, &"Constraint: recv_{buf[tail]} < send_{i}+{BufSize}"
buf[head] = i
head = next(head)
signal chars

proc consumer =
for i in 0..<NumIters:
wait chars
assert buf[tail] == i, &"Constraint: send_{buf[tail]} < recv_{i}"
buf[tail] = i + BufSize
tail = next(tail)
signal spaces

proc testSemaphore =
createThread(thr1, producer)
createThread(thr2, consumer)
joinThread(thr1)
joinThread(thr2)

testSemaphore()
26 changes: 26 additions & 0 deletions tests/tsemaphore1.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import threading/semaphore

var
semS, semT = createSemaphore()
aArrived, cArrived = false
thread: Thread[void]

proc routine =
# Section C
cArrived = true
signal semT
wait semS
# Section D
assert aArrived, "Constraint: Section A precedes D"

proc testRendezvous =
createThread(thread, routine)
# Section A
aArrived = true
signal semS
wait semT
# Section B
assert cArrived, "Constraint: Section C precedes B"
joinThread thread

testRendezvous()
88 changes: 88 additions & 0 deletions threading/barrier.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#
#
# Nim's Runtime Library
# (c) Copyright 2023 Nim contributors
#
# See the file "copying.txt", included in this
# distribution, for details about the copyright.

## Barrier for Nim.

runnableExamples:

import std / os

var phases: array[10, int]
var b = createBarrier(10)

proc worker(id: int) =
for i in 0 ..< 100:
phases[id] = i
if (id + i) mod 10 == 0:
sleep 1
wait b
for j in 0 ..< 10:
assert phases[j] == i
wait b

var threads: array[10, Thread[int]]
for i in 0..<10:
createThread(threads[i], worker, i)

joinThreads(threads)


import std / locks

type
Barrier* = object
## A barrier is a synchronization mechanism that allows a set of threads to
## all wait for each other to reach a common point. Barriers are useful in
## programs involving a fixed-size party of cooperating threads that must
## occasionally wait for each other. The barrier is called a cyclic barrier
## if it can be reused after the waiting threads are released.
##
## The barrier is initialized for a given number of threads. Each thread
## that calls `wait` on the barrier will block until all the threads have
## made that call. At this point, the barrier is reset to its initial state
## and all threads are released.
c: Cond
L: Lock
required: int # number of threads needed for the barrier to continue
left: int # current barrier count, number of threads still needed.
cycle: uint # generation count

when defined(nimAllowNonVarDestructor):
proc `=destroy`*(b: Barrier) {.inline.} =
let x = addr(b)
deinitCond(x.c)
deinitLock(x.L)
else:
proc `=destroy`*(b: var Barrier) {.inline.} =
deinitCond(b.c)
deinitLock(b.L)

proc `=sink`*(dest: var Barrier; src: Barrier) {.error.}
proc `=copy`*(dest: var Barrier; src: Barrier) {.error.}

proc createBarrier*(parties: Natural): Barrier =
result = default(Barrier)
result.required = parties
result.left = parties
initCond(result.c)
initLock(result.L)

proc wait*(b: var Barrier) =
## Wait for all threads to reach the barrier. When the last thread reaches
## the barrier, all threads are released.
acquire(b.L)
dec b.left
if b.left == 0:
inc b.cycle
b.left = b.required
broadcast(b.c)
else:
let cycle = b.cycle
while cycle == b.cycle:
wait(b.c, b.L)
release(b.L)
76 changes: 76 additions & 0 deletions threading/once.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#
#
# Nim's Runtime Library
# (c) Copyright 2023 Nim contributors
#
# See the file "copying.txt", included in this
# distribution, for details about the copyright.

## Once for Nim.

runnableExamples:

type
Singleton = object
data: int

var
counter = 1
instance: ptr Singleton
o = createOnce()

proc getInstance(): ptr Singleton =
once(o):
instance = createSharedU(Singleton)
instance.data = counter
inc counter
result = instance

proc worker {.thread.} =
for i in 1..1000:
assert getInstance().data == 1

var threads: array[10, Thread[void]]
for i in 0..<10:
createThread(threads[i], worker)
joinThreads(threads)
deallocShared(instance)


import std / [locks, atomics]

type
Once* = object
## Once is a type that allows you to execute a block of code exactly once.
## The first call to `once` will execute the block of code and all other
## calls will be ignored.
L: Lock
finished: Atomic[bool]

when defined(nimAllowNonVarDestructor):
proc `=destroy`*(o: Once) {.inline.} =
let x = addr(o)
deinitLock(x.L)
else:
proc `=destroy`*(o: var Once) {.inline.} =
deinitLock(o.L)

proc `=sink`*(dest: var Once; source: Once) {.error.}
proc `=copy`*(dest: var Once; source: Once) {.error.}

proc createOnce*(): Once =
result = default(Once)
initLock(result.L)

template once*(o: Once, body: untyped) =
## Executes `body` exactly once.
if not o.finished.load(moAcquire):
acquire(o.L)
try:
if not o.finished.load(moRelaxed):
try:
body
finally:
o.finished.store(true, moRelease)
finally:
release(o.L)
Loading

0 comments on commit f326fd9

Please sign in to comment.