Mediapipe 在RK3399PRO上的初探(二)(自定义Calculator)

2024-06-16 06:58

本文主要是介绍Mediapipe 在RK3399PRO上的初探(二)(自定义Calculator),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

PS:要转载请注明出处,本人版权所有。

PS: 这个只是基于《我自己》的理解,

如果和你的原则及想法相冲突,请谅解,勿喷。

环境说明
  • Ubuntu 18.04
  • gcc version 7.5.0 (Ubuntu 7.5.0-3ubuntu1~18.04)
  • RK3399PRO 板卡

前言


  本文有一篇前置文章为《Mediapipe 在RK3399PRO上的初探(一)(编译、运行CPU和GPU Demo, RK OpenglES 填坑,编译bazel)》 https://blog.csdn.net/u011728480/article/details/115838306 ,本文是在前置文章上的一点点探索与发现。

  在前置文章中,我们介绍了一些编译和使用Mediapipe的注意事项,也初步跑起来了一点点demo,算是对这个框架有了一个初步的认知。但是前置文章也说了,了解这个框架的意义是替换我们小组现有的框架,而且能够支撑我们的板卡产品。于是我们还需要一个最最最最最最重要的功能就是“Custormer Calculator”,就是自定义计算节点,因为这个框架的核心就是计算节点。下文我们将会讲到这个框架的一些基本概念,这些概念都是来至于官方文档的机翻+我自己的理解。同时,我会给出一个我定义的计算节点的实例,算是给大家一个感性认知。





Mediapipe的一些概念(本小节基本来至于官方文档的机翻+我自己的理解,不感兴趣,请直接看下一小节)


  本小节内容,主要参考 https://github.com/google/mediapipe/tree/master/docs/framework_concepts 的几个介绍概念的md文件。
  MediaPipe 感觉中文直译为“媒体管道”,为啥会有这个名字呢?因为它把数据+处理组合成一个计算节点,然后把一个一个节点连接起来,用数据来驱动整个核心逻辑。如果大家对Caffe、ncnn类似的框架源码有一点点了解的话,就会觉得他们非常像,但是又不像。像是说,都是通过配置文件定义计算节点逻辑图,然后通过运算,得到我们想得到的逻辑图中节点的数据。不像的话,就是说的是Mediapipe的调度机制了,极大的增加了节点计算的并行功能,而那些框架是按照图节点的上下顺序进行执行的。
  上面这段话可能有点抽象,我想表达的就是 Mediapipe 就是把任何一个“操作”都可以变为一个Calculator,因为我们的每一个项目的逻辑抽象出来都是 Calculator0+Data0->Calculator1+Data1->Calculator2+Data2->… …,然后,Mediapipe 做的是基于这种calculator的调度和执行。这里我举个栗子:

  • 人脸图+检测算法=人脸检测结果,这是一个Calculator
  • RTSP流+解码模块=解码之后的图片,这是一个Calculator

