AI大模型编写多线程并发框架(六十五):发布和应用

2024-09-03 22:20

本文主要是介绍AI大模型编写多线程并发框架(六十五):发布和应用,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

系列文章目录


文章目录

  • 系列文章目录
  • 前言
    • 一、项目背景
    • 二、第十三轮对话-优化传参
    • 三、第十四轮对话-释放资源
    • 四、完善所有单元测试
    • 五、验证通过
    • 六、发布
    • 七、参考文章


前言

在这个充满技术创新的时代,AI大模型正成为开发者们的新宠。它们可以帮助我们完成从简单的问答到复杂的编程任务,所以AI编程将会是未来的主流方向,利用AI大模型的能力,本系列文章将介绍从零到一用AI大模型编写一个多线程并发框架。

一、项目背景

经过前面系列文章,我们基本完成这个多线程并发框架的搭建,目前只剩下一些小细节需要稍微处理一下,就基本可以发布上线。

本多线程框架使用示例如下:源码地址
1、引入依赖。

<dependency><groupId>io.github.vipjoey</groupId><artifactId>mmc-juc</artifactId><version>1.1</version>
</dependency>

2、使用示例。


// 创建一个MmcTaskExecutor实例,用于执行单次长耗时任务
// 下面是创建一个计算从1加到100的任务,总共100个任务,采用fork分治算法,阈值为10,总共任务为100 / 10 * 2 = 20个大任务,执行速率约为10/s
MmcTaskExecutor<Integer, Integer> mmcTaskExecutor = MmcTaskExecutor.<Integer, Integer>builder().taskSource(IntStream.rangeClosed(1, 100).boxed().collect(Collectors.toList())) // 设置任务源.taskProcessor(x -> x.stream().reduce(0, Integer::sum)) // 设置任务处理方法.taskMerger(Integer::sum) // 设置结果处理方法(可选).threshold(10) // 设置任务处理阈值(可选).taskName("mmcTaskExample") // 设置任务名称.rateLimiter(10, 20)  // 设置速率限制,容量为10,每秒产生令牌为20,休眠时间为10ms.forkJoinPoolConcurrency(4) // 设置ForkJoinPool的并发度为4.build();// 同步执行并打印结果        
System.out.println("result: " + mmcTaskExecutor.execute());// 任务执行过程监控
[mmcTaskExample] Tasks submitted. Total tasks: 100
[mmcTaskExample] Task started. Completed tasks: 14, remaining tasks: 86
[mmcTaskExample] Task started. Completed tasks: 14, remaining tasks: 86
[mmcTaskExample] Task started. Completed tasks: 20, remaining tasks: 80
[mmcTaskExample] Task started. Completed tasks: 26, remaining tasks: 74
[mmcTaskExample] Task started. Completed tasks: 33, remaining tasks: 67
[mmcTaskExample] Task started. Completed tasks: 40, remaining tasks: 60
[mmcTaskExample] Task started. Completed tasks: 46, remaining tasks: 54
[mmcTaskExample] Task started. Completed tasks: 52, remaining tasks: 48
[mmcTaskExample] Task started. Completed tasks: 58, remaining tasks: 42
[mmcTaskExample] Task started. Completed tasks: 64, remaining tasks: 36
[mmcTaskExample] Task started. Completed tasks: 70, remaining tasks: 30
[mmcTaskExample] Task started. Completed tasks: 76, remaining tasks: 24
[mmcTaskExample] Task started. Completed tasks: 82, remaining tasks: 18
[mmcTaskExample] Task started. Completed tasks: 88, remaining tasks: 12
[mmcTaskExample] Task started. Completed tasks: 94, remaining tasks: 6
[mmcTaskExample] Task started. Completed tasks: 100, remaining tasks: 0
[mmcTaskExample] Tasks completed. Elapsed time: 1107 ms 
result: 5050

二、第十三轮对话-优化传参

1、上文提到,可以多次往MmcTaskExecutor提交任务,但明显每次构建MmcTask显得比较笨重,我们需要优化一下。
在这里插入图片描述

