以下为个人学习笔记整理。参考 PhysX SDK 3.4.0 文档,部分代码可能来源于更高版本。

# PhysX——Task Management 篇

任务管理器是 PhysX 管理 CPU 和 GPU 计算资源的调度管理器。在保证每个 Task 按部就班执行的情况下尽可能的提高运行效率,在 Cloth 和 Simulate 等场景有广泛的应用。因此本篇也作为 Simulate 的前篇来着重介绍一下「任务管理器」的实现逻辑。由于调度逻辑被拆分成了 GPU 和 CPU 两部分,本篇主要还是介绍 CPU 的调度规则。

为了能够更好理解,文章内会使用部分 Simulate 代码作为示例,来介绍 Task Management 如何运转,一些变量和类无需太过在意,先来看看 PxTaskMgr 定义:

类图

任务管理的基本单位就是 Task,任务管理的本质就是如何指定不同的 CPU / GPU 去执行不同 Task,根据各 Task 的依赖关系来控制执行顺序,PxTaskMgr 由于历史原因,保留了两种 Task 的实现机制:PxTask(绿色)、PxLightCpuTask(蓝色)。

# Task

image-20221021170949427

PxBaseTask 是所有 Task 的基类,在 PhysX 3.0 之前 APEX 基本上通过 PxTask 的派生类作为任务单元定制化逻辑,在 PhysX 3.0 以后,基本上所有 Task 都用 PxLightCpuTask 实现。两者都是通过「引用计数」来控制调用时机,当引用计数为 0 的情况下,Task 就会被 submit 给 CPU / GPU 执行。

# PxTask

PxTask 是早期的实现,所有 PxTask 会存储在 shdfnd::Array<PxTaskTableRow> (PxTaskMgr::mTaskTable) 中,PxTaskTableRow 会记录该 PxTask 的前置 / 后置依赖链表的首地址,以及该任务当前的引用计数和执行状态,而整个依赖关系会被存储在 shdfnd::Array<PxTaskDepTableRow> (PxTaskMgr::mDepTable) 中。

  • PxTask 支持两种创建方式:匿名 / 命名,两者本质区在于命名任务可以通过名字进行查找。
  • 当 PxTask 被创建时,会获得 1 个引用计数,通过调用 finishBeforestartAfter 添加前置 / 后置依赖的情况下,也可以增加依赖方的引用计数。
  • 在调用 startSimulation 会对所有 PxTask 的引用计数减 1,如果引用计数为 0 且满足依赖的情况下 PxTask 将会被 dispatchTask
  • 当任务执行完毕以后 ( taskCompleted ),会通过 resolveRow 清理 PxTaskTableRow 中的记录,并更新其他任务依赖信息, dispatchTask 满足条件的任务。

image-20221021192950414

由于是多线程,每个 PxTask 的读写操作都需要加锁,这个锁是 PxTaskMgr 级别的,因此所有访问 PxTask 的操作都将产生竞争,并依赖 PxTaskMgr 提供的锁进行各种原子操作,对于多线程同时操作 PxTask 的来说,这点非常有必要,但会带来更多的性能开销。

# PxLightCpuTask

PxLightCpuTask 是个更轻量级的任务,整体实现也相对比较简单。舍弃了匿名 / 命名规则,数据也不集中存储在 PxTaskMgr,而是在 PxLightCpuTask 中。

PxLightCpuTask 没有提供多依赖的特性。换句话说,每个 PxLightCpuTask 只能依赖 / 被依赖至多一个任务 (多分支变为单分支),这个特性使得 PxLightCpuTask 所有读写操作不需要加锁 (只可能被主线程和执行线程操作,且两者是互斥的),性能 UpMax。为此,还必须约束 PxLightCpuTask 运行过程中不能操作其他的 PxLightCpuTask。

PxLightCpuTask 由于执行不依赖 PxTaskMgr,因此调用会比较简单:

  • 通过 setContinuation 设置执行的前置依赖和 TaskMgr,当然也可以不设置。
  • 通过 removeReference 主动减少引用计数触发任务的 submit。

image-20221021195806514

题外话:为了支持 PxTask 的多依赖等特性还额外定义了 FanoutTask,并且调整锁的级别从 PxTaskMgr 变为了 FanoutTask,这里就不再展开。

# CpuDispatcher

CpuDispatcher 负责 submit 后最终的派发工作,如果在没有设置多线程情况下,CpuDispatcher 的派发的 Task 将在主线程执行。

下面来聊聊在多线程情况下的执行流程,先看看 CpuDispatcher 的初始化:

