Disruptor 实践:整合到现有的爬虫框架

2024-01-08 22:58

本文主要是介绍Disruptor 实践:整合到现有的爬虫框架,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

640?wx_fmt=jpeg


一. Disruptor

Disruptor 是一个高性能的异步处理框架。

Disruptor 是 LMAX 在线交易平台的关键组成部分,LMAX平台使用该框架对订单处理速度能达到600万TPS,除金融领域之外,其他一般的应用中都可以用到Disruptor,它可以带来显著的性能提升。其实 Disruptor 与其说是一个框架,不如说是一种设计思路,这个设计思路对于存在“并发、缓冲区、生产者—消费者模型、事务处理”这些元素的程序来说,Disruptor提出了一种大幅提升性能(TPS)的方案。

二. 实践

NetDiscovery 是基于 Vert.x、RxJava 2 等框架实现的爬虫框架。

NetDiscovery 默认的消息队列采用 JDK 的 ConcurrentLinkedQueue,由于爬虫框架各个组件都可以被替换,所以下面基于 Disruptor 实现爬虫的 Queue。

2.1 事件的封装

将爬虫的 request 封装成一个 RequestEvent,该事件会在 Disruptor 中传输。

 
  1. import com.cv4j.netdiscovery.core.domain.Request;

  2. import lombok.Data;

  3. /**

  4. * Created by tony on 2018/9/1.

  5. */

  6. @Data

  7. public class RequestEvent {

  8.    private Request request;

  9.    public String toString() {

  10.        return request.toString();

  11.    }

  12. }

2.2 发布事件

下面编写事件的发布,从 RingBuffer 中获取下一个可写入事件的序号,将爬虫要请求的 request 设置到 RequestEvent 事件中,最后将事件提交到 RingBuffer。

 
  1. import com.cv4j.netdiscovery.core.domain.Request;

  2. import com.lmax.disruptor.RingBuffer;

  3. import java.util.concurrent.atomic.AtomicInteger;

  4. /**

  5. * Created by tony on 2018/9/2.

  6. */

  7. public class Producer {

  8.    private final RingBuffer<RequestEvent> ringBuffer;

  9.    private AtomicInteger count = new AtomicInteger(0); // 计数器

  10.    public Producer(RingBuffer<RequestEvent> ringBuffer) {

  11.        this.ringBuffer = ringBuffer;

  12.    }

  13.    public void pushData(Request request){

  14.        long sequence = ringBuffer.next();

  15.        try{

  16.            RequestEvent event = ringBuffer.get(sequence);

  17.            event.setRequest(request);

  18.        }finally {

  19.            ringBuffer.publish(sequence);

  20.            count.incrementAndGet();

  21.        }

  22.    }

  23.    /**

  24.     * 发送到队列中到Request的数量

  25.     * @return

  26.     */

  27.    public int getCount() {

  28.        return count.get();

  29.    }

  30. }

2.3 消费事件

RequestEvent 设置了 request 之后,消费者需要处理具体的事件。下面的 Consumer 仅仅是记录消费者的线程名称以及 request。真正的“消费”还是需要从 DisruptorQueue 的 poll() 中获取 request ,然后在 Spider 中进行“消费”。

 
  1. import com.lmax.disruptor.WorkHandler;

  2. import lombok.extern.slf4j.Slf4j;

  3. import java.util.concurrent.atomic.AtomicInteger;

  4. /**

  5. * Created by tony on 2018/9/2.

  6. */

  7. @Slf4j

  8. public class Consumer implements WorkHandler<RequestEvent> {

  9.    @Override

  10.    public void onEvent(RequestEvent requestEvent) throws Exception {

  11.        log.info("consumer:" + Thread.currentThread().getName() + " requestEvent: value=" + requestEvent.toString());

  12.    }

  13. }

2.4 DisruptorQueue 的实现

Disruptor 支持单生产者单消费者、多生产者、多消费者、分组等方式。

在 NetDiscovery 中采用多生产者多消费者。

在 RingBuffer 创建时,ProducerType 使用 MULTI 类型表示多生产者。创建 RingBuffer 采用了 YieldingWaitStrategy 。YieldingWaitStrategy 是一种WaitStrategy,不同的 WaitStrategy 会有不同的性能。

YieldingWaitStrategy 性能是最好的,适合用于低延迟的系统。在要求极高性能且事件处理线数小于CPU逻辑核心数的场景中,推荐使用此策略;例如,CPU开启超线程的特性。

 
  1.        ringBuffer = RingBuffer.create(ProducerType.MULTI,

  2.                new EventFactory<RequestEvent>() {

  3.                    @Override

  4.                    public RequestEvent newInstance() {

  5.                        return new RequestEvent();

  6.                    }

  7.                },

  8.                ringBufferSize ,

  9.                new YieldingWaitStrategy());

