ThreadPoolExecutor配合CountDownLatch实现线程等待同步返回值
一个需求,需要重复某个调用,然后返回结果。
此时重复调用这个动作,可以用线程池进行并发操作,而等待返回结果,需要用到CountDownLatch类。
CountDownLatch是JAVA的一个计数器,核心方法就是
countDown():计数减一
await(long timeout, TimeUnit unit):等待计数清零,可以定义超时时间
在说CountDownLatch前,必须要先提一下AQS。AQS全称抽象队列同步器(AbstractQuenedSynchronizer),它是一个可以用来实现线程同步的基础框架。当然,它不是我们理解的Spring这种框架,它是一个类,类名就是AbstractQuenedSynchronizer,如果我们想要实现一个能够完成线程同步的锁或者类似的同步组件,就可以在使用AQS来实现,因为它封装了线程同步的方式,我们在自己的类中使用它,就可以很方便的实现一个我们自己的锁。
CountDownLatch的被称为门栓,可以将它看成是门上的锁,它会给门上多把锁,只有每一把锁都解开,才能通过。对于线程来说,CountDownLatch会阻塞线程的运行,只有当CountDownLatc内部记录的值减小为0,线程才能继续向前执行。
CountDownLatch底层通过AQS实现,AQS的一般使用方式就是以内部类的形式继承它,CountDownLatch就是这么使用它的。在CountDownLatch内部有一个内部类Sync,继承自AQS,并重写了AQS加锁解锁的方法,并通过Sync的对象,调用AQS的方法,阻塞线程的运行。我们知道,创建一个CountDownLatch对象时,需要传入一个整数值count,只有当count被减小为0时线程才能通过await方法,否则将被await阻塞。这里实际上是这样的:当线程运行到await方法时,需要去获取锁(锁由AQS实现),若count不为0,则线程就会获取锁失败,被阻塞;若count为0,则就能顺利通过。CountDownLatch是一次性的,因为没有方法可以增加count的值,也就是说,一旦count被减小为0,则之后就一直是0了,也就再也不能阻塞线程了。
具体例子如下:
定义一个业务线程类
public class ThreadCountDown extends Thread{ //线程安全 public static ConcurrentHashMap<Integer,String> map = new ConcurrentHashMap<Integer,String>();; private ThreadDTO dto; public ThreadDTO getDto() { return dto; } public void setDto(ThreadDTO dto) { this.dto = dto; } public ThreadCountDown(ThreadDTO dto){ this.dto = dto; } @Override public void run(){ try { Thread.sleep(2000); System.out.println(dto.getIndex()); dto.setName("name"+dto.getIndex()); map.put(dto.getIndex(),"name"+dto.getIndex()); }catch (Exception e){ }finally { dto.getCountDownLatch().countDown(); } } }
主方法与线程类的数据传输通过ThreadDTO来进行
public class ThreadDTO { private String name; private String path; private int index; private CountDownLatch countDownLatch; public String getName() { return name; } public void setName(String name) { this.name = name; } public String getPath() { return path; } public void setPath(String path) { this.path = path; } public int getIndex() { return index; } public void setIndex(int index) { this.index = index; } public CountDownLatch getCountDownLatch() { return countDownLatch; } public void setCountDownLatch(CountDownLatch countDownLatch) { this.countDownLatch = countDownLatch; } }
主方法:创建线程池ThreadPoolExecutor,然后执行submit(Runnable task, T result)方法
public class ThreadCountDownTest { /* corePoolSize:指定了线程池中的线程数量,它的数量决定了添加的任务是开辟新的线程去执行,还是放到workQueue任务队列中去; maximumPoolSize:指定了线程池中的最大线程数量,这个参数会根据你使用的workQueue任务队列的类型,决定线程池会开辟的最大线程数量; keepAliveTime:当线程池中空闲线程数量超过corePoolSize时,多余的线程会在多长时间内被销毁; unit:keepAliveTime的单位 workQueue:任务队列,被添加到线程池中,但尚未被执行的任务;它一般分为直接提交队列、有界任务队列、无界任务队列、优先任务队列几种; threadFactory:线程工厂,用于创建线程,一般用默认即可; handler:拒绝策略;当任务太多来不及处理时,如何拒绝任务 */ private static ThreadPoolExecutor executor = new ThreadPoolExecutor(10,15,60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.CallerRunsPolicy()); public static void main(String[] args) { do1(); do2(); } public static void do1(){ System.out.println("start"); int count = 100; CountDownLatch countDownLatch = new CountDownLatch(count); ArrayList<Future<ThreadDTO>> list = new ArrayList<Future<ThreadDTO>>(); for(int i=0; i < count; i ++){ ThreadDTO dto = new ThreadDTO(); dto.setIndex(i); dto.setCountDownLatch(countDownLatch); ThreadCountDown cd = new ThreadCountDown(dto); Future<ThreadDTO> f = executor.submit(cd,dto); list.add(f); } try { //设置超时时间 countDownLatch.await(300,TimeUnit.SECONDS); }catch (Exception e){ } System.out.println("end"); for(int i=0; i < list.size(); i ++){ try { System.out.println(list.get(i).get().getName()); }catch (Exception e){ } } } public static void do2(){ System.out.println("start"); int count = 100; CountDownLatch countDownLatch = new CountDownLatch(count); ArrayList<ThreadDTO> list = new ArrayList<ThreadDTO>(); for(int i=0; i < count; i ++){ ThreadDTO dto = new ThreadDTO(); dto.setIndex(i); dto.setCountDownLatch(countDownLatch); ThreadCountDown cd = new ThreadCountDown(dto); executor.submit(cd); list.add(dto); } try { //设置超时时间 countDownLatch.await(300,TimeUnit.SECONDS); }catch (Exception e){ } System.out.println("end"); //等线程池返回后,list中就有数据了,提前get的话就会等带线程执行,并发就失效了 for(int i=0; i < list.size(); i ++){ System.out.println(list.get(i).getName()); } } }
其中do1和do2大同小异,区别在于do1调用submit(Runnable task, T result),传入result,do2调用submit(Runnable task)方法,无result,但数据会被dto带出来。
public <T> Future<T> submit(Runnable task, T result)方法,在调用future.get()时,会阻断线程,导致get方法一直在等待线程的返回,并发就失效了,需要等带所有线程结束后,即在countDown.awiit后,才可调用future.get()。
同理,public <T> Future<T> submit(Runnable task)方法,线程的入参dto,在线程没执行完时,dto.get()不会阻断线程,但是获取的值是空的,所以也需要等待所有线程完成后,再来循环dto的list。