Pipeline流水线组件

2024-06-13 20:36
文章标签 组件 流水线 pipeline

本文主要是介绍Pipeline流水线组件,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

    • 1、新建pipeline流水线
    • 2、定义处理器
    • 3、定义处理器上下文
    • 4、pipeline流水线实现
    • 5、处理器抽象类实现
    • 6、pipeline流水线构建者
    • 7、具体处理器实现
    • 8、流水线测试
    • 9、运行结果

1、新建pipeline流水线

package com.summer.toolkit.model.chain;import java.util.List;
import java.util.concurrent.Executor;public interface Pipeline<T> {/*** 向pipeline中添加一个执行器** @param handler 执行器* @return 返回pipeline对象*/Pipeline<T> addLast(Handler<T> handler);/*** 向pipeline中添加一个执行器** @param name    执行器名称* @param handler 执行器* @return 返回pipeline对象*/Pipeline<T> addLast(String name, Handler<T> handler);/*** pipeline执行** @param list 数据集合* @return 返回值,执行完成返回true*/boolean execute(List<T> list);/*** pipeline并行执行** @param list     数据集合* @param executor 线程池* @return 返回值,执行完成返回true*/boolean parallelExecute(List<T> list, Executor executor);/*** pipeline执行** @param object 单个数据* @return 返回值,执行完成返回true*/boolean execute(T object);}

2、定义处理器

package com.summer.toolkit.model.chain;public interface Handler<T> {/*** 处理器处理方法** @param handlerContext 上下文* @param t              要处理的数据*/void doHandler(HandlerContext<T> handlerContext, T t);}

3、定义处理器上下文

package com.summer.toolkit.model.chain;import lombok.Data;@Data
public class HandlerContext<T> {/*** 执行器名称 */private String name;/*** 执行器 */private Handler<T> handler;/*** 链表的下一个节点,用来保存下一个执行器 */public HandlerContext<T> next;public HandlerContext(Handler<T> handler) {this.name = handler.getClass().getName();this.handler = handler;}public HandlerContext(String name, Handler<T> handler) {this.name = name;this.handler = handler;}/*** 调用该方法即调用上下文中处理器的执行方法** @param t 需要处理的数据*/public void handle(T t) {this.handler.doHandler(this, t);}/*** 执行下一个节点的处理器** @param t 待执行的数据*/public void runNext(T t) {if (this.next != null) {this.next.handle(t);}}
}

4、pipeline流水线实现

