以下为个人学习笔记整理。参考 PhysX SDK 3.4.0 文档,部分代码可能来源于更高版本。
# PhysX——Task Management 篇
任务管理器是 PhysX 管理 CPU 和 GPU 计算资源的调度管理器。在保证每个 Task 按部就班执行的情况下尽可能的提高运行效率,在 Cloth 和 Simulate 等场景有广泛的应用。因此本篇也作为 Simulate 的前篇来着重介绍一下「任务管理器」的实现逻辑。由于调度逻辑被拆分成了 GPU 和 CPU 两部分,本篇主要还是介绍 CPU 的调度规则。
为了能够更好理解,文章内会使用部分 Simulate 代码作为示例,来介绍 Task Management 如何运转,一些变量和类无需太过在意,先来看看 PxTaskMgr 定义:
任务管理的基本单位就是 Task,任务管理的本质就是如何指定不同的 CPU / GPU 去执行不同 Task,根据各 Task 的依赖关系来控制执行顺序,PxTaskMgr 由于历史原因,保留了两种 Task 的实现机制:PxTask(绿色)、PxLightCpuTask(蓝色)。
# Task
PxBaseTask 是所有 Task 的基类,在 PhysX 3.0 之前 APEX 基本上通过 PxTask 的派生类作为任务单元定制化逻辑,在 PhysX 3.0 以后,基本上所有 Task 都用 PxLightCpuTask 实现。两者都是通过「引用计数」来控制调用时机,当引用计数为 0 的情况下,Task 就会被 submit 给 CPU / GPU 执行。
# PxTask
PxTask 是早期的实现,所有 PxTask 会存储在 shdfnd::Array<PxTaskTableRow>
(PxTaskMgr::mTaskTable) 中,PxTaskTableRow 会记录该 PxTask 的前置 / 后置依赖链表的首地址,以及该任务当前的引用计数和执行状态,而整个依赖关系会被存储在 shdfnd::Array<PxTaskDepTableRow>
(PxTaskMgr::mDepTable) 中。
- PxTask 支持两种创建方式:匿名 / 命名,两者本质区在于命名任务可以通过名字进行查找。
- 当 PxTask 被创建时,会获得 1 个引用计数,通过调用
finishBefore
和startAfter
添加前置 / 后置依赖的情况下,也可以增加依赖方的引用计数。 - 在调用
startSimulation
会对所有 PxTask 的引用计数减 1,如果引用计数为 0 且满足依赖的情况下 PxTask 将会被dispatchTask
。 - 当任务执行完毕以后 (
taskCompleted
),会通过resolveRow
清理 PxTaskTableRow 中的记录,并更新其他任务依赖信息,dispatchTask
满足条件的任务。
由于是多线程,每个 PxTask 的读写操作都需要加锁,这个锁是 PxTaskMgr 级别的,因此所有访问 PxTask 的操作都将产生竞争,并依赖 PxTaskMgr 提供的锁进行各种原子操作,对于多线程同时操作 PxTask 的来说,这点非常有必要,但会带来更多的性能开销。
# PxLightCpuTask
PxLightCpuTask 是个更轻量级的任务,整体实现也相对比较简单。舍弃了匿名 / 命名规则,数据也不集中存储在 PxTaskMgr,而是在 PxLightCpuTask 中。
PxLightCpuTask 没有提供多依赖的特性。换句话说,每个 PxLightCpuTask 只能依赖 / 被依赖至多一个任务 (多分支变为单分支),这个特性使得 PxLightCpuTask 所有读写操作不需要加锁 (只可能被主线程和执行线程操作,且两者是互斥的),性能 UpMax。为此,还必须约束 PxLightCpuTask 运行过程中不能操作其他的 PxLightCpuTask。
PxLightCpuTask 由于执行不依赖 PxTaskMgr,因此调用会比较简单:
- 通过
setContinuation
设置执行的前置依赖和 TaskMgr,当然也可以不设置。 - 通过
removeReference
主动减少引用计数触发任务的 submit。
题外话:为了支持 PxTask 的多依赖等特性还额外定义了 FanoutTask,并且调整锁的级别从 PxTaskMgr 变为了 FanoutTask,这里就不再展开。
# CpuDispatcher
CpuDispatcher 负责 submit 后最终的派发工作,如果在没有设置多线程情况下,CpuDispatcher 的派发的 Task 将在主线程执行。
下面来聊聊在多线程情况下的执行流程,先看看 CpuDispatcher 的初始化:
DefaultCpuDispatcher::DefaultCpuDispatcher(uint32_t numThreads, uint32_t* affinityMasks) | |
: mQueueEntryPool(TASK_QUEUE_ENTRY_POOL_SIZE), mNumThreads(numThreads), mShuttingDown(false) | |
{ | |
uint32_t defaultAffinityMask = 0; | |
// 创建 work 线程 | |
mWorkerThreads = reinterpret_cast<CpuWorkerThread*>(PX_ALLOC(numThreads * sizeof(CpuWorkerThread), PX_DEBUG_EXP("CpuWorkerThread"))); | |
if (mWorkerThreads) | |
{ | |
for (uint32_t i = 0; i < numThreads; ++i) | |
{ | |
PX_PLACEMENT_NEW(mWorkerThreads + i, CpuWorkerThread)(); // 创建 thread 对象 | |
mWorkerThreads[i].initialize(this); // 绑定 thread 和 dispacther | |
} | |
for (uint32_t i = 0; i < numThreads; ++i) | |
{ | |
mWorkerThreads[i].start(shdfnd::Thread::getDefaultStackSize()); // 启动线程 | |
// 设置线程掩码,对于掩码的介绍: | |
// On Windows, Linux, PS4, XboxOne and Switch platforms, each set mask bit represents | |
// the index of a logical processor that the OS may schedule thread execution on | |
if (affinityMasks) | |
{ | |
mWorkerThreads[i].setAffinityMask(affinityMasks[i]); | |
} | |
else | |
{ | |
mWorkerThreads[i].setAffinityMask(defaultAffinityMask); | |
} | |
char threadName[32]; | |
shdfnd::snprintf(threadName, 32, "PxWorker%02d", i); | |
mWorkerThreads[i].setName(threadName); | |
} | |
} | |
else | |
{ | |
mNumThreads = 0; | |
} | |
} |
# CpuWorkerThread
CpuWorkerThread 继承自 Thread,但 Thread 源码看不到,官方提供了接口说明 ——ThreadImpl,感兴趣的可以了解一下。
CpuWorkerThread 有两个关键接口:tryAcceptJobToLocalQueue、execute
# tryAcceptJobToLocalQueue
该接口在 Dispatcher 的 submitTask 中被调用。会把已经注册到该线程的 Task 提交到本地的工作队列 (LocalJobList)
void DefaultCpuDispatcher::submitTask(PxBaseTask& task) | |
{ | |
shdfnd::Thread::Id currentThread = shdfnd::Thread::getId(); | |
// TODO: Could use TLS to make this more efficient | |
for (uint32_t i = 0; i < mNumThreads; ++i) | |
if (mWorkerThreads[i].tryAcceptJobToLocalQueue(task, currentThread)) | |
{ | |
return mWorkReady.set(); | |
} | |
// 主线程中 | |
SharedQueueEntry* entry = mQueueEntryPool.getEntry(&task); | |
if (entry) | |
{ | |
mJobList.push(*entry); | |
mWorkReady.set(); | |
} | |
} | |
// CpuWorkerThread 中 | |
bool CpuWorkerThread::tryAcceptJobToLocalQueue(PxBaseTask& task, shdfnd::Thread::Id taskSubmitionThread) | |
{ | |
//submit task 必须在当前线程进行操作 | |
if (taskSubmitionThread == mThreadId) | |
{ | |
SharedQueueEntry* entry = mQueueEntryPool.getEntry(&task); | |
if (entry) | |
{ | |
mLocalJobList.push(*entry); | |
return true; | |
} | |
else | |
{ | |
return false; | |
} | |
} | |
return false; | |
} |
这里会有两种情况:
- 当前线程是主线程情况下,没有匹配的 CpuWorkerThread,因此会直接插入到 DefaultCpuDispatcher,因为 DefaultCpuDispatcher 只会在主线程创建。
- 当前线程是工作线程情况下:找到对应的工作线程 ID,并添加到工作线程的本地 JobList。
# execute
执行操作分为以下几个步骤:
- 【step.1】当前工作线程如果收到退出信号则直接退出。
- 【step.2】线程唤醒时,先重置唤醒标记。(避免中途异常退出导致该线程一直处于唤醒
- 【step.3】从本地工作队列获取一个 Task,如果为空继续获取,知道工作队列为空或者取到一个可执行 Task。
- 【step.4】执行 Task,并调用 release,触发后续任务的 submit,可能在当前工作线程继续执行或把操作权限移交主线程。
- 【step.5】如果没有可执行的 Task,将挂起当前工作线程。
相关线程消息及接口说明见 SyncImpl。
pthread_create(&getThread(this)->thread, &attr, PxThreadStart, this); | |
// 线程唤醒后执行 execute | |
void* PxThreadStart(void* arg) | |
{ | |
_ThreadImpl* impl = getThread(reinterpret_cast<ThreadImpl*>(arg)); | |
impl->state = _PxThreadStarted; | |
// run setTid in thread's context | |
setTid(*impl); | |
// then run either the passed in function or execute from the derived class (Runnable). | |
if(impl->fn) | |
(*impl->fn)(impl->arg); | |
else if(impl->arg) | |
(reinterpret_cast<Runnable*>(impl->arg))->execute(); | |
return 0; | |
} | |
//execute 实现: | |
void Ext::CpuWorkerThread::execute() | |
{ | |
mThreadId = getId(); | |
while (!quitIsSignalled()) | |
{ | |
mOwner->resetWakeSignal(); | |
PxBaseTask* task = TaskQueueHelper::fetchTask(mLocalJobList, mQueueEntryPool); | |
if(!task) | |
task = mOwner->fetchNextTask(); | |
if (task) | |
{ | |
mOwner->runTask(*task); | |
task->release(); | |
} | |
else | |
{ | |
mOwner->waitForWork(); | |
} | |
} | |
quit(); | |
} |
# 总结
本篇主要介绍了任务管理器如果进行任务调度,以及任务是如何排布执行顺序,巧妙的通过引用计数建立任务间的依赖关系。
并介绍了 CpuDispatcher 是如何调度任务的执行,以及 WorkThread 的实现细节。