Skip to content

Commit

Permalink
Fix parallel handling of data in repeated trajectory runs for 'rotate…
Browse files Browse the repository at this point in the history
…' and 'average' (Amber-MD#1113)

* Add test for handling generated data set vs read-in dataset

* Test that a generated set can be used as a read-in set once processing
is complete.

* Add broadcast function

* Ensure set is properly broadcast if it was previously generated but we
are using it in a new trajectory processing run

* Enable parallel dataset handling test

* Add broadcast to the coords frames data set. Make sure we use long int when broadcasting dataset size.

* Add data broadcast to frames coords set

* No need for a broadcast, all data is on the disk

* Add broadcast

* Add broadcast for string set

* Add Sync and Bcast to tensor dataset

* Add bcast

* Add broadcast for vector data

* Add bcast

* Ensure 'average' broadcasts the final average coords set to non-master
ranks for future use. Addresses Amber-MD#1096.

* Version 6.29.7. Revision bump for fixed broadcasting of data created
during trajectory processing in a subsequent trajectory processing run
for rotate and average commands.

---------

Co-authored-by: Daniel R. Roe <[email protected]>
  • Loading branch information
drroe and Daniel R. Roe authored Nov 5, 2024
1 parent b41f221 commit 65a2fbe
Show file tree
Hide file tree
Showing 37 changed files with 613 additions and 15 deletions.
14 changes: 14 additions & 0 deletions src/Action_Average.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,20 @@ int Action_Average::SyncAction() {
AvgFrame_.SumToMaster(trajComm_);
if (trajComm_.Master())
Nframes_ = total_frames;
// If setting up a COORDS set, do it here for non-master ranks since Print()
// is only called by the master.
if (crdset_ != 0) {
trajComm_.MasterBcast( &Nframes_, 1, MPI_INT );
AvgFrame_.BcastFrame( trajComm_ );
// Do non-master rank setup of the DataSet here. Master will still do it
// in Print().
if (!trajComm_.Master()) {
AvgFrame_.Divide( (double)Nframes_ );
DataSet_Coords_REF& ref = static_cast<DataSet_Coords_REF&>( *crdset_ );
ref.CoordsSetup( AvgParm_, CoordinateInfo() ); // Coords Only
ref.AddFrame( AvgFrame_ );
}
}
return 0;
}
#endif
Expand Down
6 changes: 5 additions & 1 deletion src/Action_Rotate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ int Action_Rotate::Get3x3Set(DataSetList const& DSL, std::string const& dsname)
mprinterr("Error: No 3x3 matrices data set '%s'\n", dsname.c_str());
return 1;
}
parallelNum_.SetForParallel( rmatrices_ );
if (parallelNum_.SetForParallel( rmatrices_ ))
return 1;
return 0;
}

