Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support to use adios2's flatten_step #5634

Merged
merged 13 commits into from
Mar 3, 2025
1 change: 1 addition & 0 deletions Source/Diagnostics/WarpXOpenPMD.H
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ private:
}
}

void flushCurrent(bool isBTD) const;

/** This function does initial setup for the fields when interation is newly created
* @param[in] meshes The meshes in a series
Expand Down
39 changes: 34 additions & 5 deletions Source/Diagnostics/WarpXOpenPMD.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ WarpXOpenPMDPlot::WarpXOpenPMDPlot (
{
m_OpenPMDoptions = detail::getSeriesOptions(operator_type, operator_parameters,
engine_type, engine_parameters);
amrex::Print()<<".... openPMD options used: "<<m_OpenPMDoptions<<" \n";
}

WarpXOpenPMDPlot::~WarpXOpenPMDPlot ()
Expand All @@ -402,6 +403,23 @@ WarpXOpenPMDPlot::~WarpXOpenPMDPlot ()
}
}


void WarpXOpenPMDPlot::flushCurrent(bool isBTD) const
{
WARPX_PROFILE("WarpXOpenPMDPlot::flushCurrent");
// open files from all processors, in case some will not contribute below
// m_Series->flush(); ## NOTE flush only flushes if data is dirty. When the underlying function is collective
// ## like PDW, there will be traouble.
// the change will be use PDW when not BTD, and Put if BTD to avoid slowing down due to multiple writes of small data
openPMD::Iteration currIteration = GetIteration(m_CurrentStep, isBTD);
if (isBTD) {
currIteration.seriesFlush( "adios2.engine.preferred_flush_target = \"buffer\"" );
}
else {
currIteration.seriesFlush();
}
}

std::string
WarpXOpenPMDPlot::GetFileName (std::string& filepath)
{
Expand Down Expand Up @@ -532,7 +550,6 @@ WarpXOpenPMDPlot::WriteOpenPMDParticles (const amrex::Vector<ParticleDiag>& part
WARPX_PROFILE("WarpXOpenPMDPlot::WriteOpenPMDParticles()");

for (const auto & particle_diag : particle_diags) {

WarpXParticleContainer* pc = particle_diag.getParticleContainer();
PinnedMemoryParticleContainer* pinned_pc = particle_diag.getPinnedParticleContainer();
if (isBTD || use_pinned_pc) {
Expand Down Expand Up @@ -642,6 +659,16 @@ for (const auto & particle_diag : particle_diags) {
pc->getCharge(), pc->getMass(),
isBTD, isLastBTDFlush);
}

auto hasOption=m_OpenPMDoptions.find("FlattenSteps");
const bool flattenSteps = isBTD && (m_Series->backend() == "ADIOS2") && (hasOption != std::string::npos);

if (flattenSteps)
{
// forcing new step so data from each btd batch can be flushed out
openPMD::Iteration currIteration = GetIteration(m_CurrentStep, isBTD);
currIteration.seriesFlush(R"(adios2.engine.preferred_flush_target = "new_step")");
}
}

void
Expand All @@ -658,6 +685,7 @@ WarpXOpenPMDPlot::DumpToFile (ParticleContainer* pc,
const bool isLastBTDFlush
)
{
WARPX_PROFILE("WarpXOpenPMDPlot::DumpToFile()");
WARPX_ALWAYS_ASSERT_WITH_MESSAGE(m_Series != nullptr, "openPMD: series must be initialized");

AMREX_ALWAYS_ASSERT(write_real_comp.size() == pc->NumRealComps());
Expand Down Expand Up @@ -716,8 +744,7 @@ WarpXOpenPMDPlot::DumpToFile (ParticleContainer* pc,
SetConstParticleRecordsEDPIC(currSpecies, positionComponents, NewParticleVectorSize, charge, mass);
}

// open files from all processors, in case some will not contribute below
m_Series->flush();
flushCurrent(isBTD);

// dump individual particles
bool contributed_particles = false; // did the local MPI rank contribute particles?
Expand Down Expand Up @@ -758,6 +785,7 @@ WarpXOpenPMDPlot::DumpToFile (ParticleContainer* pc,
// BP4 (ADIOS 2.8): last MPI rank's `Put` meta-data wins
// BP5 (ADIOS 2.8): everyone has to write an empty block
if (is_resizing_flush && !contributed_particles && isBTD && m_Series->backend() == "ADIOS2") {
WARPX_PROFILE("WarpXOpenPMDPlot::ResizeInADIOS()");
for( auto & [record_name, record] : currSpecies ) {
for( auto & [comp_name, comp] : record ) {
if (comp.constant()) { continue; }
Expand Down Expand Up @@ -797,7 +825,7 @@ WarpXOpenPMDPlot::DumpToFile (ParticleContainer* pc,
}
}

m_Series->flush();
flushCurrent(isBTD);
}

void
Expand Down Expand Up @@ -1469,7 +1497,7 @@ WarpXOpenPMDPlot::WriteOpenPMDFieldsAll ( //const std::string& filename,
amrex::Gpu::streamSynchronize();
#endif
// Flush data to disk after looping over all components
m_Series->flush();
flushCurrent(isBTD);
} // levels loop (i)
}
#endif // WARPX_USE_OPENPMD
Expand All @@ -1483,6 +1511,7 @@ WarpXParticleCounter::WarpXParticleCounter (ParticleContainer* pc):
m_MPIRank{amrex::ParallelDescriptor::MyProc()},
m_MPISize{amrex::ParallelDescriptor::NProcs()}
{
WARPX_PROFILE("WarpXOpenPMDPlot::ParticleCounter()");
m_ParticleCounterByLevel.resize(pc->finestLevel()+1);
m_ParticleOffsetAtRank.resize(pc->finestLevel()+1);
m_ParticleSizeAtRank.resize(pc->finestLevel()+1);
Expand Down
Loading