From a8c3adb2c1a20f95ed6a4c1f3ba80e170bb74bcc Mon Sep 17 00:00:00 2001 From: cabboose <57953499+shayanhabibi@users.noreply.github.com> Date: Fri, 26 Nov 2021 14:27:14 +0800 Subject: [PATCH] Introduce Futex for backpressure on Wards (#20) * wip * add a cast so it compiles * cont tests * progr * futex wait for wards * move tests * some cleans and fixes * Added destroy proc and suggested changes Co-authored-by: Smooth Operator --- failtests/README.md | 3 + failtests/tloony.nim | 109 ++++++++++++++ failtests/tward.nim | 137 +++++++++++++++++ loony.nim | 221 +++++++++++++++++++--------- loony/node.nim | 1 - loony/utils/arc.nim | 58 ++++++++ loony/utils/futex.nim | 6 + loony/utils/futex/futex_linux.nim | 43 ++++++ loony/utils/futex/futex_windows.nim | 20 +++ loony/ward.nim | 96 +++++++++--- 10 files changed, 607 insertions(+), 87 deletions(-) create mode 100644 failtests/README.md create mode 100644 failtests/tloony.nim create mode 100644 failtests/tward.nim create mode 100644 loony/utils/arc.nim create mode 100644 loony/utils/futex.nim create mode 100644 loony/utils/futex/futex_linux.nim create mode 100644 loony/utils/futex/futex_windows.nim diff --git a/failtests/README.md b/failtests/README.md new file mode 100644 index 0000000..80943c1 --- /dev/null +++ b/failtests/README.md @@ -0,0 +1,3 @@ +These tests fail without changing nims system arc implementation to use atomic operations. + +Therefore they do not have to be run by the CI for now. \ No newline at end of file diff --git a/failtests/tloony.nim b/failtests/tloony.nim new file mode 100644 index 0000000..b217edc --- /dev/null +++ b/failtests/tloony.nim @@ -0,0 +1,109 @@ +import loony/utils/arc + +import std/osproc +import std/strutils +import std/logging +import std/atomics +import std/os +import std/macros +import std/hashes + +import balls + +import loony +import loony/ward + +const + continuationCount = 10_000 +let + threadCount = 11 + +type + C = object + r: bool + e: int + Cop = ref object + r: bool + e: int + # Cop = ptr C + +addHandler newConsoleLogger() +setLogFilter: + when defined(danger): + lvlNotice + elif defined(release): + lvlInfo + else: + lvlDebug + +var w: LoonyQueue[Cop] +w = newLoonyQueue[Cop]() +var q = w.newWard({PoolWaiter}) +var counter {.global.}: Atomic[int] + +proc enqueue(c: Cop) = + check not q.isNil + c.r = true + c.e = c.e + 1 + discard q.push(c) + + +proc runThings() {.thread.} = + while true: + var job = pop q + var i: bool + if job.isNil(): + echo "breaking" + break + else: + if job.e < 3: + enqueue job + else: + # freeShared(job) + atomicThreadFence(ATOMIC_RELEASE) + discard counter.fetchAdd(1) + +template expectCounter(n: int): untyped = + ## convenience + try: + check counter.load == n + except Exception: + checkpoint " counter: ", load counter + checkpoint "expected: ", n + raise + +# suite "loony": + +block: + proc main = + ## run some ref objects through the queue in many threads + var threads: seq[Thread[void]] + newSeq(threads, threadCount) + counter.store 0 + dumpAllocStats: + debugNodeCounter: + # If `loonyDebug` is defined this will output number of nodes you started + # with - the number of nodes you end with (= non-deallocated nodes) + for thread in threads.mitems: + createThread(thread, runThings) + checkpoint "created $# threads" % [ $threadCount ] + echo "Sleep for a bit" + sleep(500) + echo "Lets begin" + for i in 0 ..< continuationCount: + discard counter.load() + atomicThreadFence(ATOMIC_ACQUIRE) + var c = new Cop + + # var c = cast[Cop](createShared(C)) + # c[] = C() + enqueue c + checkpoint "queued $# continuations" % [ $continuationCount ] + sleep(1_000) + q.killWaiters + for thread in threads.mitems: + joinThread thread + checkpoint "joined $# threads" % [ $threadCount ] + + expectCounter continuationCount + main() \ No newline at end of file diff --git a/failtests/tward.nim b/failtests/tward.nim new file mode 100644 index 0000000..931669e --- /dev/null +++ b/failtests/tward.nim @@ -0,0 +1,137 @@ +import std/osproc +import std/strutils +import std/logging +import std/atomics +import std/os +import std/macros + +import balls +import cps + +import loony +import loony/ward + +const + continuationCount = 1000 +let + threadCount = 11 + +type + C = ref object of Continuation + +addHandler newConsoleLogger() +setLogFilter: + when defined(danger): + lvlNotice + elif defined(release): + lvlInfo + else: + lvlDebug + +var w: LoonyQueue[Continuation] +w = initLoonyQueue[Continuation]() +var q = w.newWard({PoolWaiter}) + + +proc runThings() {.thread.} = + while true: + var job = pop q + if job.dismissed: + break + else: + while job.running: + job = trampoline job + +proc enqueue(c: C): C {.cpsMagic.} = + check not q.isNil + discard q.push(c) + +var counter {.global.}: Atomic[int] + +import std/hashes + +# try to delay a reasonable amount of time despite platform +when defined(windows): + proc noop(c: C): C {.cpsMagic.} = + var i: int + # while true: + # let v = hash i + # if i == 300: break + # inc i + # sleep: + # when defined(danger) and false: # Reduce cont count on windows before adding sleep + # 1 + # else: + # # 0 + # 1 # đŸ¤” + c +else: + import posix + proc noop(c: C): C {.cpsMagic.} = + const + ns = when defined(danger): 1_000 else: 10_000 + var x = Timespec(tv_sec: 0.Time, tv_nsec: ns) + var y: Timespec + if 0 != nanosleep(x, y): + raise + c + +proc doContinualThings() {.cps: C.} = + enqueue() + noop() + enqueue() + discard counter.fetchAdd(1) + +template expectCounter(n: int): untyped = + ## convenience + try: + check counter.load == n + except Exception: + checkpoint " counter: ", load counter + checkpoint "expected: ", n + raise + +suite "loony": + + block: + ## run some continuations through the queue in another thread + skip "boring" + var thr: Thread[void] + + counter.store 0 + dumpAllocStats: + for i in 0 ..< continuationCount: + var c = whelp doContinualThings() + discard enqueue c + createThread(thr, runThings) + joinThread thr + expectCounter continuationCount + + block: + ## run some continuations through the queue in many threads + var threads: seq[Thread[void]] + newSeq(threads, threadCount) + counter.store 0 + dumpAllocStats: + debugNodeCounter: + # If `loonyDebug` is defined this will output number of nodes you started + # with - the number of nodes you end with (= non-deallocated nodes) + for thread in threads.mitems: + createThread(thread, runThings) + checkpoint "created $# threads" % [ $threadCount ] + echo "gonna sleep" + sleep(5_000) + echo "passed" + for i in 0 ..< continuationCount: + var c = whelp doContinualThings() + discard enqueue c + checkpoint "queued $# continuations" % [ $continuationCount ] + echo "passed 2" + sleep(5_000) + echo counter.load() + killWaiters(q) + for thread in threads.mitems: + joinThread thread + checkpoint "joined $# threads" % [ $threadCount ] + + expectCounter continuationCount diff --git a/loony.nim b/loony.nim index 04eaaa1..41656d5 100644 --- a/loony.nim +++ b/loony.nim @@ -20,7 +20,9 @@ export # raise Defect(nil) type - LoonyQueue*[T] = ref object + + LoonyQueue*[T] = ref LoonyQueueImpl[T] + LoonyQueueImpl*[T] = object head : Atomic[TagPtr] ## Whereby node contains the slots and idx tail : Atomic[TagPtr] ## is the uint16 index of the slot array currTail : Atomic[NodePtr] ## 8 bytes Current NodePtr @@ -34,15 +36,17 @@ type QueueEmpty # 0000_0000 Advanced # 0000_0001 -# TagPtr is an alias for 8 byte uint (pointer). We reserve a portion of -# the tail to contain the index of the slot to its corresponding node -# by aligning the node pointers on allocation. Since the index value is -# stored in the same memory word as its associated node pointer, the FAA -# operations could potentially affect both values if too many increments -# were to occur. This is accounted for in the algorithm and with space -# for overflow in the alignment. See Section 5.2 for the paper to see -# why an overflow would prove impossible except under extraordinarily -# large number of thread contention. +#[ + TagPtr is an alias for 8 byte uint (pointer). We reserve a portion of + the tail to contain the index of the slot to its corresponding node + by aligning the node pointers on allocation. Since the index value is + stored in the same memory word as its associated node pointer, the FAA + operations could potentially affect both values if too many increments + were to occur. This is accounted for in the algorithm and with space + for overflow in the alignment. See Section 5.2 for the paper to see + why an overflow would prove impossible except under extraordinarily + large number of thread contention. +]# template nptr(tag: TagPtr): NodePtr = toNodePtr(tag and PTRMASK) template node(tag: TagPtr): var Node = cast[ptr Node](nptr(tag))[] @@ -92,12 +96,90 @@ template compareAndSwapCurrTail(queue: LoonyQueue, expect: var uint, swap: uint | TagPtr): bool {.used.} = queue.currTail.compareExchange(expect, swap) -# Both enqueue and dequeue enter FAST PATH operations 99% of the time, -# however in cases we enter the SLOW PATH operations represented in both -# enq and deq by advTail and advHead respectively. -# -# This path requires the threads to first help updating the linked list -# struct before retrying and entering the fast path in the next attempt. +proc `=destroy`*[T](x: var LoonyQueueImpl[T]) = + ## Destroy is completely operated on the basis that no other threads are + ## operating on the queue at the same time. To not follow this will result in + ## SIGSEGVs and undefined behaviour. + var loadedLine: int # we want to track what cache line we have loaded and + # ensure we perform an atomic load at least once on each cache line + var headNodeIdx: (NodePtr, uint16) + var tailNode: ptr Node + var tailIdx: uint16 + var slotptr: ptr uint + var slotval: uint + block: + + template getHead: untyped = + let tptr = x.head.load() + headNodeIdx = (tptr.nptr, tptr.idx) + + template getTail: untyped = + if tailNode.isNil(): + let tptr = x.tail.load() + tailNode = cast[ptr Node](tptr.nptr) + tailIdx = tptr.idx + loadedLine = cast[int](tailNode) + else: + let oldNode = tailNode + tailNode = cast[ptr Node](tailNode.next.load().nptr()) + tailIdx = 0'u16 + deallocNode oldNode + + template loadSlot: untyped = + slotptr = cast[ptr uint](tailNode.slots[tailIdx].addr()) + if (loadedLine + 64) < cast[int](slotptr): + slotval = slotptr.atomicLoadN(ATOMIC_RELAXED) + loadedLine = cast[int](slotptr) + elif not slotptr.isNil(): + slotval = slotptr[] + else: + slotval = 0'u + + + template truthy: bool = + (cast[NodePtr](tailNode), tailIdx) == headNodeIdx + template idxTruthy: bool = + if cast[NodePtr](tailNode) == headNodeIdx[1]: + tailIdx < N + else: + tailIdx <= headNodeIdx[1] + + + getHead() + getTail() + if (loadedLine mod 64) != 0: + loadedLine = loadedLine - (loadedLine mod 64) + + while not truthy: + + while idxTruthy: + loadSlot() + if (slotval and spec.WRITER) == spec.WRITER: + if (slotval and CONSUMED) == CONSUMED: + inc tailIdx + elif (slotval and PTRMASK) != 0'u: + var el = cast[T](slotval and PTRMASK) + when T is ref: + GC_unref el + else: + `=destroy`(el) + inc tailIdx + else: + break + getTail() + if tailNode.isNil(): + break + if not tailNode.isNil(): + deallocNode(tailNode) + +#[ + Both enqueue and dequeue enter FAST PATH operations 99% of the time, + however in cases we enter the SLOW PATH operations represented in both + enq and deq by advTail and advHead respectively. + + This path requires the threads to first help updating the linked list + struct before retrying and entering the fast path in the next attempt. +]# proc advTail[T](queue: LoonyQueue[T]; pel: uint; tag: TagPtr): AdvTail = # Modified version of Michael-Scott algorithm @@ -178,39 +260,42 @@ proc advHead(queue: LoonyQueue; curr, h, t: var TagPtr): AdvHead = incrDeqCount(h.node, curr.idx - N) Advanced -# Fundamentally, both enqueue and dequeue operations attempt to -# exclusively reserve access to a slot in the array of their associated -# queue node by automatically incremementing the appropriate index value -# and retrieving the previous value of the index as well as the current -# node pointer. -# -# Threads that retrieve an index i < N (length of the slots array) gain -# *exclusive* rights to perform either write/consume operation on the -# corresponding slot. -# -# This guarantees there can only be exactly one of each for any given -# slot. -# -# Where i < N, we use FAST PATH operations. These operations are -# designed to be as fast as possible while only dealing with memory -# contention in rare edge cases. -# -# if not i < N, we enter SLOW PATH operations. See AdvTail and AdvHead -# above. -# -# Fetch And Add (FAA) primitives are used for both incrementing index -# values as well as performing read(consume) and write operations on -# reserved slots which drastically improves scalability compared to -# Compare And Swap (CAS) primitives. -# -# Note that all operations on slots must modify the slots state bits to -# announce both operations completion (in case of a read) and also makes -# determining the order in which two operations occured possible. +#[ + Fundamentally, both enqueue and dequeue operations attempt to + exclusively reserve access to a slot in the array of their associated + queue node by automatically incremementing the appropriate index value + and retrieving the previous value of the index as well as the current + node pointer. + + Threads that retrieve an index i < N (length of the slots array) gain + *exclusive* rights to perform either write/consume operation on the + corresponding slot. + + This guarantees there can only be exactly one of each for any given + slot. + + Where i < N, we use FAST PATH operations. These operations are + designed to be as fast as possible while only dealing with memory + contention in rare edge cases. + + if not i < N, we enter SLOW PATH operations. See AdvTail and AdvHead + above. + + Fetch And Add (FAA) primitives are used for both incrementing index + values as well as performing read(consume) and write operations on + reserved slots which drastically improves scalability compared to + Compare And Swap (CAS) primitives. + + Note that all operations on slots must modify the slots state bits to + announce both operations completion (in case of a read) and also makes + determining the order in which two operations occured possible. +]# proc pushImpl[T](queue: LoonyQueue[T], el: T, forcedCoherance: static bool = false) = doAssert not queue.isNil(), "The queue has not been initialised" - # Begin by tagging pointer el with WRITER bit + # Begin by tagging pointer el with WRITER bit and increasing the ref + # count if necessary var pel = prepareElement el # Ensure all writes in STOREBUFFER are committed. By far the most costly # primitive; it will be preferred while proving safety before working towards @@ -310,9 +395,9 @@ proc popImpl[T](queue: LoonyQueue[T]; forcedCoherance: static bool = false): T = # CPU to completely commit its STOREBUFFER result = cast[T](prev and SLOTMASK) - assert not result.isNil - when result is ref: - GC_unref result + when T is ref: + GC_unref result # We incref on the push, so we have to make sure to + # to decref or we will get memory leaks break else: # SLOW PATH OPS @@ -335,22 +420,27 @@ proc unsafePop*[T](queue: LoonyQueue[T]): T = ## related to the item. popImpl(queue, forcedCoherance = false) -# Consumed slots have been written to and then read. If a concurrent -# deque operation outpaces the corresponding enqueue operation then both -# operations have to abandon and try again. Once all slots in the node -# have been consumed or abandoned, the node is considered drained and -# unlinked from the list. Node can be reclaimed and de-allocated. -# -# Queue manages an enqueue index and a dequeue index. Each are modified -# by fetchAndAdd; gives thread reserves previous index for itself which -# may be used to address a slot in the respective nodes array. -# -# both node pointers are tagged with their assoc index value -> -# they store both address to respective node as well as the current -# index value in the same memory word. -# -# Requires a sufficient number of available bits that are not used to -# present the nodes addresses themselves. +#[ + Consumed slots have been written to and then read. If a concurrent + deque operation outpaces the corresponding enqueue operation then both + operations have to abandon and try again. Once all slots in the node + have been consumed or abandoned, the node is considered drained and + unlinked from the list. Node can be reclaimed and de-allocated. + + Queue manages an enqueue index and a dequeue index. Each are modified + by fetchAndAdd; gives thread reserves previous index for itself which + may be used to address a slot in the respective nodes array. + + both node pointers are tagged with their assoc index value -> + they store both address to respective node as well as the current + index value in the same memory word. + + Requires a sufficient number of available bits that are not used to + present the nodes addresses themselves. +]# + + + proc initLoonyQueue*(q: LoonyQueue) = ## Initialize an existing LoonyQueue. @@ -364,8 +454,7 @@ proc initLoonyQueue*(q: LoonyQueue) = var t = load tailTag.toNode().slots[i] assert h == 0, "Slot found to not be nil on initialisation" assert t == 0, "Slot found to not be nil on initialisation" - # I mean the enqueue and dequeue pretty well handle any issues with - # initialising, but I might as well help allocate the first ones right? + # Allocate the first nodes on initialisation to optimise use. proc initLoonyQueue*[T](): LoonyQueue[T] {.deprecated: "Use newLoonyQueue instead".} = ## Return an initialized LoonyQueue. diff --git a/loony/node.nim b/loony/node.nim index 596b610..019cf15 100644 --- a/loony/node.nim +++ b/loony/node.nim @@ -152,7 +152,6 @@ proc incrEnqCount*(node: var Node; final: uint16 = 0) = `=destroy` node incEnqCounter() else: - # echo finalCount - ((mask.uint16 and MASK) + 1), "\n" incEnqPathCounter() proc incrDeqCount*(node: var Node; final: uint16 = 0) = diff --git a/loony/utils/arc.nim b/loony/utils/arc.nim new file mode 100644 index 0000000..c0d0fbf --- /dev/null +++ b/loony/utils/arc.nim @@ -0,0 +1,58 @@ +#[ + +Some types and constants copied from stdlib so we can mimic rc against the +standard ref header. + +]# + +when defined(gcOrc): + const + rcIncrement = 0b10000 # so that lowest 4 bits are not touched + rcMask = 0b1111 + rcShift = 4 # shift by rcShift to get the reference counter +else: + const + rcIncrement = 0b1000 # so that lowest 3 bits are not touched + rcMask = 0b111 + rcShift = 3 # shift by rcShift to get the reference counter + +template shit(n: int): int = n shl rcShift +template unshit(n: int): int = n shr rcShift + +type + RefHeader = object + rc: int + when defined(gcOrc): + rootIdx: int + when defined(nimArcDebug) or defined(nimArcIds): + refId: int + + Cell = ptr RefHeader + +template head(p: ref): Cell = + cast[Cell](cast[int](p) -% sizeof(RefHeader)) + +template rcPtr(p: ref): ptr int = addr head(p)[].rc + +proc atomicRC*(p: ref, order: AtomMemModel = ATOMIC_SEQ_CST): int = + ## returns the current rc + atomicLoad(rcPtr(p), addr result, order) + result = unshit result + +proc atomicRC*(p: ref; n: int, order: AtomMemModel = ATOMIC_SEQ_CST) = + ## sets the rc to the provided value + let old = atomicFetchAnd(rcPtr(p), rcMask, order) + let n = (shit n) and old + atomicStore(rcPtr(p), unsafeAddr n, order) + +proc atomicIncRef*(p: ref, order: AtomMemModel = ATOMIC_SEQ_CST): int = + ## returns the old value + unshit atomicFetchAdd(rcPtr(p), rcIncrement, order) + +proc atomicDecRef*(p: ref, order: AtomMemModel = ATOMIC_SEQ_CST): int = + ## returns the old value + unshit atomicFetchSub(rcPtr(p), rcIncrement, order) + +template isIsolated*(p: ref, order: AtomMemModel = ATOMIC_SEQ_CST): bool = + ## true if the ref is the sole owner + atomicRC(p) == 0 diff --git a/loony/utils/futex.nim b/loony/utils/futex.nim new file mode 100644 index 0000000..07cec2a --- /dev/null +++ b/loony/utils/futex.nim @@ -0,0 +1,6 @@ +when defined(windows): + import ./futex/futex_windows + export futex_windows +else: + import ./futex/futex_linux + export futex_linux \ No newline at end of file diff --git a/loony/utils/futex/futex_linux.nim b/loony/utils/futex/futex_linux.nim new file mode 100644 index 0000000..c9d5de3 --- /dev/null +++ b/loony/utils/futex/futex_linux.nim @@ -0,0 +1,43 @@ +# Weave +# Copyright (c) 2019 Mamy AndrĂ©-Ratsimbazafy +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +# A wrapper for linux futex. +# Condition variables do not always wake on signal which can deadlock the runtime +# so we need to roll up our sleeves and use the low-level futex API. + +import std/atomics +export MemoryOrder + +type + FutexOp = distinct cint + +var NR_Futex {.importc: "__NR_futex", header: "".}: cint +var FutexWaitPrivate {.importc:"FUTEX_WAIT_PRIVATE", header: "".}: FutexOp +var FutexWakePrivate {.importc:"FUTEX_WAKE_PRIVATE", header: "".}: FutexOp + +proc syscall(sysno: clong): cint {.header:"", varargs.} + +proc sysFutex( + futex: pointer, op: FutexOp, val1: cint, + timeout: pointer = nil, val2: pointer = nil, val3: cint = 0): cint {.inline.} = + syscall(NR_Futex, futex, op, val1, timeout, val2, val3) + +proc wait*[T](monitor: ptr T, compare: T) {.inline.} = + ## Suspend a thread if the value of the futex is the same as refVal. + + # Returns 0 in case of a successful suspend + # If value are different, it returns EWOULDBLOCK + # We discard as this is not needed and simplifies compat with Windows futex + discard sysFutex(monitor, FutexWaitPrivate, cast[cint](compare)) + +proc wake*(monitor: pointer) {.inline.} = + ## Wake one thread (from the same process) + + # Returns the number of actually woken threads + # or a Posix error code (if negative) + # We discard as this is not needed and simplifies compat with Windows futex + discard sysFutex(monitor, FutexWakePrivate, 1) \ No newline at end of file diff --git a/loony/utils/futex/futex_windows.nim b/loony/utils/futex/futex_windows.nim new file mode 100644 index 0000000..cb0abfc --- /dev/null +++ b/loony/utils/futex/futex_windows.nim @@ -0,0 +1,20 @@ +const + INFINITE = -1 + +proc waitOnAddress[T](address: ptr T; compare: ptr T; size: int32; + dwMilliseconds: int32): bool {.stdcall, dynlib: "API-MS-Win-Core-Synch-l1-2-0", importc: "WaitOnAddress".} +proc wakeByAddressSingle(address: pointer) {.stdcall, dynlib: "API-MS-Win-Core-Synch-l1-2-0", importc: "WakeByAddressSingle".} +proc wakeByAddressAll(address: pointer) {.stdcall, dynlib: "API-MS-Win-Core-Synch-l1-2-0", importc: "WakeByAddressAll".} + +proc wait*[T](monitor: ptr T; compare: T) {.inline.} = + # win api says this can spuriously wake and should be in a loop which does + # a comparison to ensure it is appropriate for the thread to wake up + # while monitor[] == compare: + # discard waitOnAddress(monitor, compare.unsafeAddr, sizeof(T).int32, INFINITE) + discard waitOnAddress(monitor, compare.unsafeAddr, sizeof(T).int32, INFINITE) + +proc wake*(monitor: pointer) {.inline.} = + wakeByAddressSingle(monitor) + +proc wakeAll*(monitor: pointer) {.inline.} = + wakeByAddressAll(monitor) \ No newline at end of file diff --git a/loony/ward.nim b/loony/ward.nim index bf81cb1..2680701 100644 --- a/loony/ward.nim +++ b/loony/ward.nim @@ -20,8 +20,11 @@ import loony/spec {.all.} import loony/node {.all.} import loony {.all.} +import loony/utils/futex + import std/atomics import std/setutils +import std/sets type WardFlag* {.size: sizeof(uint16).} = enum @@ -30,6 +33,8 @@ type Clearable = "this ward is capable of clearing the queue" Pausable = "accessing the queue with this ward can be paused" # This flag will automatically infer PopPausable and PushPausable. + PoolWaiter = "This for use by threadpools waiting on loony for work; " & + "they will use futex wait and wake." WardFlags = uint16 @@ -44,7 +49,7 @@ converter toWardFlags*(flags: set[WardFlag]): WardFlags = when nimvm: for flag in items(flags): block: - if flag == Pausable: + if flag in {Pausable, PoolWaiter}: result = result or (1'u16 shl PopPausable.ord) result = result or (1'u16 shl PushPausable.ord) result = result or (1'u16 shl flag.ord) @@ -59,7 +64,7 @@ converter toWardFlags*(flags: set[WardFlag]): WardFlags = PushPausable in flags and not (Pausable in flags): result = result or (1'u16 shl Pausable.ord) - if flags.contains Pausable: + if flags * {Pausable, PoolWaiter} != {}: result = result or (cast[uint16]({PopPausable, PushPausable})) converter toFlags*(value: WardFlags): set[WardFlag] = @@ -107,47 +112,98 @@ proc push*[T, F](ward: Ward[T, F], el: T): bool = ## Push the element through the ward onto the queue. If the ward is paused or ## there is some restriction on access, a false is returned (which means the ## el is still a valid reference/pointer). - if not ward.isFlagOn PushPausable: - ward.queue.push el - result = true + + when PoolWaiter in F: + if not ward.isFlagOn PushPausable: + ward.queue.pushImpl(el, true) + result = true + wake(ward.values.addr()) + else: + if not ward.isFlagOn PushPausable: + ward.queue.push el + result = true proc unsafePush*[T, F](ward: Ward[T, F], el: T): bool = ## unsafePush the element through the ward onto the queue. If the ward is paused or ## there is some restriction on access, a false is returned (which means the ## el is still a valid reference/pointer) - if not ward.isFlagOn PushPausable: - ward.queue.unsafePush el - result = true + when PoolWaiter in F: + if not ward.isFlagOn PushPausable: + ward.queue.pushImpl(el, false) + result = true + wake(ward.values.addr()) + else: + if not ward.isFlagOn PushPausable: + ward.queue.unsafePush el + result = true + proc pop*[T, F](ward: Ward[T, F]): T = ## Pop an element off the queue in the ward. If the ward is paused or ## there is some restriction on access, a nil pointer is returned - if not ward.isFlagOn PopPausable: - ward.queue.pop() + when PoolWaiter in F: + when T is pointer or T is ref: + template truthy: untyped = + not ward.isFlagOn(PopPausable) and + (res = ward.queue.popImpl(true); res).isNil() + else: + template truthy: untyped = + not ward.isFlagOn(PopPausable) and + (res = ward.queue.popImpl(true); res) == default(T) + + var res: T + while truthy: + wait(ward.values.addr(), ward.values) + return res + + else: + if not ward.isFlagOn PopPausable: + result = ward.queue.pop() + proc unsafePop*[T, F](ward: Ward[T, F]): T = ## unsafePop an element off the queue in the ward. If the ward is paused or ## there is some restriction on access, a nil pointer is returned - if not ward.isFlagOn PopPausable: - ward.queue.unsafePop() + when PoolWaiter in F: + if not ward.isFlagOn PopPausable: + while result.isNil: + result = ward.queue.popImpl(false) + if result.isNil: + wait(ward.values.addr(), ward.values) + else: + if not ward.isFlagOn PopPausable: + result = ward.queue.unsafePop() -template pauseImpl[T, F](ward: Ward[T, F], flags: set[WardFlag]): bool = - when flags.intersection ward.flags == flags: - if `and`(ward.values.fetchOr(flags, moRelease), flags) > 0'u16: +template pauseImpl*[T, F](ward: Ward[T, F], flagset: set[WardFlag]): bool = + when flagset * ward.flags == flagset: + if `and`(cast[ptr uint16](ward.values.addr()).atomicFetchOr(flagset, ATOMIC_RELEASE), flagset) > 0'u16: true else: false else: raise ValueError.newException: - "You require this flag on the ward: " & $flags -template resumeImpl[T, F](ward: Ward[T, F], flag: set[WardFlag]): bool = - when flags.intersection ward.flags == flags: - if `and`(ward.values.fetchAnd(complement flags, moRelease), flags) > 0'u16: + "You require this flag on the ward: " & $flagset +template resumeImpl*[T, F](ward: Ward[T, F], flagset: set[WardFlag]): bool = + when flagset * ward.flags == flagset: + if `and`(ward.values.fetchAnd(complement flagset, moRelease), flagset) > 0'u16: true else: false else: raise ValueError.newException: - "You require this flag on the ward: " & $flags + "You require this flag on the ward: " & $flagset + +proc killWaiters*[T, F](ward: Ward[T, F]) = + ## This will pause the slot and then awaken all waiters which will cause them + ## to return a null value. + ## NOTE: This means that the ward will remain paused after kill waiters + ## is used (since it is usually used on cleaning resources this shouldn't be + ## an issue but can be reverted using resume as with other pauses) + when PoolWaiter in F: + discard ward.pauseImpl({PopPausable}) + wakeAll(ward.values.addr()) + else: + raise ValueError.newException: + "You require this flag on the ward: PoolWaiter" # These pause functions will only stop ward access that have not yet begun. # This must be kept in mind when considering activity on the queue else. proc pause*[T, F](ward: Ward[T, F]): bool =