2、以下是AI的回复。

  • 【为了满足您的要求,我们将修改MmcTaskExecutor类的submitTask方法,以在提交任务时检查MmcTask的构造参数。如果MmcTask自身具有相应的参数,则使用它们,否则将MmcTaskExecutor的参数赋值给MmcTask。如果两者都为空,则抛出异常。在计算出最终的taskSource后,我们将实例化TaskRuntime。】
    // 检查并设置MmcTask的参数private void checkAndSet(MmcTask<T, R> mmcTask) {// 检查MmcTask的构造参数,如果为空,则使用MmcTaskExecutor的参数List<T> taskSource = mmcTask.getTaskSource() != null ? mmcTask.getTaskSource() : this.taskSource;MmcTaskProcessor<T, R> taskProcessor = mmcTask.getTaskProcessor() != null ? mmcTask.getTaskProcessor() : this.taskProcessor;MmcTaskMerger<R> taskMerger = mmcTask.getTaskMerger() != null ? mmcTask.getTaskMerger() : this.taskMerger;RateLimiter rateLimiter = mmcTask.getRateLimiter() != null ? mmcTask.getRateLimiter() : this.rateLimiter;MmcTaskListener taskListener = mmcTask.getTaskListener() != null ? mmcTask.getTaskListener() : this.taskListener;String taskName = mmcTask.getTaskName() != null ? mmcTask.getTaskName() : this.taskName;int threshold = mmcTask.getThreshold() > 0 ? mmcTask.getThreshold() : this.threshold;// 如果参数为空,则抛出异常Objects.requireNonNull(taskSource, "TaskSource cannot be null.");Objects.requireNonNull(taskProcessor, "TaskProcessor cannot be null.");Objects.requireNonNull(taskMerger, "TaskMerger cannot be null.");Objects.requireNonNull(rateLimiter, "RateLimiter cannot be null.");Objects.requireNonNull(taskListener, "TaskListener cannot be null.");Objects.requireNonNull(taskName, "TaskName cannot be null.");// 将最终参数赋值给MmcTaskTaskRuntime taskRuntime = new TaskRuntime(mmcTask.getTaskName(), mmcTask.getTaskSource().size());mmcTask.setTaskSource(taskSource);mmcTask.setTaskProcessor(taskProcessor);mmcTask.setTaskMerger(taskMerger);mmcTask.setRateLimiter(rateLimiter);mmcTask.setTaskListener(taskListener);mmcTask.setTaskName(taskName);mmcTask.setStart(0);mmcTask.setEnd(taskSource.size());mmcTask.setTaskRuntime(taskRuntime);mmcTask.setThreshold(threshold);}
  • 【接下来,我们需要修改MmcTaskExecutor类去初始化MmcTask。】
import java.util.List;
import java.util.concurrent.ForkJoinPool;public class MmcTaskExecutor<T, R> {// ...省略其他成员变量和方法// 同步执行并返回结果@Overridepublic R execute() {MmcTask<T, R> mmcTask = new MmcTask.Builder<T, R>().taskSource(taskSource).taskProcessor(taskProcessor).taskMerger(taskMerger).threshold(threshold).rateLimiter(rateLimiter).taskListener(taskListener).taskName(taskName).build();return execute(mmcTask);}// 同步执行并返回结果@Overridepublic R execute(MmcTask<T, R> mmcTask) {// 检查MmcTask的构造参数,如果为空,则使用MmcTaskExecutor的参数checkAndSet(mmcTask);long startTime = System.currentTimeMillis();mmcTask.setTaskRuntime(mmcTask.getTaskRuntime());// 调用onTasksSubmitted方法taskListener.onTasksSubmitted(mmcTask.getTaskRuntime());R result = forkJoinPool.invoke(mmcTask);// 调用onTasksCompleted方法long elapsedTime = System.currentTimeMillis() - startTime;taskListener.onTasksCompleted(mmcTask.getTaskRuntime(), elapsedTime);return result;}// 异步执行@Overridepublic void commit() {commit((result -> {}));}// 异步执行并获取结果@Overridepublic void commit(MmcTaskCallback<R> callback) {MmcTask<T, R> mmcTask = new MmcTask.Builder<T, R>().taskSource(taskSource).taskProcessor(taskProcessor).taskMerger(taskMerger).threshold(threshold).rateLimiter(rateLimiter).taskListener(taskListener).taskName(taskName).build();commit(mmcTask, callback);}// 异步执行并获取结果@Overridepublic void commit(MmcTask<T, R> mmcTask, MmcTaskCallback<R> callback) {// 检查MmcTask的构造参数,如果为空,则使用MmcTaskExecutor的参数checkAndSet(mmcTask);long startTime = System.currentTimeMillis();// 调用onTasksSubmitted方法taskListener.onTasksSubmitted(mmcTask.getTaskRuntime());forkJoinPool.submit(() -> {R result = mmcTask.invoke();// 调用onTasksCompleted方法long elapsedTime = System.currentTimeMillis() - startTime;taskListener.onTasksCompleted(mmcTask.getTaskRuntime(), elapsedTime);if (callback != null) {callback.onComplete(result);}});}
}

