Program Listing for File CUDAFatAgent.cu

Return to documentation for file (src/flamegpu/simulation/detail/CUDAFatAgent.cu)

#include "flamegpu/simulation/detail/CUDAFatAgent.h"

#include "flamegpu/simulation/detail/CUDAScatter.cuh"
#include "flamegpu/runtime/HostAPI.h"
#include "flamegpu/util/nvtx.h"
#include "flamegpu/detail/cuda.cuh"

#ifdef _MSC_VER
#pragma warning(push, 1)
#pragma warning(disable : 4706 4834)
#endif  // _MSC_VER
#ifdef __NVCC_DIAG_PRAGMA_SUPPORT__
#pragma nv_diag_suppress 1719
#else
#pragma diag_suppress 1719
#endif  // __NVCC_DIAG_PRAGMA_SUPPORT__
#include <cub/cub.cuh>
#ifdef __NVCC_DIAG_PRAGMA_SUPPORT__
#pragma nv_diag_default 1719
#else
#pragma diag_default 1719
#endif  // __NVCC_DIAG_PRAGMA_SUPPORT__
#ifdef _MSC_VER
#pragma warning(pop)
#endif  // _MSC_VER

namespace flamegpu {
namespace detail {

CUDAFatAgent::CUDAFatAgent(const AgentData& description)
    : mappedAgentCount(0)
    , _nextID(ID_NOT_SET + 1)
    , d_nextID(nullptr)
    , hd_nextID(ID_NOT_SET) {
    for (const std::string &s : description.states) {
        // allocate memory for each state list by creating a new Agent State List
        AgentState state = {mappedAgentCount, s};
        states.emplace(state, std::make_shared<CUDAFatAgentStateList>(description));
    }
    mappedAgentCount++;
    // All initial states are unique
    for (const auto &s : states)
        states_unique.insert(s.second);
}
CUDAFatAgent::~CUDAFatAgent() {
    if (d_nextID) {
        gpuErrchk(flamegpu::detail::cuda::cudaFree(d_nextID));
    }
    for (auto &b : d_newLists) {
        gpuErrchk(flamegpu::detail::cuda::cudaFree(b.data));
    }
    d_newLists.clear();
}
std::unordered_map<std::string, std::shared_ptr<CUDAFatAgentStateList>> CUDAFatAgent::getStateMap(const unsigned int fat_index) {
    std::unordered_map<std::string, std::shared_ptr<CUDAFatAgentStateList>> rtn;
    // For each state
    for (const auto &s : states) {
        // If it corresponds to the correct agent
        if (s.first.agent == fat_index) {
            // Store in map
            rtn.emplace(s.first.state, s.second);
        }
    }
    return rtn;
}
void CUDAFatAgent::addSubAgent(
  const AgentData &description,
  const unsigned int master_fat_index,
  const std::shared_ptr<SubAgentData> &mapping) {
    assert(states.size());
    assert(states_unique.size());
    // Handle agent states
    for (const std::string &s : description.states) {
        const auto &mapped = mapping->states.find(s);
        AgentState sub_state = {mappedAgentCount, s};
        if (mapped != mapping->states.end()) {
            // State is mapped, so use existing state
            AgentState master_state = {master_fat_index, mapped->second};
            states.emplace(sub_state, states.at(master_state));
        } else {
            // State is not mapped, so create new state, simply clone any existing state
            // This works as the existing state should be uninitialised buffer per variable
            auto cloned_state = std::make_shared<CUDAFatAgentStateList>(*states_unique.begin()->get());
            states.emplace(sub_state, cloned_state);
            states_unique.insert(cloned_state);
        }
    }
    // Handle agent variables
    for (auto &state : states_unique) {
        // For each unique state of the fat agent
        // Add sub_agent vars
        // This includes some which are 'redundant', as they may only be used if an agent transfers between states
        // to prevent loss of data
        state->addSubAgentVariables(description, master_fat_index, mappedAgentCount, mapping);
    }
    mappedAgentCount++;
}

void CUDAFatAgent::processDeath(const unsigned int agent_fat_id, const std::string &state_name, detail::CUDAScatter &scatter, const unsigned int streamId, const cudaStream_t stream) {
    auto sm = states.find({agent_fat_id, state_name});
    if (sm == states.end()) {
        THROW exception::InvalidCudaAgentState("Error: Agent ('%s') state ('%s') was not found "
            "in CUDAFatAgent::processDeath()",
            "?", state_name.c_str());
    }

    CUDAScanCompactionConfig &scanCfg = scatter.Scan().Config(CUDAScanCompaction::Type::AGENT_DEATH, streamId);
    const unsigned int agent_count = sm->second->getSize();
    // Check if we need to resize cub storage
    auto& cub_temp = scatter.CubTemp(streamId);
    size_t tempByte = 0;
    gpuErrchk(cub::DeviceScan::ExclusiveSum(
        nullptr,
        tempByte,
        scanCfg.d_ptrs.scan_flag,
        scanCfg.d_ptrs.position,
        sm->second->getAllocatedSize() + 1,
        stream));
    cub_temp.resize(tempByte);
    gpuErrchk(cub::DeviceScan::ExclusiveSum(
        cub_temp.getPtr(),
        cub_temp.getSize(),
        scanCfg.d_ptrs.scan_flag,
        scanCfg.d_ptrs.position,
        agent_count + 1,
        stream));
    gpuErrchk(cudaStreamSynchronize(stream));  // Redundant? scatter occurs in same stream

    // Scatter
    sm->second->scatterDeath(scatter, streamId, stream);
}

void CUDAFatAgent::transitionState(unsigned int agent_fat_id, const std::string &_src, const std::string &_dest, detail::CUDAScatter &scatter, unsigned int streamId, cudaStream_t stream) {
    // Optionally process state transition
    if (_src != _dest) {
        auto src = states.find({agent_fat_id, _src});
        if (src == states.end()) {
            THROW exception::InvalidCudaAgentState("Error: Agent ('%s') state ('%s') was not found "
                "in CUDAFatAgent::transitionState()",
                "?", _src.c_str());
        }
        // If src list is empty we can skip
        if (src->second->getSizeWithDisabled() == 0)
            return;
        auto dest = states.find({agent_fat_id, _dest});
        if (dest == states.end()) {
            THROW exception::InvalidCudaAgentState("Error: Agent ('%s') state ('%s') was not found "
                "in CUDAFatAgent::transitionState()",
                "?", _dest.c_str());
        }
        // If dest list is empty and we are not in an agent function condition, we can swap the lists
        if (dest->second->getSizeWithDisabled() == 0 && src->second->getSize() == src->second->getSizeWithDisabled()) {
            // This swaps the master_lists entire states (std::swap would only swap pointers in fat_agent, we need to swap components to update copies of shared_ptr)
            states.at({agent_fat_id, _src})->swap(states.at({agent_fat_id, _dest}).get());
        } else {
            // Otherwise we must perform a scatter all operation
            // Resize destination list
            dest->second->resize(src->second->getSize() + dest->second->getSizeWithDisabled(), true, stream);
            // Build scatter data
            // It's assumed that each CUDAFatAgentStatelist has it's unique variables list in the same order, so we can map across from 1 to other
            auto &src_v = src->second->getUniqueVariables();
            auto &dest_v = dest->second->getUniqueVariables();
            std::vector<CUDAScatter::ScatterData> sd;
            for (auto src_it = src_v.begin(), dest_it = dest_v.begin(); src_it != src_v.end() && dest_it != dest_v.end(); ++src_it, ++dest_it) {
                char *in_p = reinterpret_cast<char*>((*src_it)->data_condition);
                char *out_p = reinterpret_cast<char*>((*dest_it)->data);
                sd.push_back({ (*src_it)->type_size * (*src_it)->elements, in_p, out_p });
                assert((*src_it)->type_size == (*dest_it)->type_size);
                assert((*src_it)->elements == (*dest_it)->elements);
            }
            // Perform scatter
            scatter.scatterAll(streamId, stream, sd, src->second->getSize(), dest->second->getSizeWithDisabled());
            // Update list sizes
            dest->second->setAgentCount(dest->second->getSize() + src->second->getSize());
            src->second->setAgentCount(0);
        }
    }
}

void CUDAFatAgent::processFunctionCondition(const unsigned int agent_fat_id, const std::string &state_name, detail::CUDAScatter &scatter, const unsigned int streamId, const cudaStream_t stream) {
    auto sm = states.find({agent_fat_id, state_name});
    if (sm == states.end()) {
        THROW exception::InvalidCudaAgentState("Error: Agent ('%s') state ('%s') was not found "
            "in CUDAFatAgent::processFunctionCondition()",
            "?", state_name.c_str());
    }

    CUDAScanCompactionConfig &scanCfg = scatter.Scan().Config(CUDAScanCompaction::Type::AGENT_DEATH, streamId);
    unsigned int agent_count = sm->second->getSize();
    // Check if we need to resize cub storage
    auto& cub_temp = scatter.CubTemp(streamId);
    size_t tempByte = 0;
    gpuErrchk(cub::DeviceScan::ExclusiveSum(
        nullptr,
        tempByte,
        scanCfg.d_ptrs.scan_flag,
        scanCfg.d_ptrs.position,
        sm->second->getAllocatedSize() + 1));
    cub_temp.resize(tempByte);
    // Perform scan (agent function conditions use death flag scan compact arrays as there is no overlap in use)
    gpuErrchk(cub::DeviceScan::ExclusiveSum(
        cub_temp.getPtr(),
        cub_temp.getSize(),
        scanCfg.d_ptrs.scan_flag,
        scanCfg.d_ptrs.position,
        agent_count + 1,
        stream));
    gpuErrchkLaunch();
    gpuErrchk(cudaStreamSynchronize(stream));
    // Use scan results to sort false agents into start of list (and don't swap buffers)
    const unsigned int conditionFailCount = sm->second->scatterAgentFunctionConditionFalse(scatter, streamId, stream);
    // Invert scan
    CUDAScatter::InversionIterator ii = CUDAScatter::InversionIterator(scanCfg.d_ptrs.scan_flag);
    cudaMemsetAsync(scanCfg.d_ptrs.position, 0, sizeof(unsigned int)*(agent_count + 1), stream);
    gpuErrchk(cub::DeviceScan::ExclusiveSum(
        cub_temp.getPtr(),
        cub_temp.getSize(),
        ii,
        scanCfg.d_ptrs.position,
        agent_count + 1,
        stream));
    gpuErrchkLaunch();
    gpuErrchk(cudaStreamSynchronize(stream));
    // Use inverted scan results to sort true agents into end of list (and swap buffers)
    const unsigned int conditionpassCount = sm->second->scatterAgentFunctionConditionTrue(conditionFailCount, scatter, streamId, stream);
    if (agent_count != conditionpassCount + conditionFailCount) {
        THROW exception::UnknownInternalError("Agent function condition pass + fail counts != agent count, %u + %u != %u this should not happen"
            ", in CUDAFatAgent::processFunctionCondition()\n", conditionpassCount, conditionFailCount, agent_count);
    }
}

void CUDAFatAgent::setConditionState(const unsigned int agent_fat_id, const std::string &state_name, const unsigned int numberOfDisabled) {
    // check the cuda agent state map to find the correct state list for functions starting state
    auto sm = states.find({agent_fat_id, state_name});
    if (sm == states.end()) {
        THROW exception::InvalidCudaAgentState("Error: Agent ('%s') state ('%s') was not found "
            "in CUDAFatAgent::setConditionState()",
            "?", state_name.c_str());
    }
    sm->second->setDisabledAgents(numberOfDisabled);
}

void *CUDAFatAgent::allocNewBuffer(const size_t total_agent_size, const unsigned int new_agents, const size_t varCount) {
    std::lock_guard<std::mutex> guard(d_newLists_mutex);
    // It is assumed that the buffer will be split into sub-buffers, each 64bit aligned
    // So for total number of variables-1, add 64 bits incase required for alignment.
    const size_t ALIGN_OVERFLOW = varCount > 0 ? (varCount - 1) * 8 : 0;
    const size_t SIZE_REQUIRED = total_agent_size * new_agents + ALIGN_OVERFLOW;
    const size_t ALLOCATION_SIZE = static_cast<size_t>(SIZE_REQUIRED * 1.25);  // Premept larger buffer req, reduce reallocations
    // Find the smallest existing buffer with enough size
    for (auto &b : d_newLists) {
        if (b.size >= SIZE_REQUIRED && !b.in_use) {
            // Buffer is suitable
            NewBuffer my_b = b;
            // Erase and reinsert to d_newLists to mark as in use
            d_newLists.erase(b);
            my_b.in_use = true;
            d_newLists.insert(my_b);
            // Return buffer
            return my_b.data;
        }
    }
    // Find the smallest free list and resize it to be big enough
    for (auto &b : d_newLists) {
        if (!b.in_use) {
            // Buffer is suitable
            NewBuffer my_b = b;
            // Erase and resize/reinsert to d_newLists to mark as in use
            d_newLists.erase(b);
            gpuErrchk(flamegpu::detail::cuda::cudaFree(my_b.data));
            gpuErrchk(cudaMalloc(&my_b.data, ALLOCATION_SIZE));
            my_b.size = ALLOCATION_SIZE;
            my_b.in_use = true;
            d_newLists.insert(my_b);
            // Return buffer
            return my_b.data;
        }
    }
    // No existing buffer available, so create a new one
    NewBuffer my_b;
    gpuErrchk(cudaMalloc(&my_b.data, ALLOCATION_SIZE));
    my_b.size = ALLOCATION_SIZE;
    my_b.in_use = true;
    d_newLists.insert(my_b);
    return my_b.data;
}

void CUDAFatAgent::freeNewBuffer(void *buff) {
    std::lock_guard<std::mutex> guard(d_newLists_mutex);
    // Find the right buffer
    for (auto &b : d_newLists) {
        if (b.data == buff) {
            assert(b.in_use);
            // Erase and reinsert to d_newLists to mark as free
            NewBuffer my_b = b;
            d_newLists.erase(b);
            my_b.in_use = false;
            d_newLists.insert(my_b);
            return;
        }
    }
    assert(false);
}
unsigned int CUDAFatAgent::getMappedAgentCount() const { return mappedAgentCount; }
__global__ void allocateIDs(id_t*agentIDs, unsigned int threads, id_t UNSET_FLAG, id_t _nextID) {
    const unsigned int tid = blockIdx.x * blockDim.x + threadIdx.x;
    if (tid < threads) {
        const id_t my_id = agentIDs[tid];
        if (my_id == UNSET_FLAG) {
            agentIDs[tid] = _nextID + tid;
        }
    }
}
id_t CUDAFatAgent::nextID(unsigned int count) {
    id_t rtn = _nextID;
    _nextID += count;
    return rtn;
}
id_t *CUDAFatAgent::getDeviceNextID() {
    if (!d_nextID) {
        gpuErrchk(cudaMalloc(&d_nextID, sizeof(id_t)));
    }
    if (hd_nextID != _nextID) {
        gpuErrchk(cudaMemcpy(d_nextID, &_nextID, sizeof(id_t), cudaMemcpyHostToDevice));
        hd_nextID = _nextID;
    }
    return d_nextID;
}
void CUDAFatAgent::notifyDeviceBirths(unsigned int newCount) {
    _nextID += newCount;
    hd_nextID += newCount;
#ifdef _DEBUG
    // Sanity validation, check hd_nextID == d_nextID
    assert(d_nextID);
    id_t t = 0;
    gpuErrchk(cudaMemcpy(&t, d_nextID, sizeof(id_t), cudaMemcpyDeviceToHost));
    assert(t == hd_nextID);
    assert(t == _nextID);  // At the end of device birth they should be equal, as no host birth can occur between pre and post processing agent fn
#endif
}
void CUDAFatAgent::assignIDs(HostAPI& hostapi, detail::CUDAScatter &scatter, cudaStream_t stream, const unsigned int streamId) {
    flamegpu::util::nvtx::Range range{"CUDAFatAgent::assignIDs"};
    if (agent_ids_have_init) return;
    id_t h_max = ID_NOT_SET;
    // Find the max ID within the current agents
    for (auto& s : states_unique) {
        if (!s->getSize())
            continue;
        // Agents should never be disabled at this point
        assert(s->getSizeWithDisabled() == s->getSize());
        auto vb = s->getVariableBuffer(0, ID_VARIABLE_NAME);  // _id always belongs to the root agent
        if (vb && vb->data) {
            // Check if we need to resize cub storage
            auto& cub_temp = scatter.CubTemp(streamId);
            size_t tempByte = 0;
            gpuErrchk(cub::DeviceReduce::Max(nullptr, tempByte, static_cast<id_t*>(vb->data), reinterpret_cast<id_t*>(hostapi.d_output_space), s->getSize(), stream));
            cub_temp.resize(tempByte);
            hostapi.resizeOutputSpace<id_t>();
            // Reduce for max
            gpuErrchk(cub::DeviceReduce::Max(cub_temp.getPtr(), cub_temp.getSize(), static_cast<id_t*>(vb->data), reinterpret_cast<id_t*>(hostapi.d_output_space), s->getSize(), stream));
            gpuErrchk(cudaMemcpyAsync(&h_max, hostapi.d_output_space, sizeof(id_t), cudaMemcpyDeviceToHost, stream));
            gpuErrchk(cudaStreamSynchronize(stream));
            _nextID = std::max(_nextID, h_max + 1);
        }
    }

    // For each agent state, assign IDs based on global thread index, skip if ID is already set
    // This process will waste IDs for populations with partially pre-existing IDs
    // But at the time of writing, we don't anticipate any models to come close to exceeding 4200m IDs
    // This would necessitate creating and killing thousands of agents per step
    // If this does affect a model, we can implement an ID reuse scheme in future.
    for (auto &s : states_unique) {
        auto vb = s->getVariableBuffer(0, ID_VARIABLE_NAME);  // _id always belongs to the root agent
        if (vb && vb->data && s->getSize()) {
            const unsigned int blockSize = 1024;
            const unsigned int blocks = ((s->getSize() - 1) / blockSize) + 1;
            allocateIDs<< <blocks, blockSize, 0, stream>> > (static_cast<id_t*>(vb->data), s->getSize(), ID_NOT_SET, _nextID);
            gpuErrchkLaunch();
        }
        _nextID += s->getSizeWithDisabled();
    }

    agent_ids_have_init = true;
    gpuErrchk(cudaStreamSynchronize(stream));
}
void CUDAFatAgent::resetIDCounter() {
    // Resetting ID whilst agents exist is a bad idea, so fail silently
    for (auto& s : states_unique)
        if (s->getSize())
            return;
    _nextID = ID_NOT_SET + 1;
}

}  // namespace detail
}  // namespace flamegpu