侧边栏壁纸
  • 累计撰写 57 篇文章
  • 累计创建 10 个标签
  • 累计收到 2 条评论

目 录CONTENT

文章目录

14、并发

yilee
2023-04-04 / 0 评论 / 0 点赞 / 60 阅读 / 0 字 / 正在检测是否收录...
温馨提示:
本文最后更新于2024-05-31,若内容或图片失效,请留言反馈。 部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

十四、并发

14.1 什么是线程

  1. 使用线程给其他任务提供机会
    • 不要调用Thread 类或Runnable 对象的run 方法。直接调用run 方法, 只会执行同一个线程中的任务, 而不会启动新线程。应该调用 Thread.start 方法。这个方法将创建一个执行run方法的新线程。

14.2 中断线程

  1. 中断
    • 当对一个线程调用interrupt 方法时,线程的中断状态将被置位。这是每一个线程都具有的boolean 标志。每个线程都应该不时地检査这个标志, 以判断线程是否被中断。当在一个被阻塞的线程(调用sleep 或wait ) 上调用interrupt 方法时, 阻塞调用将会被Interrupted Exception 异常中断。

14.3 线程状态

  • 线程可以有如下6 种状态,要确定一个线程的当前状态, 可调用getState 方法。
    • New ( 新创建)
    • Runnable (可运行)
    • Blocked ( 被阻塞)
    • Waiting ( 等待)
    • Timed waiting (计时等待)
    • Terminated ( 被终止)
  1. 新创建线程

    • 当用new 操作符创建一个新线程时, 如newThread®, 该线程还没有开始运行,它的状态是new
  2. 可运行线程

    • 调用start 方法,线程转化为runnable 状态。一个可运行的线桿可能正在运行也可能没有运行, 这取决于操作系统给线程提供运行的时间。因此,在任何给定时刻,一个可运行的线程可能正在运行也可能没有运行
  3. 被阻塞线程和等待线程

    • 当线程处于被阻塞或等待状态时, 它暂时不活动。它不运行任何代码且消耗最少的资源。直到线程调度器重新激活它。
      • 当一个线程试图获取一个内部的对象锁,而该锁被其他线程持有, 则该线程进人阻塞状态当所有其他线程释放该锁,并且线程调度器允许本线程持有它的时候,该线程将变成非阻塞状态。
      • 当线程等待另一个线程通知调度器一个条件时, 它自己进入等待状态。
      • 有几个方法有一个超时参数。调用它们导致线程进人计时等待( timed waiting ) 状态。这一状态将一直保持到超时期满或者接收到适当的通知。带有超时参数的方法有Thread.sleep 和Object.wait、Thread.join、Lock,tryLock 以及Condition.await 的计时版。
    • 线程转态转移

    08004f36f70941a79543d5b22b9fe3c0

  4. 被终止的线程

    • 主要包含两个方面
      • 因为run 方法正常退出而自然死亡。
      • 因为一个没有捕获的异常终止了nm 方法而意外死亡。
    • 可以调用线程的 stop 方法杀死一个线程。该方法抛出ThreadDeath 错误对象,由此杀死线程。但是,stop 方法已过时, 不要在自己的代码中调用这个方法。

14.4 线程属性

  1. 线程优先级

    • 在Java 程序设计语言中, 每一个线程有一个优先级。一个线程继承它的父线程的优先级。使用setPriority可以修改线程优先级,可修改为MIN_PRIORITY (在Thread 类中定义为1 ) 与MAX_PRIORITY (定义为10 ) 之间的任何值。NORM_PRIORITY 被定义为5。

      每当线程调度器有机会选择新线程时, 它首先选择具有较高优先级的线程

  2. 守护线程

    • 调用t .setDaemon(true) 将线程转换为守护线程(daemon thread)。守护线程的唯一用途是为其他线程提供服务。

      守护线程应该永远不去访问固有资源, 如文件、数据库,因为它会在任何时候甚至在一个操作的中间发生中断。

  3. 未捕获异常处理器

    • 线程的run 方法不能抛出任何受查异常,但非受査异常会导致线程终止。

    • 在线程死亡之前, 异常被传递到一个用于未捕获异常的处理器。处理器必须是属于一个实现Thread.UncaughtExceptionHandler 接口的类,当然默认处理器为空,即不处理异常。

      @FunctionalInterface
      public interface UncaughtExceptionHandler {
          void uncaughtException(Thread t, Throwable e);
      }
      
    • 使用setUncaughtExceptionHandler 方法为任何线程安装一个处理器。也可以用Thread类的静态方法setDefaultUncaughtExceptionHandler 为所有线程安装一个默认的处理器。但若不为独立的线程安装处理器,此时的处理器就是该线程的ThreadGroup 对象。不要在自己的程序中使用线程组。

    • ThreadGroup 类实现Thread.UncaughtExceptionHandler 接口。其功能如下:

      ​ 1 ) 如果该线程组有父线程组, 那么父线程组的uncaughtException 方法被调用。
      ​ 2 ) 否则, 如果Thread.getDefaultExceptionHandler 方法返回一个非空的处理器, 则调用该处理器。
      ​ 3 ) 否则, 如果Throwable 是ThreadDeath 的一个实例, 什么都不做。
      ​ 4 ) 否则, 线程的名字以及Throwable 的栈轨迹被输出到System.err 上。

