JUC并发编程知识学习及查漏补缺

下载OpenJDK源码

并发包中很多类底层都是native方法,为了更好的理解需要去阅读jdk的源代码(c++)

jdk8-b120下载地址

Thread启动原理

线程启动调用的是线程对象的start方法,而start方法中调用了一个native的start0方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public synchronized void start() {
if (threadStatus != 0)
throw new IllegalThreadStateException();
group.add(this);
boolean started = false;
try {
start0();
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
}
}
}

openjdk写的JNI一般是一一对应的,Thread.java对应的就是Thread.c

start0对应的是JVM_StartThread

jvm.cpp中搜索JVM_StartThread,发现最终调用Thread::start

1
2
3
4
5
6
7
8
JVM_ENTRY(void, JVM_StartThread(JNIEnv* env, jobject jthread))
JVMWrapper("JVM_StartThread");
JavaThread *native_thread = NULL;
// ...

Thread::start(native_thread);

JVM_END

thread.cpp文件中找到方法,底层是调用操作系统层去开启线程

1
2
3
4
5
6
7
8
9
10
void Thread::start(Thread* thread) {
trace("start", thread);
if (!DisableStartThread) {
if (thread->is_Java_thread()) {
java_lang_Thread::set_thread_status(((JavaThread*)thread)->threadObj(),
java_lang_Thread::RUNNABLE);
}
os::start_thread(thread);
}
}

一锁两并三程

一锁

synchronized

两并

  • 并发:一个处理器同时处理多个任务
  • 并行:多个处理器同时处理多个任务

三程

  • 进程:系统中运行的一个应用程序就是一个进程,每个进程都有自己的内存空间和系统资源
  • 线程:在同一个进程内会有1个或多个线程,是大多数操作系统进行时序调度的基本单元
  • 管程:监视器(Monitor),也就是平时说的锁,是一种同步机制

Future

Future接口定义了操作异步任务一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等

一句话:Future接口可以为主线程开一个分支任务,专门为主线程处理耗时费力的复杂业务

FutureTask

三个特点:多线程、有返回值、异步任务

常用的多线程实现方式,如继承Thread、实现Runnable接口都没有返回值;实现Callable接口可以有返回值,但是又没法放到Thread的构造方法中,而FutureTask类实现了Runnable接口又包含一个注入Callable对象的构造方法,通过实现Callable接口再经过FutureTask包装后来创建Thread就可以解决此问题。

使用示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class FutureTaskDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<String> futureTask = new FutureTask<>(new BuyWaterThread());
Thread thread = new Thread(futureTask);
thread.start();
// 阻塞等待线程执行完成,获取返回值
String s = futureTask.get();
log.info("喝到{}了", s);
}
}
class BuyWaterThread implements Callable<String> {
@Override
public String call() throws Exception {
Thread.sleep(TimeUnit.SECONDS.toMillis(10));
return "水";
}
}

Future优缺点

优点

future+线程池异步多线程任务配合,能显著提升程序的执行效率

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@SneakyThrows
public static void main(String[] args) {
List<Integer> taskDurations = Arrays.asList(500, 300, 200);
TimeInterval timer = DateUtil.timer();
m1(taskDurations, timer); // 多任务依次执行耗时 1021 ms
m2(taskDurations, timer); // 多任务异步执行耗时 535 ms
}
private static void m1(List<Integer> taskDurations, TimeInterval timer) throws InterruptedException {
for (Integer taskDuration : taskDurations) {
Thread.sleep(taskDuration);
}
log.info("多任务依次执行耗时 {} ms", timer.intervalRestart());
}
@SneakyThrows
private static void m2(List<Integer> taskDurations, TimeInterval timer) {
ExecutorService threadPool = Executors.newFixedThreadPool(3);
List<FutureTask<String>> tasks = new ArrayList<>();
for (Integer taskDuration : taskDurations) {
FutureTask<String> task = new FutureTask<>(() -> {
try {
Thread.sleep(taskDuration);
} catch (InterruptedException e) {
e.printStackTrace();
}
return taskDuration.toString();
});
tasks.add(task);
threadPool.submit(task);
}
threadPool.shutdown();
for (FutureTask<String> task : tasks) {
log.info("{}", task.get());
}
log.info("多任务异步执行耗时 {} ms", timer.intervalRestart());
}
缺点
  • get方法阻塞
    • 调用get(long timeout, TimeUnit unit)方法,设置超时时间,超时后会抛出异常,一定程度上缓解阻塞
    • 轮询调用isDone方法,判断任务状态是否为完成,轮询会耗费无谓的CPU资源;若想要异步获取结果,通常还是以此方式,尽量不要阻塞

CompletableFuture

对于Future的缺点,希望进行异步处理是可以传入回调函数,在Future结束时自动调用该回调函数,这样就不用阻塞,也不用去轮询判断了

JDK8设计出CompletableFuture,提供了一种观察者模式类似的机制

CompletionStage接口

  • CompletionStage代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段
  • 一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发

创建对象

一般通过核心的四个静态方法来创建

  • runAsync(Runnable runnable):无返回值
  • runAsync(Runnable runnable, Executor executor):无返回值,使用线程池
  • supplyAsync(Supplier<U> supplier):有返回值
  • supplyAsync(Supplier<U> supplier, Executor executor):有返回值,使用线程池
  • 若没有指定线程池,直接使用默认的ForkJoinPool.commonPool()作为它的线程池去执行异步代码

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private static void m1() {
ExecutorService threadPool = Executors.newFixedThreadPool(5);
try {
CompletableFuture.supplyAsync(() -> {
log.info("in {}", Thread.currentThread().getName());
int num = ThreadLocalRandom.current().nextInt(10);
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(1));
} catch (InterruptedException e) {
e.printStackTrace();
}
//num = num / 0;
return num;
}, threadPool).whenComplete((v, e) -> {
if (e == null) {
log.info("get value {}", v);
} else {
log.error("发生异常 {}", e.getMessage(), e);
}
});
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
System.out.println("主线程执行");
}

