首页>>后端>>java->Java多线程专题之Callable、Future与FutureTask(含源码分析)

Java多线程专题之Callable、Future与FutureTask(含源码分析)

时间:2023-12-07 本站 点击:0

前言

大家好,一直以来我都本着用最通俗的话理解核心的知识点, 我认为所有的难点都离不开 基础知识 的铺垫。目前正在出一个Java多线程专题长期系列教程,从入门到进阶, 篇幅会较多, 喜欢的话,给个关注❤️ ~

适合人群

有一定的Java基础

想学习或了解多线程开发

想提高自己的同学

大佬可以绕过 ~

背景

之前给大家讲了一些框架的使用,这些都属于业务层面的东西,你需要熟练掌握它并在项目中会运用它即可,但这些对自身技术的积累是远远不够的,如果你想要提高自己,对于语言本身你需要花更多的时间去挖掘而不是局限于框架的使用,所以之前为什么跟大家一直强调基础的重要性,框架可以千变万化,层出不穷,但是基础它是不变的,不管是学java还是前端或者是其它语言, 这一点大家还是需要认清的。

接下来的几期会专门讲多线程这一块,篇幅会较多,耐心看完你一定会有收获~

情景回顾

上期带大家学习了什么是进阶学习了Thread以及分析了它的一些源码,本期带大家学习Callable、Future与FutureTask的用法以及源码分析, 内容较多, 我们一起来看一下吧~

Callable & Future

之前我们通过Runnable,Thread就可以创建一个线程,但是它也有一个局限,就是没有返回值,有时候我们的需求需要结合多任务处理后的数据做一些事情,所以通过上边的方法就不好解决了。

下面我们看一下Callable

public interface Callable<V> {    /**     * Computes a result, or throws an exception if unable to do so.     *     * @return computed result     * @throws Exception if unable to compute a result     */    V call() throws Exception;}

首先它是一个接口,且还提供了泛型的支持,call方法有返回值, 那怎么使用它呢,肯定是要实现它

public class CallableTest {    public static class CallableDemo implements Callable<String> {        @Override        public String call() throws Exception {            return "hello";        }    }    public static void main(String[] args) throws Exception {        CallableDemo demo = new CallableDemo();        String result = demo.call();        System.out.println(result);        System.out.println("main");    }}

运行一下实际输出

hellomain

发现返回的结果输出出去了,但是这里有个问题,这个main输出在hello之后,似乎好像没有开启一个线程,依然是同步执行的,是这样吗,我们看一下call内部的线程环境

 public String call() throws Exception {    System.out.println(Thread.currentThread());    Thread.sleep(3000);    return "hello";}

运行一下实际输出

Thread[main,5,main]hellomain

好家伙,还是main线程内部,并且线程还被阻塞了,原来new是开启不了线程的,只是单纯的实现了一下它的接口,我们姿势搞错了。其实它的源码上加了注释的,说通常会借助Excutors类使用,这个类是用来创建线程池的,这个我们后边讲,这里给大家演示一下

