Divide Framework 0.1
A free and open-source 3D Framework under heavy development
Loading...
Searching...
No Matches
TaskPool.cpp
Go to the documentation of this file.
1
2
4#include "Headers/TaskPool.h"
7
8#include <iostream>
9
10namespace Divide
11{
12 namespace
13 {
14 std::atomic_uint g_taskIDCounter = 0u;
15
17 thread_local U64 g_allocatedTasks = 0u;
18 }
19
20 TaskPool::TaskPool( const std::string_view workerName )
21 : _threadNamePrefix( workerName )
22 {
23 }
24
26 {
27 DIVIDE_ASSERT( _activeThreads.load() == 0u, "Task pool is still active! Threads should be joined before destroying the pool. Call TaskPool::shutdown() first");
28 }
29
30 bool TaskPool::init( const size_t threadCount, const DELEGATE<void, const std::thread::id&>& onThreadCreateCbk)
31 {
32 shutdown();
33 if (threadCount == 0u)
34 {
35 return false;
36 }
37
38 _isRunning = true;
39 _threadCreateCbk = onThreadCreateCbk;
40 _threads.reserve( threadCount );
41 _activeThreads.store(threadCount);
42
43 for ( U32 idx = 0u; idx < threadCount; ++idx )
44 {
45 _threads.emplace_back(
46 [&, idx]
47 {
48 const auto threadName = Util::StringFormat( "{}_{}", _threadNamePrefix, idx );
49
50 Profiler::OnThreadStart( threadName );
51
52 SetThreadName( threadName );
53
54 if ( _threadCreateCbk )
55 {
56 _threadCreateCbk( std::this_thread::get_id() );
57 }
58
59 while ( _isRunning )
60 {
61 executeOneTask( false );
62 }
63
65
66 _activeThreads.fetch_sub( 1u );
67 } );
68 }
69
70 return true;
71 }
72
74 {
75 wait();
76 join();
77 waitForAllTasks( true );
78 efficient_clear( _threads );
79 _taskCallbacks.resize(0);
81 }
82
83 bool TaskPool::enqueue( Task& task, const TaskPriority priority, const DELEGATE<void>& onCompletionFunction )
84 {
86 if (priority == TaskPriority::REALTIME)
87 {
88 return runRealTime( task, onCompletionFunction);
89 }
90
91 bool hasOnCompletionFunction = false;
92 if ( onCompletionFunction )
93 {
94 hasOnCompletionFunction = true;
96 bool found = false;
98 {
99 if ( entry._taskID == U32_MAX)
100 {
101 entry._taskID = task._id;
102 entry._cbk = onCompletionFunction;
103 found = true;
104 break;
105 }
106 }
107
108 if ( !found )
109 {
110 _taskCallbacks.emplace_back( onCompletionFunction, task._id );
111 }
112 }
113
114 //Returning false from a PoolTask lambda will just reschedule it for later execution again.
115 //This may leave the task in an infinite loop, always re-queuing!
116 const auto poolTask = [this, &task, hasOnCompletionFunction]( const bool isIdleCall )
117 {
118 while ( task._unfinishedJobs.load() > 1u )
119 {
120 if ( isIdleCall )
121 {
122 // Can't be run at this time. It will be executed again later!
123 return false;
124 }
125
126 // Else, we wait until our child tasks finish running. We also try and do some other work while waiting
128 }
129
130 // Can't run this task at the current moment. We're in an idle loop and the task needs express execution (e.g. render pass task)
131 if ( !task._runWhileIdle && isIdleCall )
132 {
133 return false;
134 }
135
136 taskStarted( task );
137
138 if ( task._callback ) [[likely]]
139 {
140 task._callback( task );
141 }
142 if ( hasOnCompletionFunction )
143 {
144 _threadedCallbackBuffer.enqueue( task._id );
145 }
146
147 taskCompleted( task );
148
149 return true;
150 };
151
152 return addTask( MOV( poolTask ) );
153 }
154
155 bool TaskPool::runRealTime( Task& task, const DELEGATE<void>& onCompletionFunction )
156 {
158
159 while ( task._unfinishedJobs.load() > 1u )
160 {
161 if ( flushCallbackQueue() == 0u)
162 {
164 }
165 }
166
167 taskStarted( task );
168
169 if ( task._callback ) [[likely]]
170 {
171 task._callback( task );
172 }
173
174 taskCompleted( task );
175
176 if ( onCompletionFunction )
177 {
178 onCompletionFunction();
179 }
180
181 return true;
182 }
183
184 void TaskPool::waitForTask( const Task& task )
185 {
187
188 using namespace std::chrono_literals;
189 while ( !Finished( task ) )
190 {
192
194 _taskFinishedCV.wait_for( lock, 2ms, [&task]() noexcept
195 {
196 return Finished( task );
197 } );
198 }
199 }
200
202 {
204
206
207 constexpr I32 maxDequeueItems = 1 << 3;
208 U32 completedTaskIndices[maxDequeueItems];
209
210 size_t ret = 0u;
211 while ( true )
212 {
213 const size_t count = _threadedCallbackBuffer.try_dequeue_bulk( completedTaskIndices, maxDequeueItems );
214 if ( count == 0u )
215 {
216 break;
217 }
218
220 const size_t callbackCount = _taskCallbacks.size();
221 DIVIDE_ASSERT( callbackCount > 0u );
222
223 for ( size_t i = 0u; i < count; ++i )
224 {
225 const U32 idx = completedTaskIndices[i];
226 for ( size_t j = 0u; j < callbackCount; ++j )
227 {
229 if ( entry._taskID == idx)
230 {
231 if ( entry._cbk )
232 {
233 entry._cbk();
234 }
235 entry = {};
236 break;
237 }
238 }
239 }
240 ret += count;
241 }
242
243 return ret;
244 }
245
246 void TaskPool::waitForAllTasks( const bool flushCallbacks )
247 {
250
251 if ( _activeThreads.load() > 0u )
252 {
254 _taskFinishedCV.wait( lock, [this]() noexcept
255 {
256 return _runningTaskCount.load() == 0u;
257 } );
258 }
259
260 if ( flushCallbacks )
261 {
263 }
264 }
265
267 {
268 _runningTaskCount.fetch_add( 1 );
269 }
270
272 {
274
275 task._callback = {};
276
277 if ( task._parent != nullptr )
278 {
279 task._parent->_unfinishedJobs.fetch_sub(1);
280 }
281
282 task._unfinishedJobs.fetch_sub(1);
283
284 _runningTaskCount.fetch_sub(1);
285
287 _taskFinishedCV.notify_one();
288 }
289
290 Task* TaskPool::AllocateTask( Task* parentTask, DELEGATE<void, Task&>&& func, const bool allowedInIdle ) noexcept
291 {
293
294 if ( parentTask != nullptr )
295 {
296 parentTask->_unfinishedJobs.fetch_add( 1u );
297 }
298
299 constexpr U8 s_maxTaskRetry = 10u;
300
301 Task* task = nullptr;
302 U8 retryCount = 0u;
303 do
304 {
305 U16 expected = 0u;
306 if constexpr ( false )
307 {
308 const auto idx = g_allocatedTasks++ & Config::MAX_POOLED_TASKS - 1u;
309 Task& crtTask = g_taskAllocator[idx];
310 if ( idx == 0u && ++retryCount > s_maxTaskRetry )
311 {
313 }
314 if ( crtTask._unfinishedJobs.compare_exchange_strong( expected, 1u ) )
315 {
316 task = &crtTask;
317 }
318 }
319 else
320 {
321 Task& crtTask = g_taskAllocator[g_allocatedTasks++ & Config::MAX_POOLED_TASKS - 1u];
322 if ( crtTask._unfinishedJobs.compare_exchange_strong( expected, 1u ) )
323 {
324 task = &crtTask;
325 }
326 }
327 }
328 while ( task == nullptr );
329
330 if ( task->_id == 0u )
331 {
332 task->_id = g_taskIDCounter.fetch_add( 1u );
333 }
334 task->_parent = parentTask;
335 task->_runWhileIdle = allowedInIdle;
336 task->_callback = MOV( func );
337
338 return task;
339 }
340
342 {
344 executeOneTask( true );
345 }
346
348 {
349 _isRunning = false;
350
351 for ( std::thread& thread : _threads )
352 {
353 if (!thread.joinable())
354 {
355 continue;
356 }
357
358 thread.join();
359 }
360
361 WAIT_FOR_CONDITION( _activeThreads.load() == 0u );
362 }
363
364 void TaskPool::wait() const noexcept
365 {
366 if ( !_isRunning )
367 {
368 return;
369 }
370
371 while ( _runningTaskCount.load() > 0u )
372 {
373 // Busy wait
374 std::this_thread::yield();
375 }
376 }
377
379 {
380 return _queue.enqueue( MOV( job ) );
381 }
382
383 void TaskPool::executeOneTask( const bool isIdleCall )
384 {
385 PoolTask task = {};
386 if ( deque( isIdleCall, task ) &&
387 !task( isIdleCall ) )
388 {
389 addTask( MOV( task ) );
390 }
391 }
392
393 bool TaskPool::deque( const bool isIdleCall, PoolTask& taskOut )
394 {
396
397 if ( isIdleCall )
398 {
399 return _queue.try_dequeue( taskOut );
400 }
401
402 if constexpr ( IsBlocking )
403 {
404 while( !_queue.wait_dequeue_timed( taskOut, Time::MillisecondsToMicroseconds( 2 ) ))
405 {
406 if (!_isRunning) [[unlikely]]
407 {
408 return false;
409 }
410 std::this_thread::yield();
411 }
412 }
413 else
414 {
415 while ( !_queue.try_dequeue( taskOut ) )
416 {
417 if ( !_isRunning ) [[unlikely]]
418 {
419 return false;
420 }
421 std::this_thread::yield();
422 }
423 }
424
425 return true;
426 }
427
429 {
431
432 if ( descriptor._iterCount == 0u ) [[unlikely]]
433 {
434 return;
435 }
436
437 const U32 crtPartitionSize = std::min( descriptor._partitionSize, descriptor._iterCount );
438 const U32 partitionCount = descriptor._iterCount / crtPartitionSize;
439 const U32 remainder = descriptor._iterCount % crtPartitionSize;
440 const U32 adjustedCount = descriptor._useCurrentThread ? partitionCount - 1u : partitionCount;
441
442 std::atomic_uint jobCount = adjustedCount + (remainder > 0u ? 1u : 0u);
443 for ( U32 i = 0u; i < adjustedCount; ++i )
444 {
445 const U32 start = i * crtPartitionSize;
446 const U32 end = start + crtPartitionSize;
447 Task* parallelJob = TaskPool::AllocateTask
448 (
449 nullptr,
450 [&cbk, &jobCount, start, end]( Task& parentTask )
451 {
452 cbk( &parentTask, start, end );
453 jobCount.fetch_sub( 1 );
454 },
455 descriptor._allowRunInIdle
456 );
457
458 Start( *parallelJob, pool, descriptor._priority );
459 }
460 if ( remainder > 0u )
461 {
462 const U32 count = descriptor._iterCount;
463 Task* parallelJob = TaskPool::AllocateTask
464 (
465 nullptr,
466 [&cbk, &jobCount, count, remainder]( Task& parentTask )
467 {
468 cbk( &parentTask, count - remainder, count );
469 jobCount.fetch_sub( 1 );
470 },
471 descriptor._allowRunInIdle
472 );
473
474 Start( *parallelJob, pool, descriptor._priority );
475 }
476
477 if ( descriptor._useCurrentThread )
478 {
479 const U32 start = adjustedCount * crtPartitionSize;
480 cbk( nullptr, start, start + crtPartitionSize );
481 }
482
483 if ( descriptor._waitForFinish )
484 {
485 if ( descriptor._allowPoolIdle )
486 {
487 while ( jobCount.load() > 0 )
488 {
489 pool.threadWaiting();
490 }
491 }
492 else
493 {
494 WAIT_FOR_CONDITION( jobCount.load() == 0u );
495 }
496 }
497 }
498} //namespace Divide
#define WAIT_FOR_CONDITION(...)
#define MOV(...)
#define DIVIDE_ASSERT(...)
#define NO_DESTROY
#define DIVIDE_UNEXPECTED_CALL()
#define PROFILE_SCOPE_AUTO(CATEGORY)
Definition: Profiler.h:87
static Task * AllocateTask(Task *parentTask, DELEGATE< void, Task & > &&func, bool allowedInIdle) noexcept
Definition: TaskPool.cpp:290
eastl::fixed_vector< CallbackEntry, 1<< 9, true > _taskCallbacks
Definition: TaskPool.h:125
bool enqueue(Task &task, TaskPriority priority, const DELEGATE< void > &onCompletionFunction)
Definition: TaskPool.cpp:83
void threadWaiting()
Definition: TaskPool.cpp:341
void waitForTask(const Task &task)
Definition: TaskPool.cpp:184
std::atomic_size_t _activeThreads
Definition: TaskPool.h:137
const string _threadNamePrefix
Definition: TaskPool.h:116
Mutex _taskFinishedMutex
Definition: TaskPool.h:132
std::condition_variable _taskFinishedCV
Definition: TaskPool.h:133
moodycamel::ConcurrentQueue< U32 > _threadedCallbackBuffer
Definition: TaskPool.h:127
DELEGATE< void, const std::thread::id & > _threadCreateCbk
Definition: TaskPool.h:134
void executeOneTask(bool isIdleCall)
Definition: TaskPool.cpp:383
size_t flushCallbackQueue()
Returns the number of callbacks processed.
Definition: TaskPool.cpp:201
static constexpr bool IsBlocking
Definition: TaskPool.h:64
void wait() const noexcept
Definition: TaskPool.cpp:364
QueueType _queue
Definition: TaskPool.h:130
void waitForAllTasks(bool flushCallbacks)
Definition: TaskPool.cpp:246
bool deque(bool isIdleCall, PoolTask &taskOut)
Definition: TaskPool.cpp:393
std::atomic_uint _runningTaskCount
Definition: TaskPool.h:136
void taskCompleted(Task &task)
Definition: TaskPool.cpp:271
TaskPool(std::string_view workerName)
Definition: TaskPool.cpp:20
SharedMutex _taskCallbacksLock
Definition: TaskPool.h:124
bool addTask(PoolTask &&job)
Definition: TaskPool.cpp:378
bool runRealTime(Task &task, const DELEGATE< void > &onCompletionFunction)
Definition: TaskPool.cpp:155
void taskStarted(Task &task)
Definition: TaskPool.cpp:266
constexpr U32 MAX_POOLED_TASKS
How many tasks should we keep in a per-thread pool to avoid using new/delete (must be power of two)
Definition: config.h:114
constexpr Optick::Category::Type Threading
Definition: Profiler.h:67
void OnThreadStart(std::string_view threadName)
Definition: Profiler.cpp:76
void OnThreadStop()
Definition: Profiler.cpp:83
bool isMainThread() noexcept
constexpr T MillisecondsToMicroseconds(U a) noexcept
Definition: MathHelper.inl:749
Str StringFormat(const char *fmt, Args &&...args)
NO_DESTROY thread_local Task g_taskAllocator[Config::MAX_POOLED_TASKS]
Definition: TaskPool.cpp:16
Handle console commands that start with a forward slash.
Definition: AIProcessor.cpp:7
DELEGATE_STD< Ret, Args... > DELEGATE
std::lock_guard< mutex > LockGuard
Definition: SharedMutex.h:55
DELEGATE_STD< bool, bool > PoolTask
Definition: TaskPool.h:58
void SetThreadName(std::string_view threadName) noexcept
int32_t I32
uint8_t U8
bool Finished(const Task &task) noexcept
Definition: Task.inl:38
std::unique_lock< mutex > UniqueLock
Definition: SharedMutex.h:52
TaskPriority
Definition: Task.h:41
@ REALTIME
not threaded
uint16_t U16
void efficient_clear(eastl::fixed_vector< T, nodeCount, bEnableOverflow, OverflowAllocator > &fixed_vector)
Definition: Vector.h:52
constexpr U32 U32_MAX
void Start(Task &task, TaskPool &pool, TaskPriority priority=TaskPriority::DONT_CARE, const DELEGATE< void > &onCompletionFunction={})
Definition: Task.cpp:9
void Parallel_For(TaskPool &pool, const ParallelForDescriptor &descriptor, const DELEGATE< void, const Task *, U32, U32 > &cbk)
Definition: TaskPool.cpp:428
uint32_t U32
Project const SceneEntry & entry
Definition: DefaultScene.h:41
uint64_t U64
bool _allowPoolIdle
If true, we'll inform the thread pool to execute other tasks while waiting for the all async tasks to...
Definition: TaskPool.h:53
U32 _partitionSize
How many elements should we process per async task.
Definition: TaskPool.h:45
bool _useCurrentThread
If true, we'll process a for partition on the calling thread.
Definition: TaskPool.h:51
bool _waitForFinish
If this is false, the Parallel_For call won't block the current thread.
Definition: TaskPool.h:49
TaskPriority _priority
Each async task will start with the same priority specified here.
Definition: TaskPool.h:47
U32 _iterCount
For loop iteration count.
Definition: TaskPool.h:43
bool _allowRunInIdle
If true, async tasks can be invoked from other task's idle callbacks.
Definition: TaskPool.h:55
DELEGATE< void, Task & > _callback
Definition: Task.h:49
Task * _parent
Definition: Task.h:50
U32 _id
Definition: Task.h:51
std::atomic_ushort _unfinishedJobs
Definition: Task.h:52
bool _runWhileIdle
Definition: Task.h:53
Definition: TaskPool.h:119