14.5 同步

  1. 锁对象

    • Java SE 5.0 引入了ReentrantLock 类

      myLock.lock(); // a ReentrantLock object
      try {
      	//critical section
      } finally {
          myLock.unlock(); // make sure the lock is unlocked even if an exception is thrown
      }
      // 把解锁操作括在finally 子句之内是至关重要的。如果在临界区的代码抛出异常,锁必须被释放。
      
  2. 条件对象

    • Lock 和Condition 接口为程序设计人员提供了高度的锁定控制。Condition由ReentrantLock实例化对象创建。

    • Condition.await 方法添加一个线程到等待集中,Condition.(signal/signalAll)方法解除等待线程的阻塞状态。

      当一个线程拥有某个条件的锁时, 它仅仅可以在该条件上调用await、signalAll 或signal 方法。

  3. synchronized 关键字

    • 关锁和条件的关键

      • 锁用来保护代码片段, 任何时刻只能有一个线程执行被保护的代码。
      • 锁可以管理试图进入被保护代码段的线程。
      • 锁可以拥有一个或多个相关的条件对象。
      • 每个条件对象管理那些已经进入被保护的代码段但还不能运行的线程。
    • 使用synchronized 关键词,使用该关键词相当于每一个对象有一个内部锁, 并且该锁有一个内部条件,以下代码等价

      public synchronized void method(){
      	// method body
      }
      // 等价于
      public void method(){
          this.intrinsidock.lock();
          try {
              //method body
          } finally {
              this.intrinsidock.unlock();
          }
      }
      
    • 内部对象锁只有一个相关条件。wait 方法添加一个线程到等待集中,notifyAll/notify 方法解除等待线程的阻塞状态。即调用wait 或notityAll 等价于intrinsicCondition.await(); intrinsicCondition.signalAIl();

      wait、notifyAll 以及notify 方法是Object 类的final 方法。

      Condition 方法必须被命名为await、signalAll 和signal 以便它们不会与那些方法发生冲突。

    • 内部锁和条件的局限

      • 不能中断一个正在试图获得锁的线程。
      • 试图获得锁时不能设定超时。
      • 每个锁仅有单一的条件, 可能是不够的。在代码中应该使用哪一种? Lock 和Condition对象还是同步方法?下面是一些建议:
      • 最好既不使用Lock/Condition也不使用synchronized关键字。在许多情况下你可以使用java.util.concurrent包中的一种机制,它会为你处理所有的加锁。
      • 如果synchronized关键字适合你的程序,那么请尽量使用它,这样可以减少编写的代码数量,减少出错的几率。
      • 如果特别需要Lock/Condition结构提供的独有特性时,才使用Lock/Condition。
  4. 同步阻塞

    • 实例,获得Obj 的锁。

      synchronized (obj) { // this is the syntax for a synchronized block
      	//critical section
      }
      
    • 有时程序员使用一个对象的锁来实现额外的原子操作, 实际上称为客户端锁定(clientside locking)考虑Vector 类, 一个列表, 它的方法是同步的。 假定在Vector 中存储银行余额。这里有一个transfer(转移余额)方法的原始实现:

      public void transfer(Vector<Double> accounts, int from, int to, int amount) {// Error
          accounts.set(from, accounts.get(from) - amount);
          accounts.set(to, accounts.get(to) + amount);
      }
      
    • Vector 类的get 和set 方法是同步的, 但完全可能在get后被中断,则一个线程可能在相同的存储位置存人不同的值。因此下方可以修复这个问题,但是它完全依赖于这样一个事实, Vector 类对自己的所有可修改方法都使用内部锁。

      public void transfer(Vector<Double> accounts, int from, int to, int amount) {// Error
          synchronized (accounts){
              accounts.set(from, accounts.get(from) - amount);
              accounts.set(to, accounts.get(to) + amount);
          }
      }
      

      客户端锁定是非常脆弱的,通常不推荐使用。

  5. 监视器概念

    • 监视器的特性
      • 监视器是只包含私有域的类。
      • 每个监视器类的对象有一个相关的锁。
      • 使用该锁对所有的方法进行加锁。换句话说,如果客户端调用obj.methd()(),那么obj对象的锁是在方法调用开始时自动获得,并且当方法返回时自动释放该锁。因为所有的域是私有的,这样的安排可以确保一个线程在对对象操作时,没有其他线程能访问该域。
      • 该锁可以有任意多个相关条件。
  6. Volatile 域

  1. Volatile 变量不能提供原子性。例如, 方法 public void flipDoneO { done = !done; } // not atomic 不能确保翻转域中的值。不能保证读取、翻转和写入不被中断。

  2. 除非使用锁或volatile 修饰符, 否则无法从多个线程安全地读取一个域。

  1. final 变置
  • 还有一种情况可以安全地访问一个共享域, 即这个域声明为final 时。考虑以下声明:final Map<String, Double> accounts = new HashKap<>(); 其他线程会在构造函数完成构造之后才看到这个accounts 变量。当然,对这个映射表的操作并不是线程安全的。
  1. 原子性

    • 假设对共享变量除了赋值之外并不完成其他操作, 那么可以将这些共享变量声明为volatile

    • java.util.concurrent.atomic 包中有很多类使用了很高效的机器级指令(而不是使用锁) 来保证其他操作的原子性。如AtomicInteger、AtomicIntegerArray、AtomicIntegerFieldUpdater、AtomicLongArray、AtomicLongFieldUpdater、AtomicReference、AtomicReferenceArray 和AtomicReferenceFieldUpdater 等等。

    • 举例AtomicLong提供的方法:

      long incrementAndGet() // 以原子方式将AtomicLong 自增, 并返回自增后的值。
      long decrementAndGet()
      boolean compareAndSet(oldValue, newValue)
      // 比较大小并设置最大值,未设置返回false
      // 可以通过循环尝试更新java SE 8可简化如下
      largest.updateAndGet(x -> Math.max(x, observed))  <等=价>  largest.accumulateAndCet(observed, Math::max);
      // accumulateAndGet方法利用一个二元操作符来合并原子值和所提供的参数。
      // getAndUpdate 和getAndAccumulate 方法可以返回原值。
      
    • 如果有大量线程要访问相同的原子值, 性能会大幅下降, 因为乐观更新需要太多次重试。Java SE 8 提供了LongAdder 和LongAccumulator 类来解决这个问题。LongAdder 调用increment 让计数器自增,或者调用add 来增加一个量, 或者调用sum 来获取总和。

    • 其中以下两个调用是等价的

      LongAdder adder = new LongAdder()
      // 等价
      LongAccumulator adder = new LongAccumulator(Long::sum, 0);
      

      LongAccumulator的操作必须满足结合律和交换律。这说明, 最终结果必须独立于所结合的中间值的顺序。

      DoubleAdder 和DoubleAccumulator 也采用同样的方式, 只不过处理的是double 值。

  2. 死锁

    • Java 编程语言中没有任何东西可以避免或打破这种死锁现象。必须仔细设计程序, 以确保不会出现死锁。
  3. 线程局部变量

    • 有时可能要避免共享变量, 使用ThreadLocal 辅助类为各个线程提供各自的实例。

      public static final SimpleDateFormat dateFormat = new SimpleDateForniat("yyyy-MM-dd");
      // 多线程执行下面方法, dateFormat 内部数据结构可能会被并发的访问所破坏。
      String dateStamp = dateFormat.format(new Date());
      
      // 修改全局变量如下可避免错误
      public static final ThreadLocal<SimpleDateFormat> dateFormat = ThreadLocal.withInitial(() -> new 
      SimpleDateFormat("yyyy-MM-dd"));
      // 多线程执行下面方法,
      String dateStamp = dateFormat.get().format(new Date()) ;
      
    • java…util.Random 类是线程安全的。但是如果多个线程需要等待一个共享的随机数生成器, 这会很低效。可以使用ThreadLocal 辅助类为各个线程提供一个单独的生成器, 不过Java SE 7 还另外提供了一个便利类。只需要做以下调用:

      int random = ThreadLocalRandom.current().nextInt(upperBound)
      

      ThreadLocalRandom.current() 调用会返回特定于当前线程的Random 类实例。

  4. 锁测试与超时

    • 线程在调用lock 方法来获得另一个线程所持有的锁的时候,很可能发生阻塞。应该更加谨慎地申请锁。tryLock 方法试图申请一个锁, 在成功获得锁后返回true, 否则, 立即返回false, 而且线程可以立即离开去做其他事情。例:myLock.tryLock(100, TineUnit.MILLISECONDS)
    • TimeUnit 是一枚举类型,可以取的值包括SECONDS、MILLISECONDS, MICROSECONDS和NANOSECONDS。
    • lock 方法不能被中断。但若调用带有用超时参数的tryLock, 那么如果线程在等待期间被中断,将抛出InterruptedException 异常。这是一个非常有用的特性,因为允许程序打破死锁。也可以调用locklnterruptibly 方法。它就相当于一个超时设为无限的tryLock 方法。
    • 在等待一个条件时, 也可以提供一个超时:myCondition.await(100, TineUniBILLISECONDS))
  5. 读/ 写锁(ReentrantReadWriteLock

    • 使用读/ 写锁

      1. 构造一个ReentrantReadWriteLock 对象:

        private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock():
        
      2. 抽取读锁和写锁:

        private Lock readLock = rwl.readLock();
        private Lock writeLock = rwl.writeLock();
        
      3. 对所有的获取方法加读锁:

        public double getTotalBalance() {
            readLock.lock();
            try { ... }
            finally { readLock.unlock(); }
        }
        
      4. 对所有的修改方法加写锁:

        public void transfer(..){
            writeLock.lock();
            try { ... }
            finally { writeLock.unlock(); }
        }
        
  6. 为什么弃用 stopsuspend 方法

    • stop 和suspend 方法有一些共同点: 都试图控制一个给定线程的行为。stop 方法天生就不安全,经验证明suspend 方法会经常导致死锁。
    • stop 方法, 该方法终止所有未结束的方法, 包括run 方法。当线程被终止,立即释放被它锁住的所有对象的锁。这会导致对象处于不一致的状态。
    • suspend 方法, 该方法不会破坏对象。但是,如果用suspend 挂起一个持有一个锁的线程, 那么,该锁在恢复之前是不可用的。如果调用suspend 方法的线程试图获得同一个锁, 那么程序死锁: 被挂起的线程等着被恢复, 而将其挂起的线程等待获得锁。

14.6 阻塞队列

  • 对于许多线程问题, 可以通过使用一个或多个队列以优雅且安全的方式将其形式化。生产者线程向队列插人元素, 消费者线程则取出它们。使用队列,可以安全地从一个线程向另一个线程传递数据。阻塞队列方法如下:

    2ca69ac9b76e4b6db16b132acdefe38c

    poll和peek 方法返回空来指示失败。因此,向这些队列中插入null 值是非法的。

  • 还有带有超时的offer 方法和poll 方法的变体。

    boolean success = q.offer(x, 100, TimeUnit.MILLISECONDS);
    // 尝试在100 毫秒的时间内在队列的尾部插人一个元素。如果成功返回true ; 否则, 达到超时时,返回false。
    Object head = q.poll(100, TimeUnit.MILLISECONDS)
    // 尝试用100 毫秒的时间移除队列的头元素;如果成功返回头元素,否则,达到在超时时,返回null。
    
  • java.util.concurrent 包提供了阻塞队列的几个变种。

    • LinkedBlockingQueue 的容量是没有上边界的,但是,也可以选择指定最大容量。LinkedBlockingDeque 是一个双端的版本。

    • **ArrayBlockingQueue **在构造时需要指定容量,并且有一个可选的参数来指定是否需要公平性。若设置了公平参数, 则那么等待了最长时间的线程会优先得到处理。通常,公平性会降低性能, 只有在确实非常需要时才使用它。

    • PriorityBlockingQueue 是一个带优先级的队列,元素按照它们的优先级顺序被移出。该队列是没有容量上限, 但是,如果队列是空的, 取元素的操作会阻塞。

    • **DelayQueue **包含实现Delayed 接口的对象

      interface Delayed extends Comparable<Delayed> {
      	long getDelay(TimeUnit unit);
      }
      

      getDelay 方法返回对象的残留延迟。负值表示延迟已经结束。元素只有在延迟用完的情况下才能从DelayQueue 移除。还必须实现compareTo 方法。DelayQueue 使用该方法对元素进行排序。

    • **LinkedTransferQueue ** Java SE 7 增加了一个TranSferQueue 接口,允许生产者线程等待, 直到消费者准备就绪可以接收一个元素。如如果生产者调用 q.transfer(iteni);,这个调用会阻塞, 直到另一个线程将元素( item) 删除。LinkedTransferQueue 类实现了这个接口。

14.7 线程安全的集合

  1. 高效的映射、集和队列

    • java.util.concurrent 包提供了映射、有序集和队列的高效实现:ConcurrentHashMap、ConcurrentSkipListMap > ConcurrentSkipListSet 和ConcurrentLinkedQueue。
    • 这些集合使用复杂的算法,通过允许并发地访问数据结构的不同部分来使竞争极小化。与大多数集合不同,size 方法不必在常量时间内操作。确定这样的集合当前的大小通常需要遍历。
    • 集合返回弱一致性( weakly consistent) 的迭代器。这意味着迭代器不一定能反映出它们被构造之后的所有的修改, 但是, 它们不会将同一个值返回两次,也不会拋出ConcurrentModificationException 异常。集合如果在迭代器构造之后发生改变,java.util 包中的迭代器将抛出一个ConcurrentModificationException 异常。
    • 在JavaSE 8 中,并发散列映射将桶组织为树, 而不是列表, 键类型实现了Comparable, 从而可以保证性能为0(log⑻)。
  2. 映射条目的原子更新

    • 原子更新,统计词频例子

      // chm is a ConcurrentHashMap<String, Long>
      // 以原子方式用一个新值替换原值,一直替换直到替换成功
      do{
          long oldValue = map.get(word);
          long newValue = oldValue = null ? 1 : oldValue + 1;
      } while (!chm.replace(word, oldValue, newValue));
      // 更简单的使用以下方法
      // chm is a ConcurrentHashMap<String, LongAdder>
      // putlfAbsent 返回映射的的值(可能是原来的值, 或者是新设置的值),
      chm.putlfAbsent(word, new LongAdder()).increment();
      
    • Java SE 8 提供了一些可以更方便地完成原子更新的方法。调用compute 方法时可以提供一个键和一个计算新值的函数。这个函数接收键和相关联的值(如果没有值,则为 null ), 它会计算新值。

      // chm is a ConcurrentHashMap<String, Long>
      chm.compute(word, (k, v) -> v = null ? 1: v + 1);
      

      注意: ConcurrentHashMap ConcurrentHashMap 中不允许有null 值。有很多方法都使用null 值来指示映射中某个给定的键不存在。

      • computelfPresent 已经有原值的情况下计算新值

      • computelfAbsent 没有原值的情况下计算新值。

        // chm is a ConcurrentHashMap<String, LongAdder>
        // LongAdder 构造器只在确实需要一个新的计数器时才会调用。
        chm.computelfAbsent(word, v -> new LongAdder()).increment();
        
    • 首次增加一个键时通常需要做些特殊的处理。利用merge 方法可以非常方便地做到这一点。这个方法有一个参数表示键不存在时使用的初始值。否则, 就会调用你提供的函数来结合原值与初始值。

      map.merge(word, 1L, (oldValue, newValue) -> oldValue + newValue);
      // 或者
      map.merge(word, 1L, Long:sum);
      

      注意:

      1. 如果传入compute 或merge 的函数返回null, 将从映射中删除现有的条目。

      2. 使用compute 或merge 时, 要记住你提供的函数不能做太多工作。这个函数运行时, 可能会阻塞对映射的其他更新。当然, 这个函数也不能更新映射的其他部分。

  3. 对并发散列映射的批操作

    • Java SE 8 为并发散列映射提供了批操作,即使有其他线程在处理映射,这些操作也能安全地执行。

    • 操作方式如下,其中每个操作都有4 个版本,分别是处理键、处理值、处理键和值、处理Map.Entry 对象。

      • 搜索(search) 为每个键或值提供一个函数,直到函数生成一个非null 的结果。然后搜索终止,返回这个函数的结果。
      • 归约(reduce) 组合所有键或值, 这里要使用所提供的一个累加函数。
      • forEach 为所有键或值提供一个函数。
    • 对于上述各个操作, 需要指定一个参数化阈值(parallelism threshold)。如果映射包含的元素多于这个阈值, 就会并行完成批操作。如果希望批操作在一个线程中运行, 可以使用阈值Long.MAX_VALUE。如果希望用尽可能多的线程运行批操作,可以使用阈值1。

    • 搜索(search) 方法

      U searchKeys(long threshold, BiFunction< ? super K , ? extends U> f)
      U searchVaiues(long threshold, BiFunction< ? super V, ? extends U> f)
      U search(long threshold, BiFunction< ? super K, ? super V,? extends U> f)
      U searchEntries(long threshold, BiFunction<Map.Entry<K, V>, ? extends U> f)
      // 找出第一个出现次数超过1000 次的单词如下
      String result = map.search(threshold, (k, v) -> v > 1000 ? k : null );
      
    • forEach方法

      // 只需要为各个映射条目提供一个消费者函数
      map.forEach(threshold, (k, v) -> System.out.println(k + " -> " + v));
      // 除上述条件以外还需一个转换器函数,其结果会传递到消费者,转换器可以用作为一个过滤器。只要转换器返回null, 这个值就会被悄无声息地跳过。
      map.forEach(1000,
              (k, v) -> k + " -> " + v,   // Transformer
              System.out::println);       // Consumer
      
    • 归约(reduce) 方法

      reduce 操作用一个累加函数组合其输入

      // 计算所有值总和
      Long sum = map.reduceValues(threshold, Long::sum);
      // 计算最长的键的长度,提供一个转换器函数,跳过不需要的数据
      Long sum = map.reduceKeys(threshold, String::length, Integer::max);
      // 统计多少个条目的值> 1000
      Long count = map.reduceValues(threshold,
              v -> v > 1000 ? 1L : null,
              Long::sum);
      

      注意:

      1. 如果映射为空, 或者所有条目都被过滤掉, reduce 操作会返回null。如果只有一个元素, 则返回其转换结果, 不会应用累加器。

      2. 对于int、long 和double 输出还有相应的特殊化操作, 分别有后缀Tolnt、ToLong 和ToDouble。需要把输入转换为一个基本类型值,并指定一个默认和一个累加器函数。映射为空时返回默认值。

        long sum = map.reduceValuesToLong(threshold,
                Long::longValue,  // Transformer to primitive type
                0,          // Default value for empty map
                Long::sum);       // Primitive type accumulator
        
  4. 并发集视图

    • 静态newKeySet 方法会生成一个Set, 这实际上是ConcurrentHashMap<K, Boolean〉的一个包装器。(所有映射值都为Boolean.TRUE, 不过因为只是要把它用作一个集, 所以并不关心具体的值。)如果原来有一个映射,keySet 方法可以生成这个映射的键集。这个集是可变的。如果删除这个集的元素,这个键(以及相应的值)会从映射中删除。

      Set<String> words = ConcurrentHashMap.newKeySet();
      
    • Java SE 8 为ConcurrentHashMap 增加了第二个keySet 方法,包含一个默认值,可以在为集增加元素时使用

      Set<String> words = map.keySet(1L);
      words.add("java");
      // 若"Java” 在words 中不存在, 现在它会有一个值10
      
  5. 写数组的拷贝

    CopyOnWriteArrayList 和CopyOnWriteArraySet 是线程安全的集合, 其中所有的修改线程对底层数组进行复制。

  6. 并行数组算法

    • 在Java SE 8 中, Arrays 类提供了大量并行化操作。静态Arrays.parallelSort 方法可以对一个基本类型值或对象的数组排序。

      String[] words = {"11", "55", "bgf"};
      Arrays.parallelSort(words);
      // 可以提供 Comparator
      Arrays.parallelSort(words, Comparator.comparing(String::length));
      // 可以提供一个范围的边界
      words.parallelSort(words.length/2, words.length);
      
    • parallelSetAll 方法会用由一个函数计算得到的值填充一个数组。这个函数接收元素索引,然后计算相应位置上的值。这个操作对于所有基本类型数组和对象数组都有相应的版本。

      Arrays.parallelSetAll(values, i ->i % 10);
      
    • parallelPrefix 用对应一个给定结合操作的前缀的累加结果替换各个数组元素。

      Arrays.parallelPrefix(values, (x, y) -> x * y)
      // 原数组 [1,2, 3, 4, ...]
      // 新数组 [1, 1x2, 1x2x3, lx2x3x4, ...]
      
  7. 较早的线程安全集合

    • 从Java 的初始版本开始,VectorHashtable类就提供了线程安全的动态数组和散列表的实现。现在这些类被弃用了, 取而代之的是AnayList 和HashMap 类。

    • 任何集合类都可以通过使用同步包装器( synchronizationwrapper) 变成线程安全的,结果集合的方法使用锁加以保护, 提供了线程安全访问。

      List<E> synchArrayList = Collections.synchronizedList(new ArrayList<E>());
      Map<K, V> synchHashMap = Collections.synchronizedMap(new HashMap<K, V>0);
      

      在另一个线程可能进行修改时要对集合进行迭代,仍然需要使用“ 客户端” 锁定:

      synchronized (synchHashMap) {
          Iterator<K> iter = synchHashMap.keySet().iterator() ;
          while (iter.hasNext()) . .
      }
      // 使用“ foreach” 循环必须使用同样的代码, 因为循环使用了迭代器。
      

      如果在迭代过程中, 别的线程修改集合,迭代器会失效, 抛出ConcurrentModificationException 异常。同步仍然是需要的, 因此并发的修改可以被可靠地检测出来。

    • **最好使用 java.util.concurrent 包中定义的集合, 不使用同步包装器中的。**有一个例外是经常被修改的数组列表。在那种情况下, 同步的ArrayList 可
      以胜过CopyOnWriteArrayList()

14.8 Callable 与Future

  • Runnable 封装一个异步运行的任务,可以把它想象成为一个没有参数和返回值的异步方法。

  • Callable 与Runnable 类似, 但是有返回值。Callable 接口是一个参数化的类型, 只有一个方法call。类型参数是返回值的类型

    public interface Callable<V> {
    	V cal1() throws Exception;
    }
    
  • Future 保存异步计算的结果。可以启动一个计算,将Future 对象交给某个线程,然后忘掉它。Future 对象的所有者在结果计算好之后就可以获得它。

    public interface Future<V> {
        // 运行该计算的线程被中断, 两个get方法都将拋出IntermptedException。
        V get() throws ...;
        // get方法的调用被阻塞, 直到计算完成。
        V get(long timeout, TimeUnit unit) throws ...;
        // get方法如果在计算完成之前, 方法的调用超时,拋出一个TimeoutException 异常
        void cancel (boolean mayInterrupt);
        // cancel 取消计算
        boolean isCancelled();
        // 是否已经取消
        boolean isDone();
        // 计算是否还在进行
    }
    
  • FutureTask 包装器是一种非常便利的机制, 可将Callable 转换成Future 和Runnable, 它同时实现二者的接口。

    Callable<Integer> myConiputation = ...;
    FutureTask<Integer> task = new FutureTask<Integer>(myConiputation);
    Thread t = new Thread(task); 	// it's a Runnable
    t.start();
    Integer result = task.get(); 	// it's a Future
    

    a3101d01f48b4c77980a0a308e4b6b4c

14.9 执行器

  • 如果程序中创建了大量的生命期很短的线程,应该使用线程池( thread pool )。一个线程池中包含许多准备运行的空闲线程。将Runnable 对象交给线程池, 就会有一个线程调用run 方法。当run 方法退出时, 线程不会死亡,而是在池中准备为下一个请求提供服务。

  • 另一个使用线程池的理由是减少并发线程的数目。创建大量线程会大大降低性能甚至使虚拟机崩溃。如果有一个会创建许多线程的算法, 应该使用一个线程数“ 固定的” 线程池以限制并发线程的总数。

  • 执行器( Executor) 类有许多静态工厂方法用来构建线程池

    ee858999db4a4b819b47c1f2bdb6696c

  1. 线程池

    • 以下3 个方法返回实现了ExecutorService 接口的ThreadPoolExecutor 类的对象。

      • newCachedThreadPool 方法构建了一个线程池, 对于每个任务, 如果有空闲线程可用, 立即让它执行任务, 如果没有可用的空闲线程, 则创建一个新线程。

      • newFixedThreadPool 方法构建一个具有固定大小的线程池。如果提交的任务数多于空闲的线程数, 那么把得不到服务的任务放置到队列中。当其他任务完成以后再运行它们。

      • newSingleThreadExecutor 是一个退化了的大小为1 的线程池由一个线程执行提交的任务, 一个接着一个。

    • 下面的方法之一将一个Runnable 对象或Callable 对象提交给ExecutorService,该池会在方便的时候尽早执行提交的任务。调用submit 时,会得到一个Future 对象, 可用来查询该任务的状态。

      // 其他方法正常调用,但get 方法在完成的时候只是简单地返回null。
      Future< ?> submit(Runnable task)
      // get 方法在完成的时候返回指定的 result 对象。
      Future<T> submit(Runnable task, T result)
      // get 阻塞并将在计算结果准备好的时候返回。
      Future<T> submit(Callable<T> task)
      
    • 当用完一个线程池的时候, 调用 shutdown。该方法启动该池的关闭序列。被关闭的执行器不再接受新的任务。当所有任务都完成以后,线程池中的线程死亡。另一种方法是调用shutdownNow。该池取消尚未开始的所有任务并试图中断正在运行的线程。

    • 使用连接池

      • 调用Executors 类中静态的方法newCachedThreadPool 或newFixedThreadPool。
      • 调用submit 提交Runnable 或Callable 对象。
      • 如果想要取消一个任务, 或如果提交Callable 对象, 那就要保存好返回的Future。
      • 当不再提交任何任务时,调用shutdown。
  2. 预定执行

    • ScheduledExecutorService 接口具有为预定执行( Scheduled Execution ) 或重复执行任务而设计的方法。它是一种允许使用线程池机制的java.util.Timer 的泛化。Executors 类的newScheduledThreadPool 和newSingleThreadScheduledExecutor 方法将返回实现了ScheduledExecutorService 接口的对象。
    • 可以预定Runnable 或Callable 在初始的延迟之后只运行一次。也可以预定一个Runnable对象周期性地运行。
  3. 控制任务组

    • 将一个执行器服务作为线程池使用, 以提高执行任务的效率。

      • 在执行器中使用shutdownNow 方法取消所有的任务。

      • invokeAny 方法提交所有对象到一个Callable 对象的集合中, 并返回某个已经完成了的任务的结果。

      • invokeAll 方法提交所有对象到一个Callable 对象的集合中,并返回一个Future 对象的列表,代表所有任务的解决方案。当计算结果可获得时, 可以像下面这样对结果进行。

        List<Callable<T>> tasks = ...;
        List<Future<T>> results = executor.invokeAll(tasks);
        for (Future<T> result : results)
            processFurther(result.get());
        
      • 这个方法的缺点是如果第一个任务恰巧花去了很多时间, 则可能不得不进行等待。将结果按可获得的顺序保存起来更有实际意义。可以用ExecutorCompletionService 来进行排列。

        List<Callable<T>> tasks = ...;
        ExecutorCompletionService<T> service = new ExecutorCompletionService<>(executor);
        for (Callable<T> task : tasks) service.submit(task);
        for (int i = 0; i < tasks.size();i++)
            processFurther(service.take().get());
        
  4. Fork-Join 框架

    • ForkJoin 从字面上看Fork是分岔的意思,Join是结合的意思,我们可以理解为将大任务拆分成小任务进行计算求解,最后将小任务的结果进行结合求出大任务的解,这些裂变出来的小任务,我们就可以交给不同的线程去进行计算,这也就是分布式计算的一种思想。

    • 要采用框架可用的一种方式完成这种递归计算, 需要提供一个扩展RecursiveTaskO 的类(如果计算会生成一个类型为T 的结果)或者提供一个扩展RecursiveActicm 的类(如果不生成任何结果)。再覆盖compute 方法来生成并调用子任务, 然后合并其结果。

      class Counter extends RecursiveTask<Integer> {
          ...
          protected Integer compute() {
              if (to - from < THRESHOLD) {
                  // solve problem directly
              } else {
                  int mid = (from + to) / 2;
                  Counter first = new Counter(values, from, mid, filter);
                  Counter second = new Counter(values, mid, to, filter);
                  invokeAll(first, second);
                  return first.join() + second.join();
              }
          }
      }
      

      上面例子中,invokeAll 方法接收到很多任务并阻塞, 直到所有这些任务都已经完成。join 方法将生成结果。我们对每个子任务应用了join, 并返回其总和。

  5. 可完成Future

    • CompletableFuture 是一个简单API, 不过有很多不同方法来组合可完成future。

    • 处理单个fiiture 的方法,这里了简写记法来表示复杂的函数式接口, 这里会把 Function< ? super T,U> 写为T -> U。当然这并不是真正的Java 类型。

      ec8310c830b04f489f3eb8754c127c4e

      例,从一个Web 页面抽取所有链接来建立一个网络爬虫:

      ConipletableFuture contents = readPage(url);
      CompletableFuture<List> links = contents.thenApply(Parser::getlinks) ;

      -> thenApply 方法不会阻塞。它会返回另一个future。第一个future完成时,其结果会提供给getLinks 方法, 这个方法的返回值就是最终的结果。

    • 组合多个future 的方法

      8d02807ce2704d848edaa6755c936c3e

      1. 前3 个方法并行运行一个CompletableFuture<T> 和一个CompletableFuture<U> 动作, 并组合结果。
      2. 接下来3 个方法并行运行两个CompletableFuture<T> 动作。一旦其中一个动作完成,就传递它的结果,并忽略另一个结果。
      3. 最后的静态allOf 和anyOf 方法取一组可完成fiiture ( 数目可变),并生成一个CompletableFuture<Void> , 它会在所有这些fiiture 都完成时或者其中任意一个fiiture 完成时结束。不会传递任何结果。

14.10 同步器

  • java.util.concurrent 包包含了几个能帮助人们管理相互合作的线程集的类。这些机制具有为线程之间的共用集结点模式 提供的“ 预置功能”

    e342da56c1954601a923a724084f6f81

  1. 信号量(Semaphore

    • 一个信号量管理许多的许可证,为了通过信号量, 线程通过调用 acquire 请求许可。其实没有实际的许可对象, 信号量仅维护一个计数。许可的数目是固定的, 由此限制了通过的线程数量。其他线程可以通过调用release 释放许可。而且,许可不是二必须由获取它的线程释放。事实上, 任何线程都可以释放任意数目的许可,这可能会增加许可数目以至于超出初始数目。

      应用场景:对于一组有限制都资源访问。比如餐厅有5个位置但同时有7个人要吃饭,则要控制7个人对餐位的并发实用。

      用法:定义Semaphore变量semaphore包含受限的资源个数,每个人要来用餐时先调用semaphore.acquire()方法获取一个餐位(若没有餐位,则阻塞等待),用完餐后调用semaphore.release()释放餐位给其它人用。

  2. 倒计时门栓(CountDownLatch

    • 一个倒计时门栓( CountDownLatch) 让一个线程集等待直到计数变为0。倒计时门栓是一次性的。一旦计数为0, 就不能再重用了。一个有用的特例是计数值为1 的门栓。实现一个只能通过一次的门。线程在门外等候直到另一个线程将计数器值置为0。

      应用场景:等待一组线程任务完成后在继续执行当前线程。

      用法:定义一个CountDownLatch变量latch,在当前线程中调用latch.await()方法,在要等待的一组线程中执行完后调用latch.countDown()方法,这样当该做线程都调用过latch.countDown()方法后就开始执行当前线程latch.await()后的方法。

  3. 障栅(CyclicBarrier

    • CyclicBarrier 类实现了一个集结点( rendezvous) 称为障栅( barrier )。考虑大量线程运行在一次计算的不同部分的情形。当所有部分都准备好时,需要把结果组合在一起。当一个线程完成了它的那部分任务后, 我们让它运行到障栅处。一旦所有的线程都到达了这个障栅,障栅就撤销, 线程就可以继续运行。

    • 障栅被称为是循环的( cyclic), 因为可以在所有等待线程被释放后被重用。在这一点上,有别于CountDownLatch, CountDownLatch 只能被使用一次。Phaser 类增加了更大的灵活性,允许改变不同阶段中参与线程的个数。

      应用场景:等待一组线程到达某个点后一起执行,该组线程达到指定点后可以再次循环执行。也可用于一组线程达达某个点后再执行某个方法。

      用法:定义一个CyclicBarrier变量barrier,线程达到某个约定点时调用barrier.await()方法,当该组所有线程都调用了barrier.await()方法后改组线程一起向下执行。

      CyclicBarrier和CountDownLatch的区别

      • CountDownLatch的计数器只能使用一次。而CyclicBarrier的计数器可以使用reset() 方法重置。所以CyclicBarrier能处理更为复杂的业务场景,比如如果计算发生错误,可以重置计数器,并让线程们重新执行一次。
      • CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得CyclicBarrier阻塞的线程数量。isBroken方法用来知道阻塞的线程是否被中断。
  4. 交换器

    • 当两个线程在同一个数据缓冲区的两个实例上工作的时候, 就可以使用交换器( Exchanger) 典型的情况是, 一个线程向缓冲区填人数据, 另一个线程消耗这些数据。当它们都完成以后,相互交换缓冲区。
  5. 同步队列

    • 同步队列是一种将生产者与消费者线程配对的机制。当一个线程调用SynchronousQueue的put 方法时, 它会阻塞直到另一个线程调用take 方法为止, 反之亦然。与Exchanger 的情况不同, 数据仅仅沿一个方向传递,从生产者到消费者。即使SynchronousQueue 类实现了BlockingQueue 接口, 概念上讲, 它依然不是一个队列。它没有包含任何元素,它的size 方法总是返回0。

      应用场景:可用于实现简单的生产者消费者模型。

      用法:生产者线程put元素到队列,若队列满则组赛到队列有空间;消费者不断从队列take获取元素,若队列空则组赛道队列有元素。

API注释

14.1 什么是

java.Iang.Thread 1.0

static void sleep(long minis)
// 休眠给定的毫秒数。
// 参数:millis 休眠的毫秒数

14.1.1 使用线程给其他任务提供机会

java.Iang.Thread 1.0

Thread(Runnable target)
//构造一个新线程, 用于调用给定目标的nm() 方法。
void start()
// 启动这个线程, 将引发调用mn() 方法。这个方法将立即返回, 并且新线程将并发运行。
void run()
// 调用关联Runnable 的run 方法。

java.lang.Runnable 1.0

void run()
// 必须覆盖这个方法, 并在这个方法中提供所要执行的任务指令。

1 4 . 2 中断线程

java.Iang.Thread 1.0

void interrupts)
// 向线程发送中断请求。线程的中断状态将被设置为true。如果目前该线程被一个sleep调用阻塞,那么, InterruptedException 异常被抛出。
static boolean interrupted()
// 测试当前线程(即正在执行这一命令的线程)是否被中断。注意,这是一个静态方法。这一调用会产生副作用—它将当前线程的中断状态重置为false。
boolean isInterrupted()
// 测试线程是否被终止。不像静态的中断方法,这一调用不改变线程的中断状态。
static Thread currentThread()
// 返回代表当前执行线程的Thread 对象。

14.3.4 被终止的线程

java.iang.Thread 1.0

void join()
//等 待终止指定的线程。
void join(long mi11is)
// 等待指定的线程死亡或者经过指定的毫秒数。
Thread.State getState() 5.0
// 得到这一线程的状态;NEW、RUNNABLEBLOCKED、WAITINGHMED_WAmNG或TERMINATED之一。
******下方方法已过时,尽量不要使用****
void stop()
// 停止该线程。这一方法已过时。
void suspend()
// 暂停这一线程的执行。这一方法已过时。
void resume()
// 恢复线程。这一方法仅仅在调用suspendO 之后调用。这一方法已过时。

14.4.1 线程优先级

java.iang.Thread 1.0

void setPriority(int newPriority)
// 设置线程的优先级。优先级必须在Thread.MIN_PRIORITY 与Thread.MAX_PRIORITY之间。一般使用Thread.NORM_PRIORITY 优先级。
static int MIN_PRIORITY
// 线程的最小优先级。最小优先级的值为1。
static int NORM_PRIORITY
// 线程的默认优先级。默认优先级为5。
static int MAX_PRIORITY
// 线程的最高优先级。最高优先级的值为10。
static void yield()
// 导致当前执行线程处于让步状态。如果有其他的可运行线程具有至少与此线程同样高的优先级,那么这些线程接下来会被调度。注意,这是一个静态方法。

1 4 . 4.2 守护线程

java.iang.Thread 1.0

void setDaemon(boolean isDaemon)
// 标识该线程为守护线程或用户线程。这一方法必须在线程启动之前调用。

1 4 . 4.3 未捕获异常处理器

java.lang.Thread 1.0

static void setDefaultUncaughtExceptionHandler(Thread.UncaughtExceptionHandler handler) 5.0
static Thread.UncaughtExceptionHandler getDefaultUncaughtExceptionHandler()) 5.0
// 设置或获取未捕获异常的默认处理器。
void setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler handler) 5.0
Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() 5.0
// 设置或获取未捕获异常的处理器。如果没有安装处理器, 则将线程组对象作为处理器。

java.Iang.Thread.UncaughtExceptionHandler 5.0

void UncaughtException(Thread t, Throwable e)
// 当一个线程因未捕获异常而终止, 按规定要将客户报告记录到日志中。
// 参数:t 由于未捕获异常而终止的线程
// 	    e 未捕获的异常对象

java.lang.ThreadGroup 1.0

void UncaughtException( Thread t, Throwable e)
// 如果有父线程组, 调用父线程组的这一方法; 或者, 如果Thread 类有默认处理器,调用该处理器, 否则, 输出栈轨迹到标准错误流上(但是, 如果e 是一个ThreadDeath对象, 栈轨迹是被禁用的。ThreadDeath 对象由stop 方法产生, 而该方法已经过时)。

14.5.3 锁对象

java.util.concurrent.locks.Lock 5.0

void lock( )
// 获取这个锁;如果锁同时被另一个线程拥有则发生阻塞。
void unlock( )
// 释放这个锁。

java,util.concurrent.locks.ReentrantLock 5.0

ReentrantLock()
// 构建一个可以被用来保护临界区的可重入锁。
ReentrantLock(boo1ean fair)
// 构建一个带有公平策略的锁。一个公平锁偏爱等待时间最长的线程。但是,这一公平的保证将大大降低性能。所以, 默认情况下, 锁没有被强制为公平的。
// 使用公平锁比使用常规锁要慢很多

14.5.4 条件对象

java.util.concurrent.locks.Lock 5.0

Condition newCondition( )
// 返回一个与该锁相关的条件对象。

java.util.concurrent.locks.Condition 5.0

void await()
// 将该线程放到条件的等待集中。
void signalA11()
// 解除该条件的等待集中的所有线程的阻塞状态。
void signal ()
// 从该条件的等待集中随机地选择一个线程, 解除其阻塞状态。

14.5.5 synchronized 关键字

java丨. ang.Object 1.0

void notifyAll()
// 解除那些在该对象上调用wait 方法的线程的阻塞状态。该方法只能在同步方法或同步块内部调用。如果当前线程不是对象锁的持有者,该方法拋出一个IllegalMonitorStateException异常。
void notify()
// 随机选择一个在该对象上调用wait 方法的线程, 解除其阻塞状态。该方法只能在一个同步方法或同步块中调用。如果当前线程不是对象锁的持有者, 该方法抛出一个IllegalMonitorStateException 异常。
void wait()
// 导致线程进人等待状态直到它被通知。该方法只能在一个同步方法中调用。如果当前线程不是对象锁的持有者,该方法拋出一个IllegalMonitorStateException 异常。
void wait(long mi11is)
void wait(long mi11is, int nanos)
// 导致线程进入等待状态直到它被通知或者经过指定的时间。这些方法只能在一个同步方法中调用。如果当前线程不是对象锁的持有者该方法拋出一个IllegalMonitorStateException异常。
// 参数: millis 毫秒数
//       nanos 纳秒数,<1 000 000

14.5.12 线程局部变量

java.lang.ThreadLocal 1.2

T get()
// 得到这个线程的当前值。如果是首次调用get, 会调用initialize 来得到这个值。
protected initialize()
// 应覆盖这个方法来提供一个初始值。默认情况下,这个方法返回mill。
void set(T t)
// 为这个线程设置一个新值。
void remove()
// 删除对应这个线程的值。
static <S> ThreadLocal <S> withInitial(Suppl ier< ? extends S> supplier) 8
// 创建一个线程局部变量, 其初始值通过调用给定的supplier 生成。

java.util.concurrent.ThreadLocalRandom 7

static ThreadLocalRandom current( )
// 返回特定于当前线程的Random 类实例。

14.5.13 锁测试与超时

java.util.concurrent.locks.Lock 5.0

boolean tryLock()
// 尝试获得锁而没有发生阻塞;如果成功返回真。这个方法会抢夺可用的锁, 即使该锁有公平加锁策略, 即便其他线程已经等待很久也是如此。
boolean tryLock(long time, TimeUnit unit)
// 尝试获得锁,阻塞时间不会超过给定的值;如果成功返回true。
void lockInterruptibly()
// 获得锁, 但是会不确定地发生阻塞。如果线程被中断, 抛出一个InterruptedException异常。

java.util.concurrent.locks.Condition 5.0

boolean await(long time , TimeUnit unit)
// 进人该条件的等待集, 直到线程从等待集中移出或等待了指定的时间之后才解除阻塞。如果因为等待时间到了而返回就返回false , 否则返回true。
void awaitUninterruptibly()
// 进人该条件的等待集, 直到线程从等待集移出才解除阻塞。如果线程被中断, 该方法不会抛出InterruptedException 异常。

14.5.14 读/ 写锁

java.util.concurrent.locks.ReentrantReadWriteLock 5.0

Lock readLock()
// 得到一个可以被多个读操作共用的读锁, 但会排斥所有写操作。
Lock writeLock()
// 得到一个写锁, 排斥所有其他的读操作和写操作。

1 4 . 6 阻塞队列

java.util.concurrent.ArrayBlockingQueue 5.0

ArrayBlockingQueue(int capacity)
ArrayBlockingQueue(int capacity, boolean fair )
// 构造一个带有指定的容量和公平性设置的阻塞队列。该队列用循环数组实现。

java.util.concurrent.LinkedBlockingDeque 6

LinkedBlockingQueue()
LinkedBlockingDeque()
// 构造一个无上限的阻塞队列或双向队列,用链表实现。
LinkedBlockingQueue(int capacity)
LinkedBlockingDeque(int capacity)
// 根据指定容量构建一个有限的阻塞队列或双向队列,用链表实现。

java.util.concurrent.DelayQueue<E extends Delayed〉5.0

DelayQueue( )
// 构造一个包含Delayed 元素的无界的阻塞时间有限的阻塞队列。只有那些延迟已经超过时间的元素可以从队列中移出。

java.util.concurrent.Delayed 5.0

long getDelay(TimeUnit unit )
// 得到该对象的延迟,用给定的时间单位进行度量。

java.util.concurrent.PriorityBlockingQueue 5.0

PriorityBlockingQueue()
PriorityBlockingQueue(int initialCapacity)
PriorityBlockingQueue(int initialCapacity, Comparator ? super E> comparator)
// 构造一个无边界阻塞优先队列,用堆实现。
// 参数: initialCapacity 优先队列的初始容量。默认值是11。
//       comparator 用来对元素进行比较的比较器, 如果没有指定, 则元素必须实现Comparable 接口。

java.util.concurrent.BlockingQueue 5.0

void put(E element )
// 添加元素, 在必要时阻塞
E take()
// 移除并返回头元素, 必要时阻塞。
boolean offer(E element, long time, TimeUnit unit)
// 添加给定的元素, 如果成功返回true, 如果必要时阻塞, 直至元素已经被添加或超时。
E poll(long time, TimeUnit unit)
// 移除并返回头元素, 必要时阻塞, 直至元素可用或超时用完。失败时返回null。

java.util.concurrent.BiockingDeque 6

void putFirst(E element)
void putLast(E element)
// 添加元素, 必要时阻塞。
E takeFirst()
E takeLast()
// 移除并返回头元素或尾元素, 必要时阻塞。
boolean offerFirst(E element, long time, TimeUnit unit)
boolean offerLast(E element, long time, TimeUnit unit)
// 添加给定的元素, 成功时返回true, 必要时阻塞直至元素被添加或超时。
E pollFirst(long time, TimeUnit unit)
E pollLast(long time, TimeUnit unit)
// 移动并返回头元素或尾元素,必要时阻塞, 直至元素可用或超时。失败时返回mill。

java.util.concurrent.Transfer Queue 7

void transfer(E element)
boolean tryTransfer(E element, long time, TimeUnit unit)
// 传输一个值, 或者尝试在给定的超时时间内传输这个值, 这个调用将阻塞,直到另一个线程将元素删除。第二个方法会在调用成功时返回true。

14.7 线程安全的集合

java.util.concurrent.ConcurrentLinkedQueue 5.0

ConcurrentLinkedQueue<E>()
// 构造一个可以被多线程安全访问的无边界非阻塞的队列。

java.util.concurrent.ConcurrentLinkedQueue 6

ConcurrentSkipListSet<E>()
ConcurrentSkipListSet<E>(Comparator< ? super E> comp)
// 构造一个可以被多线程安全访问的有序集。第一个构造器要求元素实现Comparable接口。

java.util.concurrent.ConcurrentSkipListMap<K, V> 6

ConcurrentHashMap<K, V>()
ConcurrentHashMap<K, V>(int initialCapacity)
ConcurrentHashMap<K, V>(int initialCapacity, float loadFactor, int concurrencyLevel)
// 构造一个可以被多线程安全访问的散列映射表。
// 参数: initialCapacity 集合的初始容量。默认值为16。
// 		loadFactor 控制调整: 如果每一个桶的平均负载超过这个因子,表的大小会被重新调整。默认值为0.75。
// 		concurrencyLevel 并发写者线程的估计数目。
ConcurrentSkipListMap<K, V>()
ConcurrentSkipListSet<K, V>(Comparator< ? super K> comp)
// 构造一个可以被多线程安全访问的有序的映像表。第一个构造器要求键实现Comparable 接口

14.7.7 较早的线程安全集合

java.util.CoIlections 1.2

static<E> Collection<E> synchronizedCollection(Collection<E> c)
static<E> List synchronizedList(List<E> c)
static<E> Set synchronizedSet(Set< E> c)
static<E> SortedSet synchronizedSortedSet(SortedSet <E> c)
static<K, V> Map<K, V> synchronizedMap(Map<K, V > c)
static<K, V> SortedMap<K, V> synchronizedSortedMap(SortedMap<K, V> c)
// 构建集合视图, 该集合的方法是同步的。

14.8 Callable 与Future

java.util.concurrent.Callable 5.0

V call()
// 运行一个将产生结果的任务。

java.util.concurrent.Future 5.0

V get()
V get(long time, TimeUnit unit)
// 获取结果, 如果没有结果可用, 则阻塞直到真正得到结果超过指定的时间为止。如果不成功, 第二个方法会拋出TimeoutException 异常。
boolean cancel(boolean mayInterrupt)
// 尝试取消这一任务的运行。如果任务已经开始, 并且maylnterrupt 参数值为true, 它就会被中断。如果成功执行了取消操作, 返回true。
boolean isCancelled()
// 如果任务在完成前被取消了, 则返回true。
boolean isDone()
// 如果任务结束,无论是正常结束、中途取消或发生异常, 都返回true。

java.util.concurrent.FutureTask 5.0

FutureTask(Cal1able<V> task)
FutureTask(Runnable task, V result)
// 构造一个既是Future<V> 又是Runnable 的对象。

14.9.1 线程池

java.util.concurrent.Executors 5.0

ExecutorService newCachedThreadPool()
// 返回一个带缓存的线程池, 该池在必要的时候创建线程, 在线程空闲60 秒之后终止线程。
ExecutorService newFixedThreadPool(int threads)
// 返回一个线程池, 该池中的线程数由参数指定。
ExecutorService newSingleThreadExecutor()
// 返回一个执行器, 它在一个单个的线程中依次执行各个任务。

java.util.concurrent.ExecutorService 5.0

Future<T> submit(Cal1able<T> task)
Future<T> submit(Runnable task, T result)
Future< ?> submit(Runnable task)
// 提交指定的任务去执行。
void shutdown()
// 关闭服务, 会先完成已经提交的任务而不再接收新的任务。

java.util.concurrent.ThreadPoolExecutor 5.0

int getLargestPoolSize()
// 返回线程池在该执行器生命周期中的最大尺寸。

14.9.2 预定执行

java.util.concurrent.Executors 5.0

ScheduledExecutorService newScheduledThreadPool(int threads)
// 返回一个线程池, 它使用给定的线程数来调度任务。
ScheduledExecutorService newSingleThreadScheduledExecutor( )
// 返回一个执行器, 它在一个单独线程中调度任务。

java.util.concurrent.ScheduledExecutorService 5.0

ScheduledFuture<V> schedule(Cal 1able<V> task, long time, Timellnit unit)
ScheduledFuture< ?> schedule(Runnable task, long time, TimeUnit unit)
// 预定在指定的时间之后执行任务。
ScheduledFuture< ? > scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit)
// 预定在初始的延迟结束后, 周期性地运行给定的任务, 周期长度是period。
ScheduledFuture< ?> scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit unit)
// 预定在初始的延迟结束后周期性地运行给定的任务, 在一次调用完成和下一次调用开始之间有长度为delay 的延迟。