public static void main(String[] args) throws Exception {    CallableDemo demo = new CallableDemo();    // 创建线程池    ExecutorService executor = Executors.newCachedThreadPool();    // 提交任务    Future<String> future = executor.submit(demo);    System.out.println("main");}

实际输出:

mainThread[pool-1-thread-1,5,main]

发现是单独线程执行的,并且没有阻塞线程。我们发现这里也用到了Future,这个翻译过来时未来的意思,这里也就是结果发生在后边,它是一个异步情况, 那么我们如何获取到结果呢?

System.out.println(future.get());System.out.println("main");

实际输出:

Thread[pool-1-thread-1,5,main]hellomain

发现结果拿到了,但是运行的时候好像线程被阻塞了,我们可以发现get()会导致线程阻塞,举一反三,我想不阻塞的情况下拿到返回值,可以吗❓那有什么办法呢?开启单独的线程不就好了,那么在单独的线程可以拿到其它线程的值吗,我们来试一下

new Thread(() -> {    try {        System.out.println(future.get());    } catch (InterruptedException e) {        e.printStackTrace();    } catch (ExecutionException e) {        e.printStackTrace();    }}).start();System.out.println("main");

实际运行输出:

public class CallableTest {    public static class CallableDemo implements Callable<String> {        @Override        public String call() throws Exception {            return "hello";        }    }    public static void main(String[] args) throws Exception {        CallableDemo demo = new CallableDemo();        String result = demo.call();        System.out.println(result);        System.out.println("main");    }}0

发现,这下就对了~

Future & FutureTask 源码解析

端起小板凳,这部分好好听,我们主要看下它的源码实现。我们上文使用到了 Future,我们看一下它的定义,发现它也是一个接口

public class CallableTest {    public static class CallableDemo implements Callable<String> {        @Override        public String call() throws Exception {            return "hello";        }    }    public static void main(String[] args) throws Exception {        CallableDemo demo = new CallableDemo();        String result = demo.call();        System.out.println(result);        System.out.println("main");    }}1

还有一个接口叫做RunnableFuture,FutureTask是它的一个实现类,这个类帮我实现了很多好用的方法,因为我们自己实现的话是很麻烦的

public class CallableTest {    public static class CallableDemo implements Callable<String> {        @Override        public String call() throws Exception {            return "hello";        }    }    public static void main(String[] args) throws Exception {        CallableDemo demo = new CallableDemo();        String result = demo.call();        System.out.println(result);        System.out.println("main");    }}2

之前的例子也可以用FutureTask改写成:

public class CallableTest {    public static class CallableDemo implements Callable<String> {        @Override        public String call() throws Exception {            return "hello";        }    }    public static void main(String[] args) throws Exception {        CallableDemo demo = new CallableDemo();        String result = demo.call();        System.out.println(result);        System.out.println("main");    }}3

它继承了 Runnable, Future接口,我们之前调用的get方法就是其中之一,来一起看一下这个get是如何拿到值的,该部分源码来自FutureTask类实现

public class CallableTest {    public static class CallableDemo implements Callable<String> {        @Override        public String call() throws Exception {            return "hello";        }    }    public static void main(String[] args) throws Exception {        CallableDemo demo = new CallableDemo();        String result = demo.call();        System.out.println(result);        System.out.println("main");    }}4

这个state线程的状态值,这里很好理解,一个是阻塞方法awaitDone,一个是抛出结果report,我们重点看一下awaitDone的实现:

public class CallableTest {    public static class CallableDemo implements Callable<String> {        @Override        public String call() throws Exception {            return "hello";        }    }    public static void main(String[] args) throws Exception {        CallableDemo demo = new CallableDemo();        String result = demo.call();        System.out.println(result);        System.out.println("main");    }}5

首先它是一个内部方法,timed指定是否定时等待,如果传true的话需要指定时间nanos

public class CallableTest {    public static class CallableDemo implements Callable<String> {        @Override        public String call() throws Exception {            return "hello";        }    }    public static void main(String[] args) throws Exception {        CallableDemo demo = new CallableDemo();        String result = demo.call();        System.out.println(result);        System.out.println("main");    }}6

WaitNode q = null;它是一个链表结构 volatile 被用来修饰会被不同线程访问和修改的变量, 后边还会讲到,此处先有个印象

public class CallableTest {    public static class CallableDemo implements Callable<String> {        @Override        public String call() throws Exception {            return "hello";        }    }    public static void main(String[] args) throws Exception {        CallableDemo demo = new CallableDemo();        String result = demo.call();        System.out.println(result);        System.out.println("main");    }}7

for (;;) {...},这是一个死循环,这里就是阻塞部分了,内部先会判断线程状态

public class CallableTest {    public static class CallableDemo implements Callable<String> {        @Override        public String call() throws Exception {            return "hello";        }    }    public static void main(String[] args) throws Exception {        CallableDemo demo = new CallableDemo();        String result = demo.call();        System.out.println(result);        System.out.println("main");    }}8

这里为什么会移除呢,想想看,如果不移除,内部积累太多,每次都要遍历它,如果是有竞争的情况下,是不是很浪费。这里主要是避免不必要的高额开销

public class CallableTest {    public static class CallableDemo implements Callable<String> {        @Override        public String call() throws Exception {            return "hello";        }    }    public static void main(String[] args) throws Exception {        CallableDemo demo = new CallableDemo();        String result = demo.call();        System.out.println(result);        System.out.println("main");    }}9

这里为什么移除?因为完成了,我只要结果就好了,不需要在进一步判断了

hellomain0

如果处于COMPLETING,会让出cpu时间

hellomain1

这个很好理解,节点不存在就创建一个

hellomain2

如果有新任务进来,会新建一个节点,然后利用CAS操作放入waiter链表的头部,这里是一个原子性操作,CAS的概念我们后边给大家讲,这里一切都是为了安全

compareAndSwap是个原子方法,原理是CAS,即将内存中的值与期望值进行比较,如果相等,就将内存中的值修改成新值并返回true。

hellomain3

这里判断消亡时间,如果超时了,移除节点,并返回线程状态,LockSupport使线程阻塞,有的同学可能会问,for不是已经阻塞了吗❓那为啥还调用LockSupport,这里其实是线程优化,想想你一直for循环一直判断是不是也会产生开销,加上LockSupport避免不要的操作,其实for的整个过程是实现了自旋锁的操作。

阻塞了不就没法执行了吗,park加锁方法还有一个对应的unpark相当于释放,但此处没有看到这个方法,那么它在哪个地方呢❓我们大体应该可以猜到,它应该是在执行阶段,还记得RunnableFuture接口下的run方法吗?下面我们看一下它的实现

hellomain4

下面我们重点看一下这个set方法

hellomain5

UNSAFE类是一个很特殊的类,它的内部几乎都是native方法,它可以使得我们能够操作内存空间来获得更高的性能,但一般我们很少使用它,因为它不被gc控制,使用不当jvm可能都会挂了。我们重点关注一下 finishCompletion这个方法

hellomain6

我们可以看到在这个内部它是调了一个unpark方法的,可以看出之前awaitDone()方法内部的线程阻塞在这个地方被唤醒了, 再回回过头看awaitDone()方法,就明白为啥要调用park方法了,因为线程没有达到大于COMPLETING状态,它会一直for

最后一个就是report了,返回值

hellomain7

于是我们的get就拿到返回值了

FutureTask 状态

这里给大家补充一下FutureTask的状态值

hellomain8

state可能的状态转变路径如下:

NEW -> COMPLETING -> NORMAL

NEW -> COMPLETING -> EXCEPTIONAL

NEW -> CANCELLED

NEW -> INTERRUPTING -> INTERRUPTED

结束语

本期到这里就结束了, 总结一下,本节主要讲了Callable、Future与FutureTask的常用方法,以及从问题触发,带大家分析了一下FutureTask的源码,这里大家要好好理解,不要去背,想要告诉大家的是学习要带着问题, 看源码一定要大胆猜测,冷静分析 ~

下期预告

之前提到过线程组的概念,下期就带大家学习线程组和线程的优先级。关注公众号加群,一起学习进步。关注我,不迷路, 下期不见不散 ~

更文时间

工作日(周一 ? 周五)

周末不更 ☀️

节假日不定时更

往期内容

Java多线程专题之线程与进程概述

Java多线程专题之线程类和接口入门

Java多线程专题之进阶学习Thread(含源码分析)

我的博客(阅读体验较佳)

写给初学者的Java基础教程

一文带你快速学习Java集合类

花几分钟快速了解一下泛型与枚举

Java注解与反射入门到进阶

JavaIO教程从入门到进阶

项目源码(源码已更新 欢迎star⭐️)

java-thread-all

地址: https://github.com/qiuChengleiy/java-thread-all.git

推荐 SpringBoot & SpringCloud (源码已更新 欢迎star⭐️)

springboot-all

地址: https://github.com/qiuChengleiy/springboot-all.git

SpringBoot系列教程合集

一起来学SpringCloud合集

原文:https://juejin.cn/post/7103785317173297189


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:/java/18680.html