Java 8 的 Stream 接口处理集合数据转换特别好用,之前写过《自己实现 Java8 的 Stream 流(串行版)》,现在工作语言主要使用 Go 了,所以用 Go 也实现了一遍,可以使用 go get github.com/youthlin/stream
引入。先看个使用示例吧:
// example_test.go
func ExampleStream_Filter() {
stream.Of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9).
Filter(func(e types.T) bool {
// 没有范型只能通过 e.(int) 的形式强制转换
return e.(int)%3 == 0
}).
ForEach(func(e types.T) {
fmt.Println(e)
})
// Output:
// 0
// 3
// 6
// 9
}
func ExampleStream_Map() {
stream.IntRange(0, 5).
Map(func(t types.T) types.R {
return fmt.Sprintf("<%d>", t)
}).
ForEach(func(t types.T) {
fmt.Printf("%v", t)
})
// Output:
// <0><1><2><3><4>
}
然后看 Go 的接口,如下:
// stream.go
type Stream interface {
// stateless operate 无状态操作
Filter(types.Predicate) Stream // 过滤
Map(types.Function) Stream // 转换
FlatMap(func(t types.T) Stream) Stream // 打平
Peek(types.Consumer) Stream // peek 每个元素
// stateful operate 有状态操作
Distinct(types.IntFunction) Stream // 去重
Sorted(types.Comparator) Stream // 排序
Limit(int64) Stream // 限制个数
Skip(int64) Stream // 跳过个数
// terminal operate 终止操作
// 遍历
ForEach(types.Consumer)
// return []T 转为切片
ToSlice() []types.T
// return []X which X is the type of some
ToElementSlice(some types.T) types.R
// return []X which X is same as the `typ` representation
ToSliceOf(typ reflect.Type) types.R
// 测试是否所有元素满足条件
AllMatch(types.Predicate) bool
// 测试是否没有元素满足条件
NoneMatch(types.Predicate) bool
// 测试是否有任意元素满足条件
AnyMatch(types.Predicate) bool
// Reduce return optional.Empty if no element. calculate result by (T, T) -> T from first element
Reduce(accumulator types.BinaryOperator) optional.Optional
// type of initValue is same as element. (T, T) -> T
ReduceFrom(initValue types.T, accumulator types.BinaryOperator) types.T
// type of initValue is different from element. (R, T) -> R
ReduceWith(initValue types.R, accumulator func(types.R, types.T) types.R) types.R
FindFirst() optional.Optional
// 返回元素个数
Count() int64
}
由于 Go 目前还没有范型,所以上述代码中的 T
, R
, 实际都是 interface{}
. 不直接使用 interface{}
是因为使用 T, R 可以更明确表示转换前后类型可以不一样,而且等 Go2 支持范型了,再改动接口也可能会更方便,另外也是因为看惯了 Java 的 T 范型😝
// types/type.go
type (
// 任意类型
T interface{}
// 另一元素类型
R interface{}
// 另一元素类型
U interface{}
// Function 是一个转换函数,将一个元素转为另一个类型
Function func(T) R
// IntFunction 是一个转换函数,将一个元素转为一个整数
IntFunction func(T) int
// Predicate 是一个断言,测试一个元素是否满足条件
Predicate func(T) bool
// UnaryOperator 是一个参数类型和返回类型相同的转换函数
UnaryOperator func(T) T
// Consumer 是一个消费函数,消费一个元素并且没有返回
Consumer func(T)
// Supplier 是一个生成函数,每次调用会返回一个元素
Supplier func() T
// BiFunction 是一个接收 2 个元素返回一个元素的函数
BiFunction func(T, U) R
// BinaryOperator 是一个接收 2 个相同类型的入参,返回类型也相同的 BiFunction
BinaryOperator func(T, T) T
// Comparator 是一个比较器,如果第一个元素小于第二个元素返回负值,大于返回正值,否则返回 0
Comparator func(T, T) int
)
介绍 stream 的实现之前,我们先引入迭代器 iterator
接口,因为这个才是 stream 的数据源。接口和实现都很简单,就不具体写了:
// iterator.go
type iterator interface {
GetSizeIfKnown() int64
HasNext() bool
Next() types.T
}
然后就是 stream 的真正实现了,接口注释中也写了, stream 的操作分为无状态操作、有状态操作和终止操作。其中终止操作才会触发数据源迭代数据,像 Filter
等无状态操作和 Sorted
等有状态操作,都是仅记录本次操作要执行的动作,最后有终止操作才会触发之前注册的一系列动作,依次执行。
// impl.go
// head filter map for-each
// +--+ +---+ +--+
// nil <- | | <- | | <- | | <- terminalStage
// +--+ +---+ +--+
//
// +-filter----------------+
// source --> | |
// | +-map-----------+
// | | |
// | | +-for-each-+
// | | | | terminalStage
// +-------+----+----------+
//
// <----- wrapped stage ----->
type stream struct {
source iterator // 数据源
prev *stream // 前一个流
wrap func(stage) stage
}
如上述 stream
结构体的注释所示,它实际是整个数据流的一个节点。数据流的头节点通常由工厂方法生成,保存了数据源,且没有前驱节点:
// factory.go
func Of(elements ...types.T) Stream {
return newHead(it(elements...))
}
// iterator.go
func it(elements ...types.T) iterator {
return &sliceIterator{
elements: elements,
current: 0,
}
}
// impl.go
// newHead 构造头节点
func newHead(source iterator) *stream {
return &stream{source: source}
}
// newNode 构造中间节点
func newNode(prev *stream, wrap func(down stage) stage) *stream {
return &stream{
source: prev.source,
prev: prev,
wrap: wrap,
}
}
生成了头节点后,之后的每个非终止操作,都会生成一个中间节点,比如 Filter
操作会调用 newNode
.
这时引入了新的接口 stage
:
// stage.go
// stage 记录一个操作
// Begin 用于操作开始,参数是元素的个数,如果个数不确定,则是 unknownSize
// Accept 接收每个元素
// CanFinish 用于判断是否可以提前结束
// End 是收尾动作
type stage interface {
Begin(size int64)
Accept(types.T)
CanFinish() bool
End()
}
type chainedStage struct {
*baseStage
}
type terminalStage struct {
*baseStage
}
我们先看 terminalStage
, 它是由终止操作生成的,比如 ForEach
操作会调用 newTerminalStage
生成一个 terminalStage
并且传给 terminal
方法:
// impl.go
func (s *stream) ForEach(consumer types.Consumer) {
s.terminal(newTerminalStage(consumer))
}
// terminal 终止操作调用。触发包装各项操作,开始元素遍历
func (s *stream) terminal(ts *terminalStage) {
stage := s.wrapStage(ts)
source := s.source
stage.Begin(source.GetSizeIfKnown())
for source.HasNext() && !stage.CanFinish() {
stage.Accept(source.Next())
}
stage.End()
}
terminal
方法会首先将每个 stage
串起来,生成一个 wrappedStage
, 然后触发数据源迭代器往里面灌数据,就可以啦。现在我们来看每个 stage 是怎么串起来的呢?
// impl.go
func (s *stream) Filter(test types.Predicate) Stream {
return newNode(s, func(down stage) stage {
return newChainedStage(down, begin(func(int64) {
down.Begin(unknownSize) // 过滤后个数不确定
}), action(func(t types.T) {
if test(t) {
down.Accept(t)
}
}))
})
}
// stage.go
func newChainedStage(down stage, opt ...option) *chainedStage {
s := defaultChainedStage(down)
for _, o := range opt {
o(s.baseStage)
}
return s
}
func defaultChainedStage(down stage) *chainedStage {
return &chainedStage{
baseStage: &baseStage{
begin: down.Begin,
action: down.Accept,
canFinish: down.CanFinish,
end: down.End,
},
}
}
答案是通过 stream
的 wrap
字段 func wrap(stage) stage
, 然后在 wrapStage
方法中会调用这个 wrap
函数:
// impl.go
// wrapStage 将所有操作"包装"为一个操作。
// 从终止操作开始往前(因为 wrap 的参数是 downStage)包装
func (s *stream) wrapStage(terminalStage stage) stage {
stage := terminalStage
for i := s; i.prev != nil; i = i.prev {
stage = i.wrap(stage)
}
return stage
}
之所以中间操作生成的叫做 chainedStage
, 是因为它需要引用一个 downStage
, 把本次操作之后的数据传给下游。比如 Filter
操作,会把符合断言的数据传递到下游、 Map
操作会把经过函数转换的数据传递给下游。
以上就是整个 stream
的实现原理啦。接下来可以看下没有详细说明的 optional
(<- GitHub 代码连接),也是从 Java 中抄来的概念。
最后可以看下《Go 中的 Options 模式》,上面的 stage
接口的两个实现就使用了这个模式。
声明
- 本作品采用署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。除非特别注明, 霖博客文章均为原创。
- 转载请保留本文(《Go Stream 仿照 Java8 的流造的轮子》)链接地址: https://youthlin.com/?p=1755
- 订阅本站:https://youthlin.com/feed/
“Go Stream 仿照 Java8 的流造的轮子”上的1条回复
厉害了,java刚转go,正好试试
后续考虑实现parallel版本的吗