Expand Down Expand Up @@ -77,6 +78,9 @@ int Action_Rotate::SetupOutputSets(DataSetList& DSL, std::string const& dsname,
/** Initialize action. */
Action::RetType Action_Rotate::Init(ArgList& actionArgs, ActionInit& init, int debugIn)
{
# ifdef MPI
parallelNum_.SetComm( init.TrajComm() );
# endif
double xrot = 0.0, yrot = 0.0, zrot = 0.0;
DataFile* outfile = 0;
std::string output_setname;
Expand Down
4 changes: 2 additions & 2 deletions src/DataSet_Coords_CRD.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,8 @@ int DataSet_Coords_CRD::Sync(size_t total, std::vector<int> const& rank_frames,
int DataSet_Coords_CRD::Bcast(Parallel::Comm const& commIn) {
if (commIn.Size() == 1) return 0;
// Assume all data is currently on the master process.
int totalSize = Size();
int err = commIn.MasterBcast( &totalSize, 1, MPI_INT );
long int totalSize = Size();
int err = commIn.MasterBcast( &totalSize, 1, MPI_LONG );
if (!commIn.Master()) {
//rprintf("DEBUG: Resizing array to %i\n", totalSize);
frames_.Resize( totalSize );
Expand Down
17 changes: 17 additions & 0 deletions src/DataSet_Coords_FRM.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,23 @@ int DataSet_Coords_FRM::Sync(size_t total, std::vector<int> const& rank_frames,
}
return 0;
}

/** Broadcast data to all processes.
*/
int DataSet_Coords_FRM::Bcast(Parallel::Comm const& commIn) {
if (commIn.Size() == 1) return 0;
// Assume all data is currently on the master process.
long int totalSize = Size();
int err = commIn.MasterBcast( &totalSize, 1, MPI_LONG );
if (!commIn.Master()) {
//rprintf("DEBUG: Resizing array to %i\n", totalSize);
frames_.resize( totalSize );
}
// Broadcast each matrix separately
for (unsigned int idx = 0; idx < Size(); idx++)
err += frames_[idx].BcastFrame( commIn );
return commIn.CheckError( err );
}
#endif

/** Add a single element. */
Expand Down
2 changes: 2 additions & 0 deletions src/DataSet_Coords_FRM.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ class DataSet_Coords_FRM : public DataSet_Coords {
# ifdef MPI
/// Synchronize all data to the master process
int Sync(size_t, std::vector<int> const&, Parallel::Comm const&);
/// Ensure each process has all frames
int Bcast(Parallel::Comm const&);
# endif
/// Print info to stdout
void Info() const { return; }
Expand Down
9 changes: 9 additions & 0 deletions src/DataSet_Coords_REF.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,12 @@ int DataSet_Coords_REF::StripRef(AtomMask const& stripMask) {
delete stripParm; // OK to free, parm has been copied by CoordsSetup.
return 0;
}

#ifdef MPI
/** Broadcast the frame to all processes.
*/
int DataSet_Coords_REF::Bcast(Parallel::Comm const& commIn) {
int err = frame_.BcastFrame( commIn );
return commIn.CheckError( err );
}
#endif /*MPI*/
2 changes: 2 additions & 0 deletions src/DataSet_Coords_REF.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ class DataSet_Coords_REF : public DataSet_Coords {
size_t Size() const { if (!frame_.empty()) return 1; else return 0; }
# ifdef MPI
int Sync(size_t, std::vector<int> const&, Parallel::Comm const&) { return 1; }
/// Ensure each process has the frame
int Bcast(Parallel::Comm const&);
# endif
void Info() const;
void Add( size_t, const void* ) { return; }
Expand Down
2 changes: 2 additions & 0 deletions src/DataSet_Coords_TRJ.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ class DataSet_Coords_TRJ : public DataSet_Coords {
size_t Size() const { return IDX_.MaxFrames(); }
# ifdef MPI
int Sync(size_t, std::vector<int> const&, Parallel::Comm const&) { return 1; }
/// All data is on disk, no need to broadcast
int Bcast(Parallel::Comm const&) { return 0; }
# endif
void Info() const;
void Add( size_t, const void* ) { return; }
Expand Down
19 changes: 19 additions & 0 deletions src/DataSet_Mat3x3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,23 @@ int DataSet_Mat3x3::Sync(size_t total, std::vector<int> const& rank_frames,
}
return 0;
}

/** Broadcast data to all processes.
* NOTE: For now, do multiple separate broadcasts for each element.
* In the future this should probably be consolidated.
*/
int DataSet_Mat3x3::Bcast(Parallel::Comm const& commIn) {
if (commIn.Size() == 1) return 0;
// Assume all data is currently on the master process.
long int totalSize = Size();
int err = commIn.MasterBcast( &totalSize, 1, MPI_LONG );
if (!commIn.Master()) {
//rprintf("DEBUG: Resizing array to %i\n", totalSize);
Resize( totalSize );
}
// Broadcast each matrix separately
for (unsigned int idx = 0; idx < Size(); idx++)
err += commIn.MasterBcast( data_[idx].Dptr(), 9, MPI_DOUBLE );
return commIn.CheckError( err );
}
#endif
2 changes: 2 additions & 0 deletions src/DataSet_Mat3x3.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ class DataSet_Mat3x3 : public DataSet {
size_t Size() const { return data_.size(); }
# ifdef MPI
int Sync(size_t, std::vector<int> const&, Parallel::Comm const&);
/// Ensure each process has all frames
int Bcast(Parallel::Comm const&);
# endif
void Info() const { return; }
int Allocate(SizeArray const&);
Expand Down
38 changes: 36 additions & 2 deletions src/DataSet_Tensor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,43 @@ DataSet_Tensor::DataSet_Tensor() :
{}

#ifdef MPI
int DataSet_Tensor::Sync(size_t, std::vector<int> const&, Parallel::Comm const&)
int DataSet_Tensor::Sync(size_t total, std::vector<int> const& rank_frames,
Parallel::Comm const& commIn)
{
return 1;
if (commIn.Size()==1) return 0;
if (commIn.Master()) {
// Resize to accept data from other ranks.
Data_.resize( total );
int midx = rank_frames[0]; // Index on master
for (int rank = 1; rank < commIn.Size(); rank++) {
for (int ridx = 0; ridx != rank_frames[rank]; ridx++, midx++)
// TODO: Consolidate to 1 send/recv via arrays?
commIn.SendMaster( Data_[midx].Ptr(), 6, rank, MPI_DOUBLE );
}
} else { // Send data to master
for (unsigned int ridx = 0; ridx != Data_.size(); ++ridx)
commIn.SendMaster( Data_[ridx].Ptr(), 6, commIn.Rank(), MPI_DOUBLE );
}
return 0;
}

/** Broadcast data to all processes.
* NOTE: For now, do multiple separate broadcasts for each element.
* In the future this should probably be consolidated.
*/
int DataSet_Tensor::Bcast(Parallel::Comm const& commIn) {
if (commIn.Size() == 1) return 0;
// Assume all data is currently on the master process.
long int totalSize = Size();
int err = commIn.MasterBcast( &totalSize, 1, MPI_LONG );
if (!commIn.Master()) {
//rprintf("DEBUG: Resizing array to %i\n", totalSize);
Data_.resize( totalSize );
}
// Broadcast each tensor separately
for (unsigned int idx = 0; idx < Size(); idx++)
err += commIn.MasterBcast( Data_[idx].Ptr(), 6, MPI_DOUBLE );
return commIn.CheckError( err );
}
#endif

Expand Down
2 changes: 2 additions & 0 deletions src/DataSet_Tensor.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ class DataSet_Tensor : public DataSet {
size_t Size() const { return Data_.size(); }
# ifdef MPI
int Sync(size_t, std::vector<int> const&, Parallel::Comm const&);
/// Ensure each process has all frames
int Bcast(Parallel::Comm const&);
# endif
void Info() const { return; }
int Allocate(SizeArray const&);
Expand Down
29 changes: 29 additions & 0 deletions src/DataSet_Vector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -265,4 +265,33 @@ int DataSet_Vector::Sync(size_t total, std::vector<int> const& rank_frames,
}
return 0;
}

/** Broadcast data in given array to all processes.
* NOTE: For now, do multiple separate broadcasts for each element.
* In the future this should probably be consolidated.
*/
int DataSet_Vector::bcast_Varray(Varray& vecs, Parallel::Comm const& commIn) {
if (commIn.Size() == 1) return 0;
// Assume all data is currently on the master process.
long int totalSize = vecs.size();
int err = commIn.MasterBcast( &totalSize, 1, MPI_LONG );
if (!commIn.Master()) {
//rprintf("DEBUG: Resizing array to %i\n", totalSize);
vecs.resize( totalSize );
}
// Broadcast each vector separately
for (unsigned int idx = 0; idx < vecs.size(); idx++)
err += commIn.MasterBcast( vecs[idx].Dptr(), 3, MPI_DOUBLE );
return err;
}

/** Broadcast data to all processes. */
int DataSet_Vector::Bcast(Parallel::Comm const& commIn) {
if (commIn.Size() == 1) return 0;
int err = bcast_Varray( vectors_, commIn );
if (!origins_.empty())
err += bcast_Varray( origins_, commIn );
return commIn.CheckError( err );
}

#endif
6 changes: 6 additions & 0 deletions src/DataSet_Vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ class DataSet_Vector : public DataSet {
size_t Size() const { return vectors_.size(); }
# ifdef MPI
int Sync(size_t, std::vector<int> const&, Parallel::Comm const&);
/// Ensure each process has all frames
int Bcast(Parallel::Comm const&);
# endif
void Info() const { return; }
int Allocate(SizeArray const&);
Expand Down Expand Up @@ -64,6 +66,10 @@ class DataSet_Vector : public DataSet {
/// \return Constant for normalization via spherical harmonics addition theorem.
static double SphericalHarmonicsNorm(int);
private:
# ifdef MPI
/// Used to broadcast a Varray
static int bcast_Varray(Varray&, Parallel::Comm const&);
# endif
int order_; ///< Order for spherical harmonics calculations
Varray vectors_;
Varray origins_;
Expand Down
22 changes: 22 additions & 0 deletions src/DataSet_Vector_Scalar.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,26 @@ int DataSet_Vector_Scalar::Sync(size_t total, std::vector<int> const& rank_frame
}
return 0;
}

/** Broadcast data to all processes.
* NOTE: For now, do multiple separate broadcasts for each element.
* In the future this should probably be consolidated.
*/
int DataSet_Vector_Scalar::Bcast(Parallel::Comm const& commIn) {
if (commIn.Size() == 1) return 0;
// Assume all data is currently on the master process.
long int totalSize = Size();
int err = commIn.MasterBcast( &totalSize, 1, MPI_LONG );
if (!commIn.Master()) {
//rprintf("DEBUG: Resizing array to %i\n", totalSize);
vecs_.resize( totalSize );
vals_.resize( totalSize );
}
// Broadcast each vector separately
for (unsigned int idx = 0; idx < vecs_.size(); idx++)
err += commIn.MasterBcast( vecs_[idx].Dptr(), 3, MPI_DOUBLE );
// Broadcast data
err += commIn.MasterBcast( &vals_[0], totalSize, MPI_DOUBLE );
return commIn.CheckError( err );
}
#endif
2 changes: 2 additions & 0 deletions src/DataSet_Vector_Scalar.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ class DataSet_Vector_Scalar : public DataSet {
size_t MemUsageInBytes() const;
# ifdef MPI
int Sync(size_t, std::vector<int> const&, Parallel::Comm const&);
/// Ensure each process has all frames
int Bcast(Parallel::Comm const&);
# endif
// -------------------------------------------
Vec3 const& Vec(unsigned int i) const { return vecs_[i]; }
Expand Down
16 changes: 16 additions & 0 deletions src/DataSet_double.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,20 @@ int DataSet_double::Sync(size_t total, std::vector<int> const& rank_frames,
commIn.SendMaster( &(Data_[0]), Data_.size(), commIn.Rank(), MPI_DOUBLE );
return 0;
}

/** Broadcast data to all processes.
*/
int DataSet_double::Bcast(Parallel::Comm const& commIn) {
if (commIn.Size() == 1) return 0;
// Assume all data is currently on the master process.
long int totalSize = Size();
int err = commIn.MasterBcast( &totalSize, 1, MPI_LONG );
if (!commIn.Master()) {
//rprintf("DEBUG: Resizing array to %i\n", totalSize);
Data_.resize( totalSize );
}
// Broadcast data
err += commIn.MasterBcast( &Data_[0], totalSize, MPI_DOUBLE );
return commIn.CheckError( err );
}
#endif
2 changes: 2 additions & 0 deletions src/DataSet_double.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ class DataSet_double : public DataSet_1D {
size_t Size() const { return Data_.size(); }
# ifdef MPI
int Sync(size_t, std::vector<int> const&, Parallel::Comm const&);
/// Ensure each process has all frames
int Bcast(Parallel::Comm const&);
# endif
void Info() const { return; }
int Allocate(SizeArray const&);
Expand Down
15 changes: 15 additions & 0 deletions src/DataSet_float.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,19 @@ int DataSet_float::Sync(size_t total, std::vector<int> const& rank_frames,
commIn.SendMaster( &(Data_[0]), Data_.size(), commIn.Rank(), MPI_FLOAT );
return 0;
}
/** Broadcast data to all processes.
*/
int DataSet_float::Bcast(Parallel::Comm const& commIn) {
if (commIn.Size() == 1) return 0;
// Assume all data is currently on the master process.
long int totalSize = Size();
int err = commIn.MasterBcast( &totalSize, 1, MPI_LONG );
if (!commIn.Master()) {
//rprintf("DEBUG: Resizing array to %i\n", totalSize);
Data_.resize( totalSize );
}
// Broadcast data
err += commIn.MasterBcast( &Data_[0], totalSize, MPI_FLOAT );
return commIn.CheckError( err );
}
#endif
2 changes: 2 additions & 0 deletions src/DataSet_float.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ class DataSet_float : public DataSet_1D {
size_t Size() const { return Data_.size(); }
# ifdef MPI
int Sync(size_t, std::vector<int> const&, Parallel::Comm const&);
/// Ensure each process has all frames
int Bcast(Parallel::Comm const&);
# endif
void Info() const { return; }
int Allocate(SizeArray const&);
Expand Down
2 changes: 2 additions & 0 deletions src/DataSet_integer_disk.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ class DataSet_integer_disk : public DataSet_integer {
size_t Size() const { return nvals_; }
# ifdef MPI
int Sync(size_t, std::vector<int> const&, Parallel::Comm const&);
/// No need to broadcast, all data on disk
int Bcast(Parallel::Comm const&) { return 0; }
# endif
void Info() const;
int Allocate(SizeArray const&);
Expand Down
16 changes: 16 additions & 0 deletions src/DataSet_integer_mem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,20 @@ const
//rprintf("DEBUG: Send %zu frames toRank %i tag %i\n", Data_.size(), toRank, tag);
return commIn.Send( (void*)&(Data_[0]), Data_.size(), MPI_INT, toRank, tag );
}

/** Broadcast data to all processes.
*/
int DataSet_integer_mem::Bcast(Parallel::Comm const& commIn) {
if (commIn.Size() == 1) return 0;
// Assume all data is currently on the master process.
long int totalSize = Size();
int err = commIn.MasterBcast( &totalSize, 1, MPI_LONG );
if (!commIn.Master()) {
//rprintf("DEBUG: Resizing array to %i\n", totalSize);
Data_.resize( totalSize );
}
// Broadcast data
err += commIn.MasterBcast( &Data_[0], totalSize, MPI_INT );
return commIn.CheckError( err );
}
#endif
2 changes: 2 additions & 0 deletions src/DataSet_integer_mem.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ class DataSet_integer_mem : public DataSet_integer {
int Sync(size_t, std::vector<int> const&, Parallel::Comm const&);
int Recv(size_t, unsigned int, int, int, int, Parallel::Comm const&);
int Send(int, int, Parallel::Comm const&) const;
/// Ensure each process has all frames
int Bcast(Parallel::Comm const&);
# endif
void Info() const { return; }
int Allocate(SizeArray const&);
Expand Down
Loading

0 comments on commit 65a2fbe

Please sign in to comment.