三、第十四轮对话-释放资源

1、增加关闭方法,释放所有资源。
在这里插入图片描述

2、以下是AI的回复。

  • 【为了满足您的要求,我们将为MmcTaskExecutor类添加一个名为shutdown的方法。该方法将关闭ForkJoinPool,以便不再接收新任务,并释放所有资源。】
import java.util.List;
import java.util.concurrent.ForkJoinPool;public class MmcTaskExecutor<T, R> {private ForkJoinPool forkJoinPool;// ...省略其他成员变量和方法// 关闭任务容器,不再接收任务,释放所有资源public void shutdown() {forkJoinPool.shutdown();}
}
  • 【现在,您可以通过调用MmcTaskExecutor的shutdown方法来关闭任务容器。这将导致不再接收新任务,并释放所有资源。注意,在调用shutdown方法后,您将无法再提交新任务。如果您需要再次提交任务,请重新创建一个新的MmcTaskExecutor实例。】

四、完善所有单元测试

1、完善所有异步和同步提交任务的单元测试。

package com.mmc.juc;import static org.junit.Assert.assertEquals;import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.Test;public class MmcTaskExecutorTest {@Testpublic void testMmcTaskExecutor() {// 创建一个任务源,包含从1到100的整数List<Integer> taskSource = IntStream.rangeClosed(1, 100).boxed().collect(Collectors.toList());// 创建一个任务处理器,将每个整数相加MmcTaskProcessor<Integer, Integer> taskProcessor = (integer) -> integer.stream().mapToInt(Integer::intValue).sum();// 创建一个任务合并器,将所有整数的和合并MmcTaskMerger<Integer> taskMerger = Integer::sum;// 创建一个任务监听器,打印任务执行过程中的相关信息MmcTaskListener taskListener = new DefaultMmcTaskListener();// 创建一个MmcTaskExecutor实例MmcTaskExecutor<Integer, Integer> mmcTaskExecutor = MmcTaskExecutor.<Integer, Integer>builder().taskSource(taskSource).taskProcessor(taskProcessor).taskMerger(taskMerger).taskListener(taskListener).rateLimiter(10, 10) // 每秒处理10个任务.taskName("testMmcTaskExecutor").build();// 执行任务并获取结果Integer result = mmcTaskExecutor.execute();// 验证结果是否正确(1到100的和等于5050)assertEquals(5050, result.intValue());}@Testpublic void testMmcTaskExecutorWithTask() {// 创建一个任务源,包含从1到100的整数List<Integer> taskSource = IntStream.rangeClosed(1, 100).boxed().collect(Collectors.toList());// 创建一个MmcTaskExecutor实例MmcTaskExecutor<Integer, Integer> mmcTaskExecutor = MmcTaskExecutor.<Integer, Integer>builder().taskProcessor(x -> x.stream().reduce(0, Integer::sum)).taskMerger(Integer::sum).rateLimiter(new TokenBucket(10, 20)).taskListener(new DefaultMmcTaskListener()).build();Integer r = mmcTaskExecutor.execute(MmcTask.<Integer, Integer>builder().taskSource(taskSource).taskName("testMmcTaskExecutorWithTask").build());System.out.println("result: " + r);}@Testpublic void testCommit() throws InterruptedException {// 创建一个任务源,包含从1到100的整数List<Integer> taskSource = IntStream.rangeClosed(1, 100).boxed().collect(Collectors.toList());// 创建一个任务处理器,将每个整数相加MmcTaskProcessor<Integer, Integer> taskProcessor = (integer) -> integer.stream().mapToInt(Integer::intValue).sum();// 创建一个任务合并器,将所有整数的和合并MmcTaskMerger<Integer> taskMerger = Integer::sum;// 创建一个任务监听器,打印任务执行过程中的相关信息MmcTaskListener taskListener = new DefaultMmcTaskListener();// 创建一个MmcTaskExecutor实例MmcTaskExecutor<Integer, Integer> mmcTaskExecutor = MmcTaskExecutor.<Integer, Integer>builder().taskSource(taskSource).taskProcessor(taskProcessor).taskMerger(taskMerger).taskListener(taskListener).rateLimiter(10, 10) // 每秒处理10个任务.taskName("testCommit").build();mmcTaskExecutor.commit();TimeUnit.SECONDS.sleep(3); // 等待异步任务完成}@Testpublic void testCommitWithCallback() throws InterruptedException {// 创建一个任务源,包含从1到100的整数List<Integer> taskSource = IntStream.rangeClosed(1, 100).boxed().collect(Collectors.toList());// 创建一个任务处理器,将每个整数相加MmcTaskProcessor<Integer, Integer> taskProcessor = (integer) -> integer.stream().mapToInt(Integer::intValue).sum();// 创建一个任务合并器,将所有整数的和合并MmcTaskMerger<Integer> taskMerger = Integer::sum;// 创建一个任务监听器,打印任务执行过程中的相关信息MmcTaskListener taskListener = new DefaultMmcTaskListener();// 创建一个MmcTaskExecutor实例MmcTaskExecutor<Integer, Integer> mmcTaskExecutor = MmcTaskExecutor.<Integer, Integer>builder().taskSource(taskSource).taskProcessor(taskProcessor).taskMerger(taskMerger).taskListener(taskListener).rateLimiter(10, 10) // 每秒处理10个任务.taskName("testCommitWithCallback").build();mmcTaskExecutor.commit(result -> {assertEquals(5050, result.intValue());});}@Testpublic void testCommitWithCustomMmcTaskAndCallback() throws InterruptedException {// 创建一个任务源,包含从1到100的整数List<Integer> taskSource = IntStream.rangeClosed(1, 100).boxed().collect(Collectors.toList());// 创建一个MmcTaskExecutor实例MmcTaskExecutor<Integer, Integer> mmcTaskExecutor = MmcTaskExecutor.<Integer, Integer>builder().taskProcessor(x -> x.stream().reduce(0, Integer::sum)).taskMerger(Integer::sum).taskListener(new DefaultMmcTaskListener()).build();mmcTaskExecutor.commit(MmcTask.<Integer, Integer>builder().taskName("testCommitWithCustomMmcTaskAndCallback").rateLimiter(new TokenBucket(20, 20)).taskSource(taskSource).build(), result -> {assertEquals(5050, result.intValue());System.out.println("result: " + result);});}
}

五、验证通过

1、我们运行一下单元测试,看看效果。
在这里插入图片描述
很完美!整个框架运行正常!

六、发布

参考搭建大型分布式服务(四十三)SpringBoot 无代码侵入实现多Kafka数据源发布到Maven中央仓库:让世界看到你的作品! 这篇文章,将这个多线程并发框架发布到中央仓库。

<dependency><groupId>io.github.vipjoey</groupId><artifactId>mmc-juc</artifactId><version>1.1</version>
</dependency>

可能,有小伙伴有疑问,这个多线程框架不会只能用来计算1到100这些简单的任务吧?当然不是,下一篇,我们将整合到kafka框架,让它的执行速度飞起来!
《搭建大型分布式服务(四十四)SpringBoot 无代码侵入实现多Kafka数据源:单分区提升至十万级消费速度!》

七、参考文章

  • 《AI大模型编写多线程并发框架(六十一):从零开始搭建框架》
  • 《AI大模型编写多线程并发框架(六十二):限流和并发度优化》
  • 《AI大模型编写多线程并发框架(六十三):监听器优化·上》
  • 《AI大模型编写多线程并发框架(六十四):监听器优化·下》
  • 《AI大模型编写多线程并发框架(六十五):发布和应用》

加我加群(备注csdn)一起交流学习!更多干货下载、项目源码和大厂内推等着你

这篇关于AI大模型编写多线程并发框架(六十五):发布和应用的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1134205

相关文章

Ilya-AI分享的他在OpenAI学习到的15个提示工程技巧

Ilya(不是本人,claude AI)在社交媒体上分享了他在OpenAI学习到的15个Prompt撰写技巧。 以下是详细的内容: 提示精确化:在编写提示时,力求表达清晰准确。清楚地阐述任务需求和概念定义至关重要。例:不用"分析文本",而用"判断这段话的情感倾向:积极、消极还是中性"。 快速迭代:善于快速连续调整提示。熟练的提示工程师能够灵活地进行多轮优化。例:从"总结文章"到"用

AI绘图怎么变现?想做点副业的小白必看!

在科技飞速发展的今天,AI绘图作为一种新兴技术,不仅改变了艺术创作的方式,也为创作者提供了多种变现途径。本文将详细探讨几种常见的AI绘图变现方式,帮助创作者更好地利用这一技术实现经济收益。 更多实操教程和AI绘画工具,可以扫描下方,免费获取 定制服务:个性化的创意商机 个性化定制 AI绘图技术能够根据用户需求生成个性化的头像、壁纸、插画等作品。例如,姓氏头像在电商平台上非常受欢迎,

大模型研发全揭秘:客服工单数据标注的完整攻略

在人工智能(AI)领域,数据标注是模型训练过程中至关重要的一步。无论你是新手还是有经验的从业者,掌握数据标注的技术细节和常见问题的解决方案都能为你的AI项目增添不少价值。在电信运营商的客服系统中,工单数据是客户问题和解决方案的重要记录。通过对这些工单数据进行有效标注,不仅能够帮助提升客服自动化系统的智能化水平,还能优化客户服务流程,提高客户满意度。本文将详细介绍如何在电信运营商客服工单的背景下进行

中文分词jieba库的使用与实景应用(一)

知识星球:https://articles.zsxq.com/id_fxvgc803qmr2.html 目录 一.定义: 精确模式(默认模式): 全模式: 搜索引擎模式: paddle 模式(基于深度学习的分词模式): 二 自定义词典 三.文本解析   调整词出现的频率 四. 关键词提取 A. 基于TF-IDF算法的关键词提取 B. 基于TextRank算法的关键词提取

水位雨量在线监测系统概述及应用介绍

在当今社会,随着科技的飞速发展,各种智能监测系统已成为保障公共安全、促进资源管理和环境保护的重要工具。其中,水位雨量在线监测系统作为自然灾害预警、水资源管理及水利工程运行的关键技术,其重要性不言而喻。 一、水位雨量在线监测系统的基本原理 水位雨量在线监测系统主要由数据采集单元、数据传输网络、数据处理中心及用户终端四大部分构成,形成了一个完整的闭环系统。 数据采集单元:这是系统的“眼睛”,

从去中心化到智能化:Web3如何与AI共同塑造数字生态

在数字时代的演进中,Web3和人工智能(AI)正成为塑造未来互联网的两大核心力量。Web3的去中心化理念与AI的智能化技术,正相互交织,共同推动数字生态的变革。本文将探讨Web3与AI的融合如何改变数字世界,并展望这一新兴组合如何重塑我们的在线体验。 Web3的去中心化愿景 Web3代表了互联网的第三代发展,它基于去中心化的区块链技术,旨在创建一个开放、透明且用户主导的数字生态。不同于传统

AI一键生成 PPT

AI一键生成 PPT 操作步骤 作为一名打工人,是不是经常需要制作各种PPT来分享我的生活和想法。但是,你们知道,有时候灵感来了,时间却不够用了!😩直到我发现了Kimi AI——一个能够自动生成PPT的神奇助手!🌟 什么是Kimi? 一款月之暗面科技有限公司开发的AI办公工具,帮助用户快速生成高质量的演示文稿。 无论你是职场人士、学生还是教师,Kimi都能够为你的办公文

高效+灵活,万博智云全球发布AWS无代理跨云容灾方案!

摘要 近日,万博智云推出了基于AWS的无代理跨云容灾解决方案,并与拉丁美洲,中东,亚洲的合作伙伴面向全球开展了联合发布。这一方案以AWS应用环境为基础,将HyperBDR平台的高效、灵活和成本效益优势与无代理功能相结合,为全球企业带来实现了更便捷、经济的数据保护。 一、全球联合发布 9月2日,万博智云CEO Michael Wong在线上平台发布AWS无代理跨云容灾解决方案的阐述视频,介绍了

csu 1446 Problem J Modified LCS (扩展欧几里得算法的简单应用)

这是一道扩展欧几里得算法的简单应用题,这题是在湖南多校训练赛中队友ac的一道题,在比赛之后请教了队友,然后自己把它a掉 这也是自己独自做扩展欧几里得算法的题目 题意:把题意转变下就变成了:求d1*x - d2*y = f2 - f1的解,很明显用exgcd来解 下面介绍一下exgcd的一些知识点:求ax + by = c的解 一、首先求ax + by = gcd(a,b)的解 这个

Andrej Karpathy最新采访:认知核心模型10亿参数就够了,AI会打破教育不公的僵局

夕小瑶科技说 原创  作者 | 海野 AI圈子的红人,AI大神Andrej Karpathy,曾是OpenAI联合创始人之一,特斯拉AI总监。上一次的动态是官宣创办一家名为 Eureka Labs 的人工智能+教育公司 ,宣布将长期致力于AI原生教育。 近日,Andrej Karpathy接受了No Priors(投资博客)的采访,与硅谷知名投资人 Sara Guo 和 Elad G