package com.summer.toolkit.model.chain;import com.summer.toolkit.util.CollectionUtils;
import com.summer.toolkit.util.StringUtils;
import lombok.extern.slf4j.Slf4j;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;@Slf4j
public class DefaultPipeline<T> implements Pipeline<T> {/*** 默认pipeline中有一个处理器上下文的头结点* 头结点无处理逻辑,直接执行下一个节点的处理器*/HandlerContext<T> head = new HandlerContext<>(HandlerContext::runNext);@Overridepublic Pipeline<T> addLast(Handler<T> handler) {this.addLast(null, handler);return this;}@Overridepublic Pipeline<T> addLast(String name, Handler<T> handler) {if (handler == null) {log.warn("处理器为空,不进行添加!");return this;}if (StringUtils.isEmpty(name)) {name = handler.getClass().getName();}// 将处理器添加到处理器上下文的尾节点HandlerContext<T> context = head;while (context.next != null) {context = context.next;}context.next = new HandlerContext<T>(name, handler);return this;}@Overridepublic boolean execute(List<T> list) {List<Object> result = list.stream().peek(this::execute).collect(Collectors.toList());return true;}@Overridepublic boolean parallelExecute(List<T> list, Executor executor) {Map<String, List<T>> parts = this.split(list);List<CompletableFuture<Boolean>> results = new ArrayList<>();for (Map.Entry<String, List<T>> entry : parts.entrySet()) {CompletableFuture<Boolean> completableFuture = CompletableFuture// 提交任务.supplyAsync(() -> this.execute(entry.getValue()), executor)// 打印异常信息.exceptionally(e -> {log.error("并行处理数据时发生异常!{}", e.getMessage(), e);return Boolean.FALSE;});results.add(completableFuture);}CompletableFuture.allOf(results.toArray(new CompletableFuture[0])).join();return true;}@Overridepublic boolean execute(T t) {this.head.handle(t);return true;}/*** 对集合进行分组拆分** @param list 集合* @return 返回值*/private Map<String, List<T>> split(List<T> list) {Map<String, List<T>> parts = new HashMap<>(8);if (CollectionUtils.isEmpty(list)) {return parts;}// 如果集合数量过少,则不进行分组int limit = 10;if (list.size() < limit) {String key = String.valueOf(0);parts.put(key, list);return parts;}// 固定分五个分组int group = 5;for (int i = 0, length = list.size(); i < length; i++) {int key = i % group;List<T> part = parts.computeIfAbsent(String.valueOf(key), k -> new ArrayList<>());T t = list.get(i);part.add(t);}return parts;}}

5、处理器抽象类实现

package com.summer.toolkit.model.chain;import lombok.extern.slf4j.Slf4j;@Slf4j
public abstract class AbstractHandler<T> implements Handler<T> {/*** 开始处理数据,通用方法** @param handlerContext 上下文* @param t              要处理的数据*/@Overridepublic void doHandler(HandlerContext<T> handlerContext, T t) {long start = System.currentTimeMillis();String threadName = Thread.currentThread().getName();String handlerName = handlerContext.getName();log.info("====={} 开始处理:{}=====", threadName, handlerName);try {// 此处处理异常,如果执行过程失败,则继续执行下一个handlerthis.handle(t);} catch (Throwable throwable) {log.error("====={} 处理异常:{},异常原因:{}=====", threadName, handlerName, throwable.getMessage(), throwable);this.handleException(t, throwable);}long end = System.currentTimeMillis();log.info("====={} 处理完成:{},耗时:{} 毫秒=====", threadName, handlerName, (end - start));// 处理完该上下文中的处理器逻辑后,调用上下文中的下一个执行器的执行方法handlerContext.runNext(t);}/*** 处理数据抽象方法,由子类实现具体细节** @param t 对象*/public abstract void handle(T t);/*** 处理数据抽象方法,由子类实现具体细节** @param t         对象* @param throwable 异常对象*/public void handleException(T t, Throwable throwable) {log.error("=====处理数据发生异常:{}", throwable.getMessage(), throwable);}}

6、pipeline流水线构建者

package com.summer.toolkit.model.chain;public class DefaultPipelineBuilder<T> {private final Pipeline<T> pipeline;public DefaultPipelineBuilder() {this.pipeline = new DefaultPipeline<>();}/*** 向pipeline中添加一个执行器** @param handler 执行器* @return 返回pipeline对象*/public DefaultPipelineBuilder<T> addLast(Handler<T> handler) {pipeline.addLast(handler);return this;}/*** 向pipeline中添加一个执行器** @param name 执行器名称* @return 返回pipeline对象*/public DefaultPipelineBuilder<T> addLast(String name, Handler<T> handler) {pipeline.addLast(name, handler);return this;}/*** 返回pipeline对象** @return 返回值*/public Pipeline<T> build() {return this.pipeline;}}

7、具体处理器实现

package com.summer.toolkit.model.chain;import lombok.extern.slf4j.Slf4j;import java.util.Objects;@Slf4j
public class StringHandler extends AbstractHandler<String> {@Overridepublic void handle(String s) {log.info("入参:{}", s);}@Overridepublic void handleException(String s, Throwable throwable) {if (Objects.nonNull(throwable)) {log.error("异常:{}", throwable.getMessage());}}}

8、流水线测试

package com.summer.toolkit.model;import com.summer.toolkit.model.chain.DefaultPipelineBuilder;
import com.summer.toolkit.model.chain.Pipeline;
import com.summer.toolkit.model.chain.StringHandler;public class Processor {public static void main(String[] args) {DefaultPipelineBuilder<String> builder = new DefaultPipelineBuilder<>();Pipeline<String> pipeline = builder.addLast("字符串信息", new StringHandler()).addLast("寄件人信息", new StringHandler()).addLast("收件人信息", new StringHandler()).build();pipeline.execute("1");}}

9、运行结果

20:03:00.285 [main] INFO com.summer.toolkit.model.chain.AbstractHandler - =====main 开始处理:字符串信息=====
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.StringHandler - 入参:1
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.AbstractHandler - =====main 处理完成:字符串信息,耗时:5 毫秒=====
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.AbstractHandler - =====main 开始处理:寄件人信息=====
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.StringHandler - 入参:1
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.AbstractHandler - =====main 处理完成:寄件人信息,耗时:0 毫秒=====
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.AbstractHandler - =====main 开始处理:收件人信息=====
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.StringHandler - 入参:1
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.AbstractHandler - =====main 处理完成:收件人信息,耗时:0 毫秒=====

这篇关于Pipeline流水线组件的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1058361

相关文章

vue解决子组件样式覆盖问题scoped deep

《vue解决子组件样式覆盖问题scopeddeep》文章主要介绍了在Vue项目中处理全局样式和局部样式的方法,包括使用scoped属性和深度选择器(/deep/)来覆盖子组件的样式,作者建议所有组件... 目录前言scoped分析deep分析使用总结所有组件必须加scoped父组件覆盖子组件使用deep前言

基于Qt Qml实现时间轴组件

《基于QtQml实现时间轴组件》时间轴组件是现代用户界面中常见的元素,用于按时间顺序展示事件,本文主要为大家详细介绍了如何使用Qml实现一个简单的时间轴组件,需要的可以参考下... 目录写在前面效果图组件概述实现细节1. 组件结构2. 属性定义3. 数据模型4. 事件项的添加和排序5. 事件项的渲染如何使用

JS常用组件收集

收集了一些平时遇到的前端比较优秀的组件,方便以后开发的时候查找!!! 函数工具: Lodash 页面固定: stickUp、jQuery.Pin 轮播: unslider、swiper 开关: switch 复选框: icheck 气泡: grumble 隐藏元素: Headroom

如何在页面调用utility bar并传递参数至lwc组件

1.在app的utility item中添加lwc组件: 2.调用utility bar api的方式有两种: 方法一,通过lwc调用: import {LightningElement,api ,wire } from 'lwc';import { publish, MessageContext } from 'lightning/messageService';import Ca

如何使用Ansible实现CI/CD流水线的自动化

如何使用Ansible实现CI/CD流水线的自动化 持续集成(CI)和持续交付(CD)是现代软件开发过程中的核心实践,它们帮助团队更快地交付高质量的软件。Ansible,作为一个强大的自动化工具,可以在CI/CD流水线中发挥关键作用。本文将详细介绍如何使用Ansible实现CI/CD流水线的自动化,包括设计流水线的结构、配置管理、自动化测试、部署、以及集成Ansible与CI/CD工具(如Jen

vue2 组件通信

props + emits props:用于接收父组件传递给子组件的数据。可以定义期望从父组件接收的数据结构和类型。‘子组件不可更改该数据’emits:用于定义组件可以向父组件发出的事件。这允许父组件监听子组件的事件并作出响应。(比如数据更新) props检查属性 属性名类型描述默认值typeFunction指定 prop 应该是什么类型,如 String, Number, Boolean,

kubelet组件的启动流程源码分析

概述 摘要: 本文将总结kubelet的作用以及原理,在有一定基础认识的前提下,通过阅读kubelet源码,对kubelet组件的启动流程进行分析。 正文 kubelet的作用 这里对kubelet的作用做一个简单总结。 节点管理 节点的注册 节点状态更新 容器管理(pod生命周期管理) 监听apiserver的容器事件 容器的创建、删除(CRI) 容器的网络的创建与删除

火语言RPA流程组件介绍--浏览网页

🚩【组件功能】:浏览器打开指定网址或本地html文件 配置预览 配置说明 网址URL 支持T或# 默认FLOW输入项 输入需要打开的网址URL 超时时间 支持T或# 打开网页超时时间 执行后后等待时间(ms) 支持T或# 当前组件执行完成后继续等待的时间 UserAgent 支持T或# User Agent中文名为用户代理,简称 UA,它是一个特殊字符串头,使得服务器

Go并发模型:流水线模型

Go作为一个实用主义的编程语言,非常注重性能,在语言特性上天然支持并发,Go并发模型有多种模式,通过流水线模型系列文章,你会更好的使用Go的并发特性,提高的程序性能。 这篇文章主要介绍流水线模型的流水线概念,后面文章介绍流水线模型的FAN-IN和FAN-OUT,最后介绍下如何合理的关闭流水线的协程。 Golang的并发核心思路 Golang并发核心思路是关注数据流动。数据流动的过程交给cha

vue 父组件调用子组件的方法报错,“TypeError: Cannot read property ‘subDialogRef‘ of undefined“

vue 父组件调用子组件的方法报错,“TypeError: Cannot read property ‘subDialogRef’ of undefined” 最近用vue做的一个界面,引入了一个子组件,在父组件中调用子组件的方法时,报错提示: [Vue warn]: Error in v-on handler: “TypeError: Cannot read property ‘methods