Skip to content

Commit

Permalink
Refactor Worker Loop (#1924)
Browse files Browse the repository at this point in the history
  • Loading branch information
nibanks authored Aug 18, 2021
1 parent 7dd212c commit 242b085
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 124 deletions.
280 changes: 167 additions & 113 deletions src/core/worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,8 @@ QuicWorkerGetNextOperation(
_IRQL_requires_max_(PASSIVE_LEVEL)
void
QuicWorkerProcessTimers(
_In_ QUIC_WORKER* Worker
_In_ QUIC_WORKER* Worker,
_In_ CXPLAT_THREAD_ID ThreadID
)
{
//
Expand All @@ -417,7 +418,7 @@ QuicWorkerProcessTimers(
QUIC_CONNECTION* Connection =
CXPLAT_CONTAINING_RECORD(Entry, QUIC_CONNECTION, TimerLink);

Connection->WorkerThreadID = Worker->ThreadID;
Connection->WorkerThreadID = ThreadID;
QuicConfigurationAttachSilo(Connection->Configuration);
QuicConnTimerExpired(Connection, TimeNow);
QuicConfigurationDetachSilo();
Expand All @@ -429,7 +430,8 @@ _IRQL_requires_max_(PASSIVE_LEVEL)
void
QuicWorkerProcessConnection(
_In_ QUIC_WORKER* Worker,
_In_ QUIC_CONNECTION* Connection
_In_ QUIC_CONNECTION* Connection,
_In_ CXPLAT_THREAD_ID ThreadID
)
{
QuicTraceEvent(
Expand All @@ -450,7 +452,7 @@ QuicWorkerProcessConnection(
//
// Set the thread ID so reentrant API calls will execute inline.
//
Connection->WorkerThreadID = Worker->ThreadID;
Connection->WorkerThreadID = ThreadID;
Connection->Stats.Schedule.DrainCount++;

if (Connection->State.UpdateWorker) {
Expand Down Expand Up @@ -546,145 +548,197 @@ QuicWorkerProcessConnection(
}
}

CXPLAT_THREAD_CALLBACK(QuicWorkerThread, Context)
{
QUIC_WORKER* Worker = (QUIC_WORKER*)Context;
typedef enum QUIC_WORKER_LOOP_STATE {
QUIC_WORKER_LOOP_EXIT,
QUIC_WORKER_LOOP_CONTINUE,
QUIC_WORKER_LOOP_WAIT,
QUIC_WORKER_LOOP_WAIT_TIMER,
} QUIC_WORKER_LOOP_STATE;

Worker->ThreadID = CxPlatCurThreadID();
Worker->IsActive = TRUE;
QuicTraceEvent(
WorkerStart,
"[wrkr][%p] Start",
Worker);
typedef enum QUIC_WORKER_LOOP_REASON {
QUIC_WORKER_LOOP_REASON_NORMAL,
QUIC_WORKER_LOOP_REASON_WAIT,
QUIC_WORKER_LOOP_REASON_TIMER,
} QUIC_WORKER_LOOP_REASON;

//
// Runs one iteration of the worker loop.
//
_IRQL_requires_max_(PASSIVE_LEVEL)
QUIC_WORKER_LOOP_STATE
QuicWorkerLoop(
_In_ QUIC_WORKER* Worker,
_In_ CXPLAT_THREAD_ID ThreadID,
_In_ QUIC_WORKER_LOOP_REASON Reason,
_When_(return==QUIC_WORKER_LOOP_WAIT_TIMER, _Out_)
uint64_t* Delay
)
{
//
// TODO - Review how often CxPlatTimeUs64() is called in the thread. Perhaps
// we can get it down to once per loop, passing the value along.
//

while (Worker->Enabled) {
//
// For every loop of the worker thread, in an attempt to balance things,
// a single connection will be processed (if available), followed by a
// single stateless operation (if available), and then by any expired
// timers (which just queue more operations on connections).
//

if (Reason == QUIC_WORKER_LOOP_REASON_WAIT) {
QuicWorkerToggleActivityState(Worker, TRUE);

} else if (Reason == QUIC_WORKER_LOOP_REASON_TIMER) {
QuicWorkerToggleActivityState(Worker, FALSE);
QuicWorkerProcessTimers(Worker, ThreadID);
}

if (!Worker->Enabled) {
//
// For every loop of the worker thread, in an attempt to balance things,
// a single connection will be processed (if available), followed by a
// single stateless operation (if available), and then by any expired
// timers (which just queue more operations on connections).
// Because the registration layer only waits for the rundown to complete,
// and because the connection releases the rundown on handle close,
// not free, it's possible that the worker thread still had the connection
// in it's list by the time clean up started. So it needs to release any
// remaining references on connections.
//

QUIC_CONNECTION* Connection = QuicWorkerGetNextConnection(Worker);
if (Connection != NULL) {
QuicWorkerProcessConnection(Worker, Connection);
int64_t Dequeue = 0;
while (!CxPlatListIsEmpty(&Worker->Connections)) {
QUIC_CONNECTION* Connection =
CXPLAT_CONTAINING_RECORD(
CxPlatListRemoveHead(&Worker->Connections), QUIC_CONNECTION, WorkerLink);
if (!Connection->State.ExternalOwner) {
//
// If there is no external owner, shut down the connection so
// that it's not leaked.
//
QuicTraceLogConnVerbose(
AbandonOnLibShutdown,
Connection,
"Abandoning on shutdown");
QuicConnOnShutdownComplete(Connection);
}
QuicConnRelease(Connection, QUIC_CONN_REF_WORKER);
--Dequeue;
}
QuicPerfCounterAdd(QUIC_PERF_COUNTER_CONN_QUEUE_DEPTH, Dequeue);

QUIC_OPERATION* Operation = QuicWorkerGetNextOperation(Worker);
if (Operation != NULL) {
QuicBindingProcessStatelessOperation(
Operation->Type,
Operation->STATELESS.Context);
Dequeue = 0;
while (!CxPlatListIsEmpty(&Worker->Operations)) {
QUIC_OPERATION* Operation =
CXPLAT_CONTAINING_RECORD(
CxPlatListRemoveHead(&Worker->Operations), QUIC_OPERATION, Link);
#if DEBUG
Operation->Link.Flink = NULL;
#endif
QuicOperationFree(Worker, Operation);
QuicPerfCounterIncrement(QUIC_PERF_COUNTER_WORK_OPER_COMPLETED);
--Dequeue;
}
QuicPerfCounterAdd(QUIC_PERF_COUNTER_WORK_OPER_QUEUE_DEPTH, Dequeue);

return QUIC_WORKER_LOOP_EXIT;
}

QUIC_CONNECTION* Connection = QuicWorkerGetNextConnection(Worker);
if (Connection != NULL) {
QuicWorkerProcessConnection(Worker, Connection, ThreadID);
}

uint64_t TimeNow = CxPlatTimeUs64();
QUIC_OPERATION* Operation = QuicWorkerGetNextOperation(Worker);
if (Operation != NULL) {
QuicBindingProcessStatelessOperation(
Operation->Type,
Operation->STATELESS.Context);
QuicOperationFree(Worker, Operation);
QuicPerfCounterIncrement(QUIC_PERF_COUNTER_WORK_OPER_COMPLETED);
}

uint64_t TimeNow = CxPlatTimeUs64();

//
// Opportunistically try to snap-shot performance counters and do
// some validation.
//
QuicPerfCounterTrySnapShot(TimeNow);

//
// Get the delay until the next timer expires. Check to see if any timers
// have expired; if so, process them. If not, only wait for the next timer
// if we have run out of connections and stateless operations to process.
//
*Delay = QuicTimerWheelGetWaitTime(&Worker->TimerWheel, TimeNow);

if (*Delay == 0) {
//
// Opportunistically try to snap-shot performance counters and do
// some validation.
// Timers are ready to be processed.
//
QuicPerfCounterTrySnapShot(TimeNow);
QuicWorkerProcessTimers(Worker, ThreadID);
return QUIC_WORKER_LOOP_CONTINUE;
}

if (Connection != NULL || Operation != NULL) {
//
// Get the delay until the next timer expires. Check to see if any
// timers have expired; if so, process them. If not, only wait for the
// next timer if we have run out of connections and stateless operations
// to process.
// There still may be more connections or stateless operations to be
// processed. Continue processing until there are no more. Then the
// thread can wait for the timer delay.
//
uint64_t Delay = QuicTimerWheelGetWaitTime(&Worker->TimerWheel, TimeNow);
return QUIC_WORKER_LOOP_CONTINUE;
}

if (Delay == 0) {
//
// Timers are ready to be processed.
//
QuicWorkerProcessTimers(Worker);
if (*Delay != UINT64_MAX) {
//
// Since we have no connections and no stateless operations to process
// at the moment, we need to wait for the ready event or the next timer
// to expire.
//
if (*Delay >= (uint64_t)UINT32_MAX) {
*Delay = UINT32_MAX - 1; // Max has special meaning for most platforms.
}
QuicWorkerToggleActivityState(Worker, (uint32_t)*Delay);
QuicWorkerResetQueueDelay(Worker);
return QUIC_WORKER_LOOP_WAIT_TIMER;
}

} else if (Connection != NULL || Operation != NULL) {
//
// There still may be more connections or stateless operations to be
// processed. Continue processing until there are no more. Then the
// thread can wait for the timer delay.
//
continue;
//
// No active timers running, so just wait for the ready event.
//
QuicWorkerToggleActivityState(Worker, UINT32_MAX);
QuicWorkerResetQueueDelay(Worker);
return QUIC_WORKER_LOOP_WAIT;
}

} else if (Delay != UINT64_MAX) {
//
// Since we have no connections and no stateless operations to
// process at the moment, we need to wait for the ready event or the
// next timer to expire.
//
if (Delay >= (uint64_t)UINT32_MAX) {
Delay = UINT32_MAX - 1; // Max has special meaning for most platforms.
}
QuicWorkerToggleActivityState(Worker, (uint32_t)Delay);
QuicWorkerResetQueueDelay(Worker);
BOOLEAN ReadySet =
CxPlatEventWaitWithTimeout(Worker->Ready, (uint32_t)Delay);
QuicWorkerToggleActivityState(Worker, ReadySet);

if (!ReadySet) {
QuicWorkerProcessTimers(Worker);
}
CXPLAT_THREAD_CALLBACK(QuicWorkerThread, Context)
{
QUIC_WORKER* Worker = (QUIC_WORKER*)Context;
CXPLAT_THREAD_ID ThreadID = CxPlatCurThreadID();
QUIC_WORKER_LOOP_REASON LoopReason = QUIC_WORKER_LOOP_REASON_NORMAL;
QUIC_WORKER_LOOP_STATE LoopState;
uint64_t Delay = 0;

} else {
//
// No active timers running, so just wait for the ready event.
//
QuicWorkerToggleActivityState(Worker, UINT32_MAX);
QuicWorkerResetQueueDelay(Worker);
CxPlatEventWaitForever(Worker->Ready);
QuicWorkerToggleActivityState(Worker, TRUE);
}
}
Worker->IsActive = TRUE;
QuicTraceEvent(
WorkerStart,
"[wrkr][%p] Start",
Worker);

//
// Because the registration layer only waits for the rundown to complete,
// and because the connection releases the rundown on handle close,
// not free, it's possible that the worker thread still had the connection
// in it's list by the time clean up started. So it needs to release any
// remaining references on connections.
// Keep looping until the exit result. Wait on the ready event as necessary.
//
int64_t Dequeue = 0;
while (!CxPlatListIsEmpty(&Worker->Connections)) {
QUIC_CONNECTION* Connection =
CXPLAT_CONTAINING_RECORD(
CxPlatListRemoveHead(&Worker->Connections), QUIC_CONNECTION, WorkerLink);
if (!Connection->State.ExternalOwner) {
//
// If there is no external owner, shut down the connection so that
// it's not leaked.
//
QuicTraceLogConnVerbose(
AbandonOnLibShutdown,
Connection,
"Abandoning on shutdown");
QuicConnOnShutdownComplete(Connection);
}
QuicConnRelease(Connection, QUIC_CONN_REF_WORKER);
--Dequeue;
}
QuicPerfCounterAdd(QUIC_PERF_COUNTER_CONN_QUEUE_DEPTH, Dequeue);
while ((LoopState = QuicWorkerLoop(Worker, ThreadID, LoopReason, &Delay)) != QUIC_WORKER_LOOP_EXIT) {

Dequeue = 0;
while (!CxPlatListIsEmpty(&Worker->Operations)) {
QUIC_OPERATION* Operation =
CXPLAT_CONTAINING_RECORD(
CxPlatListRemoveHead(&Worker->Operations), QUIC_OPERATION, Link);
#if DEBUG
Operation->Link.Flink = NULL;
#endif
QuicOperationFree(Worker, Operation);
--Dequeue;
if (LoopState == QUIC_WORKER_LOOP_CONTINUE) {
LoopReason = QUIC_WORKER_LOOP_REASON_NORMAL;

} else if (LoopState == QUIC_WORKER_LOOP_WAIT) {
CxPlatEventWaitForever(Worker->Ready);
LoopReason = QUIC_WORKER_LOOP_REASON_WAIT;

} else {
LoopReason =
CxPlatEventWaitWithTimeout(Worker->Ready, (uint32_t)Delay) ?
QUIC_WORKER_LOOP_REASON_WAIT : QUIC_WORKER_LOOP_REASON_TIMER;
}
}
QuicPerfCounterAdd(QUIC_PERF_COUNTER_WORK_OPER_QUEUE_DEPTH, Dequeue);

QuicTraceEvent(
WorkerStop,
Expand Down
5 changes: 0 additions & 5 deletions src/core/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,6 @@ typedef struct QUIC_CACHEALIGN QUIC_WORKER {
//
uint16_t IdealProcessor;

//
// The identifier of the platform thread.
//
CXPLAT_THREAD_ID ThreadID;

//
// The average queue delay connections experience, in microseconds.
//
Expand Down
6 changes: 3 additions & 3 deletions src/generated/linux/worker.c.clog.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ tracepoint(CLOG_WORKER_C, IndicateIdealProcChanged , arg1);\
// Decoder Ring for AbandonOnLibShutdown
// [conn][%p] Abandoning on shutdown
// QuicTraceLogConnVerbose(
AbandonOnLibShutdown,
Connection,
"Abandoning on shutdown");
AbandonOnLibShutdown,
Connection,
"Abandoning on shutdown");
// arg1 = arg1 = Connection
----------------------------------------------------------*/
#define _clog_3_ARGS_TRACE_AbandonOnLibShutdown(uniqueId, arg1, encoded_arg_string)\
Expand Down
6 changes: 3 additions & 3 deletions src/generated/linux/worker.c.clog.h.lttng.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ TRACEPOINT_EVENT(CLOG_WORKER_C, IndicateIdealProcChanged,
// Decoder Ring for AbandonOnLibShutdown
// [conn][%p] Abandoning on shutdown
// QuicTraceLogConnVerbose(
AbandonOnLibShutdown,
Connection,
"Abandoning on shutdown");
AbandonOnLibShutdown,
Connection,
"Abandoning on shutdown");
// arg1 = arg1 = Connection
----------------------------------------------------------*/
TRACEPOINT_EVENT(CLOG_WORKER_C, AbandonOnLibShutdown,
Expand Down

0 comments on commit 242b085

Please sign in to comment.