Java ReentrantLock 源码学习

(1) 什么是ReentrantLock

ReentrantLock是Java里的一种可重入锁。 也可以用来达到互斥的效果。
根据创建时的传参,可以分为公平锁和非公平锁。

(2) 为什么要用ReentrantLock

ReentrantLock和synchroized异同点

(3) ReentrantLock怎么用

public class ReentrantLockDemo {

    public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock();
        lock.lock();
        try {
            // 业务逻辑处理
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}
Exception in thread "main" java.lang.IllegalMonitorStateException
	at java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.java:151)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQueuedSynchronizer.java:1261)
	at java.util.concurrent.locks.ReentrantLock.unlock(ReentrantLock.java:457)
	at cn.wkq.java.juc.reentrantlock.ReentrantLockDemo.main(ReentrantLockDemo.java:29)

(4) ReentrantLock源码解读

(4.1) ReentrantLock结构

ReentrantLock包含抽象内部类Sync、内部类 NonfairSync、FairSync
Sync 继承 AbstractQueuedSynchronizer
AbstractQueuedSynchronizer内部有同步状态state、等待队列头结点head、等待队列尾结点tail

(4.1) 加锁流程

/**
 * Creates an instance of {@code ReentrantLock}.
 * This is equivalent to using {@code ReentrantLock(false)}.
 */
public ReentrantLock() {
    sync = new NonfairSync();
}

ReentrantLock $ NonfairSync

/**
 * Sync object for non-fair locks
 */
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    /**
     * Performs lock.  Try immediate barge, backing up to normal
     * acquire on failure.
     */
    final void lock() {
        // 使用CAS尝试将state状态变量设置为1
        if (compareAndSetState(0, 1))
            // 设置当前拥有独占访问权限的线程。
            setExclusiveOwnerThread(Thread.currentThread());
        else
            // 以独占模式获取,忽略中断。
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

AbstractQueuedSynchronizer::acquire

/**
 * Acquires in exclusive mode, ignoring interrupts.  Implemented
 * by invoking at least once {@link #tryAcquire},
 * returning on success.  Otherwise the thread is queued, possibly
 * repeatedly blocking and unblocking, invoking {@link
 * #tryAcquire} until success.  This method can be used
 * to implement method {@link Lock#lock}.
 *
 * @param arg the acquire argument.  This value is conveyed to
 *        {@link #tryAcquire} but is otherwise uninterpreted and
 *        can represent anything you like.
 */
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        // 中断当前线程
        selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

ReentrantLock $ Sync::nonfairTryAcquire

/**
 * Performs non-fair tryLock.  tryAcquire is implemented in
 * subclasses, but both need nonfair try for trylock method.
 * 执行不公平的 tryLock。 tryAcquire 在子类中实现,但两者都需要对 trylock 方法进行非公平尝试。
 */
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        // 通过CAS尝试将state同步状态变量从0设置为1 获取同步锁
        if (compareAndSetState(0, acquires)) {
            // 将独占锁的拥有者设置为当前线程
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 如果拥有独占锁的的线程是当前线程的话,表示当前线程需要重复获取锁(重入锁)
    else if (current == getExclusiveOwnerThread()) {
        // 当前同步状态state变量值加1
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        // 写入state同步状态变量值,由于使用volatile修饰,单独的读写操作具有原子性
        setState(nextc);
        return true;
    }
    return false;
}
/**
 * Creates and enqueues node for current thread and given mode.
 * 为当前线程和给定模式创建和排队节点。
 *
 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
 * @return the new node
 */
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        //cas尝试设置尾结点
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 把当前节点放到尾结点
    enq(node);
    return node;
}


/**
 * Inserts node into queue, initializing if necessary. See picture above.
 * @param node the node to insert
 * @return node's predecessor
 */
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            // 初始化头结点 并通过cas设置头结点 
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            // 把当前节点设置成尾结点 如果失败一直循环
            if (compareAndSetTail(t, node)) {
                t.next = node;
                // 设置尾结点成功,返回
                return t;
            }
        }
    }
}
/**
 * Acquires in exclusive uninterruptible mode for thread already in
 * queue. Used by condition wait methods as well as acquire.
 * 以独占不间断模式获取已在队列中的线程。 由条件等待方法以及获取使用。
 *
 * @param node the node
 * @param arg the acquire argument
 * @return {@code true} if interrupted while waiting
 */
