Program Listing for File DeviceAgentVector_impl.cu

Return to documentation for file (src/flamegpu/runtime/agent/DeviceAgentVector_impl.cu)

#include "flamegpu/runtime/agent/DeviceAgentVector_impl.h"
#include "flamegpu/simulation/detail/CUDAAgent.h"
#include "flamegpu/runtime/agent/HostNewAgentAPI.h"

namespace flamegpu {

DeviceAgentVector_impl::DeviceAgentVector_impl(detail::CUDAAgent& _cuda_agent, const std::string &_cuda_agent_state,
                                               const VarOffsetStruct& _agentOffsets, std::vector<NewAgentStorage>& _newAgentData,
                                               detail::CUDAScatter& _scatter, const unsigned int _streamId, const cudaStream_t _stream)
    : AgentVector(_cuda_agent.getAgentDescription(), 0)
    , unbound_buffers_has_changed(false)
    , known_device_buffer_size(_cuda_agent.getStateSize(_cuda_agent_state))
    , cuda_agent(_cuda_agent)
    , cuda_agent_state(_cuda_agent_state)
    , agentOffsets(_agentOffsets)
    , newAgentData(_newAgentData)
    , scatter(_scatter)
    , streamId(_streamId)
    , stream(_stream) {
    // Create an empty AgentVector and initialise it manually
    // For each variable create an uninitialised array of variable data
    _size = known_device_buffer_size;
    internal_resize(_size, false);
    // Mark all variables as Invalid
    for (const auto& v : agent->variables)
        invalid_variables.insert(v.first);
    // Grab the unbound variable buffers from the CUDAFatAgentStateList
    // Leave their host counterparts de-allocated until required
    {
        const auto buffs = cuda_agent.getUnboundVariableBuffers(cuda_agent_state);
        for (auto &d_buff : buffs)
            unbound_buffers.emplace_back(d_buff);
        unbound_host_buffer_invalid = true;
    }
}

void DeviceAgentVector_impl::syncChanges() {
    // Resize device buffers if necessary
    const unsigned int old_allocated_size = cuda_agent.getStateAllocatedSize(cuda_agent_state);
    if (_size > old_allocated_size) {
        const unsigned int old_size = cuda_agent.getStateSize(cuda_agent_state);
        // Resize the underlying variable buffers for this agent state and retain variable data
        cuda_agent.resizeState(cuda_agent_state, _size, true, stream);  // @todo Don't retain data for mapped buffers?
        // Init agent data for any variables of newly created agents which are only present in a parent model
        const unsigned int new_allocated_size = cuda_agent.getStateAllocatedSize(cuda_agent_state);
        // This call does not use streams properly internally
        cuda_agent.initExcludedVars(cuda_agent_state, new_allocated_size - old_size, old_size, scatter, streamId, stream);
    }
    _requireLength();
    // Copy all changes back to device
    for (const auto &ch : change_detail) {
        auto &v = agent->variables.at(ch.first);
        // Copy back variable data into each array
        const char* host_src = static_cast<const char*>(_data->at(ch.first)->getDataPtr());
        char* device_dest = static_cast<char*>(cuda_agent.getStateVariablePtr(cuda_agent_state, ch.first));
        const size_t copy_offset = ch.second.first * v.type_size * v.elements;
        const size_t copy_len = (ch.second.second - ch.second.first) * v.type_size * v.elements;
        gpuErrchk(cudaMemcpyAsync(device_dest + copy_offset, host_src + copy_offset, copy_len, cudaMemcpyHostToDevice, stream));
    }
    change_detail.clear();
    // Copy all unbound buffes
    if (unbound_buffers_has_changed) {
        if (unbound_host_buffer_size != _size) {
            THROW exception::InvalidOperation("Unbound buffers have gone out of sync, in DeviceAgentVector::syncChanges().\n");
        }
        for (auto &buff : unbound_buffers) {
            const size_t variable_size = buff.device->type_size * buff.device->elements;
            gpuErrchk(cudaMemcpyAsync(buff.device->data, buff.host, unbound_host_buffer_size * variable_size, cudaMemcpyHostToDevice, stream));
        }
        unbound_buffers_has_changed = false;
    }
    gpuErrchk(cudaStreamSynchronize(stream));
    // Update CUDAAgent statelist size
    cuda_agent.setStateAgentCount(cuda_agent_state, _size);
}
void DeviceAgentVector_impl::purgeCache() {
    _size = cuda_agent.getStateSize(cuda_agent_state);
    // All variables are now invalid
    for (const auto& v : agent->variables)
        invalid_variables.insert(v.first);
    // Mark all unbound host buffers as requiring update
    unbound_host_buffer_invalid = false;
    unbound_host_buffer_size = 0;
    known_device_buffer_size = cuda_agent.getStateSize(cuda_agent_state);
    unbound_buffers_has_changed = false;
}

void DeviceAgentVector_impl::initUnboundBuffers() {
    if (!_capacity)
      return;
    const unsigned int device_len = cuda_agent.getStateSize(cuda_agent_state);
    const unsigned int copy_len = _size < device_len ? _size : device_len;
    // Resize to match _capacity
    for (auto &buff : unbound_buffers) {
        if (buff.host) {
            THROW exception::InvalidOperation("Host buffer is already allocated, in DeviceAgentVector::initUnboundBuffers().\n");
        }
        // Alloc
        const size_t var_size = buff.device->type_size * buff.device->elements;
        buff.host = static_cast<char*>(malloc(_capacity * var_size));
        // DtH memcpy
        gpuErrchk(cudaMemcpyAsync(buff.host, buff.device->data, copy_len * var_size, cudaMemcpyDeviceToHost, stream));
        // Not sure this will ever happen, but better safe
        for (unsigned int i = device_len; i < _size; ++i) {
            // We have unknown agents, default init them
            memcpy(buff.host + i * var_size, buff.device->default_value, var_size);
        }
    }
    gpuErrchk(cudaStreamSynchronize(stream));
    unbound_host_buffer_capacity = _capacity;
    unbound_host_buffer_size = copy_len;
    unbound_buffers_has_changed = true;  // Probably not required, but if they are being init, high chance they're going to be changed
    unbound_host_buffer_invalid = false;
}
void DeviceAgentVector_impl::reinitUnboundBuffers() {
    const unsigned int device_len = cuda_agent.getStateSize(cuda_agent_state);
    const unsigned int copy_len = _size;
    if (device_len > _size) {
        THROW exception::InvalidOperation("Unexpected state, in DeviceAgentVector::reinitUnboundBuffers()\n");
    }
    // Resize to match _capacity
    for (auto& buff : unbound_buffers) {
        if (!buff.host) {
            THROW exception::InvalidOperation("Host buffer is not already allocated, in DeviceAgentVector::reinitUnboundBuffers().\n");
        }
        const size_t var_size = buff.device->type_size * buff.device->elements;
        if (unbound_host_buffer_capacity < _capacity) {
            free(buff.host);
            // Alloc
            buff.host = static_cast<char*>(malloc(_capacity * var_size));
        }
        // DtH memcpy
        gpuErrchk(cudaMemcpyAsync(buff.host, buff.device->data, copy_len * var_size, cudaMemcpyDeviceToHost, stream));
        // Not sure this will ever happen, but better safe
        for (unsigned int i = device_len; i < _size; ++i) {
            // We have unknown agents, default init them
            memcpy(buff.host + i * var_size, buff.device->default_value, var_size);
        }
    }
    gpuErrchk(cudaStreamSynchronize(stream));
    unbound_host_buffer_capacity = unbound_host_buffer_capacity < _capacity ?_capacity : unbound_host_buffer_capacity;
    unbound_host_buffer_size = copy_len;
    unbound_buffers_has_changed = true;  // Probably not required, but if they are being init, high chance they're going to be changed
    unbound_host_buffer_invalid = false;
}
void DeviceAgentVector_impl::resizeUnboundBuffers(const unsigned int new_capacity, bool init) {
    // Resize to match agent_count
    for (auto& buff : unbound_buffers) {
        if (!buff.host) {
            THROW exception::InvalidOperation("Not setup to resize before init");
        }
        // Alloc new buff
        const size_t var_size = buff.device->type_size * buff.device->elements;
        char *t = static_cast<char*>(malloc(new_capacity * var_size));
        // Copy data across
        const unsigned int copy_len = _size < unbound_host_buffer_capacity ? _size : unbound_host_buffer_capacity;
        memcpy(t, buff.host, copy_len * var_size);
        // Free old
        free(buff.host);
        // Replace old ptr
        buff.host = t;
        if (init) {
            for (unsigned int i = unbound_host_buffer_capacity; i < new_capacity; ++i) {
                // We have unknown agents, default init them
                memcpy(buff.host + i * var_size, buff.device->default_value, var_size);
            }
        }
    }
    unbound_host_buffer_capacity = new_capacity;
    // unbound_host_buffer_size = agent_count;  // This would only make sense for init, but consisent behaviour is better
    unbound_buffers_has_changed = true;  // Probably not required, but if they are resized, high chance theyre going to change
}

void DeviceAgentVector_impl::_insert(size_type pos, size_type count) {
    if (!count)
        return;
    // Init ID for all the inserted agents
    {
        auto d = _data->find(ID_VARIABLE_NAME);
        if (d != _data->end()) {
            _require(ID_VARIABLE_NAME);
            id_t *h_ptr = static_cast<id_t*>(d->second->getDataPtr());
            for (unsigned int i = pos; i < pos + count; ++i) {
                // Always assign ID, as AgentVector should reset these to unset, but this saves us checking
                // if (h_ptr[i] == ID_NOT_SET) {
                    h_ptr[i] = cuda_agent.nextID();
                // }
            }
            _changedAfter(ID_VARIABLE_NAME, pos);
        } else {
            THROW exception::InvalidOperation("Internal agent ID variable was not found, "
                "in DeviceAgentVector_impl._insert().");
        }
    }
    // No unbound buffers, return
    if (unbound_buffers.empty())
        return;
    // Unbound buffers first use, init
    // This updates unbound_host_buffer_size to match known_device_buffer_size
    if (!unbound_host_buffer_capacity)
        initUnboundBuffers();
    // Resizes unbound buffers if necessary
    const size_type new_size = known_device_buffer_size + count;
    if (new_size > unbound_host_buffer_capacity) {
        resizeUnboundBuffers(_capacity, false);
        // Init new agents that won't be init by the replacement below
        for (auto& buff : unbound_buffers) {
            const size_t variable_size = buff.device->type_size * buff.device->elements;
            for (unsigned int i = new_size; i < _capacity; ++i) {
                memcpy(buff.host + i * variable_size, buff.device->default_value, variable_size);
            }
        }
    }
    if (unbound_host_buffer_invalid) {
        // Redownload unbound buffers from device
        reinitUnboundBuffers();
    }
    //  Move all items behind pos, then init all the newly inserted
    for (auto& buff : unbound_buffers) {
        const size_t variable_size = buff.device->type_size * buff.device->elements;
        // Move all items after this index backwards count places
        for (unsigned int i = known_device_buffer_size - 1; i >= pos; --i) {
            // Copy items individually, incase the src and destination overlap
            memcpy(buff.host + (i + count) * variable_size, buff.host + i * variable_size, variable_size);
        }
        // Default init the inserted variables
        for (unsigned int i = pos; i < pos + count; ++i) {
            memcpy(buff.host + i * variable_size, buff.device->default_value, variable_size);
        }
    }
    // Update size
    unbound_buffers_has_changed = true;
    unbound_host_buffer_size = new_size;
    known_device_buffer_size = _size;
    if (unbound_host_buffer_size != _size) {
        THROW exception::InvalidOperation("Unbound buffers have gone out of sync, in DeviceAgentVector::_insert().\n");
    }
    // Update change detail for all variables
    for (const auto& v : agent->variables) {
        // Does it exist in change map
        auto change = change_detail.find(v.first);
        if (change == change_detail.end()) {
            change_detail.emplace(v.first, std::pair<size_type, size_type>{pos, _size});
        } else {
            // Inclusive min bound
            change->second.first = change->second.first > pos ? pos : change->second.first;
            // Exclusive max bound
            change->second.second = _size;
        }
    }
}
void DeviceAgentVector_impl::_erase(size_type pos, size_type count) {
    // No unbound buffers, return
    if (unbound_buffers.empty() || !count)
        return;
    // Unbound buffers first use, init
    if (!unbound_host_buffer_capacity)
        initUnboundBuffers();
    if (unbound_host_buffer_invalid) {
        // Redownload unbound buffers from device
        reinitUnboundBuffers();
    }
    const size_type new_size = known_device_buffer_size - count;
    const size_type copy_start = pos + count;
    for (auto& buff : unbound_buffers) {
        const size_t variable_size = buff.device->type_size * buff.device->elements;
        // Move all items after this index forwards count places
        for (unsigned int i = copy_start; i < unbound_host_buffer_size; ++i) {
            // Copy items individually, incase the src and destination overlap
            memcpy(buff.host + (i - count) * variable_size, buff.host + i * variable_size, variable_size);
        }
        // Default init the empty variables at the end
        for (unsigned int i = new_size; i < known_device_buffer_size; ++i) {
            memcpy(buff.host + i * variable_size, buff.device->default_value, variable_size);
        }
    }
    // Update size
    unbound_buffers_has_changed = true;
    unbound_host_buffer_size = new_size;
    known_device_buffer_size = _size;
    if (unbound_host_buffer_size != _size) {
        THROW exception::InvalidOperation("Unbound buffers have gone out of sync, in DeviceAgentVector::_erase().\n");
    }
    // Update change detail for all variables
    for (const auto &v : agent->variables) {
        // Does it exist in change map
        auto change = change_detail.find(v.first);
        if (change == change_detail.end()) {
            change_detail.emplace(v.first, std::pair<size_type, size_type>{pos, _size});
        } else {
            // Inclusive min bound
            change->second.first = change->second.first > pos ? pos : change->second.first;
            // Exclusive max bound
            change->second.second = _size;
        }
    }
}


void DeviceAgentVector_impl::_changed(const std::string& variable_name, size_type pos) {
    // Check the variable exists
    auto var = agent->variables.find(variable_name);
    if (var == agent->variables.end()) {
        THROW exception::InvalidAgentVar("Variable %s was not found, "
            "in DeviceAgentVector::_changed()\n",
            variable_name.c_str());
    }
    // Does it exist in change map
    auto change = change_detail.find(variable_name);
    if (change == change_detail.end()) {
        change_detail.emplace(variable_name, std::pair<size_type, size_type>{pos, pos + 1});
    } else {
        // Inclusive min bound
        change->second.first = change->second.first > pos ? pos : change->second.first;
        // Exclusive max bound
        change->second.second = change->second.second <= pos ? pos + 1 : change->second.second;
    }
}
void DeviceAgentVector_impl::_changedAfter(const std::string& variable_name, size_type pos) {
    // Check the variable exists
    auto var = agent->variables.find(variable_name);
    if (var == agent->variables.end()) {
        THROW exception::InvalidAgentVar("Variable %s was not found, "
            "in DeviceAgentVector::_changed()\n",
            variable_name.c_str());
    }
    // Does it exist in change map
    auto change = change_detail.find(variable_name);
    if (change == change_detail.end()) {
        change_detail.emplace(variable_name, std::pair<size_type, size_type>{pos, _size});
    } else {
        // Inclusive min bound
        change->second.first = change->second.first > pos ? pos : change->second.first;
        // Exclusive max bound
        change->second.second = _size;
    }
}
void DeviceAgentVector_impl::_require(const std::string& variable_name) const {
    if (invalid_variables.find(variable_name) !=invalid_variables.end()) {
        const auto& v = agent->variables.at(variable_name);
        // Copy back variable data into array
        void* host_dest = _data->at(variable_name)->getDataPtr();
        const void* device_src = cuda_agent.getStateVariablePtr(cuda_agent_state, variable_name);
        gpuErrchk(cudaMemcpyAsync(host_dest, device_src, _size * v.type_size * v.elements, cudaMemcpyDeviceToHost, stream));
        if (_capacity > _size) {
            // Default-init remaining buffer space
            const auto it = _data->find(variable_name);
            const size_t variable_size = v.type_size * v.elements;
            char* t_data = static_cast<char*>(it->second->getDataPtr());
            for (unsigned int i = _size; i < _capacity; ++i) {
                memcpy(t_data + i * variable_size, v.default_value, variable_size);
            }
        }
        // The invalid variable is now current
        invalid_variables.erase(variable_name);
        gpuErrchk(cudaStreamSynchronize(stream));
    }
}
void DeviceAgentVector_impl::_requireAll() const {
    for (const auto& vn : invalid_variables) {
        const auto &v = agent->variables.at(vn);
        // Copy back variable data into array
        void* host_dest = _data->at(vn)->getDataPtr();
        const void* device_src = cuda_agent.getStateVariablePtr(cuda_agent_state, vn);
        gpuErrchk(cudaMemcpyAsync(host_dest, device_src, _size * v.type_size * v.elements, cudaMemcpyDeviceToHost, stream));
    }
    // Perform the cuda ops in a separate loop to host inits, gives a slight bit of time to eat latency
    for (const auto& vn : invalid_variables) {
        if (_capacity > _size) {
            const auto& v = agent->variables.at(vn);
            // Default-init remaining buffer space
            const auto it = _data->find(vn);
            const size_t variable_size = v.type_size * v.elements;
            char* t_data = static_cast<char*>(it->second->getDataPtr());
            for (unsigned int i = _size; i < _capacity; ++i) {
                memcpy(t_data + i * variable_size, v.default_value, variable_size);
            }
        }
    }
    // All invalid variables are now current
    invalid_variables.clear();
    gpuErrchk(cudaStreamSynchronize(stream));
}
void DeviceAgentVector_impl::_requireLength() const {
    if (newAgentData.empty())
        return;
    if (_size + newAgentData.size() > _capacity) {
        // BEGIN: Re implementation of AgentVector::resize(size_type, bool)
        // Can't call it here, as would have huge knock-on effects to which methods can/can't be const
        const_cast<DeviceAgentVector_impl*>(this)->internal_resize(_size + static_cast<size_type>(newAgentData.size()), false);
        // END: Re implementation of AgentVector::resize(size_type, bool)
    }
    _requireAll();
    // Check if host new agent has any agents
    for (auto &newAgent : newAgentData) {
        // Manually insert them to device agent vector
        for (auto &v : agentOffsets.vars) {
            char* dst = static_cast<char*>(_data->at(v.first)->getDataPtr()) + _size * v.second.len;
            const char * src = newAgent.data + v.second.offset;
            memcpy(dst, src, v.second.len);
        }
        // Increase size
        ++_size;
    }
    // This updates unbound buffers
    // BEGIN: Re implementation of DeviceAgentVector_t::_insert(size_type, size_type)
    // Can't call it here, as would have huge knock-on effects to which methods can/can't be const
    const_cast<DeviceAgentVector_impl*>(this)->_insert(_size - static_cast<size_type>(newAgentData.size()), static_cast<size_type>(newAgentData.size()));
    // END: Re implementation of DeviceAgentVector_t::_insert(size_type, size_type)
    newAgentData.clear();
}


}  // namespace flamegpu