并行查询图书价格

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private static List<WebMall> webMalls = Arrays.asList(
new WebMall("jd"),
new WebMall("taobao"),
new WebMall("pdd")
);
private static List<String> getPriceByCompletableFuture(String bookName) {
TimeInterval timer = DateUtil.timer();
ExecutorService threadPool = Executors.newFixedThreadPool(3);
List<String> prices = null;
// List<WebMall> -> List<CompletableFuture> -> List<String>
try {
prices = webMalls.stream()
.map(w -> CompletableFuture.supplyAsync(() -> {
return String.format("%s in %s price is %.2f", bookName, w.getName(), w.queryBookPrice(bookName));
}, threadPool))
.collect(Collectors.toList())
.stream()
.map(c -> c.join())
.collect(Collectors.toList());
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
prices.stream().forEach(System.out::println);
log.info("耗时 {} ms", timer.intervalRestart());
return prices;
}

常用方法

获取结果和触发计算
方法 说明
get() 需要抛出异常
get(long timeout, TimeUnit unit) 设置等待超时时间,超时后抛出异常
join() 不需要排除异常
getNow(T valueIfAbsent) 立即获取结果,不阻塞;调用时若没有没有处理完成,则返回传入的valueIfAbsent,否则返回处理结果
complete(T value) 调用时若没有没有处理完成,则返回true,并打断get/join方法立即返回括号值,否则返回false
对计算结果进行处理
方法 说明
thenApply 对计算结果存在依赖关系,这两个线程串行化;使用同一个线程;出现异常后,不再进行下一步
thenApplyAsync 与thenApply的区别在于,从线程池中取一个线程
handle 与thenApply的区别在于,出现异常后仍然会执行下一步
handleAsync 与thenApplyAsync的区别在于,出现异常后仍然会执行下一步

后缀为Async的处理方法都是从线程池中取线程,若没有传入自定义线程池,都用默认线程池ForkJoinPool,下面不再赘述

有可能处理太快,系统优化切换原则,直接使用main线程处理

对计算结果进行消费

接收任务的处理结果,并消费处理,无返回结果

方法 说明
thenRun(Runnable action) 不管上一步的结果
thenAccept(Consumer action) 处理上一步的结果,没有返回值
thenApply(Function fn) 处理上一步的结果,有返回值
对计算速度进行选用
  • applyToEither:哪个任务先处理完,用哪个任务的处理结果
对计算结果进行合并
  • thenCombine:将两个任务的处理结果一起交给thenCombine处理

优点

  • 异步任务结束时,会自动回调某个对象的方法
  • 主线程设置好回调后,不再关心异步任务的执行,异步任务之间可以顺序执行
  • 异步任务出错时,会自动回调某个对象的方法

乐观锁和悲观锁

乐观锁

乐观锁认为自己在使用数据的时候不会有其他线程修改数据,所以不会添加锁,不加锁的特点能够使其读操作的性能大幅提升,适合读多的场景

Java中使用无锁编程来实现,只是在更新数据的时候去判断,在本次更新操作之前有没有其他线程更新了数据,若没有被更新过,则当前线程将自己修改的数据成功写入,否则根据不同的实现方法执行不同的操作,如放弃修改、重试枪锁等

常见的实现方式有:

  • 版本号机制Version
  • 最常用的CAS算法,Java原子类中的递增操作就通过CAS自旋实现

悲观锁

悲观锁认为自己在使用数据的时候一定有其他线程来修改数据,因此在获取数据的时候会先加锁,确保数据不会被别的线程修改;比较严格,更耗性能

synchronized关键字Lock的实现类都是悲观锁

适合于写操作多的场景,先加锁确保写操作时数据正确

synchronized原理解析

同步代码块

示例代码

1
2
3
4
5
public void m1() {
synchronized (this) {
System.out.println("m1");
}
}

编译为字节码后使用 javap -c xxx.class命令查看信息

可见同步代码块使用monitorentermonitorexit指令实现

一般情况下是1个monitorenter和2个monitorexit指令,但是当代码块内手动抛出异常时会只有1个monitorenter和1个monitorexit

1
2
3
4
5
6
public void m1() {
synchronized (this) {
System.out.println("m1");
throw new RuntimeException();
}
}

同步方法

示例代码

1
2
3
public synchronized void m2() {
System.out.println("m1");
}

编译为字节码后使用 javap -v xxx.class命令查看更详细信息

会加一个ACC_SYNCHRONIZED标志,程序执行时判断有此标志时,执行线程会先持有monitor锁,然后再执行方法,最后在方法完成(正常或异常)时释放monitor锁

静态同步方法

示例代码

1
2
3
public synchronized static void m3() {
System.out.println("m1");
}

ACC_STATIC标志将同步方法与静态同步方法区分开

monitor

monitor被称为管程或者监视器,是一种程序结构,结构内的多个子程序(对象或模块)形成的多个工作线程互斥访问共享资源。Java虚拟机调用方法时,将会检查方法的ACC_SYNCHRONIZED访问标志是否被设置,若设置了,执行线程就要求先成功持有管程,然后才能执行方法,当方法实现完成时释放管程。在方法执行期间,执行线程持有了管程,其他任何线程都无法再获取到统一个管程。

Java的monitor在源码中采用ObjectMonitor实现

ObjectMonitor.java -> objectMonitor.cpp -> objectMonitor.hpp

每个对象天生都带一个对象监视器,每一个被锁住的对象都会和monitor关联起来,所有任何一个对象都可以成为一个锁

公平锁和非公平锁

公平锁是指多个线程按照申请锁的顺序来获取锁,类似排队,先来的排前面,这是公平的;

非公平锁是指多个线程获取锁的顺序并不是按照申请锁的顺序,后申请的线程肯能比先申请的线程优先获得锁,类似于插队,在高并发环境下,可能造成优先级翻转或者线程饥饿情况(某个线程一直拿不到锁)

new ReentrantLock()是非公平锁,new ReentrantLock(true)是公平锁

为什么默认非公平锁

  • 恢复挂起的线程到真正获取锁是有时间差的,非公平锁能更加充分地利用CPU时间片,尽量减少CPU空闲状态时间

  • 线程的切换会造成额外的开销

公平锁和非公平锁的使用场景

  • 为了更高的吞吐量,使用非公平锁比较合适
  • 否则就用公平锁

可重入锁

可重入锁又被称为递归锁,是指在同一个线程在外层获取锁的时候,再进入线程的内层方法会自动获取锁,不会因为之前已经获取过没释放而阻塞;在一个synchronized修饰的方法或代码块的内部调用本类的其他synchronized修饰的方法或代码块时,是永远可以得到锁的

ReentrantLocksynchronized关键字都是可重入锁,可重入锁的一个优点是可一定程度避免死锁

隐式锁

synchronized关键字所使用的锁

显式锁

ReentrantLock,使用时lock与unlock要一一配对

synchronized的重入实现原理

每个锁对象拥有一个锁计数器和一个指向持有该锁的线程的指针

当执行monitorenter时,若目标锁对象的计数器为0,说明它没有被其他线程所持有,Java虚拟机将该锁对象的持有线程指针指向当前线程,并将计数器加1

当目标锁对象的计数器不为0时,若锁对象的持有线程为当前线程,Java虚拟机将计数器加1,否则当前线程需要等待锁对象的持有线程释放该锁(计数器变为0)

当执行monitorexit时,Java虚拟机将锁对象的计数器减1,为0表示锁已被释放

死锁及排查

死锁产生的原因

  • 系统资源不足
  • 进程运行顺序不合适
  • 资源分配不当

死锁的排查

jps -l命令查看java进程

jstack 进程号查看进程堆栈信息

或者使用jconsole可视化的方式查看

LockSupport与线程中断

中断机制

一个线程不应该由其他线程来强制中断或停止,应该由线程自己自行停止;

Java提供一种用于停止线程的协商机制–中断,也即中断标识协商机制,Java没有给中断增加任何语法,中断的过程完全需要程序员自己实现。若要中断一个线程,需要手动调用该线程的interrupt方法,该方法也仅仅是将线程对象的中断标识设成true。需要自己写代码不断检测当前线程的标识位,若为true,表示别的线程请求中断当前线程,收到请求后怎么处理需要自己写代码实现。

中断相关API方法
方法 说明
void interrupt() 将线程的中断标识位设置为true,发起一个协商而不会立即停止线程
static boolean interrupted() 判断线程是否被中断并清除当前中断状态;做了两件事:1、返回当前线程的中断状态,测试当前线程是否已被中断 2、将当前线程的中断标识位重置为false,清除线程的中断状态
boolean isInterrupted() 判断当前线程是否被中断(通过检查中断标识位)
如何停止中断运行中的线程
  • 通过volatile变量实现,volatile关键字保证变量在多线程间的可见性

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    static volatile boolean keepRunning = true;
    private static void interruptByVolatile() {
    new Thread(() -> {
    for (;;) {
    if (!keepRunning) {
    log.info("{} 暂停运行", Thread.currentThread().getName());
    break;
    }
    }
    }).start();
    new Thread(() -> {
    try {
    Thread.sleep(TimeUnit.SECONDS.toMillis(5));
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    log.info("{} 发出停止请求", Thread.currentThread().getName());
    keepRunning = false;
    }).start();
    }
  • 通过AtomicBoolean变量实现

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    static AtomicBoolean keepRunning2 = new AtomicBoolean(true);
    private static void interruptByAtomic() {
    new Thread(() -> {
    for (;;) {
    if (!keepRunning2.get()) {
    log.info("{} 暂停运行", Thread.currentThread().getName());
    break;
    }
    }
    }).start();
    new Thread(() -> {
    try {
    Thread.sleep(TimeUnit.SECONDS.toMillis(5));
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    log.info("{} 发出停止请求", Thread.currentThread().getName());
    keepRunning2.set(false);
    }).start();
    }
  • 通过线程的中断标识位实现

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    private static void interruptByThreadInterrupt() {
    Thread t1 = new Thread(() -> {
    for (; ; ) {
    if (Thread.currentThread().isInterrupted()) {
    log.info("{} 暂停运行", Thread.currentThread().getName());
    break;
    }
    }
    });
    t1.start();
    new Thread(() -> {
    try {
    Thread.sleep(TimeUnit.SECONDS.toMillis(5));
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    log.info("{} 发出停止请求", Thread.currentThread().getName());
    t1.interrupt();
    }).start();
    }
源码解读
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void interrupt() {
if (this != Thread.currentThread())
checkAccess();
synchronized (blockerLock) {
Interruptible b = blocker;
if (b != null) {
interrupt0(); // 调用native方法设置中断标识位
b.interrupt(this);
return;
}
}
interrupt0();
}
private native void interrupt0();

public boolean isInterrupted() {
return isInterrupted(false); // 调用native方法判断是否中断,不清除标识位
}
private native boolean isInterrupted(boolean ClearInterrupted);

public static boolean interrupted() {
return currentThread().isInterrupted(true); // 清除标识位
}

调用interrupt方法时:

如果该线程阻塞的调用wait()wait(long) ,或wait(long, int)的方法Object类,或的join() , join(long)join(long, int)sleep(long) ,或sleep(long, int) ,这个类的方法,那么它的中断状态将被清除,并且将收到InterruptedException

中断不活动的线程不会产生任何影响。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private static void interruptSleep() {
Thread t1 = new Thread(() -> {
while (true) {
if (Thread.currentThread().isInterrupted()) {
log.info("{} 中断", Thread.currentThread().getName());
break;
}
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(5));
} catch (InterruptedException e) {
log.error("发生 InterruptedException");
// 此处需手动终止循环或者调用线程中断,否则就死循环了
// 因为发生InterruptedException时线程的中断状态会被清除,即标识位变为false
Thread.currentThread().interrupt();
}
}
});
t1.start();
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(2));
} catch (InterruptedException e) {
e.printStackTrace();
}
t1.interrupt();
}

LockSupport

LockSupport用于创建锁和其他同步类的基本线程阻塞原语;

该类与使用它的每个线程关联一个许可证(在Semaphore类的意义上)。 如果许可证可用,将立即返回park ,并在此过程中消费; 否则可能会阻止。 如果尚未提供许可,则致电unpark获得许可。 (与Semaphores不同,许可证不会累积。最多只有一个,即一个park对应一个unpark,不能在调用park后且调用unpark前再次调用park。)

主要有parkunpark方法,作用分别是阻塞线程和解除阻塞线程