final boolean acquireQueued(final Node node, int arg) {
    // 标志cancelAcquire()方法是否执行
    boolean failed = true;
    try {
        // 标记等待过程中是否中断过
        boolean interrupted = false;
        // 开始自旋,要么获取锁,要么中断
        for (;;) {
            // 获取前驱节点
            final Node p = node.predecessor();
            /**
             * 如果p是头结点,说明当前节点在真实数据队列的首部,就尝试获取锁(别忘了头结点是虚节点) 
             * 了解等待队列的数据结构和机制更容易看懂
             *
             * 如果当前节点的前驱结点已经是同步队列的头结点了,说明了两点内容:
             * 1、其前驱结点已经获取到了同步锁了,并且锁还没释放
             * 2、其前驱结点已经获取到了同步锁了,但是锁已经释放了
             * 
             * 然后使用tryAcquire()方法去尝试获取同步锁,如果前驱结点已经释放了锁,那么就会获取成功,
             * 否则同步锁获取失败,继续循环
             */
            if (p == head && tryAcquire(arg)) {
                // 获取锁成功,头指针移动到当前node
                setHead(node);
                // 然后将当前节点的前驱结点的后继结点置为null,帮助进行垃圾回收   为什么要设置成null?
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            /**
             * 说明p为头节点且当前没有获取到锁(可能是非公平锁被抢占了)或者是p不为头结点,这个时候就要判断当前node是否要被阻塞(被阻塞条件:前驱节点的waitStatus为-1),防止无限循环浪费资源。 
             * 
             * shouldParkAfterFailedAcquire()是对当前节点的前驱结点的状态进行判断,
             * 以及去针对各种状态做出相应处理; 
             * 如果前驱结点p的状态为SIGNAL的话,就返回true。
             * 
             * parkAndCheckInterrupt()主要用于挂起当前线程,阻塞调用栈,返回当前线程的中断状态。
             * 
             */
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        /**
         * 如果for(;;)循环中出现异常,并且failed=false没有执行的话,cancelAcquire方法
         * 就会将当前线程的状态置为 node.CANCELLED 已取消状态,并且将当前节点node移出
         * 同步队列。
         */
        if (failed)
            cancelAcquire(node);
    }
}


/**
 * Checks and updates status for a node that failed to acquire.
 * Returns true if thread should block. This is the main signal
 * control in all acquire loops.  Requires that pred == node.prev.
 *
 * @param pred node's predecessor holding status
 * @param node the node
 * @return {@code true} if thread should block
 */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
/**
 * Convenience method to interrupt current thread.
 */
static void selfInterrupt() {
    Thread.currentThread().interrupt();
}

(4.2) 释放锁流程

   
/**
 * Attempts to release this lock.
 *
 * <p>If the current thread is the holder of this lock then the hold
 * count is decremented.  If the hold count is now zero then the lock
 * is released.  If the current thread is not the holder of this
 * lock then {@link IllegalMonitorStateException} is thrown.
 *
 * @throws IllegalMonitorStateException if the current thread does not
 *         hold this lock
 */
public void unlock() {
    sync.release(1);
}
/**
 * Releases in exclusive mode.  Implemented by unblocking one or
 * more threads if {@link #tryRelease} returns true.
 * This method can be used to implement method {@link Lock#unlock}.
 *
 * @param arg the release argument.  This value is conveyed to
 *        {@link #tryRelease} but is otherwise uninterpreted and
 *        can represent anything you like.
 * @return the value returned from {@link #tryRelease}
 */
public final boolean release(int arg) {
    // 上边自定义的tryRelease如果返回true,说明该锁没有被任何线程持有
    if (tryRelease(arg)) {
        // 获取头结点
        Node h = head;
        // 头结点不为空并且头结点的waitStatus不是初始化节点情况,解除线程挂起状态
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

这里的判断条件为什么是h != null && h.waitStatus != 0?

h == null Head还没初始化。初始情况下,head == null,第一个节点入队,Head会被初始化一个虚拟节点。所以说,这里如果还没来得及入队,就会出现head == null 的情况。
h != null && waitStatus == 0 表明后继节点对应的线程仍在运行中,不需要唤醒。
h != null && waitStatus < 0 表明后继节点可能被阻塞了,需要唤醒。

// 方法返回当前锁是不是没有被线程持有
protected final boolean tryRelease(int releases) {
    // 减少可重入次数
    int c = getState() - releases;
    // 当前线程不是持有锁的线程,抛出异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    // 如果持有线程全部释放,将当前独占锁所有线程设置为null,并更新state
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}
/**
 * Wakes up node's successor, if one exists.
 *
 * @param node the node
 */
private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    // 获取头结点waitStatus
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    // 获取当前节点的下一个节点
    Node s = node.next;
    // 如果下个节点是null或者下个节点被cancelled,就找到队列最开始的非cancelled的节点
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 就从尾部节点开始找,到队首,找到队列第一个waitStatus<0的节点。
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 如果当前节点的下个节点不为空,而且状态<=0,就把当前节点unpark
    if (s != null)
        LockSupport.unpark(s.thread);
}

参考资料

[1] AQS之ReentrantLock源码解析
[2] 从ReentrantLock的实现看AQS的原理及应用
[3] 不可不说的Java“锁”事