Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support to use adios2's flatten_step #5634

Merged
merged 13 commits into from
Mar 3, 2025
25 changes: 16 additions & 9 deletions Docs/source/usage/parameters.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2773,7 +2773,7 @@ In-situ capabilities can be used by turning on Sensei or Ascent (provided they a
Only read if ``<diag_name>.format = sensei``.
When 1 lower left corner of the mesh is pinned to 0.,0.,0.

* ``<diag_name>.openpmd_backend`` (``bp``, ``h5`` or ``json``) optional, only used if ``<diag_name>.format = openpmd``
* ``<diag_name>.openpmd_backend`` (``bp5``, ``bp4``, ``h5`` or ``json``) optional, only used if ``<diag_name>.format = openpmd``
`I/O backend <https://openpmd-api.readthedocs.io/en/latest/backends/overview.html>`_ for `openPMD <https://www.openPMD.org>`_ data dumps.
``bp`` is the `ADIOS I/O library <https://csmd.ornl.gov/adios>`_, ``h5`` is the `HDF5 format <https://www.hdfgroup.org/solutions/hdf5/>`_, and ``json`` is a `simple text format <https://en.wikipedia.org/wiki/JSON>`_.
``json`` only works with serial/single-rank jobs.
Expand All @@ -2795,19 +2795,26 @@ In-situ capabilities can be used by turning on Sensei or Ascent (provided they a

.. code-block:: text

<diag_name>.adios2_operator.type = blosc
<diag_name>.adios2_operator.parameters.compressor = zstd
<diag_name>.adios2_operator.parameters.clevel = 1
<diag_name>.adios2_operator.parameters.doshuffle = BLOSC_BITSHUFFLE
<diag_name>.adios2_operator.parameters.threshold = 2048
<diag_name>.adios2_operator.parameters.nthreads = 6 # per MPI rank (and thus per GPU)
<diag_name>.adios2_operator.type = blosc
<diag_name>.adios2_operator.parameters.compressor = zstd
<diag_name>.adios2_operator.parameters.clevel = 1
<diag_name>.adios2_operator.parameters.doshuffle = BLOSC_BITSHUFFLE
<diag_name>.adios2_operator.parameters.threshold = 2048
<diag_name>.adios2_operator.parameters.nthreads = 6 # per MPI rank (and thus per GPU)

or for the lossy ZFP compressor using very strong compression per scalar:

.. code-block:: text

<diag_name>.adios2_operator.type = zfp
<diag_name>.adios2_operator.parameters.precision = 3
<diag_name>.adios2_operator.type = zfp
<diag_name>.adios2_operator.parameters.precision = 3

For back-transformed diagnostics with ADIOS BP5, we are experimenting with a new option for variable-based encoding that "flattens" the output steps, aiming to increase write and read performance:

.. code-block:: text

<diag_name>.openpmd_backend = bp5
<diag_name>.adios2_engine.parameters.FlattenSteps = on
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guj does this only have an effect for variable based encoding or generally (e.g., file based encoding)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

general

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but right now flatten steps is only applied to BTD


* ``<diag_name>.adios2_engine.type`` (``bp4``, ``sst``, ``ssc``, ``dataman``) optional,
`ADIOS2 Engine type <https://openpmd-api.readthedocs.io/en/0.16.1/details/backendconfig.html#adios2>`__ for `openPMD <https://www.openPMD.org>`_ data dumps.
Expand Down
13 changes: 13 additions & 0 deletions Source/Diagnostics/WarpXOpenPMD.H
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,19 @@ private:
}
}

/** Flushing out data of the current openPMD iteration
*
* @param[in] isBTD if the current diagnostic is BTD
*
* if isBTD=false, apply the default flush behaviour
* if isBTD=true, advice to use ADIOS Put() instead of PDW for better performance.
*
* iteration.seriesFlush() is used instead of series.flush()
* because the latter flushes only if data is dirty
* this causes trouble when the underlying writing function is collective (like PDW)
*
*/
void flushCurrent (bool isBTD) const;

