分布式流式计算框架vortex使用介绍

2024-04-15 18:18

本文主要是介绍分布式流式计算框架vortex使用介绍,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

vortex是一款轻量级的分布式流式计算框架。vortex中文意为旋涡,代表着数据流不断地流入这个旋涡然后被平稳地输出。
vortex属于内存计算型的流式框架,适用于高可用,高并发,实时计算的业务场景。
vortex是基于SpringBoot框架之上开发的,它依赖微服务分布式协作框架tridenter实现集群特性,vortex微服务内嵌了独立的TCP服务器(默认通过Netty4实现),vortex微服务集群中的应用程序通过tridenter多播功能相互发现并建立长连接,实现高可用,去中心化和负载均衡,使得整个Spring应用程序集群具备实时计算的能力。

vortex项目一共包含3部分:

  1. vortex-common
    vortex框架的agent端jar包
  2. vortex-spring-boot-starter
    vortex框架的核心jar包,添加到SpringBoot应用使其成为vortex服务端
  3. vortex-metrics
    基于vortex的分布式时序计算框架,它是vortex重要的独立子项目

服务端安装

<dependency><groupId>com.github.paganini2008.atlantis</groupId><artifactId>vortex-spring-boot-starter</artifactId><version>1.0-RC2</version>
</dependency>

agent端安装

<dependency><groupId>com.github.paganini2008.atlantis</groupId><artifactId>vortex-common</artifactId><version>1.0-RC2</version>
</dependency>

目前基于vortex框架的开源项目有两个:

  1. 分布式微服务监控系统 Jellyfish
  2. 分布式时序计算框架 Vortex Metrics
  3. 分布式网络爬虫Greenfinger

如何在你的应用中使用vortex的API
前面说过,vortex服务端接收数据,vortex agent端发送数据,vortex提供了HTTP和TCP两种协议来接收和发送外部数据。

  1. vortex服务端要实现Handler接口实现定制,比如:
@Slf4j
public class TestHandler implements Handler{@Overridepublic void onData(Tuple tuple) {log.info(tuple.toString());}}
  1. 而agent端通过TransportClient实现类来发送数据

下面以jellyfish中日志收集模块为例,参考源码:
服务端:

public class Slf4jHandler implements Handler {private static final String TOPIC_NAME = "slf4j";@Autowiredprivate IdGenerator idGenerator;@Autowiredprivate LogEntryService logEntryService;@Value("${atlantis.framework.jellyfish.handler.interferedCharacter:}")private String interferedCharacterRegex;@Overridepublic void onData(Tuple tuple) {LogEntry logEntry = new LogEntry();logEntry.setId(idGenerator.generateId());logEntry.setClusterName(tuple.getField("clusterName", String.class));logEntry.setApplicationName(tuple.getField("applicationName", String.class));logEntry.setHost(tuple.getField("host", String.class));logEntry.setIdentifier(tuple.getField("identifier", String.class));logEntry.setLoggerName(tuple.getField("loggerName", String.class));logEntry.setMessage(tuple.getField("message", String.class));logEntry.setLevel(tuple.getField("level", String.class));logEntry.setReason(tuple.getField("reason", String.class));logEntry.setMarker(tuple.getField("marker", String.class));logEntry.setCreateTime(tuple.getField("timestamp", Long.class));if (StringUtils.isNotBlank(interferedCharacterRegex)) {logEntry.setMessage(logEntry.getMessage().replaceAll(interferedCharacterRegex, ""));logEntry.setReason(logEntry.getReason().replaceAll(interferedCharacterRegex, ""));}logEntryService.bulkSaveLogEntries(logEntry);}@Overridepublic String getTopic() {return TOPIC_NAME;}}

Agent端, 你需要自己实现一个Agent端, 向vortex服务端不断发送数据,可参考jellyfish-slf4j的TransportClientAppenderBase.java源码:

    @Overrideprotected void append(ILoggingEvent eventObject) {if (transportClient == null) {return;}Tuple tuple = Tuple.newOne(GLOBAL_TOPIC_NAME);tuple.setField("clusterName", clusterName);tuple.setField("applicationName", applicationName);tuple.setField("host", host);tuple.setField("identifier", identifier);tuple.setField("loggerName", eventObject.getLoggerName());String msg = eventObject.getFormattedMessage();tuple.setField("message", msg);tuple.setField("level", eventObject.getLevel().toString());String reason = ThrowableProxyUtil.asString(eventObject.getThrowableProxy());tuple.setField("reason", reason);tuple.setField("marker", eventObject.getMarker() != null ? eventObject.getMarker().getName() : "");tuple.setField("timestamp", eventObject.getTimeStamp());Map<String, String> mdc = eventObject.getMDCPropertyMap();if (MapUtils.isNotEmpty(mdc)) {tuple.append(mdc);}transportClient.write(tuple);}