14.9.3 控制任务组

java.util.concurrent.ExecutorService 5.0

T invokeAny(Collection<Callable<T>> tasks)
T invokeAny(Collection<Callable<T>> tasks, long timeout, TimeUnit unit )
// 执行给定的任务, 返回其中一个任务的结果。第二个方法若发生超时, 抛出一个 Timeout Exception 异常。
List<Future<T>> invokeAll(Collection<Callable<T>> tasks)
List<Future<T>> invokeAll(Collection<Callable<T>> tasks, long timeout, TimeUnit unit)
// 执行给定的任务, 返回所有任务的结果。第二个方法若发生超时, 拋出一个TimecmtException 异常。

java.util.concurrent.ExecutorCompletionService5.0

ExecutorCompletionService(Executor e)
// 构建一个执行器完成服务来收集给定执行器的结果。
Future<V> submit(Callable<T> task)
Future<V> submit(Runnable task, V result)
// 提交一个任务给底层的执行器。
Future<V> take()
// 移除下一个已完成的结果, 如果没有任何已完成的结果可用则阻塞。
Future<V> poll()
Future<V> poll(long time, TimeUnit unit)
// 移除下一个已完成的结果, 如果没有任何已完成结果可用则返回null。第二个方法将等待给定的时间。
0
  1. 支付宝打赏

    qrcode alipay
  2. 微信打赏

    qrcode weixin

评论区