分类
Go

Go Stream 仿照 Java8 的流造的轮子

Java 8 的 Stream 接口处理集合数据转换特别好用,之前写过《自己实现 Java8 的 Stream 流(串行版)》,现在工作语言主要使用 Go 了,所以用 Go 也实现了一遍,可以使用 go get github.com/youthlin/stream 引入。先看个使用示例吧:

// example_test.go

func ExampleStream_Filter() {
	stream.Of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9).
		Filter(func(e types.T) bool {
			// 没有范型只能通过 e.(int) 的形式强制转换
			return e.(int)%3 == 0
		}).
		ForEach(func(e types.T) {
			fmt.Println(e)
		})
	// Output:
	// 0
	// 3
	// 6
	// 9
}
func ExampleStream_Map() {
	stream.IntRange(0, 5).
		Map(func(t types.T) types.R {
			return fmt.Sprintf("<%d>", t)
		}).
		ForEach(func(t types.T) {
			fmt.Printf("%v", t)
		})
	// Output:
	// <0><1><2><3><4>
}

然后看 Go 的接口,如下:

// stream.go

type Stream interface {
	// stateless operate 无状态操作

	Filter(types.Predicate) Stream         // 过滤
	Map(types.Function) Stream             // 转换
	FlatMap(func(t types.T) Stream) Stream // 打平
	Peek(types.Consumer) Stream            // peek 每个元素

	// stateful operate 有状态操作

	Distinct(types.IntFunction) Stream // 去重
	Sorted(types.Comparator) Stream    // 排序
	Limit(int64) Stream                // 限制个数
	Skip(int64) Stream                 // 跳过个数

	// terminal operate 终止操作

	// 遍历
	ForEach(types.Consumer)
	// return []T 转为切片
	ToSlice() []types.T
	// return []X which X is the type of some
	ToElementSlice(some types.T) types.R
	// return []X which X is same as the `typ` representation
	ToSliceOf(typ reflect.Type) types.R
	// 测试是否所有元素满足条件
	AllMatch(types.Predicate) bool
	// 测试是否没有元素满足条件
	NoneMatch(types.Predicate) bool
	// 测试是否有任意元素满足条件
	AnyMatch(types.Predicate) bool
	// Reduce return optional.Empty if no element. calculate result by (T, T) -> T from first element
	Reduce(accumulator types.BinaryOperator) optional.Optional
	// type of initValue is same as element.  (T, T) -> T
	ReduceFrom(initValue types.T, accumulator types.BinaryOperator) types.T
	// type of initValue is different from element. (R, T) -> R
	ReduceWith(initValue types.R, accumulator func(types.R, types.T) types.R) types.R
	FindFirst() optional.Optional
	// 返回元素个数
	Count() int64
}

由于 Go 目前还没有范型,所以上述代码中的 T, R, 实际都是 interface{}. 不直接使用 interface{} 是因为使用 T, R 可以更明确表示转换前后类型可以不一样,而且等 Go2 支持范型了,再改动接口也可能会更方便,另外也是因为看惯了 Java 的 T 范型😝

// types/type.go

type (
	// 任意类型
	T              interface{}
	// 另一元素类型
	R              interface{}
	// 另一元素类型
	U              interface{}
	// Function 是一个转换函数,将一个元素转为另一个类型
	Function       func(T) R
	// IntFunction 是一个转换函数,将一个元素转为一个整数
	IntFunction    func(T) int
	// Predicate 是一个断言,测试一个元素是否满足条件
	Predicate      func(T) bool
	// UnaryOperator 是一个参数类型和返回类型相同的转换函数
	UnaryOperator  func(T) T
	// Consumer 是一个消费函数,消费一个元素并且没有返回
	Consumer       func(T)
	// Supplier 是一个生成函数,每次调用会返回一个元素
	Supplier       func() T
	// BiFunction 是一个接收 2 个元素返回一个元素的函数
	BiFunction     func(T, U) R
	// BinaryOperator 是一个接收 2 个相同类型的入参,返回类型也相同的 BiFunction
	BinaryOperator func(T, T) T
	// Comparator 是一个比较器,如果第一个元素小于第二个元素返回负值,大于返回正值,否则返回 0
	Comparator     func(T, T) int 
)

介绍 stream 的实现之前,我们先引入迭代器 iterator 接口,因为这个才是 stream 的数据源。接口和实现都很简单,就不具体写了:

// iterator.go

type iterator interface {
	GetSizeIfKnown() int64
	HasNext() bool
	Next() types.T
}

然后就是 stream 的真正实现了,接口注释中也写了, stream 的操作分为无状态操作、有状态操作和终止操作。其中终止操作才会触发数据源迭代数据,像 Filter 等无状态操作和 Sorted 等有状态操作,都是仅记录本次操作要执行的动作,最后有终止操作才会触发之前注册的一系列动作,依次执行。

// impl.go

