Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

single-source multi-destination aggregation #96

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
3 changes: 3 additions & 0 deletions system/Communicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ extern HeapLeakChecker * Grappa_heapchecker;
/// Global communicator instance
Communicator global_communicator;

gasnet_token_t global_gasnet_token = 0;


/// declare labels for histogram
std::string CommunicatorStatistics::hist_labels[16] = {
"\"comm_0_to_255_bytes\"",
Expand Down
24 changes: 24 additions & 0 deletions system/Communicator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ typedef Core Node;
/// some variables from gasnet
extern size_t gasnetc_inline_limit;

//typedef void *gasnet_token_t;
extern gasnet_token_t global_gasnet_token;

/* from gasnet_internal.c:
* gasneti_nodemap_local_count = number of GASNet nodes collocated w/ gasneti_mynode
* gasneti_nodemap_local_rank = rank of gasneti_mynode among gasneti_nodemap_local_count
Expand Down Expand Up @@ -440,6 +443,27 @@ class Communicator {
GASNET_CHECK( gasnet_AMRequestMedium0( node, handler, buf, size ) );
}

/// Reply with no-argment active message with payload
inline void reply( gasnet_token_t token, int handler, void * buf, size_t size ) {
assert( communication_is_allowed_ );
assert( size < maximum_message_payload_size ); // make sure payload isn't too big
stats.record_message( size );
#ifdef VTRACE_FULL
VT_COUNT_UNSIGNED_VAL( send_ev_vt, size );
#endif
GASNET_CHECK( gasnet_AMReplyMedium0( token, handler, buf, size ) );
}

/// Reply with no-argment active message with payload
inline void reply( gasnet_token_t token, int handler ) {
assert( communication_is_allowed_ );
stats.record_message( 0 );
#ifdef VTRACE_FULL
VT_COUNT_UNSIGNED_VAL( send_ev_vt, size );
#endif
GASNET_CHECK( gasnet_AMReplyShort0( token, handler ) );
}

/// Send no-argment active message with payload. This only allows
/// messages will be immediately copied to the HCA.
/// TODO: can we avoid the copy onto gasnet's buffer? This is so small it probably doesn't matter.
Expand Down
11 changes: 11 additions & 0 deletions system/ConditionVariableLocal.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,17 @@ namespace Grappa {
}
}

/// Wake one waiter on a condition variable.
template< typename ConditionVariable >
inline void signal_hip( ConditionVariable * cv ) {
Worker * to_wake = Grappa::impl::get_waiters( cv );
if( to_wake != NULL ) {
Grappa::impl::set_waiters( cv, to_wake->next );
to_wake->next = NULL;
impl::global_scheduler.thread_hip_wake( to_wake );
}
}

/// Wake all waiters on a condition variable.
template< typename ConditionVariable >
inline void broadcast( ConditionVariable * cv ) {
Expand Down
3 changes: 2 additions & 1 deletion system/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ ifdef GOOGLE_PROFILER
#NOTE: no output is generated by default unless you call the API or set this variable:
#ENV_VARIABLES+= CPUPROFILE="\$$SLURM_JOB_NAME.\$$SLURM_JOB_ID.\$$SLURM_PROCID.prof"
# how often should we sample? (interrupts/sec)
ENV_VARIABLES+= CPUPROFILE_FREQUENCY=100
ENV_VARIABLES+= CPUPROFILE_FREQUENCY=10
CFLAGS+= -fno-omit-frame-pointer -DGOOGLE_PROFILER
LIBS+= -lprofiler
endif
Expand All @@ -266,6 +266,7 @@ endif
ifdef VTRACE_SAMPLED
#ENV_VARIABLES+= VT_VERBOSE=10
#ENV_VARIABLES+= VT_BUFFER_SIZE=512M
ENV_VARIABLES+= VT_FILE_UNIQUE=yes
ENV_VARIABLES+= VT_MAX_FLUSHES=0
ENV_VARIABLES+= VT_PFORM_GDIR=.
ENV_VARIABLES+= VT_PFORM_LDIR=/scratch
Expand Down
2 changes: 2 additions & 0 deletions system/MessageBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

#include "ConditionVariable.hpp"

GRAPPA_DEFINE_STAT( SimpleStatistic<int64_t>, mark_sent_enqueues, 0 );

namespace Grappa {

/// Internal messaging functions
Expand Down
14 changes: 10 additions & 4 deletions system/MessageBase.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

typedef int16_t Core;

GRAPPA_DECLARE_STAT( SimpleStatistic<int64_t>, mark_sent_enqueues );

namespace Grappa {

/// Internal messaging functions
Expand Down Expand Up @@ -72,10 +74,11 @@ namespace Grappa {

//if( Grappa::mycore() != source_ ) {
if( (is_delivered_ == true) && (Grappa::mycore() != source_) ) {
DVLOG(5) << __func__ << ": " << this << " Re-enqueuing to " << source_;
DCHECK_EQ( this->is_sent_, false );
enqueue( source_ );

// DVLOG(5) << __func__ << ": " << this << " Re-enqueuing to " << source_;
// DCHECK_EQ( this->is_sent_, false );
// mark_sent_enqueues++;
// // should only happen for deaggregation now, so enqueue to locale
// locale_enqueue( source_ );
} else {
DVLOG(5) << __func__ << ": " << this << " Final mark_sent";
DCHECK_EQ( Grappa::mycore(), this->source_ );
Expand Down Expand Up @@ -234,6 +237,9 @@ namespace Grappa {
inline void send_immediate();
inline void send_immediate( Core c );

inline void reply_immediate( gasnet_token_t token );
inline void reply_immediate( gasnet_token_t token, Core c );

virtual void deliver_locally() = 0;

inline void delete_after_send() {
Expand Down
17 changes: 16 additions & 1 deletion system/MessageBaseImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ namespace Grappa {

is_enqueued_ = true;
DVLOG(5) << this << " on " << Grappa::impl::global_scheduler.get_current_thread() << ": " << this->typestr()
<< " enqueuing to " << destination_ << " with is_enqueued_=" << is_enqueued_ << " and is_sent_= " << is_sent_;
<< " enqueuing to " << destination_ << " from " << source_ << " with is_enqueued_=" << is_enqueued_ << " and is_sent_= " << is_sent_;
#ifndef LEGACY_SEND
Grappa::impl::global_rdma_aggregator.enqueue( this, true );
#endif
Expand Down Expand Up @@ -98,6 +98,21 @@ namespace Grappa {
send_immediate();
}

inline void Grappa::impl::MessageBase::reply_immediate( gasnet_token_t token ) {
CHECK( !is_moved_ ) << "Shouldn't be sending a message that has been moved!"
<< " Your compiler's return value optimization failed you here.";
if( !is_enqueued_ ) source_ = global_communicator.mycore();
is_enqueued_ = true;
is_delivered_ = true;
#ifndef LEGACY_SEND
Grappa::impl::global_rdma_aggregator.reply_immediate( this, token );
#endif
#ifdef LEGACY_SEND
CHECK_EQ( true, false ) << "not supported";
legacy_send();
#endif
}

/// @}
}
}
Expand Down
2 changes: 1 addition & 1 deletion system/PerformanceTools.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ char * Grappa_get_next_profiler_filename( ) {
char * jobid = getenv("SLURM_JOB_ID");
char * procid = getenv("SLURM_PROCID");
if( /*jobname != NULL &&*/ jobid != NULL && procid != NULL ) {
sprintf( profiler_filename, "%s.%s.rank%s.phase%d.prof", "exe", jobid, procid, profiler_phase );
sprintf( profiler_filename, "%s.%s.rank%s.phase%d.prof", argv0_for_profiler, jobid, procid, profiler_phase );
} else {
sprintf( profiler_filename, "%s.%d.pid%d.phase%d.prof", argv0_for_profiler, time_for_profiler, getpid(), profiler_phase );
}
Expand Down
Loading