线程等待和唤醒方式
  • 使用Objectwait方法让线程等待,使用notify方法唤醒线程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    private static void waitNotify() {
    Object obj = new Object();
    new Thread(() -> {
    synchronized (obj) {
    try {
    obj.wait();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    log.info("{} 被唤醒", Thread.currentThread().getName());
    }
    }).start();
    try {
    Thread.sleep(TimeUnit.SECONDS.toMillis(2));
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    new Thread(() -> {
    synchronized (obj) {
    obj.notify();
    }
    }).start();
    }
    • obj对象必须是有锁的才能调用wait和notify方法
    • wait和notify方法的调用顺序必须是先wait后notify
  • 使用JUC包中的Conditionawait方法让线程等待,使用signal方法唤醒线程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    private static void awaitSignal() {
    Lock lock = new ReentrantLock();
    Condition condition = lock.newCondition();
    new Thread(() -> {
    lock.lock();
    try {
    condition.await();
    log.info("{} 被唤醒", Thread.currentThread().getName());
    } catch (InterruptedException e) {
    e.printStackTrace();
    } finally {
    lock.unlock();
    }
    }).start();
    try {
    Thread.sleep(TimeUnit.SECONDS.toMillis(2));
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    new Thread(() -> {
    lock.lock();
    try {
    condition.signal();
    } finally {
    lock.unlock();
    }
    }).start();
    }
    • 必须要先获得锁才能调用await和signal方法
    • await和signal方法的调用顺序必须是先await后signal
  • 使用LockSupportpark方法让线程等待,使用unpark方法唤醒线程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    private static void parkUnPark() {
    Thread t1 = new Thread(() -> {
    LockSupport.park();
    log.info("{} 被唤醒", Thread.currentThread().getName());
    });
    t1.start();
    try {
    Thread.sleep(TimeUnit.SECONDS.toMillis(2));
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    new Thread(() -> {
    LockSupport.unpark(t1);
    }).start();
    }
    • 方法调用没有锁的要求
    • 可以先唤醒后等待(先把许可证发给线程,线程中调用park后不会阻塞,直接通过)
    • 底层是调用Unsafe类的方法

Java内存模型JMM

Java内存模型(JMM,Java Memory Model)与JVM(Java虚拟机)不是一个概念!

CPU的运行并不是直接操作内存,而是先把内存里的数据读到CPU的缓存(二级或三级缓存),而内存的读写操作时就会造成不一致的问题。JVM规范中试图定义一种Java内存模型来屏蔽掉各种硬件和操作系统的内存访问差异,以实现让Java程序在各种平台下都能达到一致的内存访问效果。

JMM是一种抽象的概念,仅仅描述一组约定或规范,这组规范定义了程序中各个变量的读写访问方式,并决定一个线程对共享变量的写入时机以及何如对另一个线程可见,关键技术点都是围绕多线程的原子性、可见性和有序性展开的。

三大特性

  • 原子性:一个操作不能被打断,要么全部执行完毕,要么不执行

  • 可见性:指当一个线程修改某一个变量时,其他线程是否能够立即知道变更

    • JMM规定了所有的变量都存储在主内存
    • 线程不能直接修改主内存中的变量,线程修改变量时会先创建一个主内存中变量的副本保存到线程自己的工作内存中,其他线程无法访问本线程工作内存中的变量副本,线程间变量的传递都需要通过主内存来完成
    • 除了volatile关键字能实现可见性之外,还有synchronized,Lock,final也是可以的
  • 有序性:一般习惯性认为代码总是从上到下有序执行,但其他底层为了提升性能,编译器和处理器通常会对指令序列进行重新排序(指令重排)。Java规范规定JVM线程内部维持顺序化语义,即只要程序的最终结果与它顺序执行的结果相等,那么指令的执行顺序可以与代码顺序不一致,此过程叫指令重排。

    • 优点是能够使机器指令更符合CPU的执行特性,最大限度的发挥机器性能
    • 缺点是指令重排可以保证串行语义一致,但无法保证多线程间的语义也一致,可能导致线程脏读出现

happens-before

happens-before即先行发生原则。如果一个操作执行的结果需要对另一个操作可见或代码重排序,那么这两个操作之间必须存在happens-before原则。

总原则
  • 若一个操作先行发生于另一个操作,那么第一操作的执行结果将对第二个操作可见,且第一个操作的执行顺序排在第二个操作之前
  • 两个操作之间存在happens-before关系,并不意味着一定要按照happens-before原则制定的顺序来执行;若重排序之后的执行结果与按照happens-before关系来执行的结果一致,那么这种重排序并不非法
八条规则
  • 次序规则:在一个线程内,按照代码顺序,写在前面的操作先行发生于写在后面的操作,也就是说前面一步操作的结果能被后续操作获取
  • 锁定规则:一个unlock操作先行发生于后面(时间上的先后)对同一个锁的lock操作
  • volatile变量规则:对一个volatile变量的写操作先行发生于后面(时间上的先后)对这个变量的读操作,也就是说前面的写对后面的读是可见的
  • 传递规则:若操作A先行发生于操作B,操作B先行发生于操作C,则可以得出操作A先行发生于操作C
  • 线程启动规则:Thread对象的start方法先行发生于此现场的每一个动作
  • 线程中断规则:对线程interrupt方法的调用先行发生于被中断线程的代码检测到中断事件的发生,也就是说先设置中断标识位,才能检测到中断事件
  • 线程终止规则:线程中的所有操作都先行发生于对此线程的终止检测,可以通过isAlive等方法检测线程是否已经终止执行
  • 对象终结规则:一个对象的初始化完成(构造函数执行结束)先行发生于它的finalize方法的开始

volatile与JMM

被volatile修饰的变量有2大特点,可见性有序性(禁止指令重排)

volatile的内存语义:

  • 当对一个volatile变量进行写操作时,JMM会把该线程对应的本地内存中的共享变量值立即刷新回主内存
  • 当对一个volatile变量进行读操作时,JMM会把该线程对应的本地内存中的共享变量值设置为无效,重新回到主内存中读取最新的共享变量值
  • 总结:写操作立即刷新到主内存,读操作是直接从主内存读取

volatile依靠内存屏障保证可见性和有序性

内存屏障

内存屏障是一类同步屏障指令,是CPU或编译器在对内存随机访问的操作中的一个同步点,使得此点之前的所有读写操作都执行后才可以开始执行此点之后的操作,避免代码重排序。JMM的重排序规则会要求Java编译器在生成JVM指令时插入特定的内存屏障指令,通过这些内存屏障指令,volatile实现了JMM中的可见性和有序性(禁重排),但是volatile无法保障原子性

内存屏障之前的所有写操作都要回写到主内存

内存屏障之后的所有读操作都能获得内存屏障之前的所有写操作的最新结果(实现了可见性)

内存屏障分类

粗分为2种:

  • 读屏障(load barrier):在读指令之前插入读屏障,让工作内存中的缓存数据失效,重新回到主内存中获取最新数据
  • 写屏障(store barrier):在写指令之后插入写屏障,强制把工作内存(缓冲区)中的数据写回到主内存中

Unsafe类中关于屏障的代码:

1
2
3
4
5
6
// 读屏障
public native void loadFence();
// 写屏障
public native void storeFence();
// 全屏障,即读写屏障
public native void fullFence();

对应jdk源码中的Unsafe.java -> unsafe.cpp -> orderAccess.hpp

细分为4种:

屏障类型 指令示例 说明
LoadLoad Load1;LoadLoad;Load2 保证load1的读取操作在load2及后续读取操作之前执行
StoreStore Store1;StoreStore;Store2 在store2及其后的写操作执行前,保证store1的写操作已刷新到主内存
LoadStore Load1;LoadStore;Store2 在store2及其后的写操作执行前,保证load1的读操作已读取结束
StoreLoad Store1;StoreLoad;Load2 在store1的写操作已刷新到主内存之后,load2及其后的读操作才能执行

volatile变量规则

第一个操作 第二个操作:普通读写 第二个操作:volatile读 第二个操作:volatile写
普通读写 可以重排 可以重排 不可以重排
volatile读 不可以重排 不可以重排 不可以重排
volatile写 可以重排 不可以重排 不可以重排
  • volatile读之后的操作,都禁止重排序到volatile之前
  • volatile写之前的操作,都禁止重排序到volatile之后
  • volatile写之后volatile读,禁止重排序

volatile读插入内存屏障

读屏障:

  • 在volatile读操作后面插入一个LoadLoad屏障,禁止将此volatile读与后面的普通读重排序

  • 在volatile读操作后面插入一个LoadStore屏障,禁止将此volatile读与后面的普通写重排序

写屏障:

  • 在每个volatile写操作的前面插入一个StoreStore屏障,保证在volatile写之前,其前面锁头普通写操作已经刷新到主内存中
  • 在每个volatile写操作的后面插入一个StoreLoad屏障,避免volatile写与后面可能有的volatile读/写操作重排序

volatile变量读写过程

Java内存模型中定义的8种工作内存与主内存之间的原子操作:

read(读取) -> load(加载) -> use(使用)-> assign(赋值) -> store(存储) -> write(写入) -> lock(锁定)-> unlock(解锁)

  • read:作用于主内存,将变量的值从主内存传输到工作内存,主内存到工作内存

  • load:作用于工作内存,将read从主内存传输的变量值放入工作内存变量副本中,即数据加载

  • use:作用于工作内存,将工作内存变量副本的值传递给执行引擎,每当JVM遇到需要该变量的字节码指令时会执行该操作

  • assign:作用于工作内存,将从执行引擎接收到的值赋值给工作内存变量,每当JVM遇到一个给变量赋值字节码指令时会执行该操作

  • store:作用于工作内存,将赋值完毕的工作变量的值写回给主内存

  • write:作用于主内存,将store传输过来的变量值赋值给主内存中的变量

由于上述只能保证单条指令的原子性,针对多条指令的组合性原子保证,没有大面积加锁,所以,JVM提供了另外两个原子指令:

  • lock:作用于主内存,将一个变量标记为一个线程独占的状态,只是写时候加锁,就只是锁了写变量的过程。

  • unlock:作用于主内存,把一个处于锁定状态的变量释放,然后才能被其他线程占用

volatile的最佳实践

volatile变量不适合参与到依赖当前值的运算,如i=i+1,;i++等

依靠volatile的可见性特点,通常volatile用于保存某个状态的boolean值或int值

当volatile变量不符合以下两条时,需要加锁来保证原子性:

  • 运算结果并不依赖变量的当前值,或者能够确保只有单一的线程修改变量的值
  • 变量不需要与其他的状态变量共同参与不变约束

最佳实践:

  • 单一赋值,但复合运算不可以(i++之类)
  • 状态标志,判断业务是否结束
  • 开销较低的读(读保证可见性不加锁,写需要加锁来保证原子性)
  • DCL双重校验锁(Double Check Lock)

CAS

CAS(Compare And Swap),实现并发算法时常用的一种技术

它包含三个操作数:内存位置、预期原值和更新值

在执行CAS操作时,将内存位置的值与预期原值比较:若相匹配,处理器将该内存位置的值更新为新值;若不匹配,处理将不做任何操作或重试(这种重试行为被称为自旋),多个线程同时执行CAS操作只有一个会成功

硬件级别保证

CAS是JDK提供的非阻塞原子性操作,它通过硬件保证了比较-更新的原子性

CAS是一条CPU的原子指令(cmpxchg),不会造成数据不一致问题,Unsafe提供的CAS方法(compareAndSwapInt等)底层实现即为CPU指令cmpxchg。CAS的原子性是CPU实现独占的,比起synchronized重量级锁,CAS的排他时间要短很多,所以在多线程情况下性能会比较好。

Unsafe类

Unsafe是CAS的核心类,由于Java方法无法直接访问底层系统,需要通过本地(native)方法来访问,Unsafe相当于一个后门,基于该类可以直接操作特定内存的数据。Unsafe类存在于sun.misc包中,其内部方法操作可以像C的指针一样直接操作内存。

java.util.concurrent.atomic包中类的方法基本都是调用Unsafe类的方法实现的

AtomicInteger类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
// 获取value变量值在内存中的偏移地址
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}

// 使用volatile修改,保证变量的可见性
private volatile int value;

public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}

Unsafe类:

1
2
3
4
5
6
7
8
9
10
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2); // 获取内存偏移位置上的原值
// 比较并更新,若不匹配就自旋
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}

