Program Listing for File MPIEnsemble.h

Return to documentation for file (include/flamegpu/simulation/detail/MPIEnsemble.h)

#ifndef INCLUDE_FLAMEGPU_SIMULATION_DETAIL_MPIENSEMBLE_H_
#define INCLUDE_FLAMEGPU_SIMULATION_DETAIL_MPIENSEMBLE_H_

#include <mpi.h>

#include <map>
#include <set>
#include <string>
#include <mutex>
#include <vector>

#include "flamegpu/simulation/CUDAEnsemble.h"
#include "flamegpu/simulation/detail/MPISimRunner.h"

namespace flamegpu {
namespace detail {

class MPIEnsemble {
    const CUDAEnsemble::EnsembleConfig &config;
    // Tags to different the MPI messages used in protocol
    enum EnvelopeTag : int {
        // Sent from worker to manager to request a job index to process
        RequestJob = 0,
        // Sent from manager to worker to assign a job index to process in response to AssignJob
        AssignJob = 1,
        // Sent from worker to manager to report an error during job execution
        // If fail fast is enabled, following RequestJob will receive an exit job id (>=plans.size())
        ReportError = 2,
        // Sent from worker to manager to report GPUs for telemetry
        TelemetryDevices = 3,
    };

 public:
    const int world_rank;
    const int world_size;
    const int local_rank;
    const int local_size;
    const unsigned int total_runs;
    explicit MPIEnsemble(const CUDAEnsemble::EnsembleConfig &_config, unsigned int _total_runs);
    int receiveErrors(std::multimap<int, AbstractSimRunner::ErrorDetail> &err_detail);
    int receiveJobRequests(unsigned int &next_run);
    void sendErrorDetail(AbstractSimRunner::ErrorDetail &e_detail);
    int requestJob();
    void worldBarrier();
    std::string assembleGPUsString();
    void retrieveLocalErrorDetail(std::mutex &log_export_queue_mutex,
        std::multimap<int, AbstractSimRunner::ErrorDetail> &err_detail,
        std::vector<AbstractSimRunner::ErrorDetail> &err_detail_local, int i, std::set<int> devices);
    bool createParticipatingCommunicator(bool isParticipating);
    int getRankIsParticipating() { return this->rank_is_participating; }
    int getParticipatingCommSize() { return this->participating_size; }
    int getParticipatingCommRank() { return this->participating_rank; }

    static std::set<int> devicesForThisRank(std::set<int> devicesToSelectFrom, int local_size, int local_rank);

    std::set<int> devicesForThisRank(std::set<int> devicesToSelectFrom);

 private:
    static int queryMPIWorldRank();
    static int queryMPIWorldSize();
    static int queryMPISharedGroupRank();
    static int queryMPISharedGroupSize();
    static void initMPI();
    unsigned int getDeviceIndex(const int j, std::set<int> devices);
    const MPI_Datatype MPI_ERROR_DETAIL;
    bool rank_is_participating;
    MPI_Comm comm_participating;
    int participating_size;
    int participating_rank;
};
}  // namespace detail
}  // namespace flamegpu

#endif  // INCLUDE_FLAMEGPU_SIMULATION_DETAIL_MPIENSEMBLE_H_