好了,其他的就不过分的解读了,下面就使用MediaPipe的helloworld example( https://github.com/google/mediapipe/blob/master/mediapipe/examples/desktop/hello_world/hello_world.cc )为例,简单的说说以下几个概念。



Packet

  Packet,是mediapipe中的数据单元,它可以接收任意类型的数据。也是mediapipe中的数据流动单元。就是在mediapipe中,我们设计的Graph中,所有的逻辑流动都是通过packet流动来实现的。

  实例代码片段:

//MakePacket<std::string>("Hello World!") 创建一个packet,顺带说一句,我不喜欢这里的宏,不利于维护
MP_RETURN_IF_ERROR(graph.AddPacketToInputStream("in", MakePacket<std::string>("Hello World!").At(Timestamp(i))));//从packet中获取数据
mediapipe::Packet packet;
packet.Get<std::string>();


Graph

  Graph是由各个Calculator组成的,可以直接把Calculator理解为数据结构中图的节点。而Graph直接把他当做图就行了。Graph是我们定义的逻辑流程的具体载体,也就是说我们的业务逻辑是什么样子的,那么Graph里面就会有相应的逻辑流程。可以具备多输入输出。

  实例代码片段:

CalculatorGraph graph;


Caculator(Node)

  上面不是介绍了Graph和Packet嘛,这里的Calculator就是Graph里面的节点,也是处理Packet的具体单元。可以具备多输入输出。

  实例代码片段:

//比如mediapipe/calculators/core/pass_through_calculator.h里面的定义,这个Calculator被Helloworld这个例子使用,作用就是把输入的数据直接传递到输出,不做任何处理,类似NOP
class PassThroughCalculator : public CalculatorBase 


Stream

  Stream 就是 Caculator 之间的连接起来后,形成的一个数据流动路径。



Side packets

  Side packets 可以直接理解为一些静态的数据packet,在graph创建之后就不会改变的数据。

… …





自定义实现Calculator


  Talk is cheap, show me the code.

/** @Description: * @Author: Sky* @Date: * @LastEditors: Sky* @LastEditTime: * @Github: */
#include <cstdio>
#include "mediapipe/framework/calculator_graph.h"
#include "mediapipe/framework/port/logging.h"
#include "mediapipe/framework/port/parse_text_proto.h"
#include "mediapipe/framework/port/status.h"//customer calculator
#include "mediapipe/framework/calculator_framework.h"
#include "mediapipe/framework/port/canonical_errors.h"class CustomerDataType{public:CustomerDataType(int i, float f, bool b, const std::string & str):val_i(i),val_f(f),val_b(b),s_str(str){}int val_i = 1;float val_f = 11.f;bool val_b = true;std::string s_str = "customer str.";
};namespace mediapipe {class MyStringProcessCalculator : public CalculatorBase {public:/*Calculator authors can specify the expected types of inputs and outputs of a calculator in GetContract(). When a graph is initialized, the framework calls a static method to verify if the packet types of the connected inputs and outputs match the information in this specification.*/static absl::Status GetContract(CalculatorContract* cc) {/*class InputStreamShard;typedef internal::Collection<InputStreamShard> InputStreamShardSet;class OutputStreamShard;typedef internal::Collection<OutputStreamShard> OutputStreamShardSet;*///cc->Inputs().NumEntries() returns the number of input streams// if (!cc->Inputs().TagMap()->SameAs(*cc->Outputs().TagMap())) {//   return absl::InvalidArgumentError("Input and output streams's TagMap can't be same.");// }//set stream// for (CollectionItemId id = cc->Inputs().BeginId(); id < cc->Inputs().EndId(); ++id) {//   cc->Inputs().Get(id).SetAny();//   cc->Outputs().Get(id).SetSameAs(&cc->Inputs().Get(id));// }cc->Inputs().Index(0).SetAny();cc->Inputs().Index(1).Set<CustomerDataType>();cc->Outputs().Index(0).SetSameAs(&cc->Inputs().Index(0));//set stream package// for (CollectionItemId id = cc->InputSidePackets().BeginId(); id < cc->InputSidePackets().EndId(); ++id) {//   cc->InputSidePackets().Get(id).SetAny();// }// cc->InputSidePackets().Index(0).SetAny();// cc->InputSidePackets().Index(1).Set<CustomerDataType>();//set customer data-typeif (cc->OutputSidePackets().NumEntries() != 0) {// if (!cc->InputSidePackets().TagMap()->SameAs(*cc->OutputSidePackets().TagMap())) {//   return absl::InvalidArgumentError("Input and output side packets's TagMap can't be same.");// }// for (CollectionItemId id = cc->InputSidePackets().BeginId(); id < cc->InputSidePackets().EndId(); ++id) {//   cc->OutputSidePackets().Get(id).SetSameAs(&cc->InputSidePackets().Get(id));// }cc->OutputSidePackets().Index(0).SetSameAs(&cc->InputSidePackets().Index(0));}      return absl::OkStatus();}absl::Status Open(CalculatorContext* cc) final {for (CollectionItemId id = cc->Inputs().BeginId();id < cc->Inputs().EndId(); ++id) {if (!cc->Inputs().Get(id).Header().IsEmpty()) {cc->Outputs().Get(id).SetHeader(cc->Inputs().Get(id).Header());}}if (cc->OutputSidePackets().NumEntries() != 0) {for (CollectionItemId id = cc->InputSidePackets().BeginId(); id < cc->InputSidePackets().EndId(); ++id) {cc->OutputSidePackets().Get(id).Set(cc->InputSidePackets().Get(id));}}// Sets this packet timestamp offset for Packets going to all outputs.// If you only want to set the offset for a single output stream then// use OutputStream::SetOffset() directly.cc->SetOffset(TimestampDiff(0));return absl::OkStatus();}absl::Status Process(CalculatorContext* cc) final {if (cc->Inputs().NumEntries() == 0) {return tool::StatusStop();}//get node input datamediapipe::Packet  _data0 = cc->Inputs().Index(0).Value();mediapipe::Packet  _data1 = cc->Inputs().Index(1).Value();//not safety.char _tmp_buf[1024];::memset(_tmp_buf, 0, 1024);snprintf(_tmp_buf, 1024, _data0.Get<std::string>().c_str(), _data1.Get<CustomerDataType>().val_i, _data1.Get<CustomerDataType>().val_f, _data1.Get<CustomerDataType>().val_b, _data1.Get<CustomerDataType>().s_str.c_str());std::string _out_data = _tmp_buf;cc->Outputs().Index(0).AddPacket(MakePacket<std::string>(_out_data).At(cc->InputTimestamp()));return absl::OkStatus();}absl::Status Close(CalculatorContext* cc) final { return absl::OkStatus(); }};REGISTER_CALCULATOR(MyStringProcessCalculator);
}namespace mediapipe {absl::Status  RunMyGraph() {// Configures a simple graph, which concatenates 2 PassThroughCalculators.CalculatorGraphConfig config = ParseTextProtoOrDie<CalculatorGraphConfig>(R"(input_stream: "in"input_stream: "customer_in"output_stream: "out"node {calculator: "PassThroughCalculator"input_stream: "in"output_stream: "out1"}node {calculator: "MyStringProcessCalculator"input_stream: "out1"input_stream: "customer_in"output_stream: "out2"        }      node {calculator: "PassThroughCalculator"input_stream: "out2"output_stream: "out"})");LOG(INFO)<<"parse graph cfg-str done ... ...";CalculatorGraph graph;MP_RETURN_IF_ERROR(graph.Initialize(config));LOG(INFO)<<"init graph done ... ...";ASSIGN_OR_RETURN(OutputStreamPoller poller,graph.AddOutputStreamPoller("out"));LOG(INFO)<<"add out-node to output-streampoller done ... ...";MP_RETURN_IF_ERROR(graph.StartRun({}));LOG(INFO)<<"start run graph done ... ...";// Give 10 input packets that contains the same std::string "Hello World!".for (int i = 0; i < 10; ++i) {MP_RETURN_IF_ERROR(graph.AddPacketToInputStream("in", MakePacket<std::string>("CustomerCalculator: val_i %d, val_f %f, val_b %d, val_str %s").At(Timestamp(i))));MP_RETURN_IF_ERROR(graph.AddPacketToInputStream("customer_in", MakePacket<CustomerDataType>(i, i + 1.f, i%2==0, "s" + std::to_string(i)).At(Timestamp(i))));}// Close the input stream "in".MP_RETURN_IF_ERROR(graph.CloseInputStream("in"));MP_RETURN_IF_ERROR(graph.CloseInputStream("customer_in"));mediapipe::Packet packet;// Get the output packets std::string.while (poller.Next(&packet)) {LOG(INFO) << packet.Get<std::string>();}LOG(INFO)<<"RunGraph Done";return graph.WaitUntilDone();}}  // namespace mediapipeint main(int argc, char** argv) {gflags::ParseCommandLineFlags(&argc, &argv, true);FLAGS_minloglevel = 0;FLAGS_stderrthreshold = 0;FLAGS_alsologtostderr = 1;google::InitGoogleLogging(argv[0]);LOG(INFO) << "glog init success ... ...";absl::Status run_status = mediapipe::RunMyGraph();if (!run_status.ok())LOG(ERROR) << "Failed to run the graph: " << run_status.message();google::ShutdownGoogleLogging();return 0;
}

下面简单介绍这段代码。



自定义Calculator:MyStringProcessCalculator

   这里自定义了一个Calculator,主要作用就是传入snprintf的fmt字符串和fmt字符串所需要的数据。所以可以看到有两个输入,一个是string,一个是我自定义的data-type。输出是一个格式化之后的字符串,所以输出是string。

   自定义Calculator主要还是实现4个接口,分别是GetContract,Open,Process,Close。其中GetContract是Graph初始化的时候,检查Calculator用的。Open接口是在Graph开始后,对Calculator做一些初始化工作,例如设定一些Calculator初始状态等。Process是实际的Calculator功能。

namespace mediapipe {class MyStringProcessCalculator : public CalculatorBase {public:/*Calculator authors can specify the expected types of inputs and outputs of a calculator in GetContract(). When a graph is initialized, the framework calls a static method to verify if the packet types of the connected inputs and outputs match the information in this specification.*/static absl::Status GetContract(CalculatorContract* cc) {/*class InputStreamShard;typedef internal::Collection<InputStreamShard> InputStreamShardSet;class OutputStreamShard;typedef internal::Collection<OutputStreamShard> OutputStreamShardSet;*///cc->Inputs().NumEntries() returns the number of input streams// if (!cc->Inputs().TagMap()->SameAs(*cc->Outputs().TagMap())) {//   return absl::InvalidArgumentError("Input and output streams's TagMap can't be same.");// }//set stream// for (CollectionItemId id = cc->Inputs().BeginId(); id < cc->Inputs().EndId(); ++id) {//   cc->Inputs().Get(id).SetAny();//   cc->Outputs().Get(id).SetSameAs(&cc->Inputs().Get(id));// }cc->Inputs().Index(0).SetAny();cc->Inputs().Index(1).Set<CustomerDataType>();cc->Outputs().Index(0).SetSameAs(&cc->Inputs().Index(0));//set stream package// for (CollectionItemId id = cc->InputSidePackets().BeginId(); id < cc->InputSidePackets().EndId(); ++id) {//   cc->InputSidePackets().Get(id).SetAny();// }// cc->InputSidePackets().Index(0).SetAny();// cc->InputSidePackets().Index(1).Set<CustomerDataType>();//set customer data-typeif (cc->OutputSidePackets().NumEntries() != 0) {// if (!cc->InputSidePackets().TagMap()->SameAs(*cc->OutputSidePackets().TagMap())) {//   return absl::InvalidArgumentError("Input and output side packets's TagMap can't be same.");// }// for (CollectionItemId id = cc->InputSidePackets().BeginId(); id < cc->InputSidePackets().EndId(); ++id) {//   cc->OutputSidePackets().Get(id).SetSameAs(&cc->InputSidePackets().Get(id));// }cc->OutputSidePackets().Index(0).SetSameAs(&cc->InputSidePackets().Index(0));}      return absl::OkStatus();}absl::Status Open(CalculatorContext* cc) final {for (CollectionItemId id = cc->Inputs().BeginId();id < cc->Inputs().EndId(); ++id) {if (!cc->Inputs().Get(id).Header().IsEmpty()) {cc->Outputs().Get(id).SetHeader(cc->Inputs().Get(id).Header());}}if (cc->OutputSidePackets().NumEntries() != 0) {for (CollectionItemId id = cc->InputSidePackets().BeginId(); id < cc->InputSidePackets().EndId(); ++id) {cc->OutputSidePackets().Get(id).Set(cc->InputSidePackets().Get(id));}}// Sets this packet timestamp offset for Packets going to all outputs.// If you only want to set the offset for a single output stream then// use OutputStream::SetOffset() directly.cc->SetOffset(TimestampDiff(0));return absl::OkStatus();}//这里是整个Calculator的核心,就是调用snprintfabsl::Status Process(CalculatorContext* cc) final {if (cc->Inputs().NumEntries() == 0) {return tool::StatusStop();}//get node input datamediapipe::Packet  _data0 = cc->Inputs().Index(0).Value();mediapipe::Packet  _data1 = cc->Inputs().Index(1).Value();//not safety.char _tmp_buf[1024];::memset(_tmp_buf, 0, 1024);snprintf(_tmp_buf, 1024, _data0.Get<std::string>().c_str(), _data1.Get<CustomerDataType>().val_i, _data1.Get<CustomerDataType>().val_f, _data1.Get<CustomerDataType>().val_b, _data1.Get<CustomerDataType>().s_str.c_str());std::string _out_data = _tmp_buf;cc->Outputs().Index(0).AddPacket(MakePacket<std::string>(_out_data).At(cc->InputTimestamp()));return absl::OkStatus();}absl::Status Close(CalculatorContext* cc) final { return absl::OkStatus(); }};REGISTER_CALCULATOR(MyStringProcessCalculator);
}


然后开始编译运行得到结果

   编译。


# 注意,这里的--check_visibility=false 为了关闭bazel关于target之间的可见性检查,因为我的Calculator自定义放在我自己的目录的,有一个target对这个目录不可见,编译会报错。bazel build -c dbg --define MEDIAPIPE_DISABLE_GPU=1 --copt -DMESA_EGL_NO_X11_HEADERS --copt -DEGL_NO_X11 my_target --check_visibility=false --verbose_failures  --local_cpu_resources=1

   然后运行。得到如下图的结果。

result




后记


  好了,一个超级简单的自定义calculator已经实现了,相信你已经明白了吧。本系列也就此终结吧,以后随缘更新。




打赏、订阅、收藏、丢香蕉、硬币,请关注公众号(攻城狮的搬砖之路)
qrc_img

PS: 请尊重原创,不喜勿喷。

PS: 要转载请注明出处,本人版权所有。

PS: 有问题请留言,看到后我会第一时间回复。

这篇关于Mediapipe 在RK3399PRO上的初探(二)(自定义Calculator)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1065757

相关文章

【前端学习】AntV G6-08 深入图形与图形分组、自定义节点、节点动画(下)

【课程链接】 AntV G6:深入图形与图形分组、自定义节点、节点动画(下)_哔哩哔哩_bilibili 本章十吾老师讲解了一个复杂的自定义节点中,应该怎样去计算和绘制图形,如何给一个图形制作不间断的动画,以及在鼠标事件之后产生动画。(有点难,需要好好理解) <!DOCTYPE html><html><head><meta charset="UTF-8"><title>06

自定义类型:结构体(续)

目录 一. 结构体的内存对齐 1.1 为什么存在内存对齐? 1.2 修改默认对齐数 二. 结构体传参 三. 结构体实现位段 一. 结构体的内存对齐 在前面的文章里我们已经讲过一部分的内存对齐的知识,并举出了两个例子,我们再举出两个例子继续说明: struct S3{double a;int b;char c;};int mian(){printf("%zd\n",s

Spring 源码解读:自定义实现Bean定义的注册与解析

引言 在Spring框架中,Bean的注册与解析是整个依赖注入流程的核心步骤。通过Bean定义,Spring容器知道如何创建、配置和管理每个Bean实例。本篇文章将通过实现一个简化版的Bean定义注册与解析机制,帮助你理解Spring框架背后的设计逻辑。我们还将对比Spring中的BeanDefinition和BeanDefinitionRegistry,以全面掌握Bean注册和解析的核心原理。

Oracle type (自定义类型的使用)

oracle - type   type定义: oracle中自定义数据类型 oracle中有基本的数据类型,如number,varchar2,date,numeric,float....但有时候我们需要特殊的格式, 如将name定义为(firstname,lastname)的形式,我们想把这个作为一个表的一列看待,这时候就要我们自己定义一个数据类型 格式 :create or repla

HTML5自定义属性对象Dataset

原文转自HTML5自定义属性对象Dataset简介 一、html5 自定义属性介绍 之前翻译的“你必须知道的28个HTML5特征、窍门和技术”一文中对于HTML5中自定义合法属性data-已经做过些介绍,就是在HTML5中我们可以使用data-前缀设置我们需要的自定义属性,来进行一些数据的存放,例如我们要在一个文字按钮上存放相对应的id: <a href="javascript:" d

一步一步将PlantUML类图导出为自定义格式的XMI文件

一步一步将PlantUML类图导出为自定义格式的XMI文件 说明: 首次发表日期:2024-09-08PlantUML官网: https://plantuml.com/zh/PlantUML命令行文档: https://plantuml.com/zh/command-line#6a26f548831e6a8cPlantUML XMI文档: https://plantuml.com/zh/xmi

argodb自定义函数读取hdfs文件的注意点,避免FileSystem已关闭异常

一、问题描述 一位同学反馈,他写的argo存过中调用了一个自定义函数,函数会加载hdfs上的一个文件,但有些节点会报FileSystem closed异常,同时有时任务会成功,有时会失败。 二、问题分析 argodb的计算引擎是基于spark的定制化引擎,对于自定义函数的调用跟hive on spark的是一致的。udf要通过反射生成实例,然后迭代调用evaluate。通过代码分析,udf在

鸿蒙开发中实现自定义弹窗 (CustomDialog)

效果图 #思路 创建带有 @CustomDialog 修饰的组件 ,并且在组件内部定义controller: CustomDialogController 实例化CustomDialogController,加载组件,open()-> 打开对话框 , close() -> 关闭对话框 #定义弹窗 (CustomDialog)是什么? CustomDialog是自定义弹窗,可用于广告、中

mybatis框架基础以及自定义插件开发

文章目录 框架概览框架预览MyBatis框架的核心组件MyBatis框架的工作原理MyBatis框架的配置MyBatis框架的最佳实践 自定义插件开发1. 添加依赖2. 创建插件类3. 配置插件4. 启动类中注册插件5. 测试插件 参考文献 框架概览 MyBatis是一个优秀的持久层框架,它支持自定义SQL、存储过程以及高级映射,为开发者提供了极大的灵活性和便利性。以下是关于M

vue2实践:第一个非正规的自定义组件-动态表单对话框

前言 vue一个很重要的概念就是组件,作为一个没有经历过前几代前端开发的我来说,不太能理解它所带来的“进步”,但是,将它与后端c++、java类比,我感觉,组件就像是这些语言中的类和对象的概念,通过封装好的组件(类),可以通过挂载的方式,非常方便的调用其提供的功能,而不必重新写一遍实现逻辑。 我们常用的element UI就是由饿了么所提供的组件库,但是在项目开发中,我们可能还需要额外地定义一