AtomicReference

java.util.concurrent.atomic包下提供了AtomicReference类,可以将我们需要的任何对象包装进去称为原子类

CAS与自旋锁

自旋锁是指尝试获取锁的线程不会立即阻塞,而是采用循环的方式取尝试获取锁,当线程发现锁被占用时,会不断判断锁的状态,直到获取,这样的好处是减少线程上下文切换的消耗,缺点是循环会消耗CPU

实现自旋锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private static AtomicBoolean locked = new AtomicBoolean(false);
private static void lock() {
while (!locked.compareAndSet(false, true)) {
// 若是上锁状态就自旋
}
log.info("{} 上锁", Thread.currentThread().getName());
}
private static void unlock() {
while (!locked.compareAndSet(true, false)) {
// 若是未上锁状态就自旋
}
log.info("{} 解锁", Thread.currentThread().getName());
}
public static void main(String[] args) {
new Thread(() -> {
lock();
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(2));
} catch (InterruptedException e) {
e.printStackTrace();
}
unlock();
}).start();
new Thread(() -> {
lock();
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(2));
} catch (InterruptedException e) {
e.printStackTrace();
}
unlock();
}).start();
}

CAS缺点

  • 循环时间长开销很大

  • 会出现ABA问题

    • 线程1从内存位置V取出A,同时线程2也从内存中取出A,并且线程2进行操作将值变为了B,然后又进行操作将V位置的数据变为了A,此时线程1进行CAS操作时发现内存中依然是A,CAS操作成功

    • 虽然线程1操作成功,但不代表这个过程没有问题

    • 要解决ABA问题可以使用版本号(version)/戳记(AtomicStampedReference)

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      private static AtomicStampedReference<Integer> reference = new AtomicStampedReference<>(100, 0);
      public static void main(String[] args) {
      new Thread(() -> {
      Integer num = reference.getReference();
      int stamp = reference.getStamp();
      try {
      Thread.sleep(TimeUnit.SECONDS.toMillis(1));
      } catch (InterruptedException e) {
      e.printStackTrace();
      }
      reference.compareAndSet(num, 101, stamp, stamp + 1);
      try {
      Thread.sleep(TimeUnit.SECONDS.toMillis(1));
      } catch (InterruptedException e) {
      e.printStackTrace();
      }
      reference.compareAndSet(101, num, stamp + 1, stamp + 2);
      }).start();
      new Thread(() -> {
      Integer num = reference.getReference();
      int stamp = reference.getStamp();
      try {
      Thread.sleep(TimeUnit.SECONDS.toMillis(3));
      } catch (InterruptedException e) {
      e.printStackTrace();
      }
      log.info("compareAndSet {}", reference.compareAndSet(num, 102, stamp, stamp + 1));
      }).start();
      }

原子操作类

分类

基本类型原子类:

  • AtomicInteger
  • AtomicBoolean
  • AtomicLong

数组类型原子类:

  • AtomicIntegerArray
  • AtomicLongArray
  • AtomicReferenceArray

引用类型原子类:

  • AtomicReference
  • AtomicStampedReference:携带版本号的引用类型原子类,可以解决ABA问题,记录修改次数
  • AtomicMarkableReference:原子更新时会更改标记位,标记是否修改过

对象属性修改原子类:

  • AtomicIntegerFieldUpdater:基于反射,对指定类的指定volatile int 字段进行原子更新

  • AtomicLongFieldUpdater:基于反射,对指定类的指定volatile long 字段进行原子更新

  • AtomicReferenceFieldUpdater:基于反射,对指定类的指定volatile 引用类型字段进行原子更新

  • 使用目的:以线程安全的方式操作非线程安全对象内的某些字段

  • 使用要求

    • 进行操作的属性需要求public volatile关键字修饰
    • 使用静态方法newUpdater创建一个更新器,并设置需要更新的类和属性
  • 示例:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    @Data
    @AllArgsConstructor
    class Increment {
    public volatile int value;
    private String name;
    }
    public static void main(String[] args) throws InterruptedException {
    AtomicIntegerFieldUpdater<Increment> updater =
    AtomicIntegerFieldUpdater.newUpdater(Increment.class, "value");
    ExecutorService threadPool = Executors.newFixedThreadPool(5);
    Increment increment = new Increment(0, "自增");
    CountDownLatch count = new CountDownLatch(5);
    for (int i = 0; i < 5; i++) {
    threadPool.submit(() -> {
    try {
    for (int j = 0; j < 10000; j++) {
    updater.incrementAndGet(increment);
    }
    } finally {
    count.countDown();
    }
    });
    }
    threadPool.shutdown();
    count.await();
    log.info("{}", increment);
    }

原子操作增强类:

  • DoubleAccumulator
  • DoubleAdder
  • LongAccumulator:使用给定的累加器函数和初始值创建新实例
  • LongAdder:创建一个初始总和为0的新加法器;当多个线程更新用于收集统计信息但不用于细粒度同步控制的目的公共和时,此类通常优于AtomicLong,在低更新争用下,两个类具有相似特征,但在高争用情况下,本类的预期吞吐量明显更高,但代价是空间消耗更多

