Operator
基本特点
Caffe2中大多数我们所接触的operator都是class Operator的子类。
而Operator则是上系列中我们提及的class OperatorBase的子类。下面我们将一一过一下它新加的一些主要接口及其涵义。
它与OperatorBase不同,是Device context相关的一个模板类。因此与OperatorBase不同,它包含了一个名为context的属性。
// Operator is the class that you usually want to derive, if your operator will
// run on different devices. You should then implement the RunOnDevice()
// function.
template <class Context>
class Operator : public OperatorBase {
public:
explicit Operator(const OperatorDef& operator_def, Workspace* ws)
: OperatorBase(operator_def, ws), context_(operator_def.device_option()) {
// In the constructor, we switch to the device so that the child class
// constructors will run on that device.
context_.SwitchToDevice(0);
}
~Operator() noexcept override {}
.......
........
const Context* getContext() const {
return &context_;
}
protected:
void RecordEvent(const char* err_msg = nullptr) final {
if (event_) {
context_.Record(event_.get(), err_msg);
}
}
Context context_;
};
而像一般的输入、输出基本operator功能,则都通过直接委托父类OperatorBase来完成,如下所示:
inline const Tensor& Input(
int idx,
DeviceType type = Context::GetDeviceType()) {
return OperatorBase::template Input<Tensor>(idx, type);
}
inline Tensor* Output(int idx, at::IntList dims, at::TensorOptions options) {
if (options.device_opt() == c10::nullopt) {
return OperatorBase::OutputTensor(
idx, dims, options.device(context_.device()));
}
return OperatorBase::OutputTensor(idx, dims, options);
}
inline Tensor* Output(int idx, DeviceType type = Context::GetDeviceType()) {
return OperatorBase::template Output<Tensor>(idx, type);
}
Event相关async处理
对于Event等operator异步执行相关的处理,它跟OperatorBase并无太多不同,但同时也将event处理跟具体的device context更加紧密地绑定起来了。因此如果你的async operator
是Operator的子类,那么将可直接使用它提供的一些Event处理函数来进行异步操作即可,但若你使用OperatorBase作为父类,那么还得考虑所使用的operator的具体device种类,并在
进行event处理时考虑传device context参数。
void WaitEvent(const Event& ev, int stream_id = -1) final {
if (stream_id >= 0) {
context_.SwitchToDevice(stream_id);
}
context_.WaitEvent(ev);
}
void WaitEvents(const std::vector<const Event*>& events, int stream_id = -1)
final {
if (stream_id >= 0) {
context_.SwitchToDevice(stream_id);
}
for (const auto& ev : events) {
context_.WaitEvent(*ev);
}
}
Run及RunAsync
若说operator里面最为核心及用户接触最频繁的两个函数,那么肯定非Run及RunAsync莫属。
当然它们都是包了真正子类Operator里面定义的RunOnDevice函数。只是Run用来以sync的方式来执行一个op,而RunAsync则是以async的方式来执行它。
下面是op sync执行的方式。
// The run function of Operator switches to the device, and then carries out
// the actual computation with RunOnDevice(). You should implement RunOnDevice
// instead of Run().
// Note: Run does not update operator's event and can be used only with
// non-async executors that do not rely on events
bool Run(int stream_id = 0) final {
try {
StartAllObservers();
context_.SwitchToDevice(stream_id);
bool result = RunOnDevice();
if (!result) {
this->RecordLastFailedOpNetPosition();
}
context_.FinishDeviceComputation(); // throws on error
StopAllObservers();
return result;
} catch (EnforceNotMet& err) {
if (has_debug_def()) {
err.AppendMessage(
"Error from operator: \n" + ProtoDebugString(debug_def()));
AddRelatedBlobInfo(&err);
}
this->RecordLastFailedOpNetPosition();
StopAllObservers();
throw;
} catch (...) {
this->RecordLastFailedOpNetPosition();
StopAllObservers();
throw;
}
}
下面则是op async执行的具体函数RunAsync。可见我们在operator里面定义的一些event相关函数,大多都是在这里被使用的。
bool RunAsync(int stream_id = 0) final {
try {
StartAllObservers();
context_.SwitchToDevice(stream_id);
auto result = RunOnDevice();
if (result) {
if (HasAsyncPart()) {
RecordEvent();
} else {
// Manually set CPU operator's event status to finished,
// unless this is an async CPU operator
SetEventFinished();
}
} else {
SetEventFinished(getErrorMsg().c_str());
this->RecordLastFailedOpNetPosition();
}
StopAllObservers();
return result;
} catch (EnforceNotMet& err) {
if (has_debug_def()) {
err.AppendMessage(
"Error from operator: \n" + ProtoDebugString(debug_def()));
AddRelatedBlobInfo(&err);
}
SetEventFinishedWithException(err.what());
this->RecordLastFailedOpNetPosition();
StopAllObservers();
throw;
} catch (const std::exception& err) {
SetEventFinishedWithException(err.what());
this->RecordLastFailedOpNetPosition();
StopAllObservers();
throw;
} catch (...) {
SetEventFinishedWithException(getErrorMsg().c_str());
this->RecordLastFailedOpNetPosition();
StopAllObservers();
throw;
}
}
真正干活的RunOnDevice函数在这里是个纯虚函数,并不做啥事,接口而已。
virtual bool RunOnDevice() = 0;
Operator async属性及async执行
Operator async执行有两个概念需要领悟清楚。其一HasAsyncPart,它指的是我们常规意义上理解的async执行,即不等待操作真正执行完毕而是立即就返回一个handle,将来
需要时再去check看操作是否真正完成。另二则是AsyncScheduling,它指的是是否我们在执行的op支持不待其input ready(即父op执行完成)就被schedule到pool或stream中以
avalable的状态去执行(这一设计显然是受CUDA programming model影响而来的),当然也不可能真正的不顾input是否就绪,只是将同步的责任由framework移交给了CUDA而已。
// Events of operators that don't have async parts are automatically set
// to finished state by RunAsync.
// Defaulting to the value from context (true for CUDA, false for CPU).
// Override in case of async CPU operators
// Async CPU operators are expected to catch all exceptions in async parts
// and set Event to finished/failed state with Event::SetFinished or
// SetFinishedWithException call.
bool HasAsyncPart() const override {
return context_.HasAsyncPartDefault();
}
// Returns whether operator's RunOnDevice schedules async on device part and
// can be run without waiting for parent operator's async part to be finished
// on the same device.
// Note: when true, RunOnDevice must not access the content of the input blobs
// as they might not be computed yet
// Note: when true, operator's device needs to support async scheduling:
// - supports concept of streams: async ops scheduled on the same stream are
// guaranteed to be executed in the same order they were scheduled
// - provides non-blocking cross device/cross stream synchronization
// primitives
//
// By default, assuming an op with an async part can be scheduled
// asynchronously if device supports async scheduling
bool SupportsAsyncScheduling() const override {
return HasAsyncPart() && context_.SupportsAsyncScheduling();
}
一般,我们以sync的方式执行完成一个op后,就需要以使用一个barrier来保证其真正执行完成,这里亦是参考了CUDA 异步编程模型里的思想。这framework简直就是CUDA
编程模型的一个wrapper啊!呵呵,这也是让其它CPU/ASIC等各种厂商很讨厌的地方,所谓的nVidia CUDA生态的护城河。。
void SyncDeviceBarrierForObservers() override {
context_.FinishDeviceComputation();
}
Operator相关的utilities
下面是不同Device context operator注册相关的一些utilities。这一套作法跟之前Caffe里面Layer/Net/Solver等工厂模式很是类似,毕竟是出自一个人的手笔嘛。。
以下为CPU operator注册的一些utilities,其它像CUDA/HIP/IDEEP等都比较类似。
// The operator registry. Since we are not expecting a great number of devices,
// we will simply have an if-then type command and allocate the actual
// generation to device-specific registerers.
// Note that although we have CUDA and CUDNN here, the registerers themselves do
// not depend on specific cuda or cudnn libraries. This means that we will be
// able to compile it even when there is no cuda available - we simply do not
// link any cuda or cudnn operators.
C10_DECLARE_REGISTRY(
CPUOperatorRegistry,
OperatorBase,
const OperatorDef&,
Workspace*);
#define REGISTER_CPU_OPERATOR_CREATOR(key, ...) \
C10_REGISTER_CREATOR(CPUOperatorRegistry, key, __VA_ARGS__)
#define REGISTER_CPU_OPERATOR(name, ...) \
C10_IMPORT void CAFFE2_PLEASE_ADD_OPERATOR_SCHEMA_FOR_##name(); \
static void CAFFE2_UNUSED CAFFE_ANONYMOUS_VARIABLE_CPU##name() { \
CAFFE2_PLEASE_ADD_OPERATOR_SCHEMA_FOR_##name(); \
} \
C10_REGISTER_CLASS(CPUOperatorRegistry, name, __VA_ARGS__)
#define REGISTER_CPU_OPERATOR_STR(str_name, ...) \
C10_REGISTER_TYPED_CLASS(CPUOperatorRegistry, str_name, __VA_ARGS__)
#define REGISTER_CPU_OPERATOR_WITH_ENGINE(name, engine, ...) \
C10_REGISTER_CLASS(CPUOperatorRegistry, name##_ENGINE_##engine, __VA_ARGS__)