自己实现 Java8 的 Stream 流(串行版)

先看使用方式

对应 Java8 的类

new class java8 class mean
Flow Stream
Visitor Spliterator 迭代器 多了个获取数量的方法
Stage Sink 表示一次操作 分为 begin/accept/end 三步

创建流 Flow

  • Flow.of(Collection<E>)
    从集合创建流
  • Flow.of(Visitor<E>)
    从自定义迭代器(Iterator 的子接口)创建流
  • Flow.of(E...)
    从数组创建流
  • Flow.of(E)
    创建包含单个元素的流
  • Flow.empty()
    创建空的流
  • Flow.iterate(E seed, UnaryOperator<E> f)
    以 seed 为种子,f 为迭代器创建流
  • Flow.generate(Supplier s)
    以 s 为数据源创建流

API 使用

无状态操作

  • filter
  • map
  • flatMap
  • peek

有状态操作

  • distinct
  • sorted
  • limit
  • skip

终止操作

  • forEach
  • toArray
  • reduce
  • collect
  • count
  • anyMatch
  • findFirst

实现细节

AbstractFlow<S, T> implements Flow<T> 是实现类。 每个无状态操作或有状态操作都不实际执行那个操作, 而是记录当前要执行的动作,然后返回一个新的流, 在终止操作调用时,再一次性遍历数据源,依次执行所有操作。

实现过程

先了解下 Java8 的 ReferencePipelineSinkjava.util.stream.AbstractPipeline.wrapAndCopyInto 尝试写出 filterforeach 的方法体

wrapDownstream 对应 java.util.stream.AbstractPipeline.opWrapSink.
Stage 对应 Sink 表示一个要执行的动作。包含三个方法

  • begin(long size) 操作开始,预计有 size 个元素(未知时为 -1),如 toArray 操作可以预先准备指定大小的数组容器
  • accept(T) 接收每个元素时调用
  • end() 遍历结束
  • canFinish() 是否可以提前结束遍历

如何将每个动作串起来:通过 downstream 这个表示下游操作的变量 —— 每个 Stage 将自己的动作执行完后,调用下游动作的相应方法。

终止方法调用时,才会开始遍历,看下终止方法:

解释

在终止方法调用时,也有一个 TerminalStage 表示终止操作, TerminalStage 比 Stage 多了个 startAndGet 方法,该方法表示开始遍历数据源,并返回最终结果。 分为两步,将整个动作串一次执行(start()),取结果(get())。

start 中首先将每个阶段的动作都包裹起来, 从终止操作的 TerminalStage 开始, 作为前一个阶段的下游依次往前包裹每个 Stage 即 wrapStage 和 每次需要重写的 wrapDownstream 方法。 然后调用包裹了所有操作的 stage 的 begin/accept/end 方法,执行整串动作。

遍历结束后,返回终止操作的结果 get() 即完成。

看下具体 filter 的实现:

直接返回一个新的流,包裹下游的方法实现为, begin 方法通知下游元素个数不确定, accept 方法先过一遍 foreach 传入的测试条件,符合条件才往下游发送。 end 方法没有重写,由于该操作不是短路操作 canFinish 也没有重写。

终止操作,看一个 foreach:

foreach 处于操作串的最下游, accept 方法直接调用传入的 Consumer 对每个元素进行消费 (正是 foreach 的语义)。 该操作不需要返回内容,所以 get() 放回 null,

还不理解?

flow.png

你也可以在 IDEA 中从 terminal 方法开始打断点, 代码中每个返回的抽象类都重写了 toString 方法, 有助于 debug 区分各个类。

代码地址:

推荐阅读:

点击量:7


发表评论

电子邮件地址不会被公开。 必填项已用*标注