性能比较示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Getter
class LikeClick {
private long number = 0;
private AtomicLong atomicLong = new AtomicLong();
private LongAdder longAdder = new LongAdder();
private LongAccumulator longAccumulator = new LongAccumulator((a, b) -> a + b, 0);
public synchronized void clickBySynchronized() {
number++;
}
public void clickByAtomicLong() {
atomicLong.getAndIncrement();
}
public void clickByLongAdder() {
longAdder.increment();
}
public void clickByLongAccumulator() {
longAccumulator.accumulate(1);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
LikeClick likeClick = new LikeClick();
int threadNum = 50;
int clickNum = 1000000;
ExecutorService threadPool = Executors.newFixedThreadPool(50);
TimeInterval timer = DateUtil.timer();
CountDownLatch count1 = new CountDownLatch(threadNum);
for (int i = 0; i < 50; i++) {
threadPool.submit(() -> {
try {
for (int j = 0; j < clickNum; j++) {
likeClick.clickBySynchronized();
}
} finally {
count1.countDown();
}
});
}
count1.await();
log.info("clickBySynchronized 结果:{} 耗时{}ms", likeClick.getNumber(), timer.intervalRestart());
CountDownLatch count2 = new CountDownLatch(threadNum);
for (int i = 0; i < 50; i++) {
threadPool.submit(() -> {
try {
for (int j = 0; j < clickNum; j++) {
likeClick.clickByAtomicLong();
}
} finally {
count2.countDown();
}
});
}
count2.await();
log.info("clickByAtomicLong 结果:{} 耗时{}ms", likeClick.getAtomicLong(), timer.intervalRestart());
CountDownLatch count3 = new CountDownLatch(threadNum);
for (int i = 0; i < 50; i++) {
threadPool.submit(() -> {
try {
for (int j = 0; j < clickNum; j++) {
likeClick.clickByLongAdder();
}
} finally {
count3.countDown();
}
});
}
count3.await();
log.info("clickByLongAdder 结果:{} 耗时{}ms", likeClick.getLongAdder(), timer.intervalRestart());
CountDownLatch count4 = new CountDownLatch(threadNum);
for (int i = 0; i < 50; i++) {
threadPool.submit(() -> {
try {
for (int j = 0; j < clickNum; j++) {
likeClick.clickByLongAccumulator();
}
} finally {
count4.countDown();
}
});
}
count4.await();
log.info("clickByLongAccumulator 结果:{} 耗时{}ms", likeClick.getLongAccumulator(), timer.intervalRestart());
threadPool.shutdown();

结果:

LongAdder类原理解析

继承体系图:

Striped64类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
abstract class Striped64 extends Number {

/** Number of CPUS, to place bound on table size CPU数量,cells数组的最大长度 */
static final int NCPU = Runtime.getRuntime().availableProcessors();

/**
* Table of cells. When non-null, size is a power of 2.
* cells数组,长度是2的幂
*/
transient volatile Cell[] cells;

/**
* Base value, used mainly when there is no contention, but also as
* a fallback during table initialization races. Updated via CAS.
* 基础value值,当并发较低时,只累加该值,主要用于没有竞争的情况,通过cas更新
*/
transient volatile long base;

/**
* Spinlock (locked via CAS) used when resizing and/or creating Cells.
* 在调整单元格大小和/或创建单元格时使用自旋锁(通过 CAS 锁定)
*/
transient volatile int cellsBusy;
}

Cell内部类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}

LongAdder的基本思路就是分散热点,将value值分散个一个Cell数组中,不同线程会命中到数据的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,如此热点就被分散了,冲突的概率就会减小很多。当获取真正的long值时,将各个槽的变量值累加返回。

基本原理

LongAdder在无竞争的情况下和AtomicLong一样,对base变量进行cas操作。当出现竞争关系时采用分散热点的方式,用空间换时间,用一个cells数组,将一个value拆分进这个数组。多线程需要同时对value进行操作时,可以对线程id进行hash得到hash值,再根据hash值映射到这个cells数组的某个下标,再对该下标锁对应的值进行自增操作。当所有线程操作完毕,将cells数组的所有值和base都加起来作为最终结果。

源码解析
  • add方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    public void add(long x) {
    // as为cells
    // b为base值
    // v为期望值
    // m为cells数组长度
    // a为当前线程对应cell
    Cell[] as; long b, v; int m; Cell a;
    // 进入if语句的条件:cells不为空或casBase操作失败(说明出现了竞争情况)
    if ((as = cells) != null || !casBase(b = base, b + x)) {
    // 判断是否出现冲突的标识符,出现冲突后可能要扩容
    boolean uncontended = true;
    // 进入if语句调用longAccumulate的条件
    // cells为空,创建一个长度为2的cells数组
    // 数组长度小于0,不太可能出现
    // 通过getProbe方法获取线程hash值,再计算得到线程对应cells数组的下标,获取线程对应数组中的cell,cell为空,说明当前线程还没有更新过cell,需要初始化cell
    // cell中进行cas操作出现冲突,可能需要扩容
    if (as == null || (m = as.length - 1) < 0 ||
    (a = as[getProbe() & m]) == null ||
    !(uncontended = a.cas(v = a.value, v + x)))
    // 调用Striped64中的longAccumulate方法
    longAccumulate(x, null, uncontended);
    }
    }
  • longAccumulate方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    final void longAccumulate(long x, LongBinaryOperator fn,
    boolean wasUncontended) {
    int h;
    // 随机数未初始化
    if ((h = getProbe()) == 0) {
    // 强制初始化随机数
    ThreadLocalRandom.current(); // force initialization
    // 获取当前线程hash值
    h = getProbe();
    wasUncontended = true;
    }
    // 是否需要扩容的标志
    boolean collide = false; // True if last slot nonempty
    for (;;) {
    Cell[] as; Cell a; int n; long v;
    // cells数组已被初始化
    if ((as = cells) != null && (n = as.length) > 0) {
    // 线程对应数组中的cell,cell为空,说明当前线程还没有更新过cell,需要初始化cell
    if ((a = as[(n - 1) & h]) == null) {
    if (cellsBusy == 0) { // Try to attach new Cell
    Cell r = new Cell(x); // Optimistically create
    // 没有加锁并尝试加锁
    if (cellsBusy == 0 && casCellsBusy()) {
    boolean created = false;
    try { // Recheck under lock
    Cell[] rs; int m, j;
    if ((rs = cells) != null &&
    (m = rs.length) > 0 &&
    rs[j = (m - 1) & h] == null) {
    // 将刚才new的cell放到数组对应下标
    rs[j] = r;
    created = true;
    }
    } finally {
    cellsBusy = 0;
    }
    if (created)
    break;
    continue; // Slot is now non-empty
    }
    }
    collide = false;
    }
    else if (!wasUncontended) // CAS already known to fail
    // 标记为没有竞争,重新获取hash值
    wasUncontended = true; // Continue after rehash
    // 调用cell的cas方法进行累加
    else if (a.cas(v = a.value, ((fn == null) ? v + x :
    fn.applyAsLong(v, x))))
    break;
    // 当前数组长度大于等于CPU数时不再扩容
    else if (n >= NCPU || cells != as)
    collide = false; // At max size or stale
    else if (!collide)
    // 设置为允许扩容
    collide = true;
    // 没有加锁并尝试加锁
    else if (cellsBusy == 0 && casCellsBusy()) {
    try {
    if (cells == as) { // Expand table unless stale
    // 左移1位,即扩容为原长度的2倍
    Cell[] rs = new Cell[n << 1];
    for (int i = 0; i < n; ++i)
    rs[i] = as[i];
    cells = rs;
    }
    } finally {
    cellsBusy = 0;
    }
    collide = false;
    continue; // Retry with expanded table
    }
    // 重新计算hash值
    h = advanceProbe(h);
    }
    // cells没有加锁且没有初始化,尝试对它加锁,并初始化cells数组
    else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
    boolean init = false;
    try { // Initialize table
    if (cells == as) {
    Cell[] rs = new Cell[2]; // 创建一个长度为2的数组
    rs[h & 1] = new Cell(x); // 将值放到线程对应下标位置
    cells = rs;
    init = true;
    }
    } finally {
    cellsBusy = 0;
    }
    if (init)
    break;
    }
    // cells正在进行初始化,则尝试直接在base上进行累加
    else if (casBase(v = base, ((fn == null) ? v + x :
    fn.applyAsLong(v, x))))
    break; // Fall back on using base
    }
    }

  • sum方法,只保证最终一致性

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    public long sum() {
    Cell[] as = cells; Cell a;
    long sum = base;
    if (as != null) {
    for (int i = 0; i < as.length; ++i) {
    if ((a = as[i]) != null)
    sum += a.value;
    }
    }
    return sum;
    }

ThreadLocal

该类提供线程局部变量。 这些变量与它们的正常对应物的不同之处在于,访问其中的每个线程(通过其get或set方法)具有其自己的,独立初始化的变量副本。 ThreadLocal实例通常是希望将状态与线程相关联的类中的私有静态字段(例如,用户ID或事务ID)。

由于每个线程都有自己的变量副本,所以避免了线程安全问题。

在线程使用完成后最好调用ThreadLocal对象的remove方法,去删除此线程局部变量的当前线程值,避免影响后续业务逻辑和造成内存泄露等问题

1
2
3
4
5
6
objectThreadLocal.set(userInfo);
try {
// ...
} finally {
objectThreadLocal.remove();
}

Thread、ThreadLocal、ThreadLocalMap关系

Thread类中包含ThreadLocal.ThreadLocalMap变量

ThreadLocalMap类是ThreadLocal类的静态内部类

ThreadLocalMap实际上就是一个以ThreadLocal实例为key,任意对象为value的Entry对象

源码分析

ThreadLocal的get方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public T get() {
// 获取当前线程
Thread t = Thread.currentThread();
// 获取线程的threadLocals变量
ThreadLocalMap map = getMap(t);
if (map != null) {
// 获取map中的值
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
// 未获取到时设置初始值
return setInitialValue();
}

ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}

private T setInitialValue() {
T value = initialValue();
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
return value;
}
ThreadLocalMap类
1
2
3
4
5
6
7
8
9
10
static class ThreadLocalMap {
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;
Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
}

ThreadLocalMap是以ThreadLocal为键的map,不过ThreadLocal是经过两层包装的

第一层使用WeakReference<ThreadLocal<?>>将ThreadLocal对象变为一个弱引用对象

第二层是定义了一个Entry类来拓展WeakReference<ThreadLocal<?>>

