本文主要是介绍关于Java中锁使用的一些问题,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
文章目录
- 1. 开始
- 2. synchronized与Lock
- 3. Lock与Condition
- 4. LinkedBlockingQueue主要方法
- 5. 关于synchronized
- 5.1 示例说明
- 5.2 锁升级
- 6. 关于Java对象头
- 7. 回到开始
- 8. 资料参考
1. 开始
首先,我们来看一个比较长,容易劝退的例子,看看你能发现哪些问题,以及可以优化的地方。
如果没什么耐心,也可以跳过,代码实现的基本逻辑就是:统一向远程服务器发送日志数据。
import org.apache.commons.lang3.StringUtils;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;public class TaskHelper {private static SendDataTask sendDataTask;private static BlockingQueue<String> dataQueue = new LinkedBlockingQueue<>(100000);public static void main(String[] args) {sendDataTask = new SendDataTask();Thread thread = new Thread(sendDataTask, "SendDataTask");thread.setDaemon(true);thread.start();}public void submit(String data) {if (StringUtils.isNotBlank(data)) {dataQueue.add(data);sendDataTask.signalQueue();}}private static class SendDataTask implements Runnable {private ReentrantLock lock = new ReentrantLock();private Condition notEmpty = lock.newCondition();private int checkPeriod = 10 * 1000;private volatile boolean stop = false;@Overridepublic void run() {while (!stop) {if (isEmpty()) {awaitQueue();}try {Thread.sleep(checkPeriod);} catch (InterruptedException e) {e.printStackTrace();}if (stop) {break;}check();}}public void check() {boolean timedOut = false;List<String> dataList = new ArrayList<>();for (; ; ) {if (timedOut) {return;}try {String data = dataQueue.poll(100, TimeUnit.MILLISECONDS);if (StringUtils.isBlank(data)) {timedOut = true;if (dataList.size() > 0) {sendData(dataList);}} else {dataList.add(data);if (dataList.size() > 100) {sendData(dataList);}}} catch (InterruptedException ignore) {}}}public void stop() {this.stop = true;//执行关闭逻辑}private boolean isEmpty() {return dataQueue.size() == 0;}private void awaitQueue() {boolean flag = lock.tryLock();if (flag) {try {notEmpty.await();} catch (InterruptedException e) {e.printStackTrace();} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}}}public void signalQueue() {boolean flag = false;try {flag = lock.tryLock(100, TimeUnit.MILLISECONDS);if (flag)notEmpty.signalAll();} catch (InterruptedException e) {e.printStackTrace();} catch (Exception e) {e.printStackTrace();} finally {if (flag)lock.unlock();}}public void sendData(List<String> data) {System.out.println("数据处理业务逻辑");}}}
刚刚开始,我也想为什么不用Executors创建ThreadPoolExecutor来处理?
我想大概是:
- 不能丢数据,如果并发高,不好处理线程池的拒绝逻辑。
- 程序关闭的时候,需要保存未发送数据,避免丢数据
- 需要重试机制,处理网络异常情况
重试部分的逻辑我省略了,有兴趣可以了解一下下面这个库:
<dependency><groupId>com.github.lowzj</groupId><artifactId>java-retrying</artifactId><version>1.2</version>
</dependency>
但是真的有必要即使用LinkedBlockingQueue,有使用Lock吗?
这个问题,我们放在后面解决,我们先来看点相关知识。
2. synchronized与Lock
synchronized与Lock最主要的区别是:
- synchronized锁是非公平的,Lock锁是否公平可以设置
- synchronized锁是不可中断的,Lock锁是否响应中断可以设置
- synchronized没有获取锁超时机制,Lock锁可以设置锁超时时间
其实,JDK对synchronized通过偏向锁(CAS线程ID)、轻量级锁(自旋)等方式优化,其实性能并不比Lock差,具体可以看看后面synchronized。
个人觉得,除非你需要设置公平锁、需要锁响应中断,需要获取锁超时、需要读写分离锁等,其他时候尽量使用synchronized。
因为性能问题,考虑使用Lock大多数时候是不值得的,除非的确对性能有极端要求,并且做了大量对比测试。
因为Lock使用的方式相对复杂,要考虑做更多测试,否则,一个Bug带来的问题绝对比可能的一点点性能降低带来的危害要大。
3. Lock与Condition
synchronized通过Object的wait、notify、notifyAll实现等待通知机制,Lock则需要Condition配合。
看一个简单的例子:
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;public class LockCondition {private final Lock lock = new ReentrantLock();private final Condition notFull = lock.newCondition();private final Condition notEmpty = lock.newCondition();private final Task[] tasks = new Task[100];private int addIndex;private int getIndex;private int count;public void addTask(Task task) throws InterruptedException {lock.lock();try {// 注意使用的是while,不是ifwhile (count == tasks.length) {//如果队列满了notFull.await();//notFull条件不满足了,需要等待}tasks[addIndex++] = task;if (addIndex == tasks.length){addIndex = 0;}count++;notEmpty.signal();//添加数据之后,非空条件满足了,就可以唤醒等在这个条件上的线程了} finally {lock.unlock();}}public Task getTask() throws InterruptedException {lock.lock();try {while (count == 0) {//如果队列为空notEmpty.await();//队列为空,非空条件不满足了,需要等待}Task task = tasks[getIndex++];if (getIndex == tasks.length) {getIndex = 0;}count--;notFull.signal();//数据被取出之后,notFull条件满足了,需要唤醒等待notFull条件的线程return task;} finally {lock.unlock();}}private static class Task{}
}
这个例子虽然非常粗糙,但是基本包含使用Lock和Condition的要素了:
- Condition是通过Lock获取
- await放在while循环中,而不是if
- await、signal都必须先获取到锁lock之后执行
- 在finally中释放锁:lock.unlock()
- 条件不满足的时候执行await
- 条件满足之后执行signal
想要更好的知道Lock的使用方式,JDK的BlockingQueue的相关实现类绝对是最好的教程实例,读读这些类的源码吧,例如LinkedBlockingQueue。
4. LinkedBlockingQueue主要方法
添加元素方法 | 说明 |
---|---|
add | 向队列添加元素,如果队列满,则抛出异常 |
put | 向队列添加元素,阻塞到添加成功 |
offer | 向队列添加元素,没有添加成功返回false,可以设置一个添加超时时间 |
获取元素方法 | 说明 |
---|---|
remove | 获取并删除队列头部元素,如果队列为空,则抛出异常 |
take | 从队列添获取元素,一直等待到获取成功 |
poll | 从队列添获取元素,没有返回null,可以设置一个获取超时时间 |
peek | 从队列添获取元素,没有返回null |
drainTo | 从队列添获取元素,一次可以获取多个,可以设置最大获取个数 |
JDK中的其他阻塞队列方法逻辑大多类似,基本就是分:
- 是否抛出异常(add remove)
- 阻塞(put take)
- 超时还是直接返回(offer poll)
5. 关于synchronized
5.1 示例说明
public class Synchronize {private Object lock = new Object();public static synchronized void doSomethingA(){}public static void doSomethingB(){synchronized (Synchronize.class){}}public synchronized void doSomethingC(){}public void doSomethingD(){synchronized (this){}}public void doSomethingE(){synchronized (lock){}}
}
如果是静态方法,锁对象就是类的class,也就是下面2个方法加锁方式是等价的:
public static synchronized void doSomethingA(){}public static void doSomethingB(){synchronized (Synchronize.class){}
}
如果是实例方法,所对象是实例本身,也就是下面2个方法加锁方式是基本等价的:
public synchronized void doSomethingC(){}public void doSomethingD(){synchronized (this){}
}
静态方法和实例方法使用的是不同的锁,所以他们之间不会相互影响。
如果某些方法需要减小锁的粒度,可以new一个新对象做为锁对象,来减少锁的争用。
5.2 锁升级
之所以有锁升级过程是因为JDK对synchronized关键字做了优化。
无锁 -> 偏向锁 -> 轻量级锁 -> 重量级锁
每个Java对象有一个对象头,以32位为例。
无锁时候:
对象的HashCode | 分代年龄 | 是否偏向锁 | 锁标志位 |
---|---|---|---|
25位 | 4位 | 1位 | 2位(01) |
偏向锁:
线程ID | Epoch(偏向锁时间) | 分代年龄 | 是否偏向锁 | 锁标志位 |
---|---|---|---|---|
23位 | 2位 | 4位 | 1位 | 2位(01) |
因为锁不仅不存在多线程竞争,而且总是由同一线程多次获得,偏向锁通过对线程ID的CAS操作来避免同步。
可以通过XX:-UseBiasedLocking=false设置偏向锁开启与关闭,可以通过XX:BiasedLockingStartupDelay=0参数设置偏向锁是否延迟开启,0表示关闭延迟开启。
轻量级锁:
轻量级锁考虑的是因为,大多数时候一个线程持有锁的时间非常短,在没有获取到锁的时候通过短暂的自旋来避免上下文切换开销。
自旋的逻辑大概就是通过while(true)空等,会消耗CPU。
举个生活中的例子,就像烧开水,开始等的时候发现水没有开,就直接回去了,后来你知道水马上要开了,再去检查的时候,你决定等几秒钟,以免刚回去可能还没坐下水就开了。
可以通过-XX:+UseSpinning参数来设置是否启用锁自旋,使用-XX:PreBlockSpin参数设置自旋次数
重量级锁:
重量级的锁就是synchronize最开始的实现方式,对象内部的监视器锁monitor,依赖于底层的操作系统的Mutex Lock实现。
6. 关于Java对象头
这里不细介绍了,可以使用open jdk的jol查看:
<dependency><groupId>org.openjdk.jol</groupId><artifactId>jol-core</artifactId><version>0.14</version>
</dependency>
import org.junit.Test;
import org.openjdk.jol.info.ClassLayout;
import org.openjdk.jol.info.GraphLayout;public class JOLTest {private static class User{private Integer id;private String name;public Integer getId() {return id;}public void setId(Integer id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public synchronized void doSomething(){System.out.println(ClassLayout.parseInstance(this).toPrintable());System.out.println("#################");}}@Testpublic void test(){User user = new User();user.setId(1);user.setName("tim");//查看对象内部信息System.out.println(ClassLayout.parseInstance(user).toPrintable());System.out.println("-------------");//查看对象外部信息System.out.println(GraphLayout.parseInstance(user).toPrintable());System.out.println("-------------");//查看对象占用空间总大小System.out.println(GraphLayout.parseInstance(user).totalSize());System.out.println("-------------");user.doSomething();}
}
可以使用-XX:+UseCompressedOops参数对比一下指针是否压缩的差别。
7. 回到开始
回到我们最开始的程序,看看有哪些问题?
其实,最重要的就2个问题:
- LinkedBlockingQueue队列已经实现了锁相关的逻辑,我们自己使用的时候没必要在加一次锁
- 拿着锁sleep,Thread.sleep()
其实,JDK已经为我们提供了足够多的工具,很多时候我们没有必要写太多相对底层的代码。
下面,提供一个简化版的示例,仅供参考:
import java.util.LinkedList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;public class MyTaskHelper {private static final BlockingQueue<String> DATA_QUEUE = new LinkedBlockingQueue<>(1000);private static volatile boolean stop = false;private static Thread thread;static {start();}public static void addTask(String task) throws InterruptedException {DATA_QUEUE.put(task);}public static synchronized void start(){if(stop){if(thread == null || !thread.isAlive()) {thread = new Thread(new SendDataTask(), "MyTaskHelper");thread.setDaemon(true);thread.start();}stop = false;}}public static void stop(){stop = true;System.out.println("处理未发送数据逻辑");}private static class SendDataTask implements Runnable {@Overridepublic void run() {LinkedList<String> datas = new LinkedList<>();while (!stop){datas.clear();DATA_QUEUE.drainTo(datas,100);System.out.println("发送日志数据逻辑 datas");}}}
}
8. 资料参考
OpenJDK对象头
这篇关于关于Java中锁使用的一些问题的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!