本文主要是介绍【业务功能篇 142】多线程池+Semaphore信号量 数据并行处理,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
- 简介
信号量(Semaphore),有时被称为信号灯,是在多线程环境下使用的一种设施, 它负责协调各个线程, 以保证它们能够正确、合理的使用公共资源。
一个计数信号量。从概念上讲,信号量维护了一个许可集。如有必要,在许可可用前会阻塞每一个 acquire(),然后再获取该许可。每个 release() 添加一个许可,从而可能释放一个正在阻塞的获取者。但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采取相应的行动。拿到信号量的线程可以进入代码,否则就等待。通过acquire()和release()获取和释放访问许可。
- 概念
Semaphore分为单值和多值两种,前者只能被一个线程获得,后者可以被若干个线程获得。以一个停车场运作为例。为了简单起见,假设停车场只有三个车位,一开始三个车位都是空的。这时如果同时来了五辆车,看门人允许其中三辆不受阻碍的进入,然后放下车拦,剩下的车则必须在入口等待,此后来的车也都不得不在入口处等待。这时,有一辆车离开停车场,看门人得知后,打开车拦,放入一辆,如果又离开两辆,则又可以放入两辆,如此往复。 在这个停车场系统中,车位是公共资源,每辆车好比一个线程,看门人起的就是信号量的作用。 更进一步,信号量的特性如下:信号量是一个非负整数(车位数),所有通过它的线程(车辆)都会将该整数减一(通过它当然是为了使用资源),当该整数值为零时,所有试图通过它的线程都将处于等待状态。在信号量上我们定义两种操作: Wait(等待) 和 Release(释放)。 当一个线程调用Wait(等待)操作时,它要么通过然后将信号量减一,要么一直等下去,直到信号量大于一或超时。Release(释放)实际上是在信号量上执行加操作,对应于车辆离开停车场,该操作之所以叫做“释放”是因为加操作实际上是释放了由信号量守护的资源。
在java中,还可以设置该信号量是否采用公平模式,如果以公平方式执行,则线程将会按到达的顺序(FIFO)执行,如果是非公平,则可以后请求的有可能排在队列的头部。
JDK中定义如下: Semaphore(int permits, boolean fair) 创建具有给定的许可数和给定的公平设置的Semaphore。Semaphore当前在多线程环境下被扩放使用,操作系统的信号量是个很重要的概念,在进程控制方面都有应用。Java并发库Semaphore 可以很轻松完成信号量控制,Semaphore可以控制某个资源可被同时访问的个数,通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可。比如在Windows下可以设置共享文件的最大客户端访问个数。Semaphore实现的功能就类似厕所有5个坑,假如有10个人要上厕所,那么同时只能有多少个人去上厕所呢?同时只能有5个人能够占用,当5个人中 的任何一个人让开后,其中等待的另外5个人中又有一个人可以占用了。另外等待的5个人中可以是随机获得优先机会,也可以是按照先来后到的顺序获得机会,这取决于构造Semaphore对象时传入的参数选项。单个信号量的Semaphore对象可以实现互斥锁的功能,并且可以是由一个线程获得了“锁”,再由另一个线程释放“锁”,这可应用于死锁恢复的一些场合。
原文链接:https://blog.csdn.net/superviser3000/article/details/119893008
多线程池+Semaphore信号量 数据并行处理
/*** 多线程处理* synchronized 加锁访问请求并发高出现数据串改* @param list 数据* @param nSemaphore 信号量*/protected synchronized void threadPraseDate(List<DataF> list, int nThread) {if (CollectionUtils.isEmpty(list) || nThread <= 0 || CollectionUtils.isEmpty(list)) {return;}Semaphore semaphore = new Semaphore(nThread);//线程池 方法内用完直接回收ExecutorService executorService = new ThreadPoolExecutor(50, 50, 60L, TimeUnit.SECONDS,new ArrayBlockingQueue<>(2048), new ThreadFactoryBuilder().setNameFormat("Data1-%d").build(),new ThreadPoolExecutor.CallerRunsPolicy());for (DataF item : list) {executorService.execute(() -> {try {//获取信号量semaphore.acquire();//执行相关业务逻辑,逻辑复杂可以抽象出来execute(item);} catch (InterruptedException e) {e.printStackTrace();} finally {//释放信号量semaphore.release();}});}executorService.shutdown();}
这篇关于【业务功能篇 142】多线程池+Semaphore信号量 数据并行处理的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!