各种引用

  • Reference:强引用

    • 垃圾回收永远不会强引用的对象

    • 把一个对象赋给一个引用变量,这个引用变量就是一个强引用

    • 强引用是造成Java内存泄露的主要原因之一

    • 当一个对象不再使用,可以将引用赋值为null

      1
      2
      3
      4
      5
      MyObj myObj = new MyObj();
      log.info("before gc {}", myObj);
      myObj = null;
      System.gc();
      log.info("after gc {}", myObj);
  • SoftReference:软引用

    • 当系统内存充足时它不会被回收

    • 当系统内存不足时它会被回收

    • 需要用SoftReference类来实现

    • 软引用通常用在对内存敏感的程序中,如高速缓存就用到软引用

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      // 添加JVM参数 -Xmx10m -Xms10m
      SoftReference<MyObj> softReference = new SoftReference<>(new MyObj());
      System.gc();
      log.info("after gc {}", softReference.get());
      try {
      byte[] data = new byte[20 * 1024 * 1024]; // 20M对象
      } catch (Exception e){
      e.printStackTrace();
      } finally {
      log.info("after gc {}", softReference.get());
      }
  • WeakReference:弱引用

    • 用WeakReference类来实现,它比软引用生命周期更短

    • 对于弱引用的对象来说,只要一执行垃圾回收,不管JVM内存是否足够,都会回收该对象

      1
      2
      3
      4
      WeakReference<MyObj> weakReference = new WeakReference<>(new MyObj());
      log.info("before gc {}", weakReference.get());
      System.gc();
      log.info("after gc {}", weakReference.get());
  • PhantomReference:虚引用

    • 虚引用必须和ReferenceQueue(引用队列)联合使用

    • 若对象仅持有虚引用,那么它就和没有任何引用一样,在任何时候都可能被垃圾回收器回收

    • get方法总是返回null

    • 虚引用的主要作用是跟踪对象被垃圾回收的状态,仅仅提供了一种确保对象被finalize以后,做某些事情的通知机制

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      MyObj myObj = new MyObj();
      ReferenceQueue<MyObj> referenceQueue = new ReferenceQueue<>();
      PhantomReference<MyObj> phantomReference = new PhantomReference<>(myObj, referenceQueue);
      log.info("phantomReference.get() {}", phantomReference.get());
      List<byte[]> data = new ArrayList<>();
      new Thread(() -> {
      while (true) {
      data.add(new byte[1 * 1024 * 1024]);
      try {
      Thread.sleep(TimeUnit.SECONDS.toMillis(1));
      } catch (InterruptedException e) {
      e.printStackTrace();
      }
      log.info("add 1m to data, phantomReference.get() {}", phantomReference.get());
      }
      }).start();
      new Thread(() -> {
      while (true) {
      Reference<? extends MyObj> reference = referenceQueue.poll();
      if (reference != null) {
      log.info("虚对象被回收加入了引用队列 {}", reference.get());
      break;
      }
      }
      }).start();
  • 软引用和弱引用的适用场景

    • 图片缓存:程序需要读取大量本地图片,若每次都从硬盘读取会严重影响性能,若一次性全部加载到内存又可能造成内存溢出,此时可以用软引用来解决这个问题
      • 用一个Map保存图片路径和相应图片对象关联的软引用之间的映射关系,在内存不足时,JVM会自动回收这些缓存图片对象所占用的空间
      • Map<String, SoftReference<BufferedImage>> imgCache = new HashMap<>();
为什么用弱引用作为键

若键的引用是强引用,就会导致键指向的ThreadLocal对象以及值指向的对象不能被垃圾回收,造成内存泄漏

若键的引用是弱引用就大概率会减少内存泄漏的问题

垃圾回收后键的引用会指向null,这是就会出现一个键为null的Entry,并且无法访问到这个键对应的值,若当前线程迟迟不结束的话,这些键为null的Entry就会一直存在一条强引用链,无法回收,造成内存泄漏;若线程运行结束,就没有引用链可达了,在垃圾回收时就会被回收

在实际开发中,会使用线程池去复用线程,这样线程是不会结束的;可见弱引用不能100%保证内存不泄露,所以在不使用某个ThreadLocal对象后,手动调用remove方法来删除它

ThreadLocalMap的expungeStaleEntry方法

当键为null时调用set、get、remove方法最后都会调用expungeStaleEntry方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private int expungeStaleEntry(int staleSlot) {
Entry[] tab = table;
int len = tab.length;
// expunge entry at staleSlot
tab[staleSlot].value = null;
tab[staleSlot] = null;
size--;
// Rehash until we encounter null
Entry e;
int i;
for (i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
if (k == null) {
// 当键的引用为null时,将值的引用指向null,方便垃圾回收
e.value = null;
tab[i] = null;
size--;
} else {
int h = k.threadLocalHashCode & (len - 1);
if (h != i) {
tab[i] = null;
// Unlike Knuth 6.4 Algorithm R, we must scan until
// null because multiple entries could have been stale.
while (tab[h] != null)
h = nextIndex(h, len);
tab[h] = e;
}
}
}
return i;
}

最佳实践

  • 使用 ThreadLocal.withInitial(Supplier s)方式给一个初始值,防止出现NPE
  • 建议将ThreadLocal变量用static修饰
  • 使用完成手动调用remove,避免内存泄漏

Java内存对象布局和对象头

创建对象的过程

1
Object o = new Object();
  • Object.class在方法区
  • o引用在栈
  • new Object()在堆

在HotSpot虚拟机中,对象在堆内存中存储布局可以分为三个部分:

  • 对象头(Header)
  • 实例数据(Instance Data)
  • 对齐填充(Padding)

对象头

  • 对象头由对象标记(Mark Word)、类元信息(又叫类型指针)构成
  • 对象标记包含以下信息
    • 哈希码
    • GC标记
    • GC次数
    • 同步锁标记
    • 偏向锁持有者
  • 类元信息存储的是指向该对象类元数据的首地址(指向方法区中的地址)
对象标记
存储内容 标志位 状态
对象哈希码、对象分代年龄 01 未锁定
指向锁记录的指针 00 轻量级锁定
指向重量级锁的指针 10 膨胀(重量级锁定)
空,不需要记录信息 11 GC标记
偏向线程ID、偏向时间戳、对象分代年龄 01 可偏向

在64位系统中,对象标记占了8个字节,类型指针占了8个字节,对象头一共16字节

对象标记用于存储对象的hashcode、分代年龄和锁标志位等信息

这些信息都是与对象自身定义无关的数据,所以对象标记被设计成一个非固定数据结构的以便在极小的空间内存储尽量多的数据,它会根据对象的状态复用自己的存储空间,即在运行期间对象标记里存储的数据会随着锁标志位的变化而变化

类型指针

类型指针存储的是指向该对象类元数据的首地址

类型指针占了8个字节

实例数据

存储类的属性(Field)信息,包括父类的属性信息

对齐填充

虚拟机要求对象起始地址必须是8字节的整数倍,填充数据不是必须存在的,仅仅是为了字节对齐

1
2
3
4
class Custom { // 对象头16字节(忽略压缩指针的影响)+4字节+1字节=21字节,对齐填充到24字节
int id; // 4字节
boolean flag; // 1字节
}

使用JOL分析对象在JVM中的大小和布局

pom中引入依赖

1
2
3
4
5
<dependency>
<groupId>org.openjdk.jol</groupId>
<artifactId>jol-core</artifactId>
<version>0.9</version>
</dependency>

使用

1
2
3
4
5
6
7
8
// VM的细节详细情况
// System.out.println(VM.current().details());
// 所有对象分配的字节都是8的整数倍
// System.out.println(VM.current().objectAlignment());

// 对象的内存布局
Object o = new Object();
System.out.println(ClassLayout.parseInstance(o).toPrintable());

1
2
Custom c = new Custom();
System.out.println(ClassLayout.parseInstance(c).toPrintable());

类型指针为4字节与预期不一致是因为JVM默认开启了压缩指针

加上-XX:+PrintCommandLineFlags参数查看JVM默认添加的参数:

1
-XX:InitialHeapSize=267278208 -XX:MaxHeapSize=4276451328 -XX:+PrintCommandLineFlags -XX:+UseCompressedClassPointers -XX:+UseCompressedOops -XX:-UseLargePagesIndividualAllocation -XX:+UseParallelGC

其中-XX:+UseCompressedClassPointers为开启了压缩指针,+为开启,-为关闭

启动时加上-XX:-UseCompressedClassPointers参数关闭压缩指针

Synchronized与锁升级

锁的升级过程

用锁能实现数据的安全性,但是会带来性能下降

无锁能够基于线程并行提升程序性能,但会带来安全性下降

锁的升级顺序:

flowchart LR
	node1(无锁) --> node2(偏向锁) --> node3(轻量级锁) --> node4(重量级锁)

根据对象标记中标识位的不同表示不同锁

锁类型 偏向锁位 锁标志位
无锁 0 01
偏向锁 1 01
轻量级锁 00
重量级锁 10

对象标记中除了标识位还记录的内容:

  • 偏向锁:存储的是偏向的线程ID
  • 轻量级锁:存储的是指向线程栈中Lock Record的指针
  • 重量级锁:存储的是指向堆中的监视器对象的指针

Java5之前,使用synchronized这个重量级锁,当竞争激烈时,性能会下降。若要阻塞或唤醒一个线程就需要操作系统介入,需要在用户态和内核态之间频繁切换,这种切换会消耗大量系统资源。

在Java早期版本中,synchronized属于重量级锁,效率低,因为监视器锁是依赖底层操作系统的系统互斥量(Mutex Lock)来实现的,线程的挂起和恢复都需要转入内核态去完成。

Java6之后,为了减少获得锁和释放锁所带来的性能消耗,引入了轻量级锁和偏向锁

无锁

1
2
3
4
5
6
Object o = new Object();
// 对象没有调用前没有生成hashcode
System.out.println("10进制hashcode: " + o.hashCode());
System.out.println("16进制hashcode: " + Integer.toHexString(o.hashCode()));
System.out.println("2进制hashcode: " + Integer.toBinaryString(o.hashCode()));
System.out.println(ClassLayout.parseInstance(o).toPrintable());

偏向锁

单线程竞争

当线程第一次竞争到锁时,通过操作修改对象标记中的偏向线程ID、偏向模式

若不存在其他线程竞争,那么持有偏向锁的线程将永远不需要进行同步

即当一段同步代码一直被同一个线程多次访问后,由于只有一个线程那么该线程在后续访问时便会自动获得偏向锁

也就是说锁总是被第一个占用它的线程拥有,这个线程就是锁的偏向线程

流程分析

在锁第一次被持有时记录下偏向线程ID,偏向线程就一直持有着锁(后续这个线程进入或退出同步锁的代码块时,不再需要加锁和释放锁操作,而是直接去检查锁的对象标记中是不是存放着自己的线程ID)

