Skip to content

Commit

Permalink
XXX
Browse files Browse the repository at this point in the history
  • Loading branch information
igchor committed Dec 3, 2024
1 parent 38c8396 commit 7713188
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 110 deletions.
4 changes: 2 additions & 2 deletions source/adapters/level_zero/v2/event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ ur_result_t ur_event_handle_t_::release() {
return UR_RESULT_SUCCESS;

// Need to take a lock before checking if the event is timestamped.
std::unique_lock<ur_shared_mutex> lock(Mutex);
// std::unique_lock<ur_shared_mutex> lock(Mutex);

if (isTimestamped() && !getEventEndTimestamp()) {
// L0 will write end timestamp to this event some time in the future,
Expand All @@ -161,7 +161,7 @@ ur_result_t ur_event_handle_t_::release() {

// Need to unlock now, as forceRelease might deallocate memory backing
// the Mutex.
lock.unlock();
// lock.unlock();

return this->forceRelease();
}
Expand Down
4 changes: 4 additions & 0 deletions source/adapters/level_zero/v2/event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ struct ur_event_handle_t_ : _ur_object {
this->completed = completed;
}

bool isCompleted() const {
return completed;
}

// Record the start timestamp of the event, to be obtained by
// urEventGetProfilingInfo. resetQueueAndCommand should be
// called before this.
Expand Down
46 changes: 3 additions & 43 deletions source/adapters/level_zero/v2/event_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,54 +50,14 @@ ur_pooled_event_t *event_pool::allocate() {
return event;
}

void event_pool::forceCleanupExecuting() {
TRACK_SCOPE_LATENCY("event_pool::forceCleanupExecuting");

std::unique_lock<std::mutex> lock(*mutex);

for (auto &event : executing) {
event->reset();
freelist.push_back(event);
}

executing.clear();
}

void event_pool::cleanupExecuting() {
TRACK_SCOPE_LATENCY("event_pool::cleanupExecuting");

std::unique_lock<std::mutex> lock(*mutex);

if (executing.size() < EVENT_POOL_CLEANUP_SIZE) {
return;
}

size_t completed = 0;
for (size_t i = 0; i < std::min(CHECK_EXECUTING, executing.size()); i++) {
if (zeEventQueryStatus(executing[i]->getZeEvent()) == ZE_RESULT_SUCCESS) {
executing[i]->reset();
freelist.push_back(executing[i]);
completed++;
} else {
break;
}
}

executing.erase(executing.begin(), executing.begin() + completed);
}

void event_pool::free(ur_pooled_event_t *event, bool completed) {
TRACK_SCOPE_LATENCY("event_pool::free");

std::unique_lock<std::mutex> lock(*mutex);

if (completed) {
event->reset();
event->setCompleted(false);
freelist.push_back(event);
} else {
executing.push_back(event);
}
event->reset();
event->setCompleted(false);
freelist.push_back(event);

// The event is still in the pool, so we need to increment the refcount
assert(event->RefCount.load() == 0);
Expand Down
14 changes: 1 addition & 13 deletions source/adapters/level_zero/v2/event_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,23 +40,12 @@ class event_pool {
event_pool(const event_pool &) = delete;
event_pool &operator=(const event_pool &) = delete;

~event_pool() {
for (auto &event : executing) {
zeEventHostSynchronize(event->getZeEvent(), UINT64_MAX);
}
}

// Allocate an event from the pool. Thread safe.
ur_pooled_event_t *allocate();

// Free an event back to the pool. Thread safe.
void free(ur_pooled_event_t *event, bool completed);

// Move events from the executing list to the freelist (if they are
// completed). Thread safe.
void cleanupExecuting();
void forceCleanupExecuting();

event_provider *getProvider() const;
event_flags_t getFlags() const;

Expand All @@ -65,8 +54,7 @@ class event_pool {
std::unique_ptr<event_provider> provider;

std::deque<ur_pooled_event_t> events;
std::deque<ur_pooled_event_t *> executing;
std::vector<ur_pooled_event_t *> freelist;
std::deque<ur_pooled_event_t *> freelist;

std::unique_ptr<std::mutex> mutex;
};
Expand Down
106 changes: 58 additions & 48 deletions source/adapters/level_zero/v2/queue_immediate_in_order.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,50 +26,49 @@ static uint64_t CHECK_EXECUTING = []() {
}();

static uint64_t EVENT_POOL_CLEANUP_SIZE = []() {
return getenv_to_unsigned("UR_L0_V2_EVENT_POOL_CLEANUP_SIZE").value_or(100);
return getenv_to_unsigned("UR_L0_V2_EVENT_POOL_CLEANUP_SIZE").value_or(1024);
}();

void ur_command_list_handler_t::cleanupEvents(bool force){
size_t numToCleanup = std::min(CHECK_EXECUTING, executing.size());

if (executing.size() > EVENT_POOL_CLEANUP_SIZE || force) {
if (force) numToCleanup = executing.size();
size_t completed = 0;
for (size_t i = 0; i < numToCleanup; i++) {
if (zeEventQueryStatus(executing[i]->getZeEvent()) == ZE_RESULT_SUCCESS) {
executing[i]->setCompleted(true);
ur::level_zero::urEventRelease(executing[i]);
completed++;
} else {
break;
}
}
std::pair<ze_event_handle_t *, uint32_t>
ur_queue_immediate_in_order_t::getWaitListView(
const ur_event_handle_t *phWaitEvents, uint32_t numWaitEvents) {

executing.erase(executing.begin(), executing.begin() + completed);
}
counter++;

if (force) {
lastEvent = nullptr;
if (counter % EVENT_POOL_CLEANUP_SIZE == 0) {
handler.signals.push_back(eventPool->allocate());
auto zeEvent = handler.executing.back()->getZeEvent();
ZE2UR_CALL_THROWS(zeCommandListAppendWaitOnEvents, (handler.commandList.get(), 1, &zeEvent));
ZE2UR_CALL_THROWS(zeCommandListAppendSignalEvent, (handler.commandList.get(), handler.signals.back()->getZeEvent()));
}
}

std::pair<ze_event_handle_t *, uint32_t>
ur_queue_immediate_in_order_t::getWaitListView(
const ur_event_handle_t *phWaitEvents, uint32_t numWaitEvents) {
if (handler.signals.size()) {
if (zeEventQueryStatus(handler.signals.front()->getZeEvent()) == ZE_RESULT_SUCCESS) {
if (handler.executing.size() < EVENT_POOL_CLEANUP_SIZE - 1)
throw std::runtime_error("Event pool cleanup failed");

for (int i = 0; i < EVENT_POOL_CLEANUP_SIZE - 1; i++) {
handler.executing[i]->setCompleted(true);
//handler.executing[i]->reset();
ur::level_zero::urEventRelease(handler.executing[i]);
}

handler.executing.erase(handler.executing.begin(), handler.executing.begin() + EVENT_POOL_CLEANUP_SIZE - 1);
handler.signals.pop_front();
}
}

// this will always be true for native events (i.e. we will always add native
// event to the wait list)
bool useLastEvent = handler.lastEvent;

handler.cleanupEvents();
bool useLastEvent = handler.executing.size() && !handler.executing.back()->isCompleted() && counter % EVENT_POOL_CLEANUP_SIZE != 0;

waitList.resize(numWaitEvents + uint32_t(useLastEvent));
for (uint32_t i = 0; i < numWaitEvents; i++) {
waitList[i] = phWaitEvents[i]->getZeEvent();
}

if (useLastEvent) {
waitList[numWaitEvents] = handler.lastEvent->getZeEvent();
waitList[numWaitEvents] = handler.executing.back()->getZeEvent();
}

return {waitList.data(), static_cast<uint32_t>(waitList.size())};
Expand Down Expand Up @@ -105,7 +104,7 @@ ur_command_list_handler_t::ur_command_list_handler_t(
const ur_queue_properties_t *pProps)
: commandList(hContext->commandListCache.getImmediateCommandList(
hDevice->ZeDevice, false, getZeOrdinal(hDevice),
true /* always enable copy offload */,
true,
ZE_COMMAND_QUEUE_MODE_ASYNCHRONOUS,
getZePriority(pProps ? pProps->flags : ur_queue_flags_t{}),
getZeIndex(pProps))) {}
Expand Down Expand Up @@ -148,13 +147,13 @@ ur_queue_immediate_in_order_t::getSignalEvent(ur_event_handle_t *hUserEvent,
ur_command_t commandType) {
if (hUserEvent) {
*hUserEvent = eventPool->allocate();
handler.lastEvent = *hUserEvent;
handler.executing.push_back(*hUserEvent);
} else {
handler.lastEvent = eventPool->allocate();
handler.executing.push_back(eventPool->allocate());
}

handler.lastEvent->resetQueueAndCommand(this, commandType);
return handler.lastEvent;
handler.executing.back()->resetQueueAndCommand(this, commandType);
return handler.executing.back();
}

ur_result_t
Expand Down Expand Up @@ -229,9 +228,20 @@ ur_result_t ur_queue_immediate_in_order_t::queueFinish() {
ZE2UR_CALL(zeCommandListHostSynchronize,
(handler.commandList.get(), UINT64_MAX));

// eventPool->forceCleanupExecuting();
for (auto event: handler.executing) {
event->setCompleted(true);
ur::level_zero::urEventRelease(event);
}

for (auto event: handler.signals) {
event->setCompleted(true);
ur::level_zero::urEventRelease(event);
}

counter = 0;

handler.cleanupEvents(true);
handler.executing.clear();
handler.signals.clear();

// Free deferred events
for (auto &hEvent : deferredEvents) {
Expand Down Expand Up @@ -301,7 +311,7 @@ ur_result_t ur_queue_immediate_in_order_t::enqueueKernelLaunch(
(handler.commandList.get(), hZeKernel, &zeThreadGroupDimensions,
zeSignalEvent, waitList.second, waitList.first));

handler.executing.push_back(signalEvent);

if (phEvent) signalEvent->retain();

return UR_RESULT_SUCCESS;
Expand Down Expand Up @@ -335,7 +345,7 @@ ur_result_t ur_queue_immediate_in_order_t::enqueueEventsWait(
(handler.commandList.get(), signalEvent->getZeEvent()));
}

handler.executing.push_back(signalEvent);

if (phEvent) signalEvent->retain();

return UR_RESULT_SUCCESS;
Expand Down Expand Up @@ -363,7 +373,7 @@ ur_result_t ur_queue_immediate_in_order_t::enqueueEventsWaitWithBarrier(
(handler.commandList.get(), signalEvent->getZeEvent(),
numWaitEvents, pWaitEvents));

handler.executing.push_back(signalEvent);

if (phEvent) signalEvent->retain();

return UR_RESULT_SUCCESS;
Expand Down Expand Up @@ -421,7 +431,7 @@ ur_result_t ur_queue_immediate_in_order_t::enqueueGenericCopyUnlocked(
ZE2UR_CALL(zeEventHostSynchronize, (zeSignalEvent, UINT64_MAX));
}

handler.executing.push_back(signalEvent);

if (phEvent) signalEvent->retain();

return UR_RESULT_SUCCESS;
Expand Down Expand Up @@ -513,7 +523,7 @@ ur_result_t ur_queue_immediate_in_order_t::enqueueRegionCopyUnlocked(
ZE2UR_CALL(zeEventHostSynchronize, (zeSignalEvent, UINT64_MAX));
}

handler.executing.push_back(signalEvent);

if (phEvent) signalEvent->retain();

return UR_RESULT_SUCCESS;
Expand Down Expand Up @@ -707,7 +717,7 @@ ur_result_t ur_queue_immediate_in_order_t::enqueueMemBufferMap(
(handler.commandList.get(), UINT64_MAX));
}

handler.executing.push_back(signalEvent);

if (phEvent) signalEvent->retain();

return UR_RESULT_SUCCESS;
Expand Down Expand Up @@ -743,7 +753,7 @@ ur_result_t ur_queue_immediate_in_order_t::enqueueMemUnmap(
(handler.commandList.get(), signalEvent->getZeEvent()));
}

handler.executing.push_back(signalEvent);

if (phEvent) signalEvent->retain();

return UR_RESULT_SUCCESS;
Expand Down Expand Up @@ -785,7 +795,7 @@ ur_result_t ur_queue_immediate_in_order_t::enqueueGenericFillUnlocked(
(handler.commandList.get(), pDst, pPattern, patternSize, size,
zeSignalEvent, waitList.second, waitList.first));

handler.executing.push_back(signalEvent);

if (phEvent) signalEvent->retain();

return UR_RESULT_SUCCESS;
Expand Down Expand Up @@ -828,7 +838,7 @@ ur_result_t ur_queue_immediate_in_order_t::enqueueUSMMemcpy(
ZE2UR_CALL(zeEventHostSynchronize, (zeSignalEvent, UINT64_MAX));
}

handler.executing.push_back(signalEvent);

if (phEvent) signalEvent->retain();

return UR_RESULT_SUCCESS;
Expand Down Expand Up @@ -862,7 +872,7 @@ ur_result_t ur_queue_immediate_in_order_t::enqueueUSMPrefetch(
(handler.commandList.get(), signalEvent->getZeEvent()));
}

handler.executing.push_back(signalEvent);

if (phEvent) signalEvent->retain();

return UR_RESULT_SUCCESS;
Expand Down Expand Up @@ -899,7 +909,7 @@ ur_queue_immediate_in_order_t::enqueueUSMAdvise(const void *pMem, size_t size,
(handler.commandList.get(), signalEvent->getZeEvent()));
}

handler.executing.push_back(signalEvent);

if (phEvent) signalEvent->retain();

return UR_RESULT_SUCCESS;
Expand Down Expand Up @@ -1130,7 +1140,7 @@ ur_result_t ur_queue_immediate_in_order_t::enqueueCooperativeKernelLaunchExp(
(handler.commandList.get(), hZeKernel, &zeThreadGroupDimensions,
zeSignalEvent, waitList.second, waitList.first));

handler.executing.push_back(signalEvent);

if (phEvent) signalEvent->retain();

return UR_RESULT_SUCCESS;
Expand Down Expand Up @@ -1167,7 +1177,7 @@ ur_result_t ur_queue_immediate_in_order_t::enqueueTimestampRecordingExp(
ZE2UR_CALL(zeEventHostSynchronize, (zeSignalEvent, UINT64_MAX));
}

handler.executing.push_back(signalEvent);

if (phEvent) signalEvent->retain();

return UR_RESULT_SUCCESS;
Expand Down
7 changes: 3 additions & 4 deletions source/adapters/level_zero/v2/queue_immediate_in_order.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,8 @@ struct ur_command_list_handler_t {
ur_command_list_handler_t(ze_command_list_handle_t hZeCommandList,
bool ownZeHandle);

void cleanupEvents(bool force = false);

raii::command_list_unique_handle commandList;
ur_event_handle_t lastEvent = nullptr;

std::deque<ur_event_handle_t> signals;
std::deque<ur_event_handle_t> executing;
};

Expand All @@ -45,6 +42,8 @@ struct ur_queue_immediate_in_order_t : _ur_object, public ur_queue_handle_t_ {
ur_device_handle_t hDevice;
ur_queue_flags_t flags;

size_t counter = 0;

raii::cache_borrowed_event_pool eventPool;

ur_command_list_handler_t handler;
Expand Down

0 comments on commit 7713188

Please sign in to comment.