Program Listing for File MPIEnsemble.h
↰ Return to documentation for file (include/flamegpu/simulation/detail/MPIEnsemble.h
)
#ifndef INCLUDE_FLAMEGPU_SIMULATION_DETAIL_MPIENSEMBLE_H_
#define INCLUDE_FLAMEGPU_SIMULATION_DETAIL_MPIENSEMBLE_H_
#include <mpi.h>
#include <map>
#include <set>
#include <string>
#include <mutex>
#include <vector>
#include "flamegpu/simulation/CUDAEnsemble.h"
#include "flamegpu/simulation/detail/MPISimRunner.h"
namespace flamegpu {
namespace detail {
class MPIEnsemble {
const CUDAEnsemble::EnsembleConfig &config;
// Tags to different the MPI messages used in protocol
enum EnvelopeTag : int {
// Sent from worker to manager to request a job index to process
RequestJob = 0,
// Sent from manager to worker to assign a job index to process in response to AssignJob
AssignJob = 1,
// Sent from worker to manager to report an error during job execution
// If fail fast is enabled, following RequestJob will receive an exit job id (>=plans.size())
ReportError = 2,
// Sent from worker to manager to report GPUs for telemetry
TelemetryDevices = 3,
};
public:
const int world_rank;
const int world_size;
const int local_rank;
const int local_size;
const unsigned int total_runs;
explicit MPIEnsemble(const CUDAEnsemble::EnsembleConfig &_config, unsigned int _total_runs);
int receiveErrors(std::multimap<int, AbstractSimRunner::ErrorDetail> &err_detail);
int receiveJobRequests(unsigned int &next_run);
void sendErrorDetail(AbstractSimRunner::ErrorDetail &e_detail);
int requestJob();
void worldBarrier();
std::string assembleGPUsString();
void retrieveLocalErrorDetail(std::mutex &log_export_queue_mutex,
std::multimap<int, AbstractSimRunner::ErrorDetail> &err_detail,
std::vector<AbstractSimRunner::ErrorDetail> &err_detail_local, int i, std::set<int> devices);
bool createParticipatingCommunicator(bool isParticipating);
int getRankIsParticipating() { return this->rank_is_participating; }
int getParticipatingCommSize() { return this->participating_size; }
int getParticipatingCommRank() { return this->participating_rank; }
static std::set<int> devicesForThisRank(std::set<int> devicesToSelectFrom, int local_size, int local_rank);
std::set<int> devicesForThisRank(std::set<int> devicesToSelectFrom);
private:
static int queryMPIWorldRank();
static int queryMPIWorldSize();
static int queryMPISharedGroupRank();
static int queryMPISharedGroupSize();
static void initMPI();
unsigned int getDeviceIndex(const int j, std::set<int> devices);
const MPI_Datatype MPI_ERROR_DETAIL;
bool rank_is_participating;
MPI_Comm comm_participating;
int participating_size;
int participating_rank;
};
} // namespace detail
} // namespace flamegpu
#endif // INCLUDE_FLAMEGPU_SIMULATION_DETAIL_MPIENSEMBLE_H_