判断偏向锁的对象标记中的线程ID与当前线程ID相等时,表示当前线程是偏向锁的偏向线程,直接进入同步,无需每次加锁解锁都去CAS更新对象头,若始终只有一个线程使用锁,很明显偏向锁几乎没有额外开销,性能极高

当不相等时,表示发生了竞争,会尝试使用CAS来替换对象标记里的线程ID为当前线程ID

若竞争成功,对象标记里的线程ID更新为当前线程ID,锁不会升级,依然是偏向锁,只是偏向线程变更了

若竞争失败,这时可能需要升级为轻量锁,才能保证线程间公平竞争锁

偏向锁相关VM参数

使用命令java -XX:+PrintFlagsInitial | grep BiasedLock* 查看偏向锁相关参数

代码示例

添加VM参数-XX:BiasedLockingStartupDelay=0,将偏向锁启动延时时间设置为0

也可以不加这个参数,可以在代码里面手动延时超过4s

1
2
3
4
Object o = new Object();
synchronized (o) {
System.out.println(ClassLayout.parseInstance(o).toPrintable());
}

偏向锁的撤销

偏向锁是一种等到竞争出现才会释放锁的机制

撤销需要等待全局安全点(该时间点上没有字节码正在执行),同时检查持有偏向锁的线程是否还在执行

若线程正在执行(处于同步代码块),它没执行完,其他线程来竞争,该偏向锁会被撤销并出现锁升级,此时轻量级锁由原持有偏向锁的线程持有,继续执行其同步代码块,而正在竞争的线程会进入自旋等待获得该轻量级锁

若线程执行完成(退出同步代码块),则将对象标记设置为无锁状态并撤销偏向锁,重新偏向

轻量级锁

多线程竞争,但是任意时刻最多只有一个线程竞争,即不存在锁竞争太过激烈的情况,也就是没有线程阻塞

本质就是自旋锁CAS

轻量级锁的加锁

JVM为每个线程在当前线程的栈帧中创建用于存储锁记录的空间,称为Displaced Mark Word

若线程获得锁时发现是轻量级锁,会把锁的对象标记复制到自己的DMW里面

然后线程尝试用CAS将锁的对象标记替换为指向锁记录的指针,若成功,则当前线程获得锁,若失败,说明在与其他线程竞争锁,当前线程就尝试使用自旋来获取锁

自旋CAS操作,不断尝试去获取锁,能不升级锁就不升级,尽量不要阻塞

轻量级锁的释放

在释放锁时,线程会使用CAS操作将Displaced Mark Word的内容复制回锁的对象标记里

若没有发生竞争,那么复制操作成功

若有其他线程因为自旋多次导致轻量级锁升级成了重量级锁,那么CAS操作会失败,此时会释放锁并唤醒被阻塞的线程

代码示例

添加VM参数-XX:-UseBiasedLocking,关闭偏向锁

1
2
3
4
5
6
Object o = new Object();
new Thread(() -> {
synchronized (o) {
System.out.println(ClassLayout.parseInstance(o).toPrintable());
}
}).start();
自旋达到一定次数

会升级为重量级锁

  • Java6之前
    • 默认情况下自旋的次数超过10次(可以通过-XX:PreBlockSpin=10来修改)
    • 或者自旋线程数超过CPU核数一半
  • Java6之后
    • 自适应自旋锁(自旋的次数不固定)
    • 若自旋后竞争成功了,那么下次自旋的最大次数会增加
    • 若很少会自旋成功,那么下次会减少自旋的次数甚至不自旋,避免CPU空转
与偏向锁的区别
  • 偏向锁没有竞争
  • 竞争轻量级锁失败时,自旋尝试抢占锁
  • 轻量级锁每次退出同步块是都需要释放锁,而偏向锁是在竞争发生时才释放锁

重量级锁

通过monitorenter、monitorexit等指令来操作监视器对象实现

实例代码:

1
2
3
4
5
6
7
8
9
10
11
Object o = new Object();
new Thread(() -> {
synchronized (o) {
System.out.println(ClassLayout.parseInstance(o).toPrintable());
}
}).start();
new Thread(() -> {
synchronized (o) {
System.out.println(ClassLayout.parseInstance(o).toPrintable());
}
}).start();

总结

完整流程图

其他问题

无锁状态下,对象标记中记录了hashcode,为什么锁升级后就没了?

当一个对象已经计算过一致性哈希码后,它就再也无法进入偏向锁状态了,跳过偏向锁,直接升级为轻量级锁

而当一个对象处于偏向锁状态,又收到需要计算其一致性哈希码请求时,它的偏向状态会被立即撤销,并且锁膨胀为重量级锁

升级为轻量级锁时,JVM会在当前线程的栈帧中创建一个锁记录(Lock Record)空间,用于存储锁对象的对象标记拷贝,释放锁时将这些信息写回到对象头

在重量级锁的实现中,对象头指向了重量级锁的位置,代表重量级锁的ObjectMonitor类里有字段可以记录非加锁状态下的对象标记,其中自然可以存储原来的哈希码

其他小结

synchronized锁升级过程总结:先自旋,不行再阻塞

实际上是把之前悲观锁(重量级锁)变成在一定条件下使用偏向锁以及使用轻量级锁(自旋锁CAS)的形式

适用情况:

  • 偏向锁:适用于单线程情况,不存在锁竞争时
  • 轻量级锁:适用于竞争不激烈的情况,若同步方法/同步代码块执行时间很短的话,采用轻量级锁虽然会占用CPU资源但是相对比使用重量级锁还是更高效
  • 重量级锁:适用于竞争激烈的情况,若同步方法/同步代码块执行时间长,那么使用轻量级锁带来的性能消耗就比使用重量级锁更严重,这时就需要升级为重量级锁

JIT编译器对锁的优化

JIT一般被称为即时编译器(Just In Time Compiler)

锁消除
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void m1() {
Object obj = new Object();
// 锁加的没有意义,JIT编译器会无视它
synchronized (obj) {
System.out.println(obj.hashCode());
}
}
public static void main(String[] args) {
LockClearDemo clearDemo = new LockClearDemo();
for (int i = 0; i < 10; i++) {
new Thread(() -> {
clearDemo.m1();
}).start();
}
}
锁粗化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
static Object o = new Object();
public static void main(String[] args) {
new Thread(() -> {
synchronized (o) {
System.out.println("1");
}
synchronized (o) {
System.out.println("2");
}
synchronized (o) {
System.out.println("3");
}
}).start();
}

JIT编译器会将多个首尾相连且相同锁的同步代码块合并

AQS

全称为AbstractQueuedSynchronizer,抽象的队列同步器

是用来实现锁或者其他同步器组件的公共基础部分的抽象实现

是重量级基础框架及整个JUC体系的基石,主要用于解决锁分配给谁的问题

整体就是一个抽象的FIFO队列(CLH队列的变体)来完成资源获取线程的排队工作,并通过一个volatile int类变量表示持有锁的状态

CountDownLatch、Semaphore、CyclicBarrier、ReentrantLock、ReentrantReadWriteLock等类的实现都与AQS有关

源码说明

AQS使用一个volatile int类型的成员变量来表示同步状态,通过内置的FIFO队列来完成资源获取的排队工作,将每条要去抢占资源的线程封装成一个Node节点来实现锁的分配,通过CAS完成对State值的修改

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {

static final class Node {
// 共享模式
static final Node SHARED = new Node();
// 排他模式
static final Node EXCLUSIVE = null;
/** 线程已取消 */
static final int CANCELLED = 1;
/** 后继线程需要唤醒 */
static final int SIGNAL = -1;
/** 等待条件唤醒 */
static final int CONDITION = -2;
/** 下一个acquireShared将会无条件传播下去 */
static final int PROPAGATE = -3;
// 节点的等待状态,状态有上面4种
volatile int waitStatus;
// 前驱节点
volatile Node prev;
// 后继节点
volatile Node next;
// 处于该节点的线程
volatile Thread thread;
// 指向下一个处于condition状态的节点
Node nextWaiter;
// ...
}

// 队列头结点
private transient volatile Node head;
// 队列尾结点
private transient volatile Node tail;
// 队列同步状态
private volatile int state;
// ...

}

ReentrantLock原理

ReentrantLock底层也是通过AQS实现的

1
2
3
4
5
6
7
// 使用无参构造 默认为非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
lock和acquire
flowchart LR
	lock(lock) --> acquire(acquire) --> tryAcquire(tryAcquire)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
