21 : _threadNamePrefix( workerName )
27 DIVIDE_ASSERT(
_activeThreads.load() == 0u,
"Task pool is still active! Threads should be joined before destroying the pool. Call TaskPool::shutdown() first");
33 if (threadCount == 0u)
40 _threads.reserve( threadCount );
43 for (
U32 idx = 0u; idx < threadCount; ++idx )
45 _threads.emplace_back(
91 bool hasOnCompletionFunction =
false;
92 if ( onCompletionFunction )
94 hasOnCompletionFunction =
true;
102 entry._cbk = onCompletionFunction;
116 const auto poolTask = [
this, &task, hasOnCompletionFunction](
const bool isIdleCall )
142 if ( hasOnCompletionFunction )
176 if ( onCompletionFunction )
178 onCompletionFunction();
188 using namespace std::chrono_literals;
207 constexpr I32 maxDequeueItems = 1 << 3;
208 U32 completedTaskIndices[maxDequeueItems];
223 for (
size_t i = 0u; i < count; ++i )
225 const U32 idx = completedTaskIndices[i];
226 for (
size_t j = 0u; j < callbackCount; ++j )
229 if (
entry._taskID == idx)
260 if ( flushCallbacks )
294 if ( parentTask !=
nullptr )
296 parentTask->_unfinishedJobs.fetch_add( 1u );
299 constexpr U8 s_maxTaskRetry = 10u;
301 Task* task =
nullptr;
306 if constexpr ( false )
309 Task& crtTask = g_taskAllocator[idx];
310 if ( idx == 0u && ++retryCount > s_maxTaskRetry )
328 while ( task ==
nullptr );
330 if ( task->
_id == 0u )
332 task->
_id = g_taskIDCounter.fetch_add( 1u );
351 for ( std::thread& thread : _threads )
353 if (!thread.joinable())
374 std::this_thread::yield();
386 if (
deque( isIdleCall, task ) &&
387 !task( isIdleCall ) )
399 return _queue.try_dequeue( taskOut );
410 std::this_thread::yield();
415 while ( !
_queue.try_dequeue( taskOut ) )
421 std::this_thread::yield();
432 if ( descriptor.
_iterCount == 0u ) [[unlikely]]
438 const U32 partitionCount = descriptor.
_iterCount / crtPartitionSize;
439 const U32 remainder = descriptor.
_iterCount % crtPartitionSize;
442 std::atomic_uint jobCount = adjustedCount + (remainder > 0u ? 1u : 0u);
443 for (
U32 i = 0u; i < adjustedCount; ++i )
445 const U32 start = i * crtPartitionSize;
446 const U32 end = start + crtPartitionSize;
450 [&cbk, &jobCount, start, end](
Task& parentTask )
452 cbk( &parentTask, start, end );
453 jobCount.fetch_sub( 1 );
460 if ( remainder > 0u )
466 [&cbk, &jobCount, count, remainder](
Task& parentTask )
468 cbk( &parentTask, count - remainder, count );
469 jobCount.fetch_sub( 1 );
479 const U32 start = adjustedCount * crtPartitionSize;
480 cbk(
nullptr, start, start + crtPartitionSize );
487 while ( jobCount.load() > 0 )
#define WAIT_FOR_CONDITION(...)
#define PROFILE_SCOPE_AUTO(CATEGORY)
static Task * AllocateTask(Task *parentTask, DELEGATE< void, Task & > &&func, bool allowedInIdle) noexcept
eastl::fixed_vector< CallbackEntry, 1<< 9, true > _taskCallbacks
bool enqueue(Task &task, TaskPriority priority, const DELEGATE< void > &onCompletionFunction)
void waitForTask(const Task &task)
std::atomic_size_t _activeThreads
const string _threadNamePrefix
std::condition_variable _taskFinishedCV
moodycamel::ConcurrentQueue< U32 > _threadedCallbackBuffer
DELEGATE< void, const std::thread::id & > _threadCreateCbk
void executeOneTask(bool isIdleCall)
size_t flushCallbackQueue()
Returns the number of callbacks processed.
static constexpr bool IsBlocking
void wait() const noexcept
void waitForAllTasks(bool flushCallbacks)
bool deque(bool isIdleCall, PoolTask &taskOut)
std::atomic_uint _runningTaskCount
void taskCompleted(Task &task)
TaskPool(std::string_view workerName)
SharedMutex _taskCallbacksLock
bool addTask(PoolTask &&job)
bool runRealTime(Task &task, const DELEGATE< void > &onCompletionFunction)
void taskStarted(Task &task)
constexpr U32 MAX_POOLED_TASKS
How many tasks should we keep in a per-thread pool to avoid using new/delete (must be power of two)
constexpr Optick::Category::Type Threading
void OnThreadStart(std::string_view threadName)
bool isMainThread() noexcept
constexpr T MillisecondsToMicroseconds(U a) noexcept
Str StringFormat(const char *fmt, Args &&...args)
NO_DESTROY thread_local Task g_taskAllocator[Config::MAX_POOLED_TASKS]
std::atomic_uint g_taskIDCounter
thread_local U64 g_allocatedTasks
Handle console commands that start with a forward slash.
DELEGATE_STD< Ret, Args... > DELEGATE
std::lock_guard< mutex > LockGuard
DELEGATE_STD< bool, bool > PoolTask
void SetThreadName(std::string_view threadName) noexcept
bool Finished(const Task &task) noexcept
std::unique_lock< mutex > UniqueLock
void efficient_clear(eastl::fixed_vector< T, nodeCount, bEnableOverflow, OverflowAllocator > &fixed_vector)
void Start(Task &task, TaskPool &pool, TaskPriority priority=TaskPriority::DONT_CARE, const DELEGATE< void > &onCompletionFunction={})
void Parallel_For(TaskPool &pool, const ParallelForDescriptor &descriptor, const DELEGATE< void, const Task *, U32, U32 > &cbk)
Project const SceneEntry & entry
bool _allowPoolIdle
If true, we'll inform the thread pool to execute other tasks while waiting for the all async tasks to...
U32 _partitionSize
How many elements should we process per async task.
bool _useCurrentThread
If true, we'll process a for partition on the calling thread.
bool _waitForFinish
If this is false, the Parallel_For call won't block the current thread.
TaskPriority _priority
Each async task will start with the same priority specified here.
U32 _iterCount
For loop iteration count.
bool _allowRunInIdle
If true, async tasks can be invoked from other task's idle callbacks.
DELEGATE< void, Task & > _callback
std::atomic_ushort _unfinishedJobs