Program Listing for File CUDAScatter.cu

Return to documentation for file (src/flamegpu/simulation/detail/CUDAScatter.cu)

#include "flamegpu/simulation/detail/CUDAScatter.cuh"

#include <cuda_runtime.h>
#include <vector>
#include <cassert>
#include <cstdio>
#include <list>
#include <map>
#include <string>
#include <algorithm>
#include <limits>

#include "flamegpu/simulation/detail/CUDAErrorChecking.cuh"
#include "flamegpu/simulation/detail/CUDAFatAgentStateList.h"
#include "flamegpu/detail/cuda.cuh"

#ifdef _MSC_VER
#pragma warning(push, 1)
#pragma warning(disable : 4706 4834)
#endif  // _MSC_VER
#ifdef __NVCC_DIAG_PRAGMA_SUPPORT__
#pragma nv_diag_suppress 1719
#else
#pragma diag_suppress 1719
#endif  // __NVCC_DIAG_PRAGMA_SUPPORT__
#include <cub/cub.cuh>
#ifdef __NVCC_DIAG_PRAGMA_SUPPORT__
#pragma nv_diag_default 1719
#else
#pragma diag_default 1719
#endif  // __NVCC_DIAG_PRAGMA_SUPPORT__
#ifdef _MSC_VER
#pragma warning(pop)
#endif  // _MSC_VER