// FairSync
final void lock() {
acquire(1);
}
// NonfairSync
final void lock() {
// 先CAS操作
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
// AQS
public final void acquire(int arg) {
// !tryAcquire 尝试获取资源失败
// addWaiter 将当前线程封装成Node节点,并添加到同步队列尾部
// acquireQueued 将获取资源失败的线程加入到同步队列中,并阻塞线程
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// FairSync
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 获取队列同步器状态
int c = getState();
if (c == 0) {
// 没有等待的节点排在当前线程节点前面
// 且CAS操作成功
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// NonfairSync
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
// Sync
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 比公平锁少了一个判断前面是否有等待节点的条件
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
tryAcquire

以非公平锁为例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// AQS
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// NonfairSync
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
// Sync
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 判断队列同步器状态是否为空
if (c == 0) {
// 竞争资源
if (compareAndSetState(0, acquires)) {
// 将同步器持有线程设置为当前线程
setExclusiveOwnerThread(current);
return true;
}
}
// 判断同步器持有线程是否为当前线程
else if (current == getExclusiveOwnerThread()) {
// 同步器状态累加
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 竞争失败
return false;
}
addWaiter
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// AQS
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
private Node addWaiter(Node mode) {
// 创建一个排他模式节点
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 前驱节点为同步器队列的队尾节点
Node pred = tail;
if (pred != null) {
// 将队尾结点设置为当前节点的前驱节点
node.prev = pred;
// 将当前节点设置为队尾结点
if (compareAndSetTail(pred, node)) {
// 将原队尾结点的后继节点设置为当前节点
pred.next = node;
return node;
}
}
// 队尾节点为空(表示队列未初始化)时执行
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
// 队尾节点引用
Node t = tail;
if (t == null) { // Must initialize
// 初始化队头节点
if (compareAndSetHead(new Node()))
// 将队尾节点页指向刚创建的队头节点
tail = head;
// 也就是说如论如何都会都有一个哨兵节点作为队头节点,只是占位,不存储任何信息
} else {
// 将队尾结点设置为当前节点的前驱节点
node.prev = t;
// 将当前节点设置为队尾结点
if (compareAndSetTail(t, node)) {
// 将原队尾结点的后继节点设置为当前节点
t.next = node;
return t;
}
}
}
}
acquireQueued
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// AQS
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取前驱节点
final Node p = node.predecessor();
// 若前驱节点为队头节点就尝试竞争
if (p == head && tryAcquire(arg)) {
// 竞争到资源 将当前节点设置为队头节点
setHead(node);
// 原队头节点出队
p.next = null; // help GC
failed = false;
return interrupted;
}
// 前驱节点不为队头节点或竞争失败
// 根据前驱节点的状态判断是否需要在竞争失败后阻塞当前线程
// 执行阻塞当前线程(LockSupport.park)并返回线程是否被中断(阻塞后被唤醒才返回)
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
// 若失败,取消竞争
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 前驱节点状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 将前驱节点的状态改为signal
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
// 线程阻塞在这
LockSupport.park(this);
// 唤醒后返回线程中断状态并清除中断标志位
return Thread.interrupted();
}
unlock和release
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public void unlock() {
sync.release(1);
}
// AQS
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
// 判断队头节点不为空且队头节点状态不为0
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
// Sync
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
// 变更同步器状态
setState(c);
return free;
}
// AQS
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
// 将节点状态变更为0
compareAndSetWaitStatus(node, ws, 0);
// 获取后继节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 唤醒后继节点
LockSupport.unpark(s.thread);
}
cancelAcquire
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private void cancelAcquire(Node node) {
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
// 跳过取消状态的前驱节点(一直往前找到第一个状态不为取消的节点作为前驱节点)
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
// 当前节点状态变更为取消状态
node.waitStatus = Node.CANCELLED;
// 若当前节点为队尾节点 将队尾节点设置为前驱节点
if (node == tail && compareAndSetTail(node, pred)) {
// 将前驱节点的后续节点设置为null
compareAndSetNext(pred, predNext, null);
} else {
int ws;
// 前驱节点不为队头节点
// 前驱节点状态为signal 不然就把前驱节点状态变更为signal
// 前驱节点持有线程不为空(不为哨兵节点)
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
// 将前驱节点的后续节点设置为当前节点的后续节点
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
总结

ReentrantLock加锁过程可以分为三个阶段:

  • 尝试加锁
  • 加锁失败,线程入队列
  • 线程入队列后,进入阻塞状态

其他锁

flowchart LR
	node1(无锁) --> node2(独占锁) --> node3(读写锁) --> node4(邮戳锁)

独占锁包括synchronized关键字以及Lock接口实现类如ReentrantLock等锁,每次都只允许一个线程进行读写操作

ReentrantReadWriteLock

可重入读写锁,一个资源能够被多个读线程访问,或者被一个写线程访问,但是不能同时存在读写线程。即读写互斥,读读共享

适用于读多写少的场景

缺点:

  • 会造成写饥饿问题
  • 会出现锁降级(写锁降级为读锁)
    • 锁降级就是将写锁降级为读锁(类似于linux中的文件读写权限,写权限高于读权限),锁的严苛程度变强叫做升级,反之叫做降级
    • 同一个线程持有了写锁,在没有释放写锁的情况下,它还可以继续获得读锁,此时就发生了锁降级,降级为了读锁;若释放了写锁,则完全转换为读锁
    • 读锁无法升级为写锁,写锁必须在读锁释放后才能获取到
    • 目的是为了让当前线程感知到数据的变化,目的是保证数据可见性(当前线程写完之后马上要读取,不希望被别的线程拿到写锁)

代码示例:

1
2
3
4
5
6
7
8
9
10
11
public static void main(String[] args) {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
Lock readLock = lock.readLock();
Lock writeLock = lock.writeLock();
writeLock.lock();
log.info("写入");
readLock.lock();
log.info("读取");
writeLock.unlock();
readLock.unlock();
}

StampedLock

邮戳锁(也叫票据锁),是JDK1.8中新增的一个读写锁,是对JDK1.5中的读写锁ReentrantReadWriteLock的优化

主要是缓解ReentrantReadWriteLock的写锁饥饿问题

StampedLock采取乐观获取锁后,其他线程尝试获取写锁时不会被阻塞

在获取乐观读锁后,还需要对结果进行校验

特点:

  • 所有获取锁的方法,都返回一个邮戳,邮戳为0表示获取失败,其余都表示成功
  • 所有释放锁的方法,都需要传入一个邮戳,这个邮戳必须是和成功获取锁时得到的邮戳一致
  • StampedLock是不可重入的(若一个线程已经持有了写锁,再去获取写锁就会造成死锁)
  • StampedLock有三种访问模式
    • Reading:悲观读模式,功能和ReentrantReadWriteLock的读锁类似
    • Writing:写模式,功能和ReentrantReadWriteLock的写锁类似
    • Optimistic Reading:乐观读模式,无锁机制,类似于数据库中的乐观锁,支持读写并发,很乐观地认为读取时没有其他线程修改,若发现被修改再实现升级为悲观读模式

代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public void tryOptimisticRead() {
long stamp = lock.tryOptimisticRead();
int num = number;
log.info("{} validate {}", Thread.currentThread().getName(), lock.validate(stamp));
for (int i = 0; i < 4; i++) {
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(1));
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("{} validate {}", Thread.currentThread().getName(), lock.validate(stamp));
}
// 校验失败,升级为悲观读,重新读取
if (!lock.validate(stamp)) {
log.info("number has bean changed");
stamp = lock.readLock();
try {
num = number;
} finally {
lock.unlockRead(stamp);
}
}
log.info("{} read {}", Thread.currentThread().getName(), num);
}

public static void main(String[] args) {
StampedLockDemo demo = new StampedLockDemo();
new Thread(() -> {
demo.tryOptimisticRead();
}).start();
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(2));
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
demo.write();
}).start();
}

输出:

使用注意事项
  • 不支持重入
  • 悲观读锁和写锁都不支持条件变量(Condition)
  • 使用StampedLock一定不要调用中断操作(interrupt)

ConcurrentHashMap

sizeCtl变量的含义

  • 0值:表示数组未初始化,且数组的初始容量为16
  • 正数:若数组未初始化,表示值为数组的初始容量,若数组已初始化,则表示数组的扩容阈值(数组的容量*0.75)
  • -1:表示数组正在进行初始化
  • 小于0且不是-1:表示数组正在扩容,-(n+1),表示此时有n个线程同时进行数组的扩容

putVal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
// 根据键对象的hashcode计算hash值
int hash = spread(key.hashCode());
int binCount = 0;
// 不满足break条件就一直循环
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 初始化数组
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// hash值与数组长度-1取模,获得对应的下标,且当前数组下标位置没有元素
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// CAS操作,将new的节点放到数组下标位置
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// 数组下标位置的节点正在扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
// 将数组下标位置的节点加锁
synchronized (f) {
// 再次判断是否是数组下标位置的元素
if (tabAt(tab, i) == f) {
// hash值大于等于0为链表节点
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
// hash值相等即hashcode相等且equals相等,返回旧值,更新为value
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
// 存在hash冲突,将节点挂到链表尾部
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 是树节点
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
// 将节点放到树结构中
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
// 链表长度大于等于8
if (binCount >= TREEIFY_THRESHOLD)
// 判断是否满足树化条件(数组长度是否大于等于64),满足就将链表结构树化
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 长度增加(逻辑与LongAdder相似,热点分散),判断是否需要扩容
addCount(1L, binCount);
return null;
}

initTable

初始化数组

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
// 若数组为null或长度为0进行初始化操作
while ((tab = table) == null || tab.length == 0) {
// 小于0表示当前正在进行初始化或者扩容操作
if ((sc = sizeCtl) < 0)
// 礼让其他线程
Thread.yield(); // lost initialization race; just spin
// CAS操作,将sizeCtl变量变更为-1状态
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
// 竞争成功,再次判断条件是否满足
if ((tab = table) == null || tab.length == 0) {
// 使用不为0的sizeCtl作为数组长度或者使用默认长度
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
// 设置阈值为0.75*当前长度
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}

addCount

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
// counterCells不为空或者CAS更新baseCount失败
// 逻辑与LongAdder的add()方法基本一致
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
// 逻辑与LongAdder的longAccumulate()方法基本一致
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
// 新baseCount值大于等于扩容阈值
// 数组不为空且长度小于允许的最大容量
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
// 数组正在扩容
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// CAS操作,更新sizeCtl
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}

JUC并发编程知识学习及查漏补缺
https://blog.kedr.cc/posts/3956405114/
作者
zhuweitung
发布于
2022年3月19日
许可协议