说明一下,vortex服务端和agent端的交互数据可以是Map, Tuple对象或json字符串,但最终都被包装成Tuple对象

具体使用,可参考vortex的源码: https://github.com/paganini2008/vortex.git

这篇关于分布式流式计算框架vortex使用介绍的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/906558

相关文章

使用Python构建一个Hexo博客发布工具

《使用Python构建一个Hexo博客发布工具》虽然Hexo的命令行工具非常强大,但对于日常的博客撰写和发布过程,我总觉得缺少一个直观的图形界面来简化操作,下面我们就来看看如何使用Python构建一个... 目录引言Hexo博客系统简介设计需求技术选择代码实现主框架界面设计核心功能实现1. 发布文章2. 加

shell编程之函数与数组的使用详解

《shell编程之函数与数组的使用详解》:本文主要介绍shell编程之函数与数组的使用,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录shell函数函数的用法俩个数求和系统资源监控并报警函数函数变量的作用范围函数的参数递归函数shell数组获取数组的长度读取某下的

使用Python开发一个带EPUB转换功能的Markdown编辑器

《使用Python开发一个带EPUB转换功能的Markdown编辑器》Markdown因其简单易用和强大的格式支持,成为了写作者、开发者及内容创作者的首选格式,本文将通过Python开发一个Markd... 目录应用概览代码结构与核心组件1. 初始化与布局 (__init__)2. 工具栏 (setup_t

深入理解Apache Kafka(分布式流处理平台)

《深入理解ApacheKafka(分布式流处理平台)》ApacheKafka作为现代分布式系统中的核心中间件,为构建高吞吐量、低延迟的数据管道提供了强大支持,本文将深入探讨Kafka的核心概念、架构... 目录引言一、Apache Kafka概述1.1 什么是Kafka?1.2 Kafka的核心概念二、Ka

Python虚拟环境终极(含PyCharm的使用教程)

《Python虚拟环境终极(含PyCharm的使用教程)》:本文主要介绍Python虚拟环境终极(含PyCharm的使用教程),具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,... 目录一、为什么需要虚拟环境?二、虚拟环境创建方式对比三、命令行创建虚拟环境(venv)3.1 基础命令3

Python Transformer 库安装配置及使用方法

《PythonTransformer库安装配置及使用方法》HuggingFaceTransformers是自然语言处理(NLP)领域最流行的开源库之一,支持基于Transformer架构的预训练模... 目录python 中的 Transformer 库及使用方法一、库的概述二、安装与配置三、基础使用:Pi

关于pandas的read_csv方法使用解读

《关于pandas的read_csv方法使用解读》:本文主要介绍关于pandas的read_csv方法使用,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录pandas的read_csv方法解读read_csv中的参数基本参数通用解析参数空值处理相关参数时间处理相关

使用Node.js制作图片上传服务的详细教程

《使用Node.js制作图片上传服务的详细教程》在现代Web应用开发中,图片上传是一项常见且重要的功能,借助Node.js强大的生态系统,我们可以轻松搭建高效的图片上传服务,本文将深入探讨如何使用No... 目录准备工作搭建 Express 服务器配置 multer 进行图片上传处理图片上传请求完整代码示例

SpringBoot条件注解核心作用与使用场景详解

《SpringBoot条件注解核心作用与使用场景详解》SpringBoot的条件注解为开发者提供了强大的动态配置能力,理解其原理和适用场景是构建灵活、可扩展应用的关键,本文将系统梳理所有常用的条件注... 目录引言一、条件注解的核心机制二、SpringBoot内置条件注解详解1、@ConditionalOn

Python中使用正则表达式精准匹配IP地址的案例

《Python中使用正则表达式精准匹配IP地址的案例》Python的正则表达式(re模块)是完成这个任务的利器,但你知道怎么写才能准确匹配各种合法的IP地址吗,今天我们就来详细探讨这个问题,感兴趣的朋... 目录为什么需要IP正则表达式?IP地址的基本结构基础正则表达式写法精确匹配0-255的数字验证IP地