Program Listing for File MPISimRunner.cu

Return to documentation for file (src/flamegpu/simulation/detail/MPISimRunner.cu)

#include "flamegpu/simulation/detail/MPISimRunner.h"

#include <utility>
#include <map>
#include <memory>
#include <queue>
#include <vector>

#include "flamegpu/model/ModelData.h"
#include "flamegpu/simulation/CUDASimulation.h"
#include "flamegpu/simulation/RunPlanVector.h"

#ifdef _MSC_VER
#include <windows.h>
#else
#include <pthread.h>
#endif

namespace flamegpu {
namespace detail {

MPISimRunner::MPISimRunner(const std::shared_ptr<const ModelData> _model,
    std::atomic<unsigned int>& _err_ct,
    std::atomic<unsigned int>& _next_run,
    const RunPlanVector& _plans,
    std::shared_ptr<const StepLoggingConfig> _step_log_config,
    std::shared_ptr<const LoggingConfig> _exit_log_config,
    int _device_id,
    unsigned int _runner_id,
    flamegpu::Verbosity _verbosity,
    std::map<unsigned int, RunLog>& _run_logs,
    std::queue<unsigned int>& _log_export_queue,
    std::mutex& _log_export_queue_mutex,
    std::condition_variable& _log_export_queue_cdn,
    std::vector<ErrorDetail>& _err_detail_local,
    const unsigned int _total_runners,
    bool _isSWIG)
    : AbstractSimRunner(
        _model,
        _err_ct,
        _next_run,
        _plans,
        _step_log_config,
        _exit_log_config,
        _device_id,
        _runner_id,
        _verbosity,
        _run_logs,
        _log_export_queue,
        _log_export_queue_mutex,
        _log_export_queue_cdn,
        _err_detail_local,
        _total_runners,
        _isSWIG)
    { }

void MPISimRunner::main() {
    // While there are still plans to process
    while (true) {
        const unsigned int run_id = next_run.load();
        if (run_id < plans.size()) {
            // Process the assigned job
            try {
                runSimulation(run_id);
                if (next_run.exchange(Signal::RequestJob) >= plans.size()) {
                    break;
                }
                // MPI Worker's don't print progress
            } catch(std::exception &e) {
                // log_export_mutex is treated as our protection for race conditions on err_detail
                std::lock_guard<std::mutex> lck(log_export_queue_mutex);
                // Build the error detail (fixed len char array for string)
                // fprintf(stderr, "Fail: run: %u device: %u, runner: %u\n", run_id, device_id, runner_id);  // useful debug, breaks tests
                err_detail.push_back(ErrorDetail{run_id, static_cast<unsigned int>(device_id), runner_id, });
                strncpy(err_detail.back().exception_string, e.what(), sizeof(ErrorDetail::exception_string)-1);
                err_detail.back().exception_string[sizeof(ErrorDetail::exception_string) - 1] = '\0';
                err_ct.store(static_cast<int>(err_detail.size()));
                // Need to notify manager that run failed
                if (next_run.exchange(Signal::RunFailed) >= plans.size()) {
                    break;
                }
            }
        } else if (run_id == Signal::RequestJob || run_id == Signal::RunFailed) {
            std::this_thread::yield();
        } else {
            break;
        }
    }
}

}  // namespace detail
}  // namespace flamegpu