Java 多线程并发工具包漫游指南

2021-03-22 From 程序之心 By 丁仪

线程安全是一个非常重要的技术点,在 Java 中提供了很多并发工具用于解决线程安全问题。本文的整理基于 Java 8,梳理常用的并发工具,包括队列、容器、线程池、锁、原子操作及同步工具。

阻塞队列

阻塞队列有 BlockingQueue、BlockingDueue、TransferQueue 三种类型,具有插入/取值操作(put/take)无法成功时阻塞当前线程的特点,也可以在指定的超时时间(offer/poll)后超时返回。阻塞队列的特点是:

  • 生产者可以持续向队列插入新的元素,直到队列满为止,队列满后生产者线程被阻塞;
  • 消费者可以持续从队列取出元素,直到队列空为止,队列空后消费者线程被阻塞。

BlockingQueue 是单向的阻塞队列,生产者只能在尾部放入新元素,消费者只能在头部获取已有元素。Java 提供了多种类型的BlockingQueue实现类:

  • ArrayBlockingQueue:基于数组实现的有界阻塞队列,创建后不能修改队列的大小;
  • LinkedBlockingQueue:基于链表实现的有界阻塞队列,默认大小为Integer.MAX_VALUE,有较好的吞吐量,但可预测性差。
  • PriorityBlockingQueue:具有优先级的无界阻塞队列,不允许插入null,所有元素都必须可比较(Comparable接口)。
  • SynchronousQueue:只有一个元素的同步队列。若队列中有元素插入操作将被阻塞,直到队列中的元素被其他线程取走。
  • DelayQueue:无界阻塞队列,每个元素都有一个延迟时间,在延迟时间之后才释放元素。

BlockingDueue 是双向的阻塞队列,头部/尾部都可以放入新元素,也都可以获取已有元素。Dueue 是“Double Ended Queue”的缩写。调用带有 first / last 后缀的 put / take 方法即可实现头部/尾部的插入/取值阻塞操作,同时带有后缀的 offer / poll 方法可以指定超时时间。Java 只提供了一种类型的BlockingDueue实现类:

  • LinkedBlockingDeque:基于双向链表实现的有界阻塞队列,默认大小为Integer.MAX_VALUE,有较好的吞吐量,但可预测性差。

TransferQueue 被定义为转移队列,继承了 BlockingQueue 接口,也是单向的阻塞队列。扩展的转移功能是指通过 transfer 可以转移一个新元素给到消费者,如果消费者恰好在等待就立即转给消费者,否则阻塞生产者直到该元素被取走。Java 提供了一个基于链表的实现类 LinkedTransferQueue。

并发容器

工具包提供了队列的并发实现类 ConcurrentLinkedQueue 和 ConcurrentLinkedDeque,两者都是无界非阻塞线程安全的队列。

ConcurrentMap接口继承了普通的Map接口,提供了线程安全和原子操作特性。Java 8 提供了实现类ConcurrentHashMap,ConcurrentHashMap不锁定整个Map,只锁定需要写入的部分,因此并发性能比HashTable要高很多。

ConcurrentNavigableMap接口继承了ConcurrentMap和NavigableMap接口,支持并发访问NavigableMap,还能让子Map具备并发访问的能力。NavigableMap是扩展的 SortedMap,具有了针对给定搜索目标返回最接近匹配项的导航方法。

Java 提供了实现类ConcurrentSkipListMap,并没有使用 lock 来保证线程的并发访问和修改,而是使用了非阻塞算法来保证并发访问,高并发时相对于TreeMap有明显的优势。

工具包提供了NavigableSet的并发实现类ConcurrentSkipListSet,是线程安全的有序集合,适用于高并发的场景,通过ConcurrentSkipListMap实现。

工具包提供了两个写时复制容器,即CopyOnWriteArrayList和CopyOnWriteArraySet。写时复制技术是一种优化策略,多个线程可以并发访问同一份数据,当有线程要修改时才进行复制然后修改。在Linux系统中,fork进程后,子进程先与父进程共享数据需要修改时才用写时复制得到自己的副本。在Java中,写时复制容器在修改数据后,把原来容器的引用指向新容器,来实现读写分离,在并发读写中不需要加锁。写时复制容器适用于读多写少的场景,在复制时会占用较多内存,能够保证最终一致性,但无法保证瞬时一致性。

线程