namespace flamegpu {
namespace detail {

// @todo - Make _async variants of functions which launch kernels. This can be called by the non async version and immediately sync.

CUDAScatter::StreamData::StreamData()
    : d_data(nullptr)
    , data_len(0) {
}
CUDAScatter::StreamData::~StreamData() {
    /* @note - Do not clear cuda memory in the destructor of singletons.
     This is because order of static destruction in c++ is undefined
     So the cuda driver is not guaranteed to still exist when the static is destroyed.
     As this is only ever destroyed at exit time, it's not a real memory leak either.
    */
    if (d_data) {
        gpuErrchk(flamegpu::detail::cuda::cudaFree(d_data));
    }
    d_data = nullptr;
    data_len = 0;
}
void CUDAScatter::StreamData::resize(const unsigned int newLen) {
    if (newLen > data_len) {
        if (d_data) {
            gpuErrchk(flamegpu::detail::cuda::cudaFree(d_data));
        }
        gpuErrchk(cudaMalloc(&d_data, newLen * sizeof(ScatterData)));
        data_len = newLen;
    }
}

template <typename T>
__global__ void scatter_generic(
    unsigned int threadCount,
    T scan_flag,
    unsigned int *position,
    CUDAScatter::ScatterData *scatter_data,
    const unsigned int scatter_len,
    const unsigned int out_index_offset = 0,
    const unsigned int scatter_all_count = 0) {
    // global thread index
    int index = (blockIdx.x*blockDim.x) + threadIdx.x;

    if (index >= threadCount) return;

    // if optional message is to be written
    if (index < scatter_all_count || scan_flag[index - scatter_all_count] == 1) {
        int output_index = index < scatter_all_count ? index : scatter_all_count + position[index - scatter_all_count];
        for (unsigned int i = 0; i < scatter_len; ++i) {
            memcpy(scatter_data[i].out + ((out_index_offset + output_index) * scatter_data[i].typeLen), scatter_data[i].in + (index * scatter_data[i].typeLen), scatter_data[i].typeLen);
        }
    }
}
__global__ void scatter_position_generic(
    unsigned int threadCount,
    unsigned int *position,
    CUDAScatter::ScatterData *scatter_data,
    const unsigned int scatter_len) {
    // global thread index
    int index = (blockIdx.x*blockDim.x) + threadIdx.x;

    if (index >= threadCount) return;

    // if optional message is to be written
    int input_index = position[index];
    for (unsigned int i = 0; i < scatter_len; ++i) {
        memcpy(scatter_data[i].out + (index * scatter_data[i].typeLen), scatter_data[i].in + (input_index * scatter_data[i].typeLen), scatter_data[i].typeLen);
    }
}
__global__ void scatter_all_generic(
    unsigned int threadCount,
    CUDAScatter::ScatterData *scatter_data,
    const unsigned int scatter_len,
    const unsigned int out_index_offset = 0) {
    // global thread index
    int index = (blockIdx.x*blockDim.x) + threadIdx.x;

    if (index >= threadCount) return;
    for (unsigned int i = 0; i < scatter_len; ++i) {
        memcpy(scatter_data[i].out + ((out_index_offset + index) * scatter_data[i].typeLen), scatter_data[i].in + (index * scatter_data[i].typeLen), scatter_data[i].typeLen);
    }
}

unsigned int CUDAScatter::scatter(
    const unsigned int streamResourceId,
    const cudaStream_t stream,
    const Type &messageOrAgent,
    const VariableMap &vars,
    const std::map<std::string, void*> &in,
    const std::map<std::string, void*> &out,
    const unsigned int itemCount,
    const unsigned int out_index_offset,
    const bool invert_scan_flag,
    const unsigned int scatter_all_count) {
    std::vector<ScatterData> scatterData;
    for (const auto &v : vars) {
        char *in_p = reinterpret_cast<char*>(in.at(v.first));
        char *out_p = reinterpret_cast<char*>(out.at(v.first));
        scatterData.push_back({ v.second.type_size * v.second.elements, in_p, out_p });
    }
    return scatter(streamResourceId, stream, messageOrAgent, scatterData, itemCount, out_index_offset, invert_scan_flag, scatter_all_count);
}
unsigned int CUDAScatter::scatter(
    const unsigned int streamResourceId,
    const cudaStream_t stream,
    const Type &messageOrAgent,
    const std::vector<ScatterData> &sd,
    const unsigned int itemCount,
    const unsigned int out_index_offset,
    const bool invert_scan_flag,
    const unsigned int scatter_all_count) {
    int blockSize = 0;  // The launch configurator returned block size
    int minGridSize = 0;  // The minimum grid size needed to achieve the // maximum occupancy for a full device // launch
    int gridSize = 0;  // The actual grid size needed, based on input size
    // calculate the grid block size for main agent function
    gpuErrchk(cudaOccupancyMaxPotentialBlockSize(&minGridSize, &blockSize, scatter_generic<unsigned int*>, 0, itemCount));
    gridSize = (itemCount + blockSize - 1) / blockSize;
    // Make sure we have enough space to store scatterdata
    streamResources[streamResourceId].resize(static_cast<unsigned int>(sd.size()));
    // Important that sd.size() is still used here, incase allocated len (data_len) is bigger
    gpuErrchk(cudaMemcpyAsync(streamResources[streamResourceId].d_data, sd.data(), sizeof(ScatterData) * sd.size(), cudaMemcpyHostToDevice, stream));
    if (invert_scan_flag) {
        scatter_generic <<<gridSize, blockSize, 0, stream>>> (
            itemCount,
            InversionIterator(scan.Config(messageOrAgent, streamResourceId).d_ptrs.scan_flag),
            scan.Config(messageOrAgent, streamResourceId).d_ptrs.position,
            streamResources[streamResourceId].d_data, static_cast<unsigned int>(sd.size()),
            out_index_offset, scatter_all_count);
    } else {
        scatter_generic <<<gridSize, blockSize, 0, stream>>> (
            itemCount,
            scan.Config(messageOrAgent, streamResourceId).d_ptrs.scan_flag,
            scan.Config(messageOrAgent, streamResourceId).d_ptrs.position,
            streamResources[streamResourceId].d_data, static_cast<unsigned int>(sd.size()),
            out_index_offset, scatter_all_count);
    }
    gpuErrchkLaunch();
    // Update count of live agents
    unsigned int rtn = 0;
    gpuErrchk(cudaMemcpyAsync(&rtn, scan.Config(messageOrAgent, streamResourceId).d_ptrs.position + itemCount - scatter_all_count, sizeof(unsigned int), cudaMemcpyDeviceToHost, stream));
    gpuErrchk(cudaStreamSynchronize(stream));  // @todo - async + sync variants.
    return rtn + scatter_all_count;
}
void CUDAScatter::scatterPosition(
    unsigned int streamResourceId,
    cudaStream_t stream,
    Type messageOrAgent,
    const std::vector<ScatterData>& sd,
    unsigned int itemCount) {
    scatterPosition_async(streamResourceId, stream, scan.Config(messageOrAgent, streamResourceId).d_ptrs.position, sd, itemCount);
    gpuErrchk(cudaStreamSynchronize(stream));
}
void CUDAScatter::scatterPosition_async(
    unsigned int streamResourceId,
    cudaStream_t stream,
    Type messageOrAgent,
    const std::vector<ScatterData>& sd,
    unsigned int itemCount) {
    scatterPosition_async(streamResourceId, stream, scan.Config(messageOrAgent, streamResourceId).d_ptrs.position, sd, itemCount);
}
void CUDAScatter::scatterPosition_async(
    unsigned int streamResourceId,
    cudaStream_t stream,
    unsigned int *position,
    const std::vector<ScatterData> &sd,
    unsigned int itemCount) {
    int blockSize = 0;  // The launch configurator returned block size
    int minGridSize = 0;  // The minimum grid size needed to achieve the // maximum occupancy for a full device // launch
    int gridSize = 0;  // The actual grid size needed, based on input size
    // calculate the grid block size for main agent function
    gpuErrchk(cudaOccupancyMaxPotentialBlockSize(&minGridSize, &blockSize, scatter_position_generic, 0, itemCount));
    gridSize = (itemCount + blockSize - 1) / blockSize;
    // Make sure we have enough space to store scatterdata
    streamResources[streamResourceId].resize(static_cast<unsigned int>(sd.size()));
    // Important that sd.size() is still used here, incase allocated len (data_len) is bigger
    gpuErrchk(cudaMemcpyAsync(streamResources[streamResourceId].d_data, sd.data(), sizeof(ScatterData) * sd.size(), cudaMemcpyHostToDevice, stream));
    scatter_position_generic <<<gridSize, blockSize, 0, stream>>> (
        itemCount,
        position,
        streamResources[streamResourceId].d_data, static_cast<unsigned int>(sd.size()));
    gpuErrchkLaunch();
}
unsigned int CUDAScatter::scatterCount(
    const unsigned int streamResourceId,
    const cudaStream_t stream,
    const Type &messageOrAgent,
    const unsigned int itemCount,
    const unsigned int scatter_all_count) {
    unsigned int rtn = 0;
    gpuErrchk(cudaMemcpy(&rtn, scan.Config(messageOrAgent, streamResourceId).d_ptrs.position + itemCount - scatter_all_count, sizeof(unsigned int), cudaMemcpyDeviceToHost));
    return rtn;
}

unsigned int CUDAScatter::scatterAll(
    const unsigned int streamResourceId,
    const cudaStream_t stream,
    const std::vector<ScatterData> &sd,
    const unsigned int itemCount,
    const unsigned int out_index_offset) {
    if (!itemCount)
        return itemCount;  // No work to do
    int blockSize = 0;  // The launch configurator returned block size
    int minGridSize = 0;  // The minimum grid size needed to achieve the // maximum occupancy for a full device // launch
    int gridSize = 0;  // The actual grid size needed, based on input size

                       // calculate the grid block size for main agent function
    gpuErrchk(cudaOccupancyMaxPotentialBlockSize(&minGridSize, &blockSize, scatter_all_generic, 0, itemCount));
    gridSize = (itemCount + blockSize - 1) / blockSize;
    streamResources[streamResourceId].resize(static_cast<unsigned int>(sd.size()));
    // Important that sd.size() is still used here, incase allocated len (data_len) is bigger
    gpuErrchk(cudaMemcpyAsync(streamResources[streamResourceId].d_data, sd.data(), sizeof(ScatterData) * sd.size(), cudaMemcpyHostToDevice, stream));
    scatter_all_generic <<<gridSize, blockSize, 0, stream>>> (
        itemCount,
        streamResources[streamResourceId].d_data, static_cast<unsigned int>(sd.size()),
        out_index_offset);
    gpuErrchkLaunch();
    gpuErrchk(cudaStreamSynchronize(stream));  // @todo - async + sync variants.
    // Update count of live agents
    return itemCount;
}
unsigned int CUDAScatter::scatterAll(
    const unsigned int streamResourceId,
    const cudaStream_t stream,
    const VariableMap &vars,
    const std::map<std::string, void*> &in,
    const std::map<std::string, void*> &out,
    const unsigned int itemCount,
    const unsigned int out_index_offset) {
    std::vector<ScatterData> scatterData;
    for (const auto &v : vars) {
        char *in_p = reinterpret_cast<char*>(in.at(v.first));
        char *out_p = reinterpret_cast<char*>(out.at(v.first));
        scatterData.push_back({ v.second.type_size * v.second.elements, in_p, out_p });
    }
    return scatterAll(streamResourceId, stream, scatterData, itemCount, out_index_offset);
}

__global__ void pbm_reorder_generic(
    const unsigned int threadCount,
    const unsigned int * __restrict__ bin_index,
    const unsigned int * __restrict__ bin_sub_index,
    const unsigned int * __restrict__ pbm,
    CUDAScatter::ScatterData *scatter_data,
    const unsigned int scatter_len) {
    // global thread index
    int index = (blockIdx.x*blockDim.x) + threadIdx.x;

    if (index >= threadCount) return;

    const unsigned int sorted_index = pbm[bin_index[index]] + bin_sub_index[index];

    // if optional message is to be written
    for (unsigned int i = 0; i < scatter_len; ++i) {
        memcpy(scatter_data[i].out + (sorted_index * scatter_data[i].typeLen), scatter_data[i].in + (index * scatter_data[i].typeLen), scatter_data[i].typeLen);
    }
}

void CUDAScatter::pbm_reorder(
    const unsigned int streamResourceId,
    const cudaStream_t stream,
    const VariableMap &vars,
    const std::map<std::string, void*> &in,
    const std::map<std::string, void*> &out,
    const unsigned int itemCount,
    const unsigned int *d_bin_index,
    const unsigned int *d_bin_sub_index,
    const unsigned int *d_pbm) {
    // If itemCount is 0, then there is no work to be done.
    if (itemCount == 0) {
        return;
    }

    int blockSize = 0;  // The launch configurator returned block size
    int minGridSize = 0;  // The minimum grid size needed to achieve the // maximum occupancy for a full device // launch
    int gridSize = 0;  // The actual grid size needed, based on input size

                       // calculate the grid block size for main agent function
    gpuErrchk(cudaOccupancyMaxPotentialBlockSize(&minGridSize, &blockSize, pbm_reorder_generic, 0, itemCount));
    gridSize = (itemCount + blockSize - 1) / blockSize;
    // for each variable, scatter from swap to regular
    std::vector<ScatterData> sd;
    for (const auto &v : vars) {
        char *in_p = reinterpret_cast<char*>(in.at(v.first));
        char *out_p = reinterpret_cast<char*>(out.at(v.first));
        sd.push_back({ v.second.type_size * v.second.elements, in_p, out_p });
    }
    streamResources[streamResourceId].resize(static_cast<unsigned int>(sd.size()));
    // Important that sd.size() is still used here, incase allocated len (data_len) is bigger
    gpuErrchk(cudaMemcpyAsync(streamResources[streamResourceId].d_data, sd.data(), sizeof(ScatterData) * sd.size(), cudaMemcpyHostToDevice, stream));
    pbm_reorder_generic <<<gridSize, blockSize, 0, stream>>> (
            itemCount,
            d_bin_index,
            d_bin_sub_index,
            d_pbm,
            streamResources[streamResourceId].d_data, static_cast<unsigned int>(sd.size()));
    gpuErrchkLaunch();
    gpuErrchk(cudaStreamSynchronize(stream));  // @todo - async + sync variants.
}

__global__ void scatter_new_agents(
    const unsigned int threadCount,
    const unsigned int agent_size,
    CUDAScatter::ScatterData *scatter_data,
    const unsigned int scatter_len,
    const unsigned int out_index_offset) {
    // global thread index
    int index = (blockIdx.x*blockDim.x) + threadIdx.x;

    if (index >= threadCount) return;

    // Which variable are we outputting
    const unsigned int var_out = index % scatter_len;
    const unsigned int agent_index = index / scatter_len;

    // if optional message is to be written
    char * const in_ptr = scatter_data[var_out].in + (agent_index * agent_size);
    char * const out_ptr = scatter_data[var_out].out + ((out_index_offset + agent_index) * scatter_data[var_out].typeLen);
    memcpy(out_ptr, in_ptr, scatter_data[var_out].typeLen);
}
void CUDAScatter::scatterNewAgents(
    const unsigned int streamResourceId,
    const cudaStream_t stream,
    const std::vector<ScatterData> &sd,
    const size_t totalAgentSize,
    const unsigned int inCount,
    const unsigned int outIndexOffset) {
    // 1 thread per agent variable
    const unsigned int threadCount = static_cast<unsigned int>(sd.size()) * inCount;
    int blockSize = 0;  // The launch configurator returned block size
    int minGridSize = 0;  // The minimum grid size needed to achieve the // maximum occupancy for a full device // launch
    int gridSize = 0;  // The actual grid size needed, based on input size

    // calculate the grid block size for main agent function
    gpuErrchk(cudaOccupancyMaxPotentialBlockSize(&minGridSize, &blockSize, scatter_new_agents, 0, threadCount));
    gridSize = (threadCount + blockSize - 1) / blockSize;
    streamResources[streamResourceId].resize(static_cast<unsigned int>(sd.size()));
    // Important that sd.size() is still used here, incase allocated len (data_len) is bigger
    gpuErrchk(cudaMemcpyAsync(streamResources[streamResourceId].d_data, sd.data(), sizeof(ScatterData) * sd.size(), cudaMemcpyHostToDevice, stream));
    scatter_new_agents <<<gridSize, blockSize, 0, stream>>> (
        threadCount,
        static_cast<unsigned int>(totalAgentSize),
        streamResources[streamResourceId].d_data, static_cast<unsigned int>(sd.size()),
        outIndexOffset);
    gpuErrchkLaunch();
    gpuErrchk(cudaStreamSynchronize(stream));  // @todo - async + sync variants.
}
__global__ void broadcastInitKernel(
    const unsigned int threadCount,
    CUDAScatter::ScatterData *scatter_data,
    const unsigned int scatter_len,
    const unsigned int out_index_offset) {
    // global thread index
    int index = (blockIdx.x*blockDim.x) + threadIdx.x;

    if (index >= threadCount) return;

    // Which variable are we outputting
    const unsigned int var_out = index % scatter_len;
    const unsigned int agent_index = index / scatter_len;
    const unsigned int type_len = scatter_data[var_out].typeLen;
    // if optional message is to be written
    char * const in_ptr = scatter_data[var_out].in;
    char * const out_ptr = scatter_data[var_out].out + ((out_index_offset + agent_index) * type_len);
    memcpy(out_ptr, in_ptr, type_len);
}
void CUDAScatter::broadcastInit(
    unsigned int streamResourceId,
    cudaStream_t stream,
    const std::list<std::shared_ptr<VariableBuffer>> &vars,
    unsigned int inCount,
    unsigned int outIndexOffset) {
    broadcastInit_async(streamResourceId, stream, vars, inCount, outIndexOffset);
    gpuErrchk(cudaStreamSynchronize(stream));
}
void CUDAScatter::broadcastInit_async(
    unsigned int streamResourceId,
    cudaStream_t stream,
    const std::list<std::shared_ptr<VariableBuffer>>& vars,
    unsigned int inCount,
    unsigned int outIndexOffset) {
    // No variables means no work to do
    if (vars.size() == 0) return;
    // 1 thread per agent variable
    const unsigned int threadCount = static_cast<unsigned int>(vars.size()) * inCount;
    int blockSize = 0;  // The launch configurator returned block size
    int minGridSize = 0;  // The minimum grid size needed to achieve the // maximum occupancy for a full device // launch
    int gridSize = 0;  // The actual grid size needed, based on input size

    // calculate the grid block size for main agent function
    gpuErrchk(cudaOccupancyMaxPotentialBlockSize(&minGridSize, &blockSize, broadcastInitKernel, 0, threadCount));
    gridSize = (threadCount + blockSize - 1) / blockSize;
    // Calculate memory usage (crudely in multiples of ScatterData)
    ptrdiff_t offset = 0;
    for (const auto &v : vars) {
        offset += v->type_size * v->elements;
    }
    streamResources[streamResourceId].resize(static_cast<unsigned int>(offset + vars.size() * sizeof(ScatterData)));
    // Build scatter data structure and init data
    std::vector<ScatterData> sd;
    char *default_data = reinterpret_cast<char*>(malloc(offset));
    offset = 0;
    for (const auto &v : vars) {
        // Scatter data
        char *in_p = reinterpret_cast<char*>(streamResources[streamResourceId].d_data) + offset;
        char *out_p = reinterpret_cast<char*>(v->data_condition);
        sd.push_back({ v->type_size * v->elements, in_p, out_p });
        // Init data
        memcpy(default_data + offset, v->default_value, v->type_size * v->elements);
        // Update offset
        offset += v->type_size * v->elements;
    }
    // Important that sd.size() is used here, as allocated len would exceed 2nd memcpy
    gpuErrchk(cudaMemcpyAsync(streamResources[streamResourceId].d_data, default_data, offset, cudaMemcpyHostToDevice, stream));
    gpuErrchk(cudaMemcpyAsync(streamResources[streamResourceId].d_data + offset, sd.data(), sizeof(ScatterData) * sd.size(), cudaMemcpyHostToDevice, stream));
    ::free(default_data);
    broadcastInitKernel <<<gridSize, blockSize, 0, stream>>> (
        threadCount,
        streamResources[streamResourceId].d_data + offset, static_cast<unsigned int>(sd.size()),
        outIndexOffset);
    gpuErrchkLaunch();
}
void CUDAScatter::broadcastInit(
    unsigned int streamResourceId,
    cudaStream_t stream,
    const VariableMap& vars,
    void* const d_newBuff,
    unsigned int inCount,
    unsigned int outIndexOffset) {
    broadcastInit_async(streamResourceId, stream, vars, d_newBuff, inCount, outIndexOffset);
    gpuErrchk(cudaStreamSynchronize(stream));
}
void CUDAScatter::broadcastInit_async(
    unsigned int streamResourceId,
    cudaStream_t stream,
    const VariableMap &vars,
    void * const d_newBuff,
    unsigned int inCount,
    unsigned int outIndexOffset) {
    // 1 thread per agent variable
    const unsigned int threadCount = static_cast<unsigned int>(vars.size()) * inCount;
    int blockSize = 0;  // The launch configurator returned block size
    int minGridSize = 0;  // The minimum grid size needed to achieve the // maximum occupancy for a full device // launch
    int gridSize = 0;  // The actual grid size needed, based on input size

    // calculate the grid block size for main agent function
    gpuErrchk(cudaOccupancyMaxPotentialBlockSize(&minGridSize, &blockSize, broadcastInitKernel, 0, threadCount));
    gridSize = (threadCount + blockSize - 1) / blockSize;
    // Calculate memory usage (crudely in multiples of ScatterData)
    std::vector<ScatterData> sd;
    ptrdiff_t offset = 0;
    for (const auto &v : vars) {
        offset += v.second.type_size * v.second.elements;
    }
    char *default_data = reinterpret_cast<char*>(malloc(offset));
    streamResources[streamResourceId].resize(static_cast<unsigned int>(offset + vars.size() * sizeof(ScatterData)));
    // Build scatter data structure
    offset = 0;
    char * d_var = static_cast<char*>(d_newBuff);
    for (const auto &v : vars) {
        // In this case, in is the location of first variable, but we step by inOffsetData.totalSize
        char *in_p = reinterpret_cast<char*>(streamResources[streamResourceId].d_data) + offset;
        char *out_p = d_var;
        sd.push_back({ v.second.type_size * v.second.elements, in_p, out_p });
        // Build init data
        memcpy(default_data + offset, v.second.default_value, v.second.type_size * v.second.elements);
        // Prep pointer for next var
        d_var += v.second.type_size * v.second.elements * inCount;
        // Update offset
        offset += v.second.type_size * v.second.elements;
    }
    // Important that sd.size() is still used here, incase allocated len (data_len) is bigger
    gpuErrchk(cudaMemcpyAsync(streamResources[streamResourceId].d_data, default_data, offset, cudaMemcpyHostToDevice, stream));
    gpuErrchk(cudaMemcpyAsync(streamResources[streamResourceId].d_data + offset, sd.data(), sizeof(ScatterData) * sd.size(), cudaMemcpyHostToDevice, stream));
    ::free(default_data);
    broadcastInitKernel <<<gridSize, blockSize, 0, stream>>> (
        threadCount,
        streamResources[streamResourceId].d_data + offset, static_cast<unsigned int>(sd.size()),
        outIndexOffset);
    gpuErrchkLaunch();
}
__global__ void reorder_array_messages(
    const unsigned int threadCount,
    const unsigned int array_length,
    const unsigned int *d_position,
#if !defined(FLAMEGPU_SEATBELTS) || FLAMEGPU_SEATBELTS
    unsigned int *d_write_flag,
#endif
    CUDAScatter::ScatterData *scatter_data,
    const unsigned int scatter_len
) {
    // global thread index
    int index = (blockIdx.x*blockDim.x) + threadIdx.x;

    if (index >= threadCount) return;

    const unsigned int output_index = d_position[index];
    // If out of bounds, put it in 1 out of bounds slot
    if (output_index < array_length) {
        for (unsigned int i = 0; i < scatter_len; ++i) {
            memcpy(scatter_data[i].out + (output_index * scatter_data[i].typeLen), scatter_data[i].in + (index * scatter_data[i].typeLen), scatter_data[i].typeLen);
        }
#if !defined(FLAMEGPU_SEATBELTS) || FLAMEGPU_SEATBELTS
        // Set err check flag
        atomicInc(d_write_flag + output_index, UINT_MAX);
#endif
    }
}
void CUDAScatter::arrayMessageReorder(
    const unsigned int streamResourceId,
    const cudaStream_t stream,
    const VariableMap &vars,
    const std::map<std::string, void*> &in,
    const std::map<std::string, void*> &out,
    const unsigned int itemCount,
    const unsigned int array_length,
    unsigned int *d_write_flag) {
    // If itemCount is 0, then there is no work to be done.
    if (itemCount == 0) {
        return;
    }

    if (itemCount > array_length) {
        THROW exception::ArrayMessageWriteConflict("Too many messages output for array message structure (%u > %u).\n", itemCount, array_length);
    }
    int blockSize = 0;  // The launch configurator returned block size
    int minGridSize = 0;  // The minimum grid size needed to achieve the // maximum occupancy for a full device // launch
    int gridSize = 0;  // The actual grid size needed, based on input size
                       // calculate the grid block size for main agent function
    cudaOccupancyMaxPotentialBlockSize(&minGridSize, &blockSize, reorder_array_messages, 0, itemCount);
    gridSize = (itemCount + blockSize - 1) / blockSize;
    unsigned int *d_position = nullptr;
    // Build AoS -> AoS list
    std::vector<ScatterData> sd;
    for (const auto &v : vars) {
        if (v.first != "___INDEX") {
            char *in_p = reinterpret_cast<char*>(in.at(v.first));
            char *out_p = reinterpret_cast<char*>(out.at(v.first));
            sd.push_back({ v.second.type_size * v.second.elements, in_p, out_p });
        } else {  // Special case, log index var
            d_position = reinterpret_cast<unsigned int*>(in.at(v.first));
            d_write_flag = d_write_flag ? d_write_flag : reinterpret_cast<unsigned int*>(out.at(v.first));
        }
    }
    assert(d_position);  // Not an array message, lacking ___INDEX var
    size_t t_data_len = 0;
    {  // Decide per-stream resource memory requirements based on curve data, and potentially cub temp memory
#if !defined(FLAMEGPU_SEATBELTS) || FLAMEGPU_SEATBELTS
        // Query cub to find the number of temporary bytes required.
        gpuErrchk(cub::DeviceReduce::Max(nullptr, t_data_len, d_write_flag, d_position, array_length, stream));
#endif
        // the num bytes required is maximum of the per message variable ScatterData and the cub temp bytes required
        const size_t srBytesRequired = std::max(sd.size() * sizeof(ScatterData), t_data_len);
        // The number of bytes currently allocated
        const size_t srBytesAllocated = streamResources[streamResourceId].data_len * sizeof(ScatterData);
        // If not enough bytes are allocated, perform an appropriate resize
        if (srBytesRequired > srBytesAllocated) {
            const size_t elementsRequired = ((srBytesRequired - 1) / sizeof(ScatterData)) + 1;
            streamResources[streamResourceId].resize(static_cast<unsigned int>(elementsRequired));
        }
    }
    // Important that sd.size() is still used here, incase allocated len (data_len) is bigger
    gpuErrchk(cudaMemcpyAsync(streamResources[streamResourceId].d_data, sd.data(), sizeof(ScatterData) * sd.size(), cudaMemcpyHostToDevice, stream));
    reorder_array_messages <<<gridSize, blockSize, 0, stream >>> (
        itemCount, array_length,
        d_position,
#if !defined(FLAMEGPU_SEATBELTS) || FLAMEGPU_SEATBELTS
        d_write_flag,
#endif
        streamResources[streamResourceId].d_data, static_cast<unsigned int>(sd.size()));
    gpuErrchkLaunch();
#if !defined(FLAMEGPU_SEATBELTS) || FLAMEGPU_SEATBELTS
    // Check d_write_flag for dupes
    gpuErrchk(cub::DeviceReduce::Max(streamResources[streamResourceId].d_data, t_data_len, d_write_flag, d_position, array_length, stream));
    unsigned int maxBinSize = 0;
    gpuErrchk(cudaMemcpyAsync(&maxBinSize, d_position, sizeof(unsigned int), cudaMemcpyDeviceToHost, stream));
    gpuErrchk(cudaStreamSynchronize(stream));
    if (maxBinSize > 1) {
        // Too many messages for single element of array
        // Report bad ones
        unsigned int *hd_write_flag = (unsigned int *)malloc(sizeof(unsigned int) * array_length);
        gpuErrchk(cudaMemcpy(hd_write_flag, d_write_flag, sizeof(unsigned int)* array_length, cudaMemcpyDeviceToHost));
        unsigned int last_fail_index = std::numeric_limits<unsigned int>::max();
        for (unsigned int i = 0; i < array_length; ++i) {
            if (hd_write_flag[i] > 1) {
                fprintf(stderr, "Array messagelist contains %u messages at index %u!\n", hd_write_flag[i], i);
                last_fail_index = i;
            }
        }
        if (last_fail_index == 0) {
            fprintf(stderr, "This may occur if optional message output was not enabled, and some agents failed to create a message.\n");
        }
        THROW exception::ArrayMessageWriteConflict("Multiple threads output array messages to the same index, see stderr.\n");
    }
#endif
}

}  // namespace detail
}  // namespace flamegpu