/** This function does initial setup for the fields when interation is newly created
* @param[in] meshes The meshes in a series
Expand Down
40 changes: 35 additions & 5 deletions Source/Diagnostics/WarpXOpenPMD.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,24 @@ WarpXOpenPMDPlot::~WarpXOpenPMDPlot ()
}
}

void WarpXOpenPMDPlot::flushCurrent (bool isBTD) const
{
WARPX_PROFILE("WarpXOpenPMDPlot::flushCurrent");

auto hasOption = m_OpenPMDoptions.find("FlattenSteps");
const bool flattenSteps = isBTD && (m_Series->backend() == "ADIOS2") && (hasOption != std::string::npos);

openPMD::Iteration currIteration = GetIteration(m_CurrentStep, isBTD);
if (flattenSteps) {
// delayed until all fields and particles are registered for flush
// and dumped once via flattenSteps
currIteration.seriesFlush( "adios2.engine.preferred_flush_target = \"buffer\"" );
}
else {
currIteration.seriesFlush();
}
}

std::string
WarpXOpenPMDPlot::GetFileName (std::string& filepath)
{
Expand Down Expand Up @@ -532,7 +550,6 @@ WarpXOpenPMDPlot::WriteOpenPMDParticles (const amrex::Vector<ParticleDiag>& part
WARPX_PROFILE("WarpXOpenPMDPlot::WriteOpenPMDParticles()");

for (const auto & particle_diag : particle_diags) {

WarpXParticleContainer* pc = particle_diag.getParticleContainer();
PinnedMemoryParticleContainer* pinned_pc = particle_diag.getPinnedParticleContainer();
if (isBTD || use_pinned_pc) {
Expand Down Expand Up @@ -642,6 +659,17 @@ for (const auto & particle_diag : particle_diags) {
pc->getCharge(), pc->getMass(),
isBTD, isLastBTDFlush);
}

auto hasOption = m_OpenPMDoptions.find("FlattenSteps");
const bool flattenSteps = isBTD && (m_Series->backend() == "ADIOS2") && (hasOption != std::string::npos);

if (flattenSteps)
{
// forcing new step so data from each btd batch in
// preferred_flush_target="buffer" can be flushed out
openPMD::Iteration currIteration = GetIteration(m_CurrentStep, isBTD);
currIteration.seriesFlush(R"(adios2.engine.preferred_flush_target = "new_step")");
}
}

void
Expand All @@ -658,6 +686,7 @@ WarpXOpenPMDPlot::DumpToFile (ParticleContainer* pc,
const bool isLastBTDFlush
)
{
WARPX_PROFILE("WarpXOpenPMDPlot::DumpToFile()");
WARPX_ALWAYS_ASSERT_WITH_MESSAGE(m_Series != nullptr, "openPMD: series must be initialized");

AMREX_ALWAYS_ASSERT(write_real_comp.size() == pc->NumRealComps());
Expand Down Expand Up @@ -716,8 +745,7 @@ WarpXOpenPMDPlot::DumpToFile (ParticleContainer* pc,
SetConstParticleRecordsEDPIC(currSpecies, positionComponents, NewParticleVectorSize, charge, mass);
}

// open files from all processors, in case some will not contribute below
m_Series->flush();
flushCurrent(isBTD);

// dump individual particles
bool contributed_particles = false; // did the local MPI rank contribute particles?
Expand Down Expand Up @@ -758,6 +786,7 @@ WarpXOpenPMDPlot::DumpToFile (ParticleContainer* pc,
// BP4 (ADIOS 2.8): last MPI rank's `Put` meta-data wins
// BP5 (ADIOS 2.8): everyone has to write an empty block
if (is_resizing_flush && !contributed_particles && isBTD && m_Series->backend() == "ADIOS2") {
WARPX_PROFILE("WarpXOpenPMDPlot::ResizeInADIOS()");
for( auto & [record_name, record] : currSpecies ) {
for( auto & [comp_name, comp] : record ) {
if (comp.constant()) { continue; }
Expand Down Expand Up @@ -797,7 +826,7 @@ WarpXOpenPMDPlot::DumpToFile (ParticleContainer* pc,
}
}

m_Series->flush();
flushCurrent(isBTD);
}

void
Expand Down Expand Up @@ -1469,7 +1498,7 @@ WarpXOpenPMDPlot::WriteOpenPMDFieldsAll ( //const std::string& filename,
amrex::Gpu::streamSynchronize();
#endif
// Flush data to disk after looping over all components
m_Series->flush();
flushCurrent(isBTD);
} // levels loop (i)
}
#endif // WARPX_USE_OPENPMD
Expand All @@ -1483,6 +1512,7 @@ WarpXParticleCounter::WarpXParticleCounter (ParticleContainer* pc):
m_MPIRank{amrex::ParallelDescriptor::MyProc()},
m_MPISize{amrex::ParallelDescriptor::NProcs()}
{
WARPX_PROFILE("WarpXOpenPMDPlot::ParticleCounter()");
m_ParticleCounterByLevel.resize(pc->finestLevel()+1);
m_ParticleOffsetAtRank.resize(pc->finestLevel()+1);
m_ParticleSizeAtRank.resize(pc->finestLevel()+1);
Expand Down
Loading