工具包中Executor接口定义了执行器的基本功能,即execute方法,接收Runnable对象参数并执行Runnable中的操作。ExecutorService接口继承Executor接口后增加了关于执行器服务的定义,如关闭、立即关闭、检查关闭、等待终止、提交有返回值的任务、批量提交任务等。通过Executors的工厂方法获取ExecutorService的具体实现,目前Executors可以返回的实现类型如下:

  • FixedThreadPool:固定大小的线程池,创建时指定大小;
  • WorkStealingPool:拥有多个任务队列(以便减少连接数)的线程池;
  • SingleThreadExecutor:单线程执行器,顾名思义只有一个线程执行任务
  • CachedThreadPool:根据需要创建线程,可以重复利用已存在的线程来执行任务
  • SingleThreadScheduledExecutor:根据时间计划延迟创建单个工作线程或者周期性创建的单线程执行器;
  • ScheduledThreadPool:能够延后执行任务,或者按照固定的周期执行任务

如果希望在任务执行完成后得到任务的返回值,可以调用submit方法传入Callable任务,并通过返回的Future对象查看任务执行是否完成,并获取返回值。

任务分解与合并

ForkJoinPool 可以对任务进行递归分解与合并,从而充分发挥多线程的潜力。其中:

  • 分解任务,就是把大任务拆分成小任务,小任务再给到 ForkJoinPool 再次分解,直到不能分解;
  • 合并任务,则是小任务执行完成后,把结果给到 ForkJoinPool 再次合并,直到得到最终结果。

任务分割成子任务有一定开销,只有当给的任务过大,把它分割成几个子任务才有意义。

并发锁

Java 提供了读写锁和可重入锁。使用锁实现的同步机制很像synchronized块,但是比synchronized块更灵活。锁和synchronized的主要区别在于:

  • Synchronized块不能保证等待进入块的线程的访问顺序;
  • Synchronized块无法接收参数,不能在有超时时间限制的情况下尝试访问;
  • Synchronized块必须包含在单个方法中,而锁的lock和unlock操作可以在单独的方法中。

ReadWriteLock 读写锁接口,允许多个线程读取某个资源,但是一次只能有一个线程进行写操作。内部有读锁、写锁两个接口,分别保护读操作和写操作。实现类为ReentrantReadWriteLock。

ReentrantLock 可重入锁,具有与使用 synchronized 方法和语句所访问的隐式监视器锁定相同的一些基本行为和语义,但功能更强大。ReentrantLock 将由最近成功获得锁,并且还没有释放该锁的线程所拥有。当锁没有被另一个线程所拥有时,调用 lock 的线程将成功获取该锁并返回。如果当前线程已经拥有该锁,此方法将立即返回。内部有一个计数器,拥有锁的线程每锁定一次,计数器加1,每释放一次计数器减1。

原子类型

工具包提供了一些可以用原子方式进行读写的变量类型,支持无锁线程安全的单变量编程。本质上,这些类都扩展了volatile的概念,使用一个volatile类型的变量来存储实际数据。工具包提供了4种类型的原子变量类型:

  • AtomicBoolean:可原子操作的布尔对象
  • AtomicInteger:可原子操作的整形对象
  • AtomicLong:可原子操作的长整形对象
  • AtomicReference:可原子操作的对象引用

在此基础上,工具包还提供了原子性的数组类型,包括AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray。

原子变量可以实现线程安全的 i++、i--、++i、--i 等类型的操作。

线程同步

Java 提供了多个线程之间同步的工具,包括 CountDownLatch、CyclicBarrier、Exchanger、Semaphore 。

CountDownLatch 用于一个或者多个线程等待一系列指定操作的完成。初始化时,给定一个数量,每调用一次 countDown() 方法数量减一。其他线程调用await方法等待时,线程会阻塞到数量减到 0 才开始执行。

CyclicBarrier 是一种同步机制,它能够对处理一些算法线程实现同步。换句话讲,它就是一个所有线程必须等待的一个栅栏,直到所有线程都到达这里,然后所有线程才可以继续做其他事情。可以设置多个栅栏。实现多线程多次同步等待。

Exchanger 表示一种会合点,两个线程可以在这里交换对象。两个线程各自调用exchange方法进行交换,当线程A调用exchange()方法后,它会陷入阻塞状态,直到线程B也调用了exchange()方法,然后以线程安全的方式交换数据,之后线程A和B继续运行。

Semaphore 是一种信号量,可以控制某个资源可被同时访问的个数。通过 acquire() 获取一个许可,如果没有就等待,调用 release() 释放一个许可。

总结

线程是Java开发常用的技术线程安全是多线程开发需要关注的重点。Java提供的工具包已经覆盖了非常多的场景。

本文来源:程序之心,转载请注明出处!

本文地址:https://chengxuzhixin.com/blog/article/200082.html

发表感想

© 2016 - 2022 chengxuzhixin.com All Rights Reserved.

浙ICP备2021034854号-1    浙公网安备 33011002016107号