// Class structure and main members classWorkerThreadPool : public Object { GDCLASS(WorkerThreadPool, Object) // Use TaskID and GroupID as the task identifiers typedefint64_t TaskID; typedefint64_t GroupID; private: structTask; // Task Structure structGroup; // Task Group Structure structThreadData; // Thread Data // Core Data Stucture PagedAllocator<Task, false, TASKS_PAGE_SIZE> task_allocator; PagedAllocator<Group, false, GROUPS_PAGE_SIZE> group_allocator; SelfList<Task>::List low_priority_task_queue; // Low-priority queue SelfList<Task>::List task_queue; // High-priority queue BinaryMutex task_mutex; // Mutual exclusion lock to protect task queue TightLocalVector<ThreadData> threads; // Thread data array HashMap<Thread::ID, int> thread_ids; // Thread IDs to indices Map HashMap<TaskID, Task *> tasks; // All tasks map HashMap<GroupID, Group *> groups; // All task groups map };
// Memory Management for Tasks and Groups // Use the paging allocator to reduce memory fragmentation. staticconstuint32_t TASKS_PAGE_SIZE = 1024; staticconstuint32_t GROUPS_PAGE_SIZE = 256;
// Obtain the Task from the distributor instead of creating one. Task *task = task_allocator.alloc(); // Return after use task_allocator.free(task);
void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, Task *p_task) { while (true) { MutexLock lock(task_mutex); // Check if the waiting period has ended if (p_task->completed) break; // Try to do other tasks if (task_queue.first()) { Task *next_task = task_queue.first()->self(); task_queue.remove(task_queue.first()); lock.unlock(); _process_task(next_task); } else { // Waiting for a condition variable p_caller_pool_thread->awaited_task = p_task; p_caller_pool_thread->cond_var.wait(lock); p_caller_pool_thread->awaited_task = nullptr; } } }
classJobSystemThreadPoolfinal : public JobSystemWithBarrier { private: // Task memory pool (fixed size) FixedSizeFreeList<Job> mJobs; // Array of worker threads Array<thread> mThreads; // Circular Queue (Lock-Free Design) staticconstexpr uint32 cQueueLength = 1024; atomic<Job *> mQueue[cQueueLength]; // The local head pointer of each thread + the global tail pointer atomic<uint> *mHeads = nullptr; // One head per thread atomic<uint> mTail = 0; // Global tail // Semaphores are used for thread wake-up. Semaphore mSemaphore; // Exit indicator atomic<bool> mQuit = false; };
屏障同步
Jolt的屏障在个人看来是Godot协作等待的物理特化版,适于物理阶段同步
屏障的结构如下,读写指针分别对齐到不同缓存行,避免多核间的缓存失效
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
classBarrierImpl : public Barrier { private: staticconstexpr uint cMaxJobs = 2048; // Maximum number of tasks (a power of 2) atomic<Job *> mJobs[cMaxJobs]; // Use circular buffer stores tasks // Read-write pointer (cache alignment in line to avoid false sharing) alignas(JPH_CACHE_LINE_SIZE) atomic<uint> mJobReadIndex { 0 }; alignas(JPH_CACHE_LINE_SIZE) atomic<uint> mJobWriteIndex { 0 }; // Synchronous Control atomic<int> mNumToAcquire { 0 }; // The required count of semaphores Semaphore mSemaphore; // Task Completion Semaphore atomic<bool> mInUse { false }; };