//            head filter   map   for-each
//            +--+    +---+    +--+
//     nil <- |  | <- |   | <- |  | <- terminalStage
//            +--+    +---+    +--+
//
//                +-filter----------------+
//  source -->    |                       |
//                |       +-map-----------+
//                |       |               |
//                |       |    +-for-each-+
//                |       |    |          | terminalStage
//                +-------+----+----------+
//
//               <----- wrapped stage ----->
type stream struct {
	source iterator // 数据源
	prev   *stream  // 前一个流
	wrap   func(stage) stage
}

如上述 stream 结构体的注释所示,它实际是整个数据流的一个节点。数据流的头节点通常由工厂方法生成,保存了数据源,且没有前驱节点:

// factory.go
func Of(elements ...types.T) Stream {
	return newHead(it(elements...))
}


// iterator.go
func it(elements ...types.T) iterator {
	return &sliceIterator{
		elements: elements,
		current:  0,
	}
}

// impl.go
// newHead 构造头节点
func newHead(source iterator) *stream {
	return &stream{source: source}
}

// newNode 构造中间节点
func newNode(prev *stream, wrap func(down stage) stage) *stream {
	return &stream{
		source: prev.source,
		prev:   prev,
		wrap:   wrap,
	}
}

生成了头节点后,之后的每个非终止操作,都会生成一个中间节点,比如 Filter 操作会调用 newNode.

这时引入了新的接口 stage:

// stage.go

// stage 记录一个操作
// Begin 用于操作开始,参数是元素的个数,如果个数不确定,则是 unknownSize
// Accept 接收每个元素
// CanFinish 用于判断是否可以提前结束
// End 是收尾动作
type stage interface {
	Begin(size int64)
	Accept(types.T)
	CanFinish() bool
	End()
}
type chainedStage struct {
	*baseStage
}
type terminalStage struct {
	*baseStage
}

我们先看 terminalStage, 它是由终止操作生成的,比如 ForEach 操作会调用 newTerminalStage 生成一个 terminalStage 并且传给 terminal 方法:

// impl.go
func (s *stream) ForEach(consumer types.Consumer) {
	s.terminal(newTerminalStage(consumer))
}

// terminal 终止操作调用。触发包装各项操作,开始元素遍历
func (s *stream) terminal(ts *terminalStage) {
	stage := s.wrapStage(ts)
	source := s.source
	stage.Begin(source.GetSizeIfKnown())
	for source.HasNext() && !stage.CanFinish() {
		stage.Accept(source.Next())
	}
	stage.End()
}

terminal 方法会首先将每个 stage 串起来,生成一个 wrappedStage, 然后触发数据源迭代器往里面灌数据,就可以啦。现在我们来看每个 stage 是怎么串起来的呢?

// impl.go

func (s *stream) Filter(test types.Predicate) Stream {
	return newNode(s, func(down stage) stage {
		return newChainedStage(down, begin(func(int64) {
			down.Begin(unknownSize) // 过滤后个数不确定
		}), action(func(t types.T) {
			if test(t) {
				down.Accept(t)
			}
		}))
	})
}

// stage.go
func newChainedStage(down stage, opt ...option) *chainedStage {
	s := defaultChainedStage(down)
	for _, o := range opt {
		o(s.baseStage)
	}
	return s
}

func defaultChainedStage(down stage) *chainedStage {
	return &chainedStage{
		baseStage: &baseStage{
			begin:     down.Begin,
			action:    down.Accept,
			canFinish: down.CanFinish,
			end:       down.End,
		},
	}
}

答案是通过 streamwrap 字段 func wrap(stage) stage, 然后在 wrapStage 方法中会调用这个 wrap 函数:

// impl.go

// wrapStage 将所有操作"包装"为一个操作。
// 从终止操作开始往前(因为 wrap 的参数是 downStage)包装
func (s *stream) wrapStage(terminalStage stage) stage {
	stage := terminalStage
	for i := s; i.prev != nil; i = i.prev {
		stage = i.wrap(stage)
	}
	return stage
}

之所以中间操作生成的叫做 chainedStage, 是因为它需要引用一个 downStage, 把本次操作之后的数据传给下游。比如 Filter 操作,会把符合断言的数据传递到下游、 Map 操作会把经过函数转换的数据传递给下游。

以上就是整个 stream 的实现原理啦。接下来可以看下没有详细说明的 optional(<- GitHub 代码连接),也是从 Java 中抄来的概念。

最后可以看下《Go 中的 Options 模式》,上面的 stage 接口的两个实现就使用了这个模式。


“Go Stream 仿照 Java8 的流造的轮子”上的1条回复

厉害了,java刚转go,正好试试
后续考虑实现parallel版本的吗

回复 回复时对方会收到邮件通知

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注

[/鼓掌] [/难过] [/调皮] [/白眼] [/疑问] [/流泪] [/流汗] [/撇嘴] [/抠鼻] [/惊讶] [/微笑] [/得意] [/大兵] [/坏笑] [/呲牙] [/吓到] [/可爱] [/发怒] [/发呆] [/偷笑] [/亲亲]