#include <cstdio>
#include <string>
#include <map>
#include <vector>
#include <algorithm>
#include <set>
#include "flamegpu/simulation/detail/MPIEnsemble.h"
#include "flamegpu/detail/compute_capability.cuh"
namespace flamegpu {
namespace detail {
MPIEnsemble::MPIEnsemble(const CUDAEnsemble::EnsembleConfig &_config, const unsigned int _total_runs)
: config(_config)
, world_rank(queryMPIWorldRank())
, world_size(queryMPIWorldSize())
, local_rank(queryMPISharedGroupRank())
, local_size(queryMPISharedGroupSize())
, total_runs(_total_runs)
, MPI_ERROR_DETAIL(AbstractSimRunner::createErrorDetailMPIDatatype())
, rank_is_participating(false)
, comm_participating(MPI_COMM_NULL)
, participating_size(0)
, participating_rank(-1) { }
int MPIEnsemble::receiveErrors(std::multimap<int, AbstractSimRunner::ErrorDetail> &err_detail) {
int errCount = 0;
if (world_rank == 0) {
MPI_Status status;
int flag;
// Check whether MPI runners have reported an error
MPI_ANY_SOURCE, // int source
EnvelopeTag::ReportError, // int tag
MPI_COMM_WORLD, // MPI_Comm communicator
&flag, // int flag
while (flag) {
// Receive the message
memset(&status, 0, sizeof(MPI_Status));
AbstractSimRunner::ErrorDetail e_detail;
memset(&e_detail, 0, sizeof(AbstractSimRunner::ErrorDetail));
&e_detail, // void* data
1, // int count
MPI_ERROR_DETAIL, // MPI_Datatype datatype (can't use MPI_DATATYPE_NULL)
MPI_ANY_SOURCE, // int source
EnvelopeTag::ReportError, // int tag
MPI_COMM_WORLD, // MPI_Comm communicator
&status); // MPI_Status*
err_detail.emplace(status.MPI_SOURCE, e_detail);
// Progress flush
if (config.verbosity >= Verbosity::Default && config.error_level != CUDAEnsemble::EnsembleConfig::Fast) {
fprintf(stderr, "Warning: Run %u/%u failed on rank %d, device %d, thread %u with exception: \n%s\n",
e_detail.run_id + 1, total_runs, status.MPI_SOURCE, e_detail.device_id, e_detail.runner_id, e_detail.exception_string);
// Check again
MPI_Iprobe(MPI_ANY_SOURCE, EnvelopeTag::ReportError, MPI_COMM_WORLD, &flag, &status);
return errCount;
int MPIEnsemble::receiveJobRequests(unsigned int &next_run) {
int mpi_runners_fin = 0;
if (world_rank == 0) {
MPI_Status status;
int flag;
MPI_ANY_SOURCE, // int source
EnvelopeTag::RequestJob, // int tag
MPI_COMM_WORLD, // MPI_Comm communicator
&flag, // int flag
&status); // MPI_Status*
while (flag) {
// Receive the message (kind of redundant as we already have the status and it carrys no data)
memset(&status, 0, sizeof(MPI_Status));
nullptr, // void* data
0, // int count
MPI_CHAR, // MPI_Datatype datatype (can't use MPI_DATATYPE_NULL)
MPI_ANY_SOURCE, // int source
EnvelopeTag::RequestJob, // int tag
MPI_COMM_WORLD, // MPI_Comm communicator
&status); // MPI_Status*
// Respond to the sender with a job assignment
&next_run, // void* data
1, // int count
MPI_UNSIGNED, // MPI_Datatype datatype
status.MPI_SOURCE, // int destination
EnvelopeTag::AssignJob, // int tag
MPI_COMM_WORLD); // MPI_Comm communicator
if (next_run >= total_runs) ++mpi_runners_fin;
// Print progress to console
if (config.verbosity >= Verbosity::Default && next_run <= total_runs) {
fprintf(stdout, "MPI ensemble assigned run %d/%u to rank %d\n", next_run, total_runs, status.MPI_SOURCE);
// Check again
MPI_Iprobe(MPI_ANY_SOURCE, EnvelopeTag::RequestJob, MPI_COMM_WORLD, &flag, &status);
return mpi_runners_fin;
void MPIEnsemble::sendErrorDetail(AbstractSimRunner::ErrorDetail &e_detail) {
if (world_rank != 0) {
&e_detail, // void* data
1, // int count
MPI_ERROR_DETAIL, // MPI_Datatype datatype (can't use MPI_DATATYPE_NULL)
0, // int destination
EnvelopeTag::ReportError, // int tag
MPI_COMM_WORLD); // MPI_Comm communicator
int MPIEnsemble::requestJob() {
unsigned int next_run = UINT_MAX;
if (world_rank != 0) {
// Send a job request to 0, these have no data
nullptr, // void* data
0, // int count
MPI_CHAR, // MPI_Datatype datatype (can't use MPI_DATATYPE_NULL)
0, // int destination
EnvelopeTag::RequestJob, // int tag
MPI_COMM_WORLD); // MPI_Comm communicator
// Wait for a job assignment from 0
MPI_Status status;
memset(&status, 0, sizeof(MPI_Status));
&next_run, // void* data
1, // int count
MPI_UNSIGNED, // MPI_Datatype datatype
0, // int source
EnvelopeTag::AssignJob, // int tag
MPI_COMM_WORLD, // MPI_Comm communicator
&status); // MPI_Status* status
return next_run;
void MPIEnsemble::worldBarrier() {
std::string MPIEnsemble::assembleGPUsString() {
std::string remote_device_names;
// One rank per node should notify rank 0 of their GPU devices. other ranks will send an empty message.
if (world_rank == 0) {
int bufflen = 256; // Length of name string in cudaDeviceProp
char *buff = static_cast<char*>(malloc(bufflen));
for (int i = 1; i < world_size; ++i) {
// Receive a message from each rank
MPI_Status status;
memset(&status, 0, sizeof(MPI_Status));
MPI_ANY_SOURCE, // int source
EnvelopeTag::TelemetryDevices, // int tag
MPI_COMM_WORLD, // MPI_Comm communicator
int strlen = 0;
// Ensure our receive buffer is long enough
MPI_Get_count(&status, MPI_CHAR, &strlen);
if (strlen > bufflen) {
buff = static_cast<char*>(malloc(strlen));
buff, // void* data
strlen, // int count
MPI_CHAR, // MPI_Datatype datatype (can't use MPI_DATATYPE_NULL)
MPI_ANY_SOURCE, // int source
EnvelopeTag::TelemetryDevices, // int tag
MPI_COMM_WORLD, // MPI_Comm communicator
&status); // MPI_Status*
if (strlen > 1) {
remote_device_names.append(", ");
} else {
const std::string d_string = local_rank == 0 ? compute_capability::getDeviceNames(config.devices) : "";
// Send GPU count
d_string.c_str(), // void* data
d_string.length() + 1, // int count
MPI_CHAR, // MPI_Datatype datatype
0, // int destination
EnvelopeTag::TelemetryDevices, // int tag
MPI_COMM_WORLD); // MPI_Comm communicator
return remote_device_names;
int MPIEnsemble::queryMPIWorldRank() {
int world_rank = -1;
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
return world_rank;
int MPIEnsemble::queryMPIWorldSize() {
int world_size = -1;
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
return world_size;
int MPIEnsemble::queryMPISharedGroupRank() {
int local_rank = -1;
MPI_Comm group;
MPI_Comm_rank(group, &local_rank);
return local_rank;
int MPIEnsemble::queryMPISharedGroupSize() {
int local_size = -1;
MPI_Comm group;
MPI_Comm_size(group, &local_size);
return local_size;
void MPIEnsemble::initMPI() {
int flag = 0;
// MPI can only be init once, for certain test cases we do some initial MPI comms for setup
if (!flag) {
// Init MPI, fetch rank and size
int thread_provided = 0;
// MPI single means that only the main thread will perform MPI actions
MPI_Init_thread(NULL, NULL, MPI_THREAD_SINGLE, &thread_provided);
if (thread_provided != MPI_THREAD_SINGLE) {
THROW exception::UnknownInternalError("MPI unable to provide MPI_THREAD_SINGLE support");
unsigned int MPIEnsemble::getDeviceIndex(const int j, const std::set<int> devices) {
int i = 0;
for (auto& d : devices) {
if (i++ == j)
return d;
return j; // If set is empty, then direct index should be used
void MPIEnsemble::retrieveLocalErrorDetail(std::mutex &log_export_queue_mutex, std::multimap<int, AbstractSimRunner::ErrorDetail> &err_detail,
std::vector<detail::AbstractSimRunner::ErrorDetail> &err_detail_local, const int i, std::set<int> devices) {
// Fetch error detail
detail::AbstractSimRunner::ErrorDetail e_detail;
// log_export_mutex is treated as our protection for race conditions on err_detail
std::lock_guard<std::mutex> lck(log_export_queue_mutex);
// Fetch corresponding error detail
bool success = false;
const unsigned int t_device_id = getDeviceIndex(i / config.concurrent_runs, devices);
const unsigned int t_runner_id = i % config.concurrent_runs;
for (auto it = err_detail_local.begin(); it != err_detail_local.end(); ++it) {
if (it->runner_id == t_runner_id && it->device_id == t_device_id) {
e_detail = *it;
if (world_rank == 0) {
// Only rank 0 collects error details
err_detail.emplace(world_rank, e_detail);
} else {
// fprintf(stderr, "[%d] Purged error from device %u runner %u\n", world_rank, t_device_id, t_runner_id); // useful debug, breaks tests
success = true;
if (!success) {
THROW exception::UnknownInternalError("[%d] Management thread failed to locate reported error from device %u runner %u from %u errors, in CUDAEnsemble::simulate()", world_rank, t_device_id, t_runner_id, static_cast<unsigned int>(err_detail_local.size()));
if (world_rank == 0) {
// Progress flush
if (config.verbosity >= Verbosity::Default && config.error_level != CUDAEnsemble::EnsembleConfig::Fast) {
fprintf(stderr, "Warning: Run %u/%u failed on rank %d, device %d, thread %u with exception: \n%s\n",
e_detail.run_id + 1, total_runs, world_rank, e_detail.device_id, e_detail.runner_id, e_detail.exception_string);
} else {
// Notify 0 that an error occurred, with the error detail
bool MPIEnsemble::createParticipatingCommunicator(const bool isParticipating) {
// If the communicator has not yet been created, create it and get the rank and size.
if (this->comm_participating == MPI_COMM_NULL) {
// determine if this thread is participating or not, i..e. the colour of the rank
this->rank_is_participating = isParticipating;
// Split the world communicator, if the split fails, abort (this makes the return type not useful tbh.)
if (MPI_Comm_split(MPI_COMM_WORLD, this->rank_is_participating, this->world_rank, &this->comm_participating) != MPI_SUCCESS) {
fprintf(stderr, "Error creating communicator\n");
return false;
// Get the size of the split pariticpating communicator
MPI_Comm_size(this->comm_participating, &this->participating_size);
// Get the local rank within the split communicator
MPI_Comm_rank(this->comm_participating, &this->participating_rank);
return true;
std::set<int> MPIEnsemble::devicesForThisRank(const std::set<int> devicesToSelectFrom, int local_size, int local_rank) {
// create a vector from teh set to enable direct access.
std::vector<int> devicesToSelectFromVector = std::vector<int>(devicesToSelectFrom.begin(), devicesToSelectFrom.end());
int device_count = static_cast<int>(devicesToSelectFrom.size());
// if there is only a single mpi rank on this shared memory system, assign all devices, or if there are no devices to select from
if (local_size == 1 || device_count == 0) {
return devicesToSelectFrom;
} else if (local_size > 1 && local_size <= device_count) {
// Otherwise, if there are more than one rank per node, but fewer ranks than gpus, attempt to load balance
std::set<int> devices;
// find the balanced number of gpus per rank, and how many will need +1
int gpusPerRank = device_count / local_size;
int unallocated = device_count - (gpusPerRank * local_size);
// Compute the indices of the first and last gpu to be assigned to the current rank, based on how many lower ranks will have +1
int lowerRanksWithPlus1 = local_rank < unallocated ? local_rank : unallocated;
int lowerRanksWithPlus0 = std::max(0, local_rank - unallocated);
int first = (lowerRanksWithPlus1 * (gpusPerRank + 1)) + (lowerRanksWithPlus0 * gpusPerRank);
int last = local_rank < unallocated ? first + gpusPerRank + 1 : first + gpusPerRank;
// Assign the devices for this rank
for (int i = first; i < last; i++) {
return devices;
} else {
// Otherwise, there are more ranks than gpus, so use upto one gpu per rank.
std::set<int> devices;
for (const auto & d : devicesToSelectFromVector) {}
if (local_rank < device_count) {
return devices;
std::set<int> MPIEnsemble::devicesForThisRank(const std::set<int> devicesToSelectFrom) {
return MPIEnsemble::devicesForThisRank(devicesToSelectFrom, this->local_size, this->local_rank);
} // namespace detail
} // namespace flamegpu