ThreadPoolExecutor配合CountDownLatch实现线程等待同步返回值

一个需求,需要重复某个调用,然后返回结果。

此时重复调用这个动作,可以用线程池进行并发操作,而等待返回结果,需要用到CountDownLatch类。

CountDownLatch是JAVA的一个计数器,核心方法就是

countDown():计数减一

await(long timeout, TimeUnit unit):等待计数清零,可以定义超时时间


在说CountDownLatch前,必须要先提一下AQS。AQS全称抽象队列同步器(AbstractQuenedSynchronizer),它是一个可以用来实现线程同步的基础框架。当然,它不是我们理解的Spring这种框架,它是一个类,类名就是AbstractQuenedSynchronizer,如果我们想要实现一个能够完成线程同步的锁或者类似的同步组件,就可以在使用AQS来实现,因为它封装了线程同步的方式,我们在自己的类中使用它,就可以很方便的实现一个我们自己的锁。


CountDownLatch的被称为门栓,可以将它看成是门上的锁,它会给门上多把锁,只有每一把锁都解开,才能通过。对于线程来说,CountDownLatch会阻塞线程的运行,只有当CountDownLatc内部记录的值减小为0,线程才能继续向前执行。


CountDownLatch底层通过AQS实现,AQS的一般使用方式就是以内部类的形式继承它,CountDownLatch就是这么使用它的。在CountDownLatch内部有一个内部类Sync,继承自AQS,并重写了AQS加锁解锁的方法,并通过Sync的对象,调用AQS的方法,阻塞线程的运行。我们知道,创建一个CountDownLatch对象时,需要传入一个整数值count,只有当count被减小为0时线程才能通过await方法,否则将被await阻塞。这里实际上是这样的:当线程运行到await方法时,需要去获取锁(锁由AQS实现),若count不为0,则线程就会获取锁失败,被阻塞;若count为0,则就能顺利通过。CountDownLatch是一次性的,因为没有方法可以增加count的值,也就是说,一旦count被减小为0,则之后就一直是0了,也就再也不能阻塞线程了。


具体例子如下:


定义一个业务线程类

public class ThreadCountDown extends Thread{
    //线程安全
    public static ConcurrentHashMap<Integer,String> map = new ConcurrentHashMap<Integer,String>();;
    private ThreadDTO dto;
    public ThreadDTO getDto() {
        return dto;
    }
    public void setDto(ThreadDTO dto) {
        this.dto = dto;
    }
    public ThreadCountDown(ThreadDTO dto){
        this.dto = dto;
    }
    @Override
    public void run(){
        try {
            Thread.sleep(2000);
            System.out.println(dto.getIndex());
            dto.setName("name"+dto.getIndex());
            map.put(dto.getIndex(),"name"+dto.getIndex());
        }catch (Exception e){
        }finally {
            dto.getCountDownLatch().countDown();
        }
    }
}


主方法与线程类的数据传输通过ThreadDTO来进行

public class ThreadDTO {
    private String name;
    private String path;
    private int index;
    private CountDownLatch countDownLatch;
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public String getPath() {
        return path;
    }
    public void setPath(String path) {
        this.path = path;
    }
    public int getIndex() {
        return index;
    }
    public void setIndex(int index) {
        this.index = index;
    }
    public CountDownLatch getCountDownLatch() {
        return countDownLatch;
    }
    public void setCountDownLatch(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
    }
}


主方法:创建线程池ThreadPoolExecutor,然后执行submit(Runnable task, T result)方法

public class ThreadCountDownTest {
    /*
    corePoolSize:指定了线程池中的线程数量,它的数量决定了添加的任务是开辟新的线程去执行,还是放到workQueue任务队列中去;
    maximumPoolSize:指定了线程池中的最大线程数量,这个参数会根据你使用的workQueue任务队列的类型,决定线程池会开辟的最大线程数量;
    keepAliveTime:当线程池中空闲线程数量超过corePoolSize时,多余的线程会在多长时间内被销毁;
    unit:keepAliveTime的单位
    workQueue:任务队列,被添加到线程池中,但尚未被执行的任务;它一般分为直接提交队列、有界任务队列、无界任务队列、优先任务队列几种;
    threadFactory:线程工厂,用于创建线程,一般用默认即可;
    handler:拒绝策略;当任务太多来不及处理时,如何拒绝任务
     */
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(10,15,60, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.CallerRunsPolicy());
    
    public static void main(String[] args) {
        do1();
        do2();
    }
    
    
    public static void do1(){
        System.out.println("start");
        int count = 100;
        CountDownLatch countDownLatch = new CountDownLatch(count);
        ArrayList<Future<ThreadDTO>> list = new ArrayList<Future<ThreadDTO>>();
        for(int i=0; i < count; i ++){
            ThreadDTO dto = new ThreadDTO();
            dto.setIndex(i);
            dto.setCountDownLatch(countDownLatch);
            ThreadCountDown cd = new ThreadCountDown(dto);
            
            Future<ThreadDTO> f = executor.submit(cd,dto);
            list.add(f);
            
        }
        try {
            //设置超时时间
            countDownLatch.await(300,TimeUnit.SECONDS);
        }catch (Exception e){
        }
        System.out.println("end");
        for(int i=0; i < list.size(); i ++){
            try {
                System.out.println(list.get(i).get().getName());
            }catch (Exception e){
            }
        }
    }
    
    
    public static void do2(){
        System.out.println("start");
        int count = 100;
        CountDownLatch countDownLatch = new CountDownLatch(count);
        ArrayList<ThreadDTO> list = new ArrayList<ThreadDTO>();
        for(int i=0; i < count; i ++){
            ThreadDTO dto = new ThreadDTO();
            dto.setIndex(i);
            dto.setCountDownLatch(countDownLatch);
            ThreadCountDown cd = new ThreadCountDown(dto);
            executor.submit(cd);
            list.add(dto);
        }
        try {
            //设置超时时间
            countDownLatch.await(300,TimeUnit.SECONDS);
        }catch (Exception e){
        }
        System.out.println("end");
        //等线程池返回后,list中就有数据了,提前get的话就会等带线程执行,并发就失效了
        for(int i=0; i < list.size(); i ++){
            System.out.println(list.get(i).getName());
        }
    }
}


其中do1和do2大同小异,区别在于do1调用submit(Runnable task, T result),传入result,do2调用submit(Runnable task)方法,无result,但数据会被dto带出来。


public <T> Future<T> submit(Runnable task, T result)方法,在调用future.get()时,会阻断线程,导致get方法一直在等待线程的返回,并发就失效了,需要等带所有线程结束后,即在countDown.awiit后,才可调用future.get()。

同理,public <T> Future<T> submit(Runnable task)方法,线程的入参dto,在线程没执行完时,dto.get()不会阻断线程,但是获取的值是空的,所以也需要等待所有线程完成后,再来循环dto的list。

{context}