Upsert api写s3的流程源码分析

2024-03-05 23:52
文章标签 分析 源码 流程 api s3 upsert

本文主要是介绍Upsert api写s3的流程源码分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Upsert api写s3的流程

milvus版本:v2.3.2

实现:先insert再delete,并限制不能修改主键列。

整体架构:

在这里插入图片描述

Upsert 的数据流向

在这里插入图片描述

upsert写入s3的流程

upsert先insert,再delete。从proxy的execute()方法可以看出。

func (it *upsertTask) Execute(ctx context.Context) (err error) {ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Upsert-Execute")defer sp.End()log := log.Ctx(ctx).With(zap.String("collectionName", it.req.CollectionName))tr := timerecord.NewTimeRecorder(fmt.Sprintf("proxy execute upsert %d", it.ID()))// 拿到stream,类型为msgstream.mqMsgStreamstream, err := it.chMgr.getOrCreateDmlStream(it.collectionID)if err != nil {return err}// 创建msgPackmsgPack := &msgstream.MsgPack{BeginTs: it.BeginTs(),EndTs:   it.EndTs(),}// 添加insertMsgPackerr = it.insertExecute(ctx, msgPack)if err != nil {log.Warn("Fail to insertExecute", zap.Error(err))return err}// 添加deleteMsgPackerr = it.deleteExecute(ctx, msgPack)if err != nil {log.Warn("Fail to deleteExecute", zap.Error(err))return err}tr.RecordSpan()// 发送数据至mqerr = stream.Produce(msgPack)if err != nil {it.result.Status = merr.Status(err)return err}sendMsgDur := tr.RecordSpan()metrics.ProxySendMutationReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.UpsertLabel).Observe(float64(sendMsgDur.Milliseconds()))totalDur := tr.ElapseSpan()log.Debug("Proxy Upsert Execute done", zap.Int64("taskID", it.ID()),zap.Duration("total duration", totalDur))return nil
}

将insertmsg和deletemsg加入msgPack,然后datanode进行消费。

insert和delete流程分别可以参考对应写入s3的流程。产生insertlog和deletelog。

// Operate handles input messages, implementing flowgrpah.Node
func (ddn *ddNode) Operate(in []Msg) []Msg {......// 遍历msMsgfor _, msg := range msMsg.TsMessages() {switch msg.Type() {case commonpb.MsgType_DropCollection:......case commonpb.MsgType_DropPartition:......// 处理insert消息case commonpb.MsgType_Insert:......// 处理delete消息case commonpb.MsgType_Delete:......}}......
}

s3文件

upsert结合了insert和delete操作。因此在s3对应的文件也即insert和delete对应的文件。

主要涉及delta_log和stats_log。

insert:

files/insert_log/{collectionID}/{partitionID}/{segmentID}/{fieldID}/{logidx}
files/stats_log/{collectionID}/{partitionID}/{segmentID}/{fieldID}/1(flushed)
files/stats_log/{collectionID}/{partitionID}/{segmentID}/{fieldID}/{logidx}(not flushed)

delete:

files/delta_log/{collID}/{partID}/{segmentID}/{logID}
files/stats_log/{collID}/{partID}/{segmentID}/{fieldID}/1(flushed)
files/stats_log/{collID}/{partID}/{segmentID}/{fieldID}/{logID}(not flushed)

这篇关于Upsert api写s3的流程源码分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/778227

相关文章

Java调用DeepSeek API的最佳实践及详细代码示例

《Java调用DeepSeekAPI的最佳实践及详细代码示例》:本文主要介绍如何使用Java调用DeepSeekAPI,包括获取API密钥、添加HTTP客户端依赖、创建HTTP请求、处理响应、... 目录1. 获取API密钥2. 添加HTTP客户端依赖3. 创建HTTP请求4. 处理响应5. 错误处理6.

Linux流媒体服务器部署流程

《Linux流媒体服务器部署流程》文章详细介绍了流媒体服务器的部署步骤,包括更新系统、安装依赖组件、编译安装Nginx和RTMP模块、配置Nginx和FFmpeg,以及测试流媒体服务器的搭建... 目录流媒体服务器部署部署安装1.更新系统2.安装依赖组件3.解压4.编译安装(添加RTMP和openssl模块

Springboot中分析SQL性能的两种方式详解

《Springboot中分析SQL性能的两种方式详解》文章介绍了SQL性能分析的两种方式:MyBatis-Plus性能分析插件和p6spy框架,MyBatis-Plus插件配置简单,适用于开发和测试环... 目录SQL性能分析的两种方式:功能介绍实现方式:实现步骤:SQL性能分析的两种方式:功能介绍记录

0基础租个硬件玩deepseek,蓝耘元生代智算云|本地部署DeepSeek R1模型的操作流程

《0基础租个硬件玩deepseek,蓝耘元生代智算云|本地部署DeepSeekR1模型的操作流程》DeepSeekR1模型凭借其强大的自然语言处理能力,在未来具有广阔的应用前景,有望在多个领域发... 目录0基础租个硬件玩deepseek,蓝耘元生代智算云|本地部署DeepSeek R1模型,3步搞定一个应

Deepseek R1模型本地化部署+API接口调用详细教程(释放AI生产力)

《DeepseekR1模型本地化部署+API接口调用详细教程(释放AI生产力)》本文介绍了本地部署DeepSeekR1模型和通过API调用将其集成到VSCode中的过程,作者详细步骤展示了如何下载和... 目录前言一、deepseek R1模型与chatGPT o1系列模型对比二、本地部署步骤1.安装oll

最长公共子序列问题的深度分析与Java实现方式

《最长公共子序列问题的深度分析与Java实现方式》本文详细介绍了最长公共子序列(LCS)问题,包括其概念、暴力解法、动态规划解法,并提供了Java代码实现,暴力解法虽然简单,但在大数据处理中效率较低,... 目录最长公共子序列问题概述问题理解与示例分析暴力解法思路与示例代码动态规划解法DP 表的构建与意义动

浅析如何使用Swagger生成带权限控制的API文档

《浅析如何使用Swagger生成带权限控制的API文档》当涉及到权限控制时,如何生成既安全又详细的API文档就成了一个关键问题,所以这篇文章小编就来和大家好好聊聊如何用Swagger来生成带有... 目录准备工作配置 Swagger权限控制给 API 加上权限注解查看文档注意事项在咱们的开发工作里,API

一分钟带你上手Python调用DeepSeek的API

《一分钟带你上手Python调用DeepSeek的API》最近DeepSeek非常火,作为一枚对前言技术非常关注的程序员来说,自然都想对接DeepSeek的API来体验一把,下面小编就来为大家介绍一下... 目录前言免费体验API-Key申请首次调用API基本概念最小单元推理模型智能体自定义界面总结前言最

JAVA调用Deepseek的api完成基本对话简单代码示例

《JAVA调用Deepseek的api完成基本对话简单代码示例》:本文主要介绍JAVA调用Deepseek的api完成基本对话的相关资料,文中详细讲解了如何获取DeepSeekAPI密钥、添加H... 获取API密钥首先,从DeepSeek平台获取API密钥,用于身份验证。添加HTTP客户端依赖使用Jav

springboot启动流程过程

《springboot启动流程过程》SpringBoot简化了Spring框架的使用,通过创建`SpringApplication`对象,判断应用类型并设置初始化器和监听器,在`run`方法中,读取配... 目录springboot启动流程springboot程序启动入口1.创建SpringApplicat