단순히 문서 해석과 궁금한 점을 덧붙여 작성하였습니다.
시리즈는 아래와 같습니다.
Framework Concepts
Framework Concepts - Calculators
각 Calculator는 그래프의 노드입니다. 우리는 어떻게 새로운 calculator를 만드는지, 어떻게 calculator를 초기화하고, 어떻게 그것의 calculations, input과 output streams, timestamps, options 들을 수행하는지 설명합니다. 그래프의 각 노드들은 Calculator 로써 구현되어있습니다. graph execution의 bulk는 해당 calulators 안에서 일어납니다. calculator는 아마도 0 혹은 더 많은 input stream 과/혹은 side packets 를 받을 수도 있고, 0 혹은 더 많은 output streams 과/혹은 side packets를 생산할 수도 있습니다.
CalculatorBase
Calculator는 CalculatorBase 클래스의 새로운 sub-class를 정의함에 있어 생성됩니다. CalculatorBase 클래스는 많은 메소드들을 구현하는 것과 미디어파이프와 함께 새로운 sub-class 를 등록합니다. 최소한, 새로운 calculator는 반드시 아래 네 개의 메소드를 구현해야합니다.
GetContract()
Calculator 작성자들은 GetContract() 안에서 calculator의 inputs과 outputs 의 예상되는 타입을 특정할 수 있습니다. 그래프가 초기화될 때, 프레임워크는 만약 연결된 inputs과 outputs 의 packet types 이 특정된 정보와 맞는지 확인하기 위해서 static method 를 호출합니다.
Open()
그래프가 시작된 이후, 프레임워크는 Open()을 호출합니다. 이 input side packets는 이 상황에서 calculator로 사용할 수 있습니다. Open()는 노드 설정 명령들을 해석하고(Graphs 문서를 보세요) calculator의 per-graph-run state를 준비합니다. 이 함수는 아마도 또한 calculator outputs를 위한 packets을 작성합니다. Open() 중에 에러는 그래프 실행을 종료할 수 있습니다.
Process()
inputs과 함께 calculator를 위해서, 프레임워크는 Process()를 반복적으로 언제든지 최소 하나의 input stream이 packet을 사용가능할 때 호출합니다. 프레임워크는 모든 inputs은 같은 timestamp를 가지고 있음(더 많은 정보를 위해서는 Synchronization을 보세요)을 기본적으로 보증합니다. 여러 Process() 호출들은 병렬 실행이 끝날 때 동시에 발생할 수 있습니다. 만약 Process() 중에 에러가 발생한다면, 프레임워크는 Close() 를 호출하고 그래프 실행은 종료됩니다.
Close()
Process() 종료를 위한 모든 호출 이후 혹은 모든 input streams가 닫힐 때, 프레임워크는 Close()를 호출합니다. 이 함수는 만약 Open()이 취소되고 그리고 진행되고 심지어 만약 그래프 실행이 에러로 인해 종료 되었을 때 항상 호출됩니다. Close() 중에는 어떤 input streams 를 거쳐 가능한 inputs이 없습니다만, 아직도 input side packets에 접근하고, 그래서 outputs를 아마도 출력할 수 있습니다. Close() 가 리턴된 이후, calculator는 죽은 노드로 여겨집니다. calculator object는 그래프가 실행을 끝낸 뒤에 금방 소멸됩니다.
CalculatorBase.h 의 일부 코드입니다.
class CalculatorBase {
public:
...
// The subclasses of CalculatorBase must implement GetContract.
// ...
static absl::Status GetContract(CalculatorContract* cc);
// Open is called before any Process() calls, on a freshly constructed
// calculator. Subclasses may override this method to perform necessary
// setup, and possibly output Packets and/or set output streams' headers.
// ...
virtual absl::Status Open(CalculatorContext* cc) {
return absl::OkStatus();
}
// Processes the incoming inputs. May call the methods on cc to access
// inputs and produce outputs.
// ...
virtual absl::Status Process(CalculatorContext* cc) = 0;
// Is called if Open() was called and succeeded. Is called either
// immediately after processing is complete or after a graph run has ended
// (if an error occurred in the graph). ...
virtual absl::Status Close(CalculatorContext* cc) {
return absl::OkStatus();
}
...
};
Life of a Calculator
Mediapipe graph의 초기화 중에, 프레임워크는 어떤 종류의 packets이 예상되는지 결정하기 위해 Getcontract() static method 를 호출합니다.
프레임워크는 전체 calculator를 각 그래프 실행을 위해(예를 들어 비디오 혹은 이미지 마다 한번씩) 구성하고 소멸합니다. 그래프 실행을 지나서 변함없이 남겨지는 비싸고 큰 오브젝트는 input side packets로써 공급되어야 합니다. 그래서 calculations는 이후 실행시 반복되지 않도록 합니다.
초기화 이후, 각 그래프의 실행을 위해, 아래의 시퀀스가 발생합니다:
- Open()
- Process() (반복적으로)
- Close()
프레임워크는 Open()을 calculator를 초기화하기 위해 호출합니다. Open()은 어떤 옵션과 calculator의 per-graph-run state를 셋업을 해석해야합니다. Open()은 아마도 input side packets을 포함하고 calculator outputs를 위해 packets을 출력합니다. 만약 적절하다면, 그것은 input streams의 잠재적인 packet 버퍼링을 줄이기 위해 SetOffSet()을 호출합니다.
만약 에러가 Open() 혹은 Process() 중에 발생한다면,(그들 중 하나가 ok가 아닌 상태를 반환하며 가리키면서) 그래프 실행은 더 이상의 calculator의 함수 호출 없이 종료되고, calculator는 소멸됩니다.
inputs과 함께 calculator를 위해, 프레임워크는 언제든지 최소 하나의 input이 사용가능한 packet 을 가지고 있을 때마다 Process()를 호출합니다. 프레임워크는 inputs 모두는 같은 timestamp를 가지고 있다고 보증합니다, 그 timestamps는 각자 Process()로 호출되면서 증가되고 모든 패킷이 전송됩니다. 그 결과, 일부 inputs는 Process()가 호출될 때 어떤 packets도 가지고 있지 않을 수 있습니다. packet이 누락되는 input은 빈 packet(timestamp 없이)으로 생성하는 것으로 나타납니다.
프레임워크느 모든 Process() 호출 후 Close()를 호출합니다. 모든 inputs는 모두 사용되지만, Close()는 input side packets에 접근할 수 있고 아마도 outputs을 출력할 수 있습니다. Close의 리턴 이후, calculator는 소멸됩니다.
inputs이 없는 Calculators 들은 sources라고 불려집니다. source calculator는 Ok 상태를 반환하는 한 Process()를 계속 호출합니다. source calculator는 stop status를 리턴함으로써 모두 사용되었음을 가리킵니다.(다시 말해서, MediaPipe::tool::StatusStop)
Identifying inputs and outputs
calculator를 위한 public interface는 input streams 과 output streams의 집합으로 구성됩니다. CalculatorGraphConfiguration안에서, 일부 calculators로부터의 outputs들은 named streams을 사용하는 다른 calculators inputs과 연결되어 있습니다. stream names 들은 주로 lowercase이고, 반면에 input과 output tags는 주로 UPPERCASE 입니다. 아래의 예제에서, VIDEO 태그명과 같이 있는 output 은 video_stream 이라고 명명된 stream을 사용하는 VIDEO_IN 태그명과 같이 있는 input과 연결되어 있습니다.
# Graph describing calculator SomeAudioVideoCalculator
node {
calculator: "SomeAudioVideoCalculator"
input_stream: "INPUT:combined_input"
output_stream: "VIDEO:video_stream"
}
node {
calculator: "SomeVideoCalculator"
input_stream: "VIDEO_IN:video_stream"
output_stream: "VIDEO_OUT:processed_video"
}
input과 output streams는 index number, tag name, 혹은 tag name과 index number의 조합에 의해 구별할 수 있습니다. 당신은 아래 예제에서 input 과 output identifiers의 몇몇 예를 볼 수 있습니다. SomAudioVideoCalculator 는 그것의 video output 을 tag로써 식별하고 그것의 audio outputs는 tag와 index의 조합으로써 식별합니다. tag VIDEO와 함께있는 input은 video_stream 이라고 명명한 stream 에 연결되어있습니다. tag AUDIO와 함께있는 outputs 과 0과 1의 index들은 audio_left와 audio_right라는 이름의 streams에 연결되어 있습니다. SomeAudioCalculator는 index 만으로 (tag가 필요없습니다) 그것의 audio inputs을 식별합니다.
# Graph describing calculator SomeAudioVideoCalculator
node {
calculator: "SomeAudioVideoCalculator"
input_stream: "combined_input"
output_stream: "VIDEO:video_stream"
output_stream: "AUDIO:0:audio_left"
output_stream: "AUDIO:1:audio_right"
}
node {
calculator: "SomeAudioCalculator"
input_stream: "audio_left"
input_stream: "audio_right"
output_stream: "audio_energy"
}
calculator 구현에서, inputs과 outputs은 또한 tag name 과 index number에 의해 식별됩니다. 아래의 함수에서 input과 output은 식별됩니다:
- index number 로써: input stream은 index 0으로써 간단하게 식별됩니다.
- tag name 으로써: video output stream은 tag name VIDEO 로 식별됩니다.
- tag name 과 index number 로써: output audio streams 들은 tag name AUDIO 와 index numbers 0과 1의 조합으로 식별됩니다.
// c++ Code snippet describing the SomeAudioVideoCalculator GetContract() method
class SomeAudioVideoCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
cc->Inputs().Index(0).SetAny();
// SetAny() is used to specify that whatever the type of the
// stream is, it's acceptable. This does not mean that any
// packet is acceptable. Packets in the stream still have a
// particular type. SetAny() has the same effect as explicitly
// setting the type to be the stream's type.
cc->Outputs().Tag("VIDEO").Set<ImageFrame>();
cc->Outputs().Get("AUDIO", 0).Set<Matrix>();
cc->Outputs().Get("AUDIO", 1).Set<Matrix>();
return absl::OkStatus();
}
Processing
Process()는 non_source node 에서 불려지고 모든 것이 잘 갔다고 알려주기 위해 absl::OkStatus()를, 혹은 에러를 나타내기 위해 다른 상태 코드를 반드시 리턴해야 합니다.
만약 non-source calculator가 tool::StatusStop()을 리턴할 경우, 그러면 이 신호는 그래프가 초기에 취소되었다는 것입니다. 이 경우, 모든 source calculators 와 graph input streams는 닫혀집니다.(그리고 남아있는 Packets 는 그래프를 통해 전파됩니다.)
그래프의 source node 는 absl::OkStatus()를 리턴할 때까지 Process() 호출을 계속합니다. 생성될 더 이상 데이터가 없다는 것을 알려주기 위해서 tool::StatusStop()을 리턴합니다. 어떤 다른 상태는 에러가 발생함을 나타냅니다.
Close()는 성공을 알리기 위해 absl::OkStatus()를 리턴합니다. 어떤 다른 상태는 실패를 나타냅니다.
여기에 기본적인 Process() 함수가 있습니다. 이것은 그것의 input data를 요청하기 위해 Input() 함수(만약 calculator가 하나의 input을 가지고 있을 때만 유일하게 사용될 수 있는)에서 사용됩니다. 그런 다음, output packet을 위해 필요한 메모리 할당을 하기 위해 std::unique_ptr 로 사용되고 calculations를 수행합니다. output stream에 추가할 때 포인터를 해제하면 끝납니다.
absl::Status MyCalculator::Process() {
const Matrix& input = Input()->Get<Matrix>();
std::unique_ptr<Matrix> output(new Matrix(input.rows(), input.cols()));
// do your magic here....
// output->row(n) = ...
Output()->Add(output.release(), InputTimestamp());
return absl::OkStatus();
}
Calculator options
Calculatos 는 (1) input stream packets (2) input side packets 과 (3) calculator options 을 통해 파라미터들을 처리하는 것을 허용합니다. Calculator options, 특정짓자면, CalculatorGraphConfiguration.Node 메시지의 node_options 필드에서 문자대로 값이 보여집니다.
node {
calculator: "TfLiteInferenceCalculator"
input_stream: "TENSORS:main_model_input"
output_stream: "TENSORS:main_model_output"
node_options: {
[type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
model_path: "mediapipe/models/detection_model.tflite"
}
}
}
node_options 필드는 proto3 문법을 허용합니다. 대체적으로, calculator options 는 proto2 문법을 사용한 options 필드에서 서술될 수 있습니다.
node {
calculator: "TfLiteInferenceCalculator"
input_stream: "TENSORS:main_model_input"
output_stream: "TENSORS:main_model_output"
node_options: {
[type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
model_path: "mediapipe/models/detection_model.tflite"
}
}
}
모든 calculators가 calculator options 를 허용하는 것은 아닙니다. options 를 허용하기 위해, calculator는 보통 새로운 protobuf message type을 그것의 options을 나타내기 위해 정의합니다, PacketClonerCalculatorOptions와 같은 것입니다. calculator는 그리고 CalculatorBase::Open 메소드 안에 그 protobuf message, 아마도 또한 CalculatorBase::Getcontract 함수 혹은 그것의 CalculatorBase::Process 함수를 읽습니다. 보통, 새로운 protobuf message type은 ".proto" 파일과 mediapipe_proto_library() 빌드 규정을 사용함으로써 protobuf schema 로 정의될 것입니다.
mediapipe_proto_library(
name = "packet_cloner_calculator_proto",
srcs = ["packet_cloner_calculator.proto"],
visibility = ["//visibility:public"],
deps = [
"//mediapipe/framework:calculator_options_proto",
"//mediapipe/framework:calculator_proto",
],
)
Example calculator
이번 섹션은 PacketClonerCalculator의 구현에 대해서 설명합니다. PacketClonerCalculator는 간단한 job과 연관도어 있고, 많은 calculator graphs에서 사용됩니다. PacketClonerCalculator 는 단순히 그것의 대부분 최근 input packets의 복사본을 요구대로 생성합니다.
PacketClonerCalculator는 data packets이 도착하는 것 이 완벽하게 정렬되지 않은 timestamps에서 유용합니다. 우리가 도청기가 설치되어있는 방을 가지고 있다고 가정해봅시다, 빛 센서와 비디오 카메라는 감각적인 데이터를 모읍니다. 각 센서들은 독립적으로 구성되고 간헐적으로 데이터를 수집합니다. 각 센서의 출력이 다음과 같다고 해봅시다:
- 도청기 = 방의 소리의 데시벨의 강도 (숫자)
- 빛 센서 = 방의 밝기 (숫자)
- 비디오 카메라 = 방의 RGB 이미지 프레임 (이미지 프레임)
우리의 간단한 perception pipeline은 이 3개 센서들로부터 언제든지 우리가 마지막으로 수집된 마이크의 음량 데이터와 빛센서의 밝기 데이터와 동기화된 카메라로부터 이미지 프레임 데이터를 가지고 있을 때 감각적인 데이터를 처리하기위해 설계되었습니다. MediaPipe와 이것을 하기 위해, 우리의 perception pipeline은 3개의 inpput streams 를 가집니다:
- room_mic_signal - 이 input stream 에서 데이터의 각 packet 은 timestamp와 함께 방에서 얼마나 큰 소리인지를 나타내는integer 데이터입니다.
- room_lightening_sensor - 이 input stream 에서 데이터의 각 packet은 timestamp와 함께 얼마나 조명된 방의 밝기를 나타내는 integer 데이터입니다.
- room_video_tick_signal - 이 input stream 에서 데이터의 각 packet은 timestamp와 함께 방에서 카메라로부터 수집된 비디오를 나타내는 비디오 데이터의 imageframe 입니다.
아래는 PacketClonerCalculator 의 구현입니다. 당신은 GetContract(), Open(), 그리고 Process() 메소드를 인스턴스 변수인 가장 최근 input packets을 들고 있는 current_ 만큼 잘 볼 수 있을 것입니다.
// This takes packets from N+1 streams, A_1, A_2, ..., A_N, B.
// For every packet that appears in B, outputs the most recent packet from each
// of the A_i on a separate stream.
#include <vector>
#include "absl/strings/str_cat.h"
#include "mediapipe/framework/calculator_framework.h"
namespace mediapipe {
// For every packet received on the last stream, output the latest packet
// obtained on all other streams. Therefore, if the last stream outputs at a
// higher rate than the others, this effectively clones the packets from the
// other streams to match the last.
//
// Example config:
// node {
// calculator: "PacketClonerCalculator"
// input_stream: "first_base_signal"
// input_stream: "second_base_signal"
// input_stream: "tick_signal"
// output_stream: "cloned_first_base_signal"
// output_stream: "cloned_second_base_signal"
// }
//
class PacketClonerCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
const int tick_signal_index = cc->Inputs().NumEntries() - 1;
// cc->Inputs().NumEntries() returns the number of input streams
// for the PacketClonerCalculator
for (int i = 0; i < tick_signal_index; ++i) {
cc->Inputs().Index(i).SetAny();
// cc->Inputs().Index(i) returns the input stream pointer by index
cc->Outputs().Index(i).SetSameAs(&cc->Inputs().Index(i));
}
cc->Inputs().Index(tick_signal_index).SetAny();
return absl::OkStatus();
}
absl::Status Open(CalculatorContext* cc) final {
tick_signal_index_ = cc->Inputs().NumEntries() - 1;
current_.resize(tick_signal_index_);
// Pass along the header for each stream if present.
for (int i = 0; i < tick_signal_index_; ++i) {
if (!cc->Inputs().Index(i).Header().IsEmpty()) {
cc->Outputs().Index(i).SetHeader(cc->Inputs().Index(i).Header());
// Sets the output stream of index i header to be the same as
// the header for the input stream of index i
}
}
return absl::OkStatus();
}
absl::Status Process(CalculatorContext* cc) final {
// Store input signals.
for (int i = 0; i < tick_signal_index_; ++i) {
if (!cc->Inputs().Index(i).Value().IsEmpty()) {
current_[i] = cc->Inputs().Index(i).Value();
}
}
// Output if the tick signal is non-empty.
if (!cc->Inputs().Index(tick_signal_index_).Value().IsEmpty()) {
for (int i = 0; i < tick_signal_index_; ++i) {
if (!current_[i].IsEmpty()) {
cc->Outputs().Index(i).AddPacket(
current_[i].At(cc->InputTimestamp()));
// Add a packet to output stream of index i a packet from inputstream i
// with timestamp common to all present inputs
} else {
cc->Outputs().Index(i).SetNextTimestampBound(
cc->InputTimestamp().NextAllowedInStream());
// if current_[i], 1 packet buffer for input stream i is empty, we will set
// next allowed timestamp for input stream i to be current timestamp + 1
}
}
}
return absl::OkStatus();
}
private:
std::vector<Packet> current_;
int tick_signal_index_;
};
REGISTER_CALCULATOR(PacketClonerCalculator);
} // namespace mediapipe
일반적으로, calculator는 .cc 파일만을 가지고 있습니다. .h 는 필요하지 않습니다. 왜냐하면 mediapipe 는 calculators가 알려지기 위해 등록을 사용하기 때문입니다. 당신이 calculator class를 정의하고 난 후, 매크로 선언문 REGISTER_CALCULATOR(calculator_claass_name) 으로 등록합니다.
아래는 3개의 input streams, 하나의 node(PacketClonerCalculator)와 2개의 output streams를 가지고 있는 간단한 MediaPipe 그래프입니다.
input_stream: "room_mic_signal"
input_stream: "room_lighting_sensor"
input_stream: "room_video_tick_signal"
node {
calculator: "PacketClonerCalculator"
input_stream: "room_mic_signal"
input_stream: "room_lighting_sensor"
input_stream: "room_video_tick_signal"
output_stream: "cloned_room_mic_signal"
output_stream: "cloned_lighting_sensor"
}
아래의 다이어그램은 어떻게 PacketClonerCalculator가 그것의 input packets(위)의 연속을 기반으로 한 그것의 output packets(아래) 를 정의하는지를 보여줍니다.
References
https://google.github.io/mediapipe/framework_concepts/calculators.html
'개발 > mediapipe' 카테고리의 다른 글
python - face mesh (0) | 2022.03.05 |
---|---|
mediapipe hello world error on m1 (0) | 2022.03.03 |
Framework Concepts - Graphs (0) | 2022.03.03 |
mediapipe install on iOS (2) | 2022.03.03 |
Windows 에서 media pipe 설치하기 (0) | 2022.03.01 |