Skip to content

Commit

Permalink
Introduce Futex for backpressure on Wards (#20)
Browse files Browse the repository at this point in the history
* wip

* add a cast so it compiles

* cont tests

* progr

* futex wait for wards

* move tests

* some cleans and fixes

* Added destroy proc and suggested changes

Co-authored-by: Smooth Operator <[email protected]>
  • Loading branch information
shayanhabibi and disruptek authored Nov 26, 2021
1 parent 37ea875 commit a8c3adb
Show file tree
Hide file tree
Showing 10 changed files with 607 additions and 87 deletions.
3 changes: 3 additions & 0 deletions failtests/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
These tests fail without changing nims system arc implementation to use atomic operations.

Therefore they do not have to be run by the CI for now.
109 changes: 109 additions & 0 deletions failtests/tloony.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import loony/utils/arc

import std/osproc
import std/strutils
import std/logging
import std/atomics
import std/os
import std/macros
import std/hashes

import balls

import loony
import loony/ward

const
continuationCount = 10_000
let
threadCount = 11

type
C = object
r: bool
e: int
Cop = ref object
r: bool
e: int
# Cop = ptr C

addHandler newConsoleLogger()
setLogFilter:
when defined(danger):
lvlNotice
elif defined(release):
lvlInfo
else:
lvlDebug

var w: LoonyQueue[Cop]
w = newLoonyQueue[Cop]()
var q = w.newWard({PoolWaiter})
var counter {.global.}: Atomic[int]

proc enqueue(c: Cop) =
check not q.isNil
c.r = true
c.e = c.e + 1
discard q.push(c)


proc runThings() {.thread.} =
while true:
var job = pop q
var i: bool
if job.isNil():
echo "breaking"
break
else:
if job.e < 3:
enqueue job
else:
# freeShared(job)
atomicThreadFence(ATOMIC_RELEASE)
discard counter.fetchAdd(1)

template expectCounter(n: int): untyped =
## convenience
try:
check counter.load == n
except Exception:
checkpoint " counter: ", load counter
checkpoint "expected: ", n
raise

# suite "loony":

block:
proc main =
## run some ref objects through the queue in many threads
var threads: seq[Thread[void]]
newSeq(threads, threadCount)
counter.store 0
dumpAllocStats:
debugNodeCounter:
# If `loonyDebug` is defined this will output number of nodes you started
# with - the number of nodes you end with (= non-deallocated nodes)
for thread in threads.mitems:
createThread(thread, runThings)
checkpoint "created $# threads" % [ $threadCount ]
echo "Sleep for a bit"
sleep(500)
echo "Lets begin"
for i in 0 ..< continuationCount:
discard counter.load()
atomicThreadFence(ATOMIC_ACQUIRE)
var c = new Cop

# var c = cast[Cop](createShared(C))
# c[] = C()
enqueue c
checkpoint "queued $# continuations" % [ $continuationCount ]
sleep(1_000)
q.killWaiters
for thread in threads.mitems:
joinThread thread
checkpoint "joined $# threads" % [ $threadCount ]

expectCounter continuationCount
main()
137 changes: 137 additions & 0 deletions failtests/tward.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import std/osproc
import std/strutils
import std/logging
import std/atomics
import std/os
import std/macros

import balls
import cps

import loony
import loony/ward

const
continuationCount = 1000
let
threadCount = 11

type
C = ref object of Continuation

addHandler newConsoleLogger()
setLogFilter:
when defined(danger):
lvlNotice
elif defined(release):
lvlInfo
else:
lvlDebug

var w: LoonyQueue[Continuation]
w = initLoonyQueue[Continuation]()
var q = w.newWard({PoolWaiter})


proc runThings() {.thread.} =
while true:
var job = pop q
if job.dismissed:
break
else:
while job.running:
job = trampoline job

proc enqueue(c: C): C {.cpsMagic.} =
check not q.isNil
discard q.push(c)

var counter {.global.}: Atomic[int]

import std/hashes

# try to delay a reasonable amount of time despite platform
when defined(windows):
proc noop(c: C): C {.cpsMagic.} =
var i: int
# while true:
# let v = hash i
# if i == 300: break
# inc i
# sleep:
# when defined(danger) and false: # Reduce cont count on windows before adding sleep
# 1
# else:
# # 0
# 1 # 🤔
c
else:
import posix
proc noop(c: C): C {.cpsMagic.} =
const
ns = when defined(danger): 1_000 else: 10_000
var x = Timespec(tv_sec: 0.Time, tv_nsec: ns)
var y: Timespec
if 0 != nanosleep(x, y):
raise
c

proc doContinualThings() {.cps: C.} =
enqueue()
noop()
enqueue()
discard counter.fetchAdd(1)

template expectCounter(n: int): untyped =
## convenience
try:
check counter.load == n
except Exception:
checkpoint " counter: ", load counter
checkpoint "expected: ", n
raise

suite "loony":

block:
## run some continuations through the queue in another thread
skip "boring"
var thr: Thread[void]

counter.store 0
dumpAllocStats:
for i in 0 ..< continuationCount:
var c = whelp doContinualThings()
discard enqueue c
createThread(thr, runThings)
joinThread thr
expectCounter continuationCount

block:
## run some continuations through the queue in many threads
var threads: seq[Thread[void]]
newSeq(threads, threadCount)
counter.store 0
dumpAllocStats:
debugNodeCounter:
# If `loonyDebug` is defined this will output number of nodes you started
# with - the number of nodes you end with (= non-deallocated nodes)
for thread in threads.mitems:
createThread(thread, runThings)
checkpoint "created $# threads" % [ $threadCount ]
echo "gonna sleep"
sleep(5_000)
echo "passed"
for i in 0 ..< continuationCount:
var c = whelp doContinualThings()
discard enqueue c
checkpoint "queued $# continuations" % [ $continuationCount ]
echo "passed 2"
sleep(5_000)
echo counter.load()
killWaiters(q)
for thread in threads.mitems:
joinThread thread
checkpoint "joined $# threads" % [ $threadCount ]

expectCounter continuationCount
Loading

0 comments on commit a8c3adb

Please sign in to comment.