Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatic poll tick estimation (WIP) #232

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 5 additions & 9 deletions system/tasks/TaskingScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,8 @@
#include "../PerformanceTools.hpp"

/// TODO: this should be based on some actual time-related metric so behavior is predictable across machines
DEFINE_int64( periodic_poll_ticks, 0, "number of ticks to wait before polling periodic queue for one core (set to 0 for auto-growth)");
DEFINE_int64( periodic_poll_ticks_base, 28000, "number of ticks to wait before polling periodic queue for one core (see _growth for increase)");
DEFINE_int64( periodic_poll_ticks_growth, 281, "number of ticks to add per core");
DEFINE_int64( periodic_poll_ticks, 0, "number of ticks to wait before polling periodic queue for one core (set to 0 for dynamic balance using poll_factor)");
DEFINE_double( poll_factor, 6.0, "ratio between time spent working and time spent polling network (default 6:1)");

DEFINE_bool(poll_on_idle, true, "have tasking layer poll aggregator if it has nothing better to do");

Expand Down Expand Up @@ -85,6 +84,7 @@ TaskingScheduler global_scheduler;
, work_args( NULL )
, previous_periodic_ts( 0 )
, periodic_poll_ticks( 0 )
, dynamic_poll_ticks( true )
, in_no_switch_region_( false )
, prev_ts( 0 )
, prev_stats_blob_ts( 0 )
Expand All @@ -101,13 +101,9 @@ void TaskingScheduler::init ( Worker * master_arg, TaskManager * taskman ) {
current_thread = master;
task_manager = taskman;
work_args = new task_worker_args( taskman, this );
if( 0 == FLAGS_periodic_poll_ticks ) {
periodic_poll_ticks = FLAGS_periodic_poll_ticks_base + global_communicator.cores * FLAGS_periodic_poll_ticks_growth;
if( 0 == global_communicator.mycore ) {
VLOG(2) << "Actual periodic poll ticks value is " << periodic_poll_ticks;
}
} else {
if( 0 != FLAGS_periodic_poll_ticks ) {
periodic_poll_ticks = FLAGS_periodic_poll_ticks;
dynamic_poll_ticks = false;
}
}

Expand Down
52 changes: 39 additions & 13 deletions system/tasks/TaskingScheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@
GRAPPA_DECLARE_METRIC( SimpleMetric<uint64_t>, scheduler_context_switches );
GRAPPA_DECLARE_METRIC( SimpleMetric<uint64_t>, scheduler_count);



// forward declarations
namespace Grappa {
namespace impl { void idle_flush_rdma_aggregator(); }
Expand All @@ -71,6 +69,8 @@ namespace Metrics { void sample_all(); }
bool idle_flush_aggregator();

DECLARE_int64( periodic_poll_ticks );
DECLARE_double( poll_factor );

DECLARE_bool(poll_on_idle);
DECLARE_bool(flush_on_idle);
DECLARE_bool(rdma_flush_on_idle);
Expand Down Expand Up @@ -130,8 +130,9 @@ class TaskingScheduler : public Scheduler {
// STUB: replace with real periodic threads
Grappa::Timestamp previous_periodic_ts;
Grappa::Timestamp periodic_poll_ticks;
bool dynamic_poll_ticks;
inline bool should_run_periodic( Grappa::Timestamp current_ts ) {
return current_ts - previous_periodic_ts > periodic_poll_ticks;
return (current_ts - previous_periodic_ts) > periodic_poll_ticks;
}

Worker * periodicDequeue(Grappa::Timestamp current_ts) {
Expand All @@ -140,6 +141,9 @@ class TaskingScheduler : public Scheduler {
// Grappa::Timestamp current_ts = Grappa::timestamp();

if( should_run_periodic( current_ts ) ) {
// record time at start of periodic worker execution
previous_periodic_ts = Grappa::force_tick();
// run periodic thread
return periodicQ.dequeue();
} else {
return NULL;
Expand Down Expand Up @@ -193,16 +197,19 @@ class TaskingScheduler : public Scheduler {
// Grappa::Metrics::dump_stats_blob();
// }

// check for periodic tasks
result = periodicDequeue(current_ts);
if (result != NULL) {
// DVLOG(5) << current_thread->id << " scheduler: pick periodic";
*(stats.state_timers[ stats.prev_state ]) += (current_ts - prev_ts) / tick_scale;
stats.prev_state = TaskingSchedulerMetrics::StatePoll;
prev_ts = current_ts;
return result;
// Check for periodic tasks. Skip if we ran the periodic
// thread in the last scheduling cycle to give new tasks a
// chance to start.
if( stats.prev_state != TaskingSchedulerMetrics::StatePoll ) {
result = periodicDequeue(current_ts);
if (result != NULL) {
// DVLOG(5) << current_thread->id << " scheduler: pick periodic";
*(stats.state_timers[ stats.prev_state ]) += (current_ts - prev_ts) / tick_scale;
stats.prev_state = TaskingSchedulerMetrics::StatePoll;
prev_ts = current_ts;
return result;
}
}


// check ready tasks
result = readyQ.dequeue();
Expand Down Expand Up @@ -379,8 +386,27 @@ class TaskingScheduler : public Scheduler {
/// Put the Worker into the periodic queue
void periodic( Worker * thr ) {
periodicQ.enqueue( thr );
Grappa::tick();

// record time at end of periodic worker execution
Grappa::Timestamp current_ts = Grappa::force_tick();

if( dynamic_poll_ticks ) { // set next timeout based on this poll time
// first make sure we have a starting timestamp.
if( previous_periodic_ts == 0 ) {
previous_periodic_ts = current_ts;
}

// now figure out how long polling took
auto poll_ticks = current_ts - previous_periodic_ts;


if( poll_ticks > 0 ) {
// assuming the timestamp counter hasn't rolled over,
// compute next time to run polling thread
periodic_poll_ticks = static_cast<int64_t>( FLAGS_poll_factor * poll_ticks );
}
}

previous_periodic_ts = current_ts;
}

Expand Down