Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Worker Loop #1924

Merged
merged 2 commits into from
Aug 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
280 changes: 167 additions & 113 deletions src/core/worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,8 @@ QuicWorkerGetNextOperation(
_IRQL_requires_max_(PASSIVE_LEVEL)
void
QuicWorkerProcessTimers(
_In_ QUIC_WORKER* Worker
_In_ QUIC_WORKER* Worker,
_In_ CXPLAT_THREAD_ID ThreadID
)
{
//
Expand All @@ -417,7 +418,7 @@ QuicWorkerProcessTimers(
QUIC_CONNECTION* Connection =
CXPLAT_CONTAINING_RECORD(Entry, QUIC_CONNECTION, TimerLink);

Connection->WorkerThreadID = Worker->ThreadID;
Connection->WorkerThreadID = ThreadID;
QuicConfigurationAttachSilo(Connection->Configuration);
QuicConnTimerExpired(Connection, TimeNow);
QuicConfigurationDetachSilo();
Expand All @@ -429,7 +430,8 @@ _IRQL_requires_max_(PASSIVE_LEVEL)
void
QuicWorkerProcessConnection(
_In_ QUIC_WORKER* Worker,
_In_ QUIC_CONNECTION* Connection
_In_ QUIC_CONNECTION* Connection,
_In_ CXPLAT_THREAD_ID ThreadID
)
{
QuicTraceEvent(
Expand All @@ -450,7 +452,7 @@ QuicWorkerProcessConnection(
//
// Set the thread ID so reentrant API calls will execute inline.
//
Connection->WorkerThreadID = Worker->ThreadID;
Connection->WorkerThreadID = ThreadID;
Connection->Stats.Schedule.DrainCount++;

if (Connection->State.UpdateWorker) {
Expand Down Expand Up @@ -546,145 +548,197 @@ QuicWorkerProcessConnection(
}
}

CXPLAT_THREAD_CALLBACK(QuicWorkerThread, Context)
{
QUIC_WORKER* Worker = (QUIC_WORKER*)Context;
typedef enum QUIC_WORKER_LOOP_STATE {
QUIC_WORKER_LOOP_EXIT,
QUIC_WORKER_LOOP_CONTINUE,
QUIC_WORKER_LOOP_WAIT,
QUIC_WORKER_LOOP_WAIT_TIMER,
} QUIC_WORKER_LOOP_STATE;

Worker->ThreadID = CxPlatCurThreadID();
Worker->IsActive = TRUE;
QuicTraceEvent(
WorkerStart,
"[wrkr][%p] Start",
Worker);
typedef enum QUIC_WORKER_LOOP_REASON {
QUIC_WORKER_LOOP_REASON_NORMAL,
QUIC_WORKER_LOOP_REASON_WAIT,
QUIC_WORKER_LOOP_REASON_TIMER,
} QUIC_WORKER_LOOP_REASON;

//
// Runs one iteration of the worker loop.
//
_IRQL_requires_max_(PASSIVE_LEVEL)
QUIC_WORKER_LOOP_STATE
QuicWorkerLoop(
nibanks marked this conversation as resolved.
Show resolved Hide resolved
_In_ QUIC_WORKER* Worker,
_In_ CXPLAT_THREAD_ID ThreadID,
_In_ QUIC_WORKER_LOOP_REASON Reason,
_When_(return==QUIC_WORKER_LOOP_WAIT_TIMER, _Out_)
uint64_t* Delay
)
{
//
// TODO - Review how often CxPlatTimeUs64() is called in the thread. Perhaps
// we can get it down to once per loop, passing the value along.
//

while (Worker->Enabled) {
//
// For every loop of the worker thread, in an attempt to balance things,
// a single connection will be processed (if available), followed by a
// single stateless operation (if available), and then by any expired
// timers (which just queue more operations on connections).
//

if (Reason == QUIC_WORKER_LOOP_REASON_WAIT) {
QuicWorkerToggleActivityState(Worker, TRUE);

} else if (Reason == QUIC_WORKER_LOOP_REASON_TIMER) {
QuicWorkerToggleActivityState(Worker, FALSE);
QuicWorkerProcessTimers(Worker, ThreadID);
}

if (!Worker->Enabled) {
//
// For every loop of the worker thread, in an attempt to balance things,
// a single connection will be processed (if available), followed by a
// single stateless operation (if available), and then by any expired
// timers (which just queue more operations on connections).
// Because the registration layer only waits for the rundown to complete,
// and because the connection releases the rundown on handle close,
// not free, it's possible that the worker thread still had the connection
// in it's list by the time clean up started. So it needs to release any
// remaining references on connections.
//

QUIC_CONNECTION* Connection = QuicWorkerGetNextConnection(Worker);
if (Connection != NULL) {
QuicWorkerProcessConnection(Worker, Connection);
int64_t Dequeue = 0;
while (!CxPlatListIsEmpty(&Worker->Connections)) {
QUIC_CONNECTION* Connection =
CXPLAT_CONTAINING_RECORD(
CxPlatListRemoveHead(&Worker->Connections), QUIC_CONNECTION, WorkerLink);
if (!Connection->State.ExternalOwner) {
//
// If there is no external owner, shut down the connection so
// that it's not leaked.
//
QuicTraceLogConnVerbose(
AbandonOnLibShutdown,
Connection,
"Abandoning on shutdown");
QuicConnOnShutdownComplete(Connection);
}
QuicConnRelease(Connection, QUIC_CONN_REF_WORKER);
--Dequeue;
}
QuicPerfCounterAdd(QUIC_PERF_COUNTER_CONN_QUEUE_DEPTH, Dequeue);

QUIC_OPERATION* Operation = QuicWorkerGetNextOperation(Worker);
if (Operation != NULL) {
QuicBindingProcessStatelessOperation(
Operation->Type,
Operation->STATELESS.Context);
Dequeue = 0;
while (!CxPlatListIsEmpty(&Worker->Operations)) {
QUIC_OPERATION* Operation =
CXPLAT_CONTAINING_RECORD(
CxPlatListRemoveHead(&Worker->Operations), QUIC_OPERATION, Link);
#if DEBUG
Operation->Link.Flink = NULL;
#endif
QuicOperationFree(Worker, Operation);
QuicPerfCounterIncrement(QUIC_PERF_COUNTER_WORK_OPER_COMPLETED);
--Dequeue;
}
QuicPerfCounterAdd(QUIC_PERF_COUNTER_WORK_OPER_QUEUE_DEPTH, Dequeue);

return QUIC_WORKER_LOOP_EXIT;
}

QUIC_CONNECTION* Connection = QuicWorkerGetNextConnection(Worker);
if (Connection != NULL) {
QuicWorkerProcessConnection(Worker, Connection, ThreadID);
}

uint64_t TimeNow = CxPlatTimeUs64();
QUIC_OPERATION* Operation = QuicWorkerGetNextOperation(Worker);
if (Operation != NULL) {
QuicBindingProcessStatelessOperation(
Operation->Type,
Operation->STATELESS.Context);
QuicOperationFree(Worker, Operation);
QuicPerfCounterIncrement(QUIC_PERF_COUNTER_WORK_OPER_COMPLETED);
}

uint64_t TimeNow = CxPlatTimeUs64();

//
// Opportunistically try to snap-shot performance counters and do
// some validation.
//
QuicPerfCounterTrySnapShot(TimeNow);

//
// Get the delay until the next timer expires. Check to see if any timers
// have expired; if so, process them. If not, only wait for the next timer
// if we have run out of connections and stateless operations to process.
//
*Delay = QuicTimerWheelGetWaitTime(&Worker->TimerWheel, TimeNow);

if (*Delay == 0) {
//
// Opportunistically try to snap-shot performance counters and do
// some validation.
// Timers are ready to be processed.
//
QuicPerfCounterTrySnapShot(TimeNow);
QuicWorkerProcessTimers(Worker, ThreadID);
return QUIC_WORKER_LOOP_CONTINUE;
}

if (Connection != NULL || Operation != NULL) {
//
// Get the delay until the next timer expires. Check to see if any
// timers have expired; if so, process them. If not, only wait for the
// next timer if we have run out of connections and stateless operations
// to process.
// There still may be more connections or stateless operations to be
// processed. Continue processing until there are no more. Then the
// thread can wait for the timer delay.
//
uint64_t Delay = QuicTimerWheelGetWaitTime(&Worker->TimerWheel, TimeNow);
return QUIC_WORKER_LOOP_CONTINUE;
}

if (Delay == 0) {
//
// Timers are ready to be processed.
//
QuicWorkerProcessTimers(Worker);
if (*Delay != UINT64_MAX) {
//
// Since we have no connections and no stateless operations to process
// at the moment, we need to wait for the ready event or the next timer
// to expire.
//
if (*Delay >= (uint64_t)UINT32_MAX) {
*Delay = UINT32_MAX - 1; // Max has special meaning for most platforms.
}
QuicWorkerToggleActivityState(Worker, (uint32_t)*Delay);
QuicWorkerResetQueueDelay(Worker);
return QUIC_WORKER_LOOP_WAIT_TIMER;
}

} else if (Connection != NULL || Operation != NULL) {
//
// There still may be more connections or stateless operations to be
// processed. Continue processing until there are no more. Then the
// thread can wait for the timer delay.
//
continue;
//
// No active timers running, so just wait for the ready event.
//
QuicWorkerToggleActivityState(Worker, UINT32_MAX);
QuicWorkerResetQueueDelay(Worker);
return QUIC_WORKER_LOOP_WAIT;
}

} else if (Delay != UINT64_MAX) {
//
// Since we have no connections and no stateless operations to
// process at the moment, we need to wait for the ready event or the
// next timer to expire.
//
if (Delay >= (uint64_t)UINT32_MAX) {
Delay = UINT32_MAX - 1; // Max has special meaning for most platforms.
}
QuicWorkerToggleActivityState(Worker, (uint32_t)Delay);
QuicWorkerResetQueueDelay(Worker);
BOOLEAN ReadySet =
CxPlatEventWaitWithTimeout(Worker->Ready, (uint32_t)Delay);
QuicWorkerToggleActivityState(Worker, ReadySet);

if (!ReadySet) {
QuicWorkerProcessTimers(Worker);
}
CXPLAT_THREAD_CALLBACK(QuicWorkerThread, Context)
{
QUIC_WORKER* Worker = (QUIC_WORKER*)Context;
CXPLAT_THREAD_ID ThreadID = CxPlatCurThreadID();
QUIC_WORKER_LOOP_REASON LoopReason = QUIC_WORKER_LOOP_REASON_NORMAL;
QUIC_WORKER_LOOP_STATE LoopState;
uint64_t Delay = 0;

} else {
//
// No active timers running, so just wait for the ready event.
//
QuicWorkerToggleActivityState(Worker, UINT32_MAX);
QuicWorkerResetQueueDelay(Worker);
CxPlatEventWaitForever(Worker->Ready);
QuicWorkerToggleActivityState(Worker, TRUE);
}
}
Worker->IsActive = TRUE;
QuicTraceEvent(
WorkerStart,
"[wrkr][%p] Start",
Worker);

//
// Because the registration layer only waits for the rundown to complete,
// and because the connection releases the rundown on handle close,
// not free, it's possible that the worker thread still had the connection
// in it's list by the time clean up started. So it needs to release any
// remaining references on connections.
// Keep looping until the exit result. Wait on the ready event as necessary.
//
int64_t Dequeue = 0;
while (!CxPlatListIsEmpty(&Worker->Connections)) {
QUIC_CONNECTION* Connection =
CXPLAT_CONTAINING_RECORD(
CxPlatListRemoveHead(&Worker->Connections), QUIC_CONNECTION, WorkerLink);
if (!Connection->State.ExternalOwner) {
//
// If there is no external owner, shut down the connection so that
// it's not leaked.
//
QuicTraceLogConnVerbose(
AbandonOnLibShutdown,
Connection,
"Abandoning on shutdown");
QuicConnOnShutdownComplete(Connection);
}
QuicConnRelease(Connection, QUIC_CONN_REF_WORKER);
--Dequeue;
}
QuicPerfCounterAdd(QUIC_PERF_COUNTER_CONN_QUEUE_DEPTH, Dequeue);
while ((LoopState = QuicWorkerLoop(Worker, ThreadID, LoopReason, &Delay)) != QUIC_WORKER_LOOP_EXIT) {

Dequeue = 0;
while (!CxPlatListIsEmpty(&Worker->Operations)) {
QUIC_OPERATION* Operation =
CXPLAT_CONTAINING_RECORD(
CxPlatListRemoveHead(&Worker->Operations), QUIC_OPERATION, Link);
#if DEBUG
Operation->Link.Flink = NULL;
#endif
QuicOperationFree(Worker, Operation);
--Dequeue;
if (LoopState == QUIC_WORKER_LOOP_CONTINUE) {
LoopReason = QUIC_WORKER_LOOP_REASON_NORMAL;

} else if (LoopState == QUIC_WORKER_LOOP_WAIT) {
CxPlatEventWaitForever(Worker->Ready);
LoopReason = QUIC_WORKER_LOOP_REASON_WAIT;

} else {
LoopReason =
CxPlatEventWaitWithTimeout(Worker->Ready, (uint32_t)Delay) ?
QUIC_WORKER_LOOP_REASON_WAIT : QUIC_WORKER_LOOP_REASON_TIMER;
}
}
QuicPerfCounterAdd(QUIC_PERF_COUNTER_WORK_OPER_QUEUE_DEPTH, Dequeue);

QuicTraceEvent(
WorkerStop,
Expand Down
5 changes: 0 additions & 5 deletions src/core/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,6 @@ typedef struct QUIC_CACHEALIGN QUIC_WORKER {
//
uint16_t IdealProcessor;

//
// The identifier of the platform thread.
//
CXPLAT_THREAD_ID ThreadID;

//
// The average queue delay connections experience, in microseconds.
//
Expand Down
6 changes: 3 additions & 3 deletions src/generated/linux/worker.c.clog.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ tracepoint(CLOG_WORKER_C, IndicateIdealProcChanged , arg1);\
// Decoder Ring for AbandonOnLibShutdown
// [conn][%p] Abandoning on shutdown
// QuicTraceLogConnVerbose(
AbandonOnLibShutdown,
Connection,
"Abandoning on shutdown");
AbandonOnLibShutdown,
Connection,
"Abandoning on shutdown");
// arg1 = arg1 = Connection
----------------------------------------------------------*/
#define _clog_3_ARGS_TRACE_AbandonOnLibShutdown(uniqueId, arg1, encoded_arg_string)\
Expand Down
6 changes: 3 additions & 3 deletions src/generated/linux/worker.c.clog.h.lttng.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ TRACEPOINT_EVENT(CLOG_WORKER_C, IndicateIdealProcChanged,
// Decoder Ring for AbandonOnLibShutdown
// [conn][%p] Abandoning on shutdown
// QuicTraceLogConnVerbose(
AbandonOnLibShutdown,
Connection,
"Abandoning on shutdown");
AbandonOnLibShutdown,
Connection,
"Abandoning on shutdown");
// arg1 = arg1 = Connection
----------------------------------------------------------*/
TRACEPOINT_EVENT(CLOG_WORKER_C, AbandonOnLibShutdown,
Expand Down