private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
//如果设置t.next = node;
//compareAndSetTail如果设置tail失败,则需要解除t的next关联,所以在compareAndSetTail设置成功后再设置t.next = node
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;
//分三种情况:
//1.如果node为tail,将pred设为tail,pred.next设为空
//2.如果node不为tail,且pred为signal时,不用unpark后续节点,
// 只需要设置pred.next = node.next,丢失中间所有cancell的节点
//3.pred为head节点,unpark后续节点
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
//导致unparkSuccessor只能从tail往前找waitStatus <= 0的节点
//从head节点往后找会出现闭环
node.next = node; // help GC
}
//比jdk1.5的实现要更加精确
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
/*
1.如果前一个节点的等待状态waitStatus<0,也就是前面的节点还没有获
得到锁,那么返回true,表示当前节点(线程)就应该park()了。否则进行2。
2.如果前一个节点的等待状态waitStatus>0,也就是前一个节点
被CANCELLED了,那么就将前一个节点去掉,递归此操作直到所有前一个节
点的waitStatus<=0,进行4。否则进行3。
3.前一个节点等待状态waitStatus=0,修改前一个节点状态位为SINGAL,
表示后面有节点等待你处理,需要根据它的等待状态来决定是否该park()。
进行4。
4.返回false,表示线程不应该park()。
*/
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
long lastTime = System.nanoTime();
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
if (nanosTimeout <= 0)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
//如果nanosTimeout < spinForTimeoutThreshold
//自旋会比线程切换更有效率,否则才做park
LockSupport.parkNanos(this, nanosTimeout);
long now = System.nanoTime();
nanosTimeout -= now - lastTime;
lastTime = now;
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
//前任节点状态为cancelled,compareAndSetWaitStatus设置前任节点
//状态失败,表明前任节点此时已经cancelled,唤醒线程
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
分享到:
相关推荐
Java大神Doug Lea对AQS的解析:Most synchronizers (locks, barriers, etc.) in the J2SE1.5 java.util.concurrent package are constructed using a small framework based on class AbstractQueuedSynchronizer. ...
主要为大家详细介绍了Java并发系列之AbstractQueuedSynchronizer源码,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
解析AbstractQueuedSynchronizer这个类中,锁的获取、释放的相关逻辑。
AbstractQueuedSynchronizer(AQS)详解.mp4 使用AQS重写自己的锁.mp4 重入锁原理与演示.mp4 读写锁认识与原理.mp4 细读ReentrantReadWriteLock源码.mp4 ReentrantReadWriteLock锁降级详解.mp4 线程安全性问题简单总结...
1.1 AQS源码解析 https://blog.csdn.net/qq_34125999/article/details/105343472 1.2 Sync /** * ReentrantLock 基础结构 */ abstract static class Sync extends AbstractQueuedSynchronizer { private static ...
第22节AbstractQueuedSynchronizer(AQS)详解00:49:04分钟 | 第23节使用AQS重写自己的锁00:31:04分钟 | 第24节重入锁原理与演示00:12:24分钟 | 第25节读写锁认识与原理00:18:04分钟 | 第26节细读...
第22节AbstractQueuedSynchronizer(AQS)详解00:49:04分钟 | 第23节使用AQS重写自己的锁00:31:04分钟 | 第24节重入锁原理与演示00:12:24分钟 | 第25节读写锁认识与原理00:18:04分钟 | 第26节细读...
第22节AbstractQueuedSynchronizer(AQS)详解00:49:04分钟 | 第23节使用AQS重写自己的锁00:31:04分钟 | 第24节重入锁原理与演示00:12:24分钟 | 第25节读写锁认识与原理00:18:04分钟 | 第26节细读...
第22节AbstractQueuedSynchronizer(AQS)详解00:49:04分钟 | 第23节使用AQS重写自己的锁00:31:04分钟 | 第24节重入锁原理与演示00:12:24分钟 | 第25节读写锁认识与原理00:18:04分钟 | 第26节细读...
AbstractQueuedSynchronizer DERRANTCM 剑指offer 占小狼 ConcurrentHashMap skywang12345 数据结构 IAM四十二 Android动画总结 Carson_Ho Android基础 me115 图解设计模式 Piasy Android开源框架 朱祁林 https原理...
AbstractQueuedSynchronizer DERRANTCM 剑指offer 占小狼 ConcurrentHashMap skywang12345 数据结构 IAM四十二 Android动画总结 Carson_Ho Android基础 me115 图解设计模式 Piasy Android开源框架 朱祁林 https原理...
AbstractQueuedSynchronizer DERRANTCM 剑指offer 占小狼 ConcurrentHashMap skywang12345 数据结构 IAM四十二 Android动画总结 Carson_Ho Android基础 me115 图解设计模式 Piasy Android开源框架 朱祁林 https原理...
AQS是J.U.C包下AbstractQueuedSynchronizer抽象的队列式的同步器的简称,这是一个抽象类,它定义了一套多线程访问共享资源的同步器框架,J.U.C包下的许多同步类实现都依赖于它,比如ReentrantLock/Semaphore/...