From 440f77617f0fcfe2cc21689b1d99046b659bf6c0 Mon Sep 17 00:00:00 2001 From: Nick Banks Date: Wed, 18 Aug 2021 10:00:07 -0400 Subject: [PATCH 1/2] Refactor Worker Loop --- src/core/worker.c | 280 +++++++++++++++++++++++++++------------------- src/core/worker.h | 5 - 2 files changed, 167 insertions(+), 118 deletions(-) diff --git a/src/core/worker.c b/src/core/worker.c index 8dc5531e69..1d524f2b2e 100644 --- a/src/core/worker.c +++ b/src/core/worker.c @@ -396,7 +396,8 @@ QuicWorkerGetNextOperation( _IRQL_requires_max_(PASSIVE_LEVEL) void QuicWorkerProcessTimers( - _In_ QUIC_WORKER* Worker + _In_ QUIC_WORKER* Worker, + _In_ CXPLAT_THREAD_ID ThreadID ) { // @@ -417,7 +418,7 @@ QuicWorkerProcessTimers( QUIC_CONNECTION* Connection = CXPLAT_CONTAINING_RECORD(Entry, QUIC_CONNECTION, TimerLink); - Connection->WorkerThreadID = Worker->ThreadID; + Connection->WorkerThreadID = ThreadID; QuicConfigurationAttachSilo(Connection->Configuration); QuicConnTimerExpired(Connection, TimeNow); QuicConfigurationDetachSilo(); @@ -429,7 +430,8 @@ _IRQL_requires_max_(PASSIVE_LEVEL) void QuicWorkerProcessConnection( _In_ QUIC_WORKER* Worker, - _In_ QUIC_CONNECTION* Connection + _In_ QUIC_CONNECTION* Connection, + _In_ CXPLAT_THREAD_ID ThreadID ) { QuicTraceEvent( @@ -450,7 +452,7 @@ QuicWorkerProcessConnection( // // Set the thread ID so reentrant API calls will execute inline. // - Connection->WorkerThreadID = Worker->ThreadID; + Connection->WorkerThreadID = ThreadID; Connection->Stats.Schedule.DrainCount++; if (Connection->State.UpdateWorker) { @@ -546,145 +548,197 @@ QuicWorkerProcessConnection( } } -CXPLAT_THREAD_CALLBACK(QuicWorkerThread, Context) -{ - QUIC_WORKER* Worker = (QUIC_WORKER*)Context; +typedef enum QUIC_WORKER_LOOP_STATE { + QUIC_WORKER_LOOP_EXIT, + QUIC_WORKER_LOOP_CONTINUE, + QUIC_WORKER_LOOP_WAIT, + QUIC_WORKER_LOOP_WAIT_TIMER, +} QUIC_WORKER_LOOP_STATE; - Worker->ThreadID = CxPlatCurThreadID(); - Worker->IsActive = TRUE; - QuicTraceEvent( - WorkerStart, - "[wrkr][%p] Start", - Worker); +typedef enum QUIC_WORKER_LOOP_REASON { + QUIC_WORKER_LOOP_REASON_NORMAL, + QUIC_WORKER_LOOP_REASON_WAIT, + QUIC_WORKER_LOOP_REASON_TIMER, +} QUIC_WORKER_LOOP_REASON; +// +// Runs one iteration of the worker loop. +// +_IRQL_requires_max_(PASSIVE_LEVEL) +QUIC_WORKER_LOOP_STATE +QuicWorkerLoop( + _In_ QUIC_WORKER* Worker, + _In_ CXPLAT_THREAD_ID ThreadID, + _In_ QUIC_WORKER_LOOP_REASON Reason, + _When_(return==QUIC_WORKER_LOOP_WAIT_TIMER, _Out_) + uint64_t* Delay + ) +{ // // TODO - Review how often CxPlatTimeUs64() is called in the thread. Perhaps // we can get it down to once per loop, passing the value along. // - while (Worker->Enabled) { + // + // For every loop of the worker thread, in an attempt to balance things, + // a single connection will be processed (if available), followed by a + // single stateless operation (if available), and then by any expired + // timers (which just queue more operations on connections). + // + + if (Reason == QUIC_WORKER_LOOP_REASON_WAIT) { + QuicWorkerToggleActivityState(Worker, TRUE); + } else if (Reason == QUIC_WORKER_LOOP_REASON_TIMER) { + QuicWorkerToggleActivityState(Worker, FALSE); + QuicWorkerProcessTimers(Worker, ThreadID); + } + + if (!Worker->Enabled) { // - // For every loop of the worker thread, in an attempt to balance things, - // a single connection will be processed (if available), followed by a - // single stateless operation (if available), and then by any expired - // timers (which just queue more operations on connections). + // Because the registration layer only waits for the rundown to complete, + // and because the connection releases the rundown on handle close, + // not free, it's possible that the worker thread still had the connection + // in it's list by the time clean up started. So it needs to release any + // remaining references on connections. // - - QUIC_CONNECTION* Connection = QuicWorkerGetNextConnection(Worker); - if (Connection != NULL) { - QuicWorkerProcessConnection(Worker, Connection); + int64_t Dequeue = 0; + while (!CxPlatListIsEmpty(&Worker->Connections)) { + QUIC_CONNECTION* Connection = + CXPLAT_CONTAINING_RECORD( + CxPlatListRemoveHead(&Worker->Connections), QUIC_CONNECTION, WorkerLink); + if (!Connection->State.ExternalOwner) { + // + // If there is no external owner, shut down the connection so + // that it's not leaked. + // + QuicTraceLogConnVerbose( + AbandonOnLibShutdown, + Connection, + "Abandoning on shutdown"); + QuicConnOnShutdownComplete(Connection); + } + QuicConnRelease(Connection, QUIC_CONN_REF_WORKER); + --Dequeue; } + QuicPerfCounterAdd(QUIC_PERF_COUNTER_CONN_QUEUE_DEPTH, Dequeue); - QUIC_OPERATION* Operation = QuicWorkerGetNextOperation(Worker); - if (Operation != NULL) { - QuicBindingProcessStatelessOperation( - Operation->Type, - Operation->STATELESS.Context); + Dequeue = 0; + while (!CxPlatListIsEmpty(&Worker->Operations)) { + QUIC_OPERATION* Operation = + CXPLAT_CONTAINING_RECORD( + CxPlatListRemoveHead(&Worker->Operations), QUIC_OPERATION, Link); +#if DEBUG + Operation->Link.Flink = NULL; +#endif QuicOperationFree(Worker, Operation); - QuicPerfCounterIncrement(QUIC_PERF_COUNTER_WORK_OPER_COMPLETED); + --Dequeue; } + QuicPerfCounterAdd(QUIC_PERF_COUNTER_WORK_OPER_QUEUE_DEPTH, Dequeue); + + return QUIC_WORKER_LOOP_EXIT; + } + + QUIC_CONNECTION* Connection = QuicWorkerGetNextConnection(Worker); + if (Connection != NULL) { + QuicWorkerProcessConnection(Worker, Connection, ThreadID); + } - uint64_t TimeNow = CxPlatTimeUs64(); + QUIC_OPERATION* Operation = QuicWorkerGetNextOperation(Worker); + if (Operation != NULL) { + QuicBindingProcessStatelessOperation( + Operation->Type, + Operation->STATELESS.Context); + QuicOperationFree(Worker, Operation); + QuicPerfCounterIncrement(QUIC_PERF_COUNTER_WORK_OPER_COMPLETED); + } + + uint64_t TimeNow = CxPlatTimeUs64(); + + // + // Opportunistically try to snap-shot performance counters and do + // some validation. + // + QuicPerfCounterTrySnapShot(TimeNow); + + // + // Get the delay until the next timer expires. Check to see if any timers + // have expired; if so, process them. If not, only wait for the next timer + // if we have run out of connections and stateless operations to process. + // + *Delay = QuicTimerWheelGetWaitTime(&Worker->TimerWheel, TimeNow); + if (*Delay == 0) { // - // Opportunistically try to snap-shot performance counters and do - // some validation. + // Timers are ready to be processed. // - QuicPerfCounterTrySnapShot(TimeNow); + QuicWorkerProcessTimers(Worker, ThreadID); + return QUIC_WORKER_LOOP_CONTINUE; + } + if (Connection != NULL || Operation != NULL) { // - // Get the delay until the next timer expires. Check to see if any - // timers have expired; if so, process them. If not, only wait for the - // next timer if we have run out of connections and stateless operations - // to process. + // There still may be more connections or stateless operations to be + // processed. Continue processing until there are no more. Then the + // thread can wait for the timer delay. // - uint64_t Delay = QuicTimerWheelGetWaitTime(&Worker->TimerWheel, TimeNow); + return QUIC_WORKER_LOOP_CONTINUE; + } - if (Delay == 0) { - // - // Timers are ready to be processed. - // - QuicWorkerProcessTimers(Worker); + if (*Delay != UINT64_MAX) { + // + // Since we have no connections and no stateless operations to process + // at the moment, we need to wait for the ready event or the next timer + // to expire. + // + if (*Delay >= (uint64_t)UINT32_MAX) { + *Delay = UINT32_MAX - 1; // Max has special meaning for most platforms. + } + QuicWorkerToggleActivityState(Worker, (uint32_t)*Delay); + QuicWorkerResetQueueDelay(Worker); + return QUIC_WORKER_LOOP_WAIT_TIMER; + } - } else if (Connection != NULL || Operation != NULL) { - // - // There still may be more connections or stateless operations to be - // processed. Continue processing until there are no more. Then the - // thread can wait for the timer delay. - // - continue; + // + // No active timers running, so just wait for the ready event. + // + QuicWorkerToggleActivityState(Worker, UINT32_MAX); + QuicWorkerResetQueueDelay(Worker); + return QUIC_WORKER_LOOP_WAIT; +} - } else if (Delay != UINT64_MAX) { - // - // Since we have no connections and no stateless operations to - // process at the moment, we need to wait for the ready event or the - // next timer to expire. - // - if (Delay >= (uint64_t)UINT32_MAX) { - Delay = UINT32_MAX - 1; // Max has special meaning for most platforms. - } - QuicWorkerToggleActivityState(Worker, (uint32_t)Delay); - QuicWorkerResetQueueDelay(Worker); - BOOLEAN ReadySet = - CxPlatEventWaitWithTimeout(Worker->Ready, (uint32_t)Delay); - QuicWorkerToggleActivityState(Worker, ReadySet); - - if (!ReadySet) { - QuicWorkerProcessTimers(Worker); - } +CXPLAT_THREAD_CALLBACK(QuicWorkerThread, Context) +{ + QUIC_WORKER* Worker = (QUIC_WORKER*)Context; + CXPLAT_THREAD_ID ThreadID = CxPlatCurThreadID(); + QUIC_WORKER_LOOP_REASON LoopReason = QUIC_WORKER_LOOP_REASON_NORMAL; + QUIC_WORKER_LOOP_STATE LoopState; + uint64_t Delay = 0; - } else { - // - // No active timers running, so just wait for the ready event. - // - QuicWorkerToggleActivityState(Worker, UINT32_MAX); - QuicWorkerResetQueueDelay(Worker); - CxPlatEventWaitForever(Worker->Ready); - QuicWorkerToggleActivityState(Worker, TRUE); - } - } + Worker->IsActive = TRUE; + QuicTraceEvent( + WorkerStart, + "[wrkr][%p] Start", + Worker); // - // Because the registration layer only waits for the rundown to complete, - // and because the connection releases the rundown on handle close, - // not free, it's possible that the worker thread still had the connection - // in it's list by the time clean up started. So it needs to release any - // remaining references on connections. + // Keep looping until the exit result. Wait on the ready event as necessary. // - int64_t Dequeue = 0; - while (!CxPlatListIsEmpty(&Worker->Connections)) { - QUIC_CONNECTION* Connection = - CXPLAT_CONTAINING_RECORD( - CxPlatListRemoveHead(&Worker->Connections), QUIC_CONNECTION, WorkerLink); - if (!Connection->State.ExternalOwner) { - // - // If there is no external owner, shut down the connection so that - // it's not leaked. - // - QuicTraceLogConnVerbose( - AbandonOnLibShutdown, - Connection, - "Abandoning on shutdown"); - QuicConnOnShutdownComplete(Connection); - } - QuicConnRelease(Connection, QUIC_CONN_REF_WORKER); - --Dequeue; - } - QuicPerfCounterAdd(QUIC_PERF_COUNTER_CONN_QUEUE_DEPTH, Dequeue); + while ((LoopState = QuicWorkerLoop(Worker, ThreadID, LoopReason, &Delay)) != QUIC_WORKER_LOOP_EXIT) { - Dequeue = 0; - while (!CxPlatListIsEmpty(&Worker->Operations)) { - QUIC_OPERATION* Operation = - CXPLAT_CONTAINING_RECORD( - CxPlatListRemoveHead(&Worker->Operations), QUIC_OPERATION, Link); -#if DEBUG - Operation->Link.Flink = NULL; -#endif - QuicOperationFree(Worker, Operation); - --Dequeue; + if (LoopState == QUIC_WORKER_LOOP_CONTINUE) { + LoopReason = QUIC_WORKER_LOOP_REASON_NORMAL; + + } else if (LoopState == QUIC_WORKER_LOOP_WAIT) { + CxPlatEventWaitForever(Worker->Ready); + LoopReason = QUIC_WORKER_LOOP_REASON_WAIT; + + } else { + LoopReason = + CxPlatEventWaitWithTimeout(Worker->Ready, (uint32_t)Delay) ? + QUIC_WORKER_LOOP_REASON_WAIT : QUIC_WORKER_LOOP_REASON_TIMER; + } } - QuicPerfCounterAdd(QUIC_PERF_COUNTER_WORK_OPER_QUEUE_DEPTH, Dequeue); QuicTraceEvent( WorkerStop, diff --git a/src/core/worker.h b/src/core/worker.h index dd4b8ac27e..c6d5b1d1fa 100644 --- a/src/core/worker.h +++ b/src/core/worker.h @@ -30,11 +30,6 @@ typedef struct QUIC_CACHEALIGN QUIC_WORKER { // uint16_t IdealProcessor; - // - // The identifier of the platform thread. - // - CXPLAT_THREAD_ID ThreadID; - // // The average queue delay connections experience, in microseconds. // From fa8bb0196eabbc64e180d939121a6bb1b8b65387 Mon Sep 17 00:00:00 2001 From: Nick Banks Date: Wed, 18 Aug 2021 11:49:28 -0400 Subject: [PATCH 2/2] Fix clog's ws failure --- src/generated/linux/worker.c.clog.h | 6 +++--- src/generated/linux/worker.c.clog.h.lttng.h | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/generated/linux/worker.c.clog.h b/src/generated/linux/worker.c.clog.h index 3bcef7be0e..66dfe3b129 100644 --- a/src/generated/linux/worker.c.clog.h +++ b/src/generated/linux/worker.c.clog.h @@ -52,9 +52,9 @@ tracepoint(CLOG_WORKER_C, IndicateIdealProcChanged , arg1);\ // Decoder Ring for AbandonOnLibShutdown // [conn][%p] Abandoning on shutdown // QuicTraceLogConnVerbose( - AbandonOnLibShutdown, - Connection, - "Abandoning on shutdown"); + AbandonOnLibShutdown, + Connection, + "Abandoning on shutdown"); // arg1 = arg1 = Connection ----------------------------------------------------------*/ #define _clog_3_ARGS_TRACE_AbandonOnLibShutdown(uniqueId, arg1, encoded_arg_string)\ diff --git a/src/generated/linux/worker.c.clog.h.lttng.h b/src/generated/linux/worker.c.clog.h.lttng.h index 2defc81d8b..3bf5eabb70 100644 --- a/src/generated/linux/worker.c.clog.h.lttng.h +++ b/src/generated/linux/worker.c.clog.h.lttng.h @@ -24,9 +24,9 @@ TRACEPOINT_EVENT(CLOG_WORKER_C, IndicateIdealProcChanged, // Decoder Ring for AbandonOnLibShutdown // [conn][%p] Abandoning on shutdown // QuicTraceLogConnVerbose( - AbandonOnLibShutdown, - Connection, - "Abandoning on shutdown"); + AbandonOnLibShutdown, + Connection, + "Abandoning on shutdown"); // arg1 = arg1 = Connection ----------------------------------------------------------*/ TRACEPOINT_EVENT(CLOG_WORKER_C, AbandonOnLibShutdown,