The World

scribble

Ralph YY's Blog

15 Jul 2020
CompletableFuture Implement At Multiple Thread

Background

最近有这么一个问题:
1.数据库返回一个List的数据,要求对每个数据点结构进行过滤,满足要求的加入result。
2.每个数据点都是独立的,是从cache中获取的,相互间没有依赖关系。
3.如果result数量达到了上限比如250,那么就终止多线程,直接返回结果。

Thinking

多线程从Java1.5就有了,之前使用多线程就无脑的拷贝旧代码,比如Executor, Callable, Future啥的,但是这些都是老API了,应用上是有缺陷的,比如

List<Future<?>> futureList = executor.invokeAll(callableList);
for (Future<List<?>> future : futureList) {
    future.get();
}

如果invokeAll就会运行直到全部线程结束,中途无法终止。这样提前跳出多线程就无法实现。也可以将future写在循环内

for(int i=0;i<100;i++){
    Callable<A> callable = () -> new AnyFunction();
    Future future = executor.submit(profileGDFVDataCallable);
    future.get();
}

这样的话可以获取每个线程的结果,然后根据结果就可以call executorService.shutdownNow()直接中断,但是future.get()会阻塞主线程,所以这个实际上就是单线程在跑了。

如果解决呢,一种方式就是使用CompletionService抱在ExecutiveService之上,见reference[1]

public static void main(String[] args)  {
    Long start = System.currentTimeMillis();
    //开启3个线程
    ExecutorService exs = Executors.newFixedThreadPool(5);
    try {
        int taskCount = 10;
        //结果集
        List<Integer> list = new ArrayList<Integer>();
        //1.定义CompletionService
        CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(exs);  
        List<Future<Integer>> futureList = new ArrayList<Future<Integer>>();
        //2.添加任务
        for(int i=0;i<taskCount;i++){
            futureList.add(completionService.submit(new Task(i+1)));
        }

        //使用内部阻塞队列的take()
        for(int i=0;i<taskCount;i++){
            Integer result = completionService.take().get();//采用completionService.take(),内部维护阻塞队列,任务先完成的先获取到
            System.out.println("任务i=="+result+"完成!"+new Date());
            list.add(result);
        }
        System.out.println("list="+list);
        System.out.println("总耗时="+(System.currentTimeMillis()-start));
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        exs.shutdown();//关闭线程池
    }	 
}

但是感觉还是缺乏灵活性

CompletableFuture

在Java1.8有了CompletableFuture,功能上比较多,可以用lambda,可以catch exception,比较好用。
而且最重要的是使用CompletableFuture可以在多线程的同时还处理返回数据,也就是说get()不block,性能好又好用,所以在Java8后无脑用这个就行。

reference[1]的例子很好,这里再抄一个

public static void main(String[] args) {
    Long start = System.currentTimeMillis();
    //结果集
    List<String> list = new ArrayList<String>();
    List<String> list2 = new ArrayList<String>();
    //定长10线程池
    ExecutorService exs = Executors.newFixedThreadPool(10);
    List<CompletableFuture<String>> futureList = new ArrayList<>();
    final List<Integer> taskList = Lists.newArrayList(2,1,3,4,5,6,7,8,9,10);
    try {
        ////方式一:循环创建CompletableFuture list,调用sequence()组装返回一个有返回值的CompletableFuture,返回结果get()获取
        //for(int i=0;i<taskList.size();i++){
        //    final int j=i;
        //    //异步执行
        //    CompletableFuture<String> future = CompletableFuture.supplyAsync(()->calc(taskList.get(j)), exs)
        //        //Integer转换字符串    thenAccept只接受不返回不影响结果
        //        .thenApply(e->Integer.toString(e))
        //        //如需获取任务完成先后顺序,此处代码即可
        //        .whenComplete((v, e) -> {
        //            System.out.println("任务"+v+"完成!result="+v+",异常 e="+e+","+new Date());
        //            list2.add(v);
        //        })
        //        ;
        //    futureList.add(future);
        //}
        ////流式获取结果:此处是根据任务添加顺序获取的结果
        //list = sequence(futureList).get();
		 
        //方式二:全流式处理转换成CompletableFuture[]+组装成一个无返回值CompletableFuture,join等待执行完毕。返回结果whenComplete获取
        CompletableFuture[] cfs = taskList.stream().map(object-> CompletableFuture.supplyAsync(()->calc(object), exs)
            .thenApply(h->Integer.toString(h))
            //如需获取任务完成先后顺序,此处代码即可
            .whenComplete((v, e) -> {
                System.out.println("任务"+v+"完成!result="+v+",异常 e="+e+","+new Date());
                list2.add(v);
            })).toArray(CompletableFuture[]::new);
        //等待总任务完成,但是封装后无返回值,必须自己whenComplete()获取
        CompletableFuture.allOf(cfs).join();
        System.out.println("任务完成先后顺序,结果list2="+list2+";任务提交顺序,结果list="+list+",耗时="+(System.currentTimeMillis()-start));
    } catch (Exception e) {
        e.printStackTrace();
    }finally {
        exs.shutdown();
    }
} 

下面是我最后使用的到达limit就跳出的例子:

//get test data in multi threading
List<TestData> resultList = new ArrayList<>();
//using the CompletableFuture instead of future to avoid the blocking the main thread
List<CompletableFuture<TestData>> futureList = new ArrayList<>();
for (int i=0;i<10000000;i++) {
    CompletableFuture<TestData> future = CompletableFuture.supplyAsync(() ->{
        //这里可以写上处理逻辑,也可以将某些logic写法stream lambda的表达式中去比如thenCombine()
        RawData rawData = TestDao.findRawData(i);
        if(rawData != null){
            return convertToRealData(rawData);
        }
        return null;
    }, executorService).exceptionally(e -> {
        System.out.println("Failed to get test Data", e.getCause());
        return null;
    });
    futureList.add(future);
}
if(!Util.isNullOrEmpty(futureList)){
    for(int i=0; i < futureList.size(); i++){
        CompletableFuture<TestData> future = futureList.get(i);
        TestData data = future.get(1, TimeUnit.MINUTES);
        if(data != null){
            if(resultList.size() < 250){
                resultList.add(data);
            } else {
                // jump out early when hit the company limit
                executorService.shutdownNow();
                break;
            }
        }
    }
}

唯一的缺点就是也要写不少代码,虽然CompletableFuture.supplyAsync处理比较方便,不过对于每个线程的异常还是需要单独处理,如果内部方法丢出Exception(不是RuntimeException)的话还要再去try catch,而不同直接用stream来处理。stream的调用过程中也要避免有错误,否则stream就被破坏了,结果未知呵呵。

Reference

1.CompletableFuture原理解析
2.多线程并发执行任务,取结果归集。终极总结:Future、FutureTask、CompletionService、CompletableFuture


Til next time,
at 00:00

scribble

comments powered by Disqus