DefaultCpuDispatcher::DefaultCpuDispatcher(uint32_t numThreads, uint32_t* affinityMasks)
	: mQueueEntryPool(TASK_QUEUE_ENTRY_POOL_SIZE), mNumThreads(numThreads), mShuttingDown(false)
{
	uint32_t defaultAffinityMask = 0;
	// 创建 work 线程
	mWorkerThreads = reinterpret_cast<CpuWorkerThread*>(PX_ALLOC(numThreads * sizeof(CpuWorkerThread), PX_DEBUG_EXP("CpuWorkerThread")));
	if (mWorkerThreads)
	{
		for (uint32_t i = 0; i < numThreads; ++i)
		{
			PX_PLACEMENT_NEW(mWorkerThreads + i, CpuWorkerThread)();	// 创建 thread 对象
			mWorkerThreads[i].initialize(this);	// 绑定 thread 和 dispacther 
		}
		for (uint32_t i = 0; i < numThreads; ++i)
		{
			mWorkerThreads[i].start(shdfnd::Thread::getDefaultStackSize()); // 启动线程
             // 设置线程掩码,对于掩码的介绍:
             // On Windows, Linux, PS4, XboxOne and Switch platforms, each set mask bit represents
             // the index of a logical processor that the OS may schedule thread execution on
			if (affinityMasks)
			{
				mWorkerThreads[i].setAffinityMask(affinityMasks[i]);
			}
			else
			{
				mWorkerThreads[i].setAffinityMask(defaultAffinityMask);
			}
			char threadName[32];
			shdfnd::snprintf(threadName, 32, "PxWorker%02d", i);
			mWorkerThreads[i].setName(threadName);
		}
	}
	else
	{
		mNumThreads = 0;
	}
}

# CpuWorkerThread

CpuWorkerThread 继承自 Thread,但 Thread 源码看不到,官方提供了接口说明 ——ThreadImpl,感兴趣的可以了解一下。

CpuWorkerThread 有两个关键接口:tryAcceptJobToLocalQueueexecute

# tryAcceptJobToLocalQueue

该接口在 Dispatcher 的 submitTask 中被调用。会把已经注册到该线程的 Task 提交到本地的工作队列 (LocalJobList)

void DefaultCpuDispatcher::submitTask(PxBaseTask& task)
{
	shdfnd::Thread::Id currentThread = shdfnd::Thread::getId();
	// TODO: Could use TLS to make this more efficient
	for (uint32_t i = 0; i < mNumThreads; ++i)
		if (mWorkerThreads[i].tryAcceptJobToLocalQueue(task, currentThread))
		{
			return mWorkReady.set();
		}
    // 主线程中
	SharedQueueEntry* entry = mQueueEntryPool.getEntry(&task);
	if (entry)
	{
		mJobList.push(*entry);
		mWorkReady.set();
	}
}
// CpuWorkerThread 中
bool CpuWorkerThread::tryAcceptJobToLocalQueue(PxBaseTask& task, shdfnd::Thread::Id taskSubmitionThread)
{
    //submit task 必须在当前线程进行操作
	if (taskSubmitionThread == mThreadId)
	{
		SharedQueueEntry* entry = mQueueEntryPool.getEntry(&task);
		if (entry)
		{
			mLocalJobList.push(*entry);
			return true;
		}
		else
		{
			return false;
		}
	}
	return false;
}

这里会有两种情况:

  • 当前线程是主线程情况下,没有匹配的 CpuWorkerThread,因此会直接插入到 DefaultCpuDispatcher,因为 DefaultCpuDispatcher 只会在主线程创建。
  • 当前线程是工作线程情况下:找到对应的工作线程 ID,并添加到工作线程的本地 JobList。

# execute

执行操作分为以下几个步骤:

  • 【step.1】当前工作线程如果收到退出信号则直接退出。
  • 【step.2】线程唤醒时,先重置唤醒标记。(避免中途异常退出导致该线程一直处于唤醒
  • 【step.3】从本地工作队列获取一个 Task,如果为空继续获取,知道工作队列为空或者取到一个可执行 Task。
  • 【step.4】执行 Task,并调用 release,触发后续任务的 submit,可能在当前工作线程继续执行或把操作权限移交主线程。
  • 【step.5】如果没有可执行的 Task,将挂起当前工作线程。

相关线程消息及接口说明见 SyncImpl

pthread_create(&getThread(this)->thread, &attr, PxThreadStart, this);
// 线程唤醒后执行 execute
void* PxThreadStart(void* arg)
{
	_ThreadImpl* impl = getThread(reinterpret_cast<ThreadImpl*>(arg));
	impl->state = _PxThreadStarted;
	// run setTid in thread's context
	setTid(*impl);
	// then run either the passed in function or execute from the derived class (Runnable).
	if(impl->fn)
		(*impl->fn)(impl->arg);
	else if(impl->arg)
		(reinterpret_cast<Runnable*>(impl->arg))->execute();
	return 0;
}
//execute 实现:
void Ext::CpuWorkerThread::execute()
{
   mThreadId = getId();
   while (!quitIsSignalled())
    {
      mOwner->resetWakeSignal();
      PxBaseTask* task = TaskQueueHelper::fetchTask(mLocalJobList, mQueueEntryPool);
      if(!task)
         task = mOwner->fetchNextTask();
      
      if (task)
      {
         mOwner->runTask(*task);
         task->release();
      }
      else
      {
         mOwner->waitForWork();
      }
   }
   quit();
}

# 总结

本篇主要介绍了任务管理器如果进行任务调度,以及任务是如何排布执行顺序,巧妙的通过引用计数建立任务间的依赖关系。

并介绍了 CpuDispatcher 是如何调度任务的执行,以及 WorkThread 的实现细节。

更新于 阅读次数

请我[恰饭]~( ̄▽ ̄)~*

鑫酱(●'◡'●) 微信支付

微信支付

鑫酱(●'◡'●) 支付宝

支付宝