EventProcessor 用于处理 Disruptor 中的事件。

EventProcessor 的实现类包括:BatchEventProcessor 用于单线程批量处理事件,WorkProcessor 用于多线程处理事件。

WorkerPool 管理着一组 WorkProcessor。创建完 ringBuffer 之后,创建 workerPool:

 
  1.        SequenceBarrier barriers = ringBuffer.newBarrier();

  2.        for (int i = 0; i < consumers.length; i++) {

  3.            consumers[i] = new Consumer();

  4.        }

  5.        workerPool = new WorkerPool<RequestEvent>(ringBuffer,

  6.                        barriers,

  7.                        new EventExceptionHandler(),

  8.                        consumers);

启动 workerPool:

 
  1.        ringBuffer.addGatingSequences(workerPool.getWorkerSequences());

  2.        workerPool.start(Executors.newFixedThreadPool(threadNum));

最后是 DisruptorQueue 完整的代码:

 
  1. import com.cv4j.netdiscovery.core.domain.Request;

  2. import com.cv4j.netdiscovery.core.queue.AbstractQueue;

  3. import com.lmax.disruptor.*;

  4. import com.lmax.disruptor.dsl.ProducerType;

  5. import lombok.extern.slf4j.Slf4j;

  6. import java.util.concurrent.Executors;

  7. import java.util.concurrent.atomic.AtomicInteger;

  8. /**

  9. * Created by tony on 2018/9/1.

  10. */

  11. @Slf4j

  12. public class DisruptorQueue extends AbstractQueue {

  13.    private RingBuffer<RequestEvent> ringBuffer;

  14.    private Consumer[] consumers = null;

  15.    private Producer producer = null;

  16.    private WorkerPool<RequestEvent> workerPool = null;

  17.    private int ringBufferSize = 1024*1024; // RingBuffer 大小,必须是 2 的 N 次方

  18.    private AtomicInteger consumerCount = new AtomicInteger(0);

  19.    private static final int CONSUME_NUM = 2;

  20.    private static final int THREAD_NUM = 4;

  21.    public DisruptorQueue() {

  22.        this(CONSUME_NUM,THREAD_NUM);

  23.    }

  24.    public DisruptorQueue(int consumerNum,int threadNum) {

  25.        consumers = new Consumer[consumerNum];

  26.        //创建ringBuffer

  27.        ringBuffer = RingBuffer.create(ProducerType.MULTI,

  28.                new EventFactory<RequestEvent>() {

  29.                    @Override

  30.                    public RequestEvent newInstance() {

  31.                        return new RequestEvent();

  32.                    }

  33.                },

  34.                ringBufferSize ,

  35.                new YieldingWaitStrategy());

  36.        SequenceBarrier barriers = ringBuffer.newBarrier();

  37.        for (int i = 0; i < consumers.length; i++) {

  38.            consumers[i] = new Consumer();

  39.        }

  40.        workerPool = new WorkerPool<RequestEvent>(ringBuffer,

  41.                        barriers,

  42.                        new EventExceptionHandler(),

  43.                        consumers);

  44.        ringBuffer.addGatingSequences(workerPool.getWorkerSequences());

  45.        workerPool.start(Executors.newFixedThreadPool(threadNum));

  46.        producer = new Producer(ringBuffer);

  47.    }

  48.    @Override

  49.    protected void pushWhenNoDuplicate(Request request) {

  50.        producer.pushData(request);

  51.        try {

  52.            Thread.sleep(100);

  53.        } catch (InterruptedException e) {

  54.            e.printStackTrace();

  55.        }

  56.    }

  57.    @Override

  58.    public Request poll(String spiderName) {

  59.        Request request = ringBuffer.get(ringBuffer.getCursor() - producer.getCount() +1).getRequest();

  60.        ringBuffer.next();

  61.        consumerCount.incrementAndGet();

  62.        return request;

  63.    }

  64.    @Override

  65.    public int getLeftRequests(String spiderName) {

  66.        return producer.getCount()-consumerCount.get();

  67.    }

  68.    public int getTotalRequests(String spiderName) {

  69.        return super.getTotalRequests(spiderName);

  70.    }

  71.    static class EventExceptionHandler implements ExceptionHandler {

  72.        public void handleEventException(Throwable ex, long sequence, Object event) {

  73.            log.debug("handleEventException:" + ex);

  74.        }

  75.        public void handleOnStartException(Throwable ex) {

  76.            log.debug("handleOnStartException:" + ex);

  77.        }

  78.        public void handleOnShutdownException(Throwable ex) {

  79.            log.debug("handleOnShutdownException:" + ex);

  80.        }

  81.    }

  82. }

其中,pushWhenNoDuplicate() 是将 request 发送到 ringBuffer 中。poll() 是从 ringBuffer 中取出对应的 request ,用于爬虫进行网络请求、解析请求等处理。

总结:

爬虫框架 github 地址:https://github.com/fengzhizi715/NetDiscovery

上述代码是比较经典的 Disruptor 多生产者多消费者的代码,亦可作为样板代码使用。

最后,在爬虫框架是面向接口编程的,所以替换其中的任意组件都比较方便。

该系列的相关文章:

从API到DSL —— 使用 Kotlin 特性为爬虫框架进一步封装

使用Kotlin Coroutines简单改造原有的爬虫框架

为爬虫框架构建Selenium模块、DSL模块(Kotlin实现)

基于Vert.x和RxJava 2构建通用的爬虫框架


关注【Java与Android技术栈】

更多精彩内容请关注扫码

640?wx_fmt=jpeg


这篇关于Disruptor 实践:整合到现有的爬虫框架的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/585124

相关文章

Java调用DeepSeek API的最佳实践及详细代码示例

《Java调用DeepSeekAPI的最佳实践及详细代码示例》:本文主要介绍如何使用Java调用DeepSeekAPI,包括获取API密钥、添加HTTP客户端依赖、创建HTTP请求、处理响应、... 目录1. 获取API密钥2. 添加HTTP客户端依赖3. 创建HTTP请求4. 处理响应5. 错误处理6.

golang内存对齐的项目实践

《golang内存对齐的项目实践》本文主要介绍了golang内存对齐的项目实践,内存对齐不仅有助于提高内存访问效率,还确保了与硬件接口的兼容性,是Go语言编程中不可忽视的重要优化手段,下面就来介绍一下... 目录一、结构体中的字段顺序与内存对齐二、内存对齐的原理与规则三、调整结构体字段顺序优化内存对齐四、内

Spring Boot整合log4j2日志配置的详细教程

《SpringBoot整合log4j2日志配置的详细教程》:本文主要介绍SpringBoot项目中整合Log4j2日志框架的步骤和配置,包括常用日志框架的比较、配置参数介绍、Log4j2配置详解... 目录前言一、常用日志框架二、配置参数介绍1. 日志级别2. 输出形式3. 日志格式3.1 PatternL

修改若依框架Token的过期时间问题

《修改若依框架Token的过期时间问题》本文介绍了如何修改若依框架中Token的过期时间,通过修改`application.yml`文件中的配置来实现,默认单位为分钟,希望此经验对大家有所帮助,也欢迎... 目录修改若依框架Token的过期时间修改Token的过期时间关闭Token的过期时js间总结修改若依

SpringBoot整合DeepSeek实现AI对话功能

《SpringBoot整合DeepSeek实现AI对话功能》本文介绍了如何在SpringBoot项目中整合DeepSeekAPI和本地私有化部署DeepSeekR1模型,通过SpringAI框架简化了... 目录Spring AI版本依赖整合DeepSeek API key整合本地化部署的DeepSeek

C++实现封装的顺序表的操作与实践

《C++实现封装的顺序表的操作与实践》在程序设计中,顺序表是一种常见的线性数据结构,通常用于存储具有固定顺序的元素,与链表不同,顺序表中的元素是连续存储的,因此访问速度较快,但插入和删除操作的效率可能... 目录一、顺序表的基本概念二、顺序表类的设计1. 顺序表类的成员变量2. 构造函数和析构函数三、顺序表

Ollama整合open-webui的步骤及访问

《Ollama整合open-webui的步骤及访问》:本文主要介绍如何通过源码方式安装OpenWebUI,并详细说明了安装步骤、环境要求以及第一次使用时的账号注册和模型选择过程,需要的朋友可以参考... 目录安装环境要求步骤访问选择PjrIUE模型开始对话总结 安装官方安装地址:https://docs.

python实现简易SSL的项目实践

《python实现简易SSL的项目实践》本文主要介绍了python实现简易SSL的项目实践,包括CA.py、server.py和client.py三个模块,文中通过示例代码介绍的非常详细,对大家的学习... 目录运行环境运行前准备程序实现与流程说明运行截图代码CA.pyclient.pyserver.py参

使用C++实现单链表的操作与实践

《使用C++实现单链表的操作与实践》在程序设计中,链表是一种常见的数据结构,特别是在动态数据管理、频繁插入和删除元素的场景中,链表相比于数组,具有更高的灵活性和高效性,尤其是在需要频繁修改数据结构的应... 目录一、单链表的基本概念二、单链表类的设计1. 节点的定义2. 链表的类定义三、单链表的操作实现四、

Spring Boot 整合 ShedLock 处理定时任务重复执行的问题小结

《SpringBoot整合ShedLock处理定时任务重复执行的问题小结》ShedLock是解决分布式系统中定时任务重复执行问题的Java库,通过在数据库中加锁,确保只有一个节点在指定时间执行... 目录前言什么是 ShedLock?ShedLock 的工作原理:定时任务重复执行China编程的问题使用 Shed