流数据处理利器

article/2025/9/14 11:59:09

流处理 (Stream processing) 是一种计算机编程范式,其允许给定一个数据序列 (流处理数据源),一系列数据操作 (函数) 被应用到流中的每个元素。同时流处理工具可以显著提高程序员的开发效率,允许他们编写有效、干净和简洁的代码。

流数据处理在我们的日常工作中非常常见,举个例子,我们在业务开发中往往会记录许多业务日志,这些日志一般是先发送到 Kafka,然后再由 Job 消费 Kafaka 写到 elasticsearch,在进行日志流处理的过程中,往往还会对日志做一些处理,比如过滤无效的日志,做一些计算以及重新组合日志等等,示意图如下:

流处理工具 fx

go-zero是一个功能完备的微服务框架,框架中内置了很多非常实用的工具,其中就包含流数据处理工具fx,下面我们通过一个简单的例子来认识下该工具:

package mainimport ("fmt""os""os/signal""syscall""time""github.com/tal-tech/go-zero/core/fx"
)func main() {ch := make(chan int)go inputStream(ch)go outputStream(ch)c := make(chan os.Signal, 1)signal.Notify(c, syscall.SIGTERM, syscall.SIGINT)<-c
}func inputStream(ch chan int) {count := 0for {ch <- counttime.Sleep(time.Millisecond * 500)count++}
}func outputStream(ch chan int) {fx.From(func(source chan<- interface{}) {for c := range ch {source <- c}}).Walk(func(item interface{}, pipe chan<- interface{}) {count := item.(int)pipe <- count}).Filter(func(item interface{}) bool {itemInt := item.(int)if itemInt%2 == 0 {return true}return false}).ForEach(func(item interface{}) {fmt.Println(item)})
}

inputStream 函数模拟了流数据的产生,outputStream 函数模拟了流数据的处理过程,其中 From 函数为流的输入,Walk 函数并发的作用在每一个 item 上,Filter 函数对 item 进行过滤为 true 保留为 false 不保留,ForEach 函数遍历输出每一个 item 元素。

流数据处理中间操作

一个流的数据处理可能存在许多的中间操作,每个中间操作都可以作用在流上。就像流水线上的工人一样,每个工人操作完零件后都会返回处理完成的新零件,同理流处理中间操作完成后也会返回一个新的流。

fx 的流处理中间操作:

操作函数功能输入
Distinct去除重复的 itemKeyFunc,返回需要去重的 key
Filter过滤不满足条件的 itemFilterFunc,Option 控制并发量
Group对 item 进行分组KeyFunc,以 key 进行分组
Head取出前 n 个 item,返回新 streamint64 保留数量
Map对象转换MapFunc,Option 控制并发量
Merge合并 item 到 slice 并生成新 stream
Reverse反转 item
Sort对 item 进行排序LessFunc 实现排序算法
Tail与 Head 功能类似,取出后 n 个 item 组成新 streamint64 保留数量
Walk作用在每个 item 上WalkFunc,Option 控制并发量

下图展示了每个步骤和每个步骤的结果:

用法与原理分析

From

通过 From 函数构建流并返回 Stream,流数据通过 channel 进行存储:

// 例子
s := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}
fx.From(func(source chan<- interface{}) {for _, v := range s {source <- v}
})// 源码
func From(generate GenerateFunc) Stream {source := make(chan interface{})go func() {defer close(source)// 构造流数据写入channelgenerate(source)}()return Range(source)
}

Filter

Filter 函数提供过滤 item 的功能,FilterFunc 定义过滤逻辑 true 保留 item,false 则不保留:

// 例子 保留偶数
s := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}
fx.From(func(source chan<- interface{}) {for _, v := range s {source <- v}
}).Filter(func(item interface{}) bool {if item.(int)%2 == 0 {return true}return false
})// 源码
func (p Stream) Filter(fn FilterFunc, opts ...Option) Stream {return p.Walk(func(item interface{}, pipe chan<- interface{}) {// 执行过滤函数true保留,false丢弃if fn(item) {pipe <- item}}, opts...)
}

Group

Group 对流数据进行分组,需定义分组的 key,数据分组后以 slice 存入 channel:


// 例子 按照首字符"g"或者"p"分组,没有则分到另一组
ss := []string{"golang", "google", "php", "python", "java", "c++"}
fx.From(func(source chan<- interface{}) {for _, s := range ss {source <- s}
}).Group(func(item interface{}) interface{} {if strings.HasPrefix(item.(string), "g") {return "g"} else if strings.HasPrefix(item.(string), "p") {return "p"}return ""
}).ForEach(func(item interface{}) {fmt.Println(item)
})// 源码
func (p Stream) Group(fn KeyFunc) Stream {// 定义分组存储mapgroups := make(map[interface{}][]interface{})for item := range p.source {// 用户自定义分组keykey := fn(item)// key相同分到一组groups[key] = append(groups[key], item)}source := make(chan interface{})go func() {for _, group := range groups {// 相同key的一组数据写入到channelsource <- group}close(source)}()return Range(source)
}

Reverse

reverse 可以对流中元素进行反转处理:

// 例子
fx.Just(1, 2, 3, 4, 5).Reverse().ForEach(func(item interface{}) {fmt.Println(item)
})// 源码
func (p Stream) Reverse() Stream {var items []interface{}// 获取流中数据for item := range p.source {items = append(items, item)}// 反转算法for i := len(items)/2 - 1; i >= 0; i-- {opp := len(items) - 1 - iitems[i], items[opp] = items[opp], items[i]}// 写入流return Just(items...)
}

Distinct

distinct 对流中元素进行去重,去重在业务开发中比较常用,经常需要对用户 id 等做去重操作:

// 例子
fx.Just(1, 2, 2, 2, 3, 3, 4, 5, 6).Distinct(func(item interface{}) interface{} {return item
}).ForEach(func(item interface{}) {fmt.Println(item)
})
// 结果为 1,2,3,4,5,6// 源码
func (p Stream) Distinct(fn KeyFunc) Stream {source := make(chan interface{})threading.GoSafe(func() {defer close(source)// 通过key进行去重,相同key只保留一个keys := make(map[interface{}]lang.PlaceholderType)for item := range p.source {key := fn(item)// key存在则不保留if _, ok := keys[key]; !ok {source <- itemkeys[key] = lang.Placeholder}}})return Range(source)
}

Walk

Walk 函数并发的作用在流中每一个 item 上,可以通过 WithWorkers 设置并发数,默认并发数为 16,最小并发数为 1,如设置 unlimitedWorkers 为 true 则并发数无限制,但并发写入流中的数据由 defaultWorkers 限制,WalkFunc 中用户可以自定义后续写入流中的元素,可以不写入也可以写入多个元素:

// 例子
fx.Just("aaa", "bbb", "ccc").Walk(func(item interface{}, pipe chan<- interface{}) {newItem := strings.ToUpper(item.(string))pipe <- newItem
}).ForEach(func(item interface{}) {fmt.Println(item)
})// 源码
func (p Stream) walkLimited(fn WalkFunc, option *rxOptions) Stream {pipe := make(chan interface{}, option.workers)go func() {var wg sync.WaitGrouppool := make(chan lang.PlaceholderType, option.workers)for {// 控制并发数量pool <- lang.Placeholderitem, ok := <-p.sourceif !ok {<-poolbreak}wg.Add(1)go func() {defer func() {wg.Done()<-pool}()// 作用在每个元素上fn(item, pipe)}()}// 等待处理完成wg.Wait()close(pipe)}()return Range(pipe)
}

并发处理

fx 工具除了进行流数据处理以外还提供了函数并发功能,在微服务中实现某个功能往往需要依赖多个服务,并发的处理依赖可以有效的降低依赖耗时,提升服务的性能。

fx.Parallel(func() {userRPC() // 依赖1
}, func() {accountRPC() // 依赖2
}, func() {orderRPC() // 依赖3
})

注意 fx.Parallel 进行依赖并行处理的时候不会有 error 返回,如需有 error 返回或者有一个依赖报错需要立马结束依赖请求请使用MapReduce工具进行处理。

总结

本篇文章介绍了流处理的基本概念和 go-zero 中的流处理工具 fx,在实际的生产中流处理场景应用也非常多,希望本篇文章能给大家带来一定的启发,更好的应对工作中的流处理场景。

项目地址

https://github.com/tal-tech/go-zero

组件地址

https://github.com/tal-tech/go-zero/tree/master/core/fx

Example

https://github.com/tal-tech/go-zero/tree/master/example/fx

微信交流群


http://chatgpt.dhexx.cn/article/eHl1Ev8L.shtml

相关文章

流处理(Stream)和批处理(Batch)

1 流处理和批处理的概念 在程序计算当中&#xff0c;同一节点或者不同节点之间的数据的传递是实时传递还是延迟传递&#xff0c;这就引出了两个概念。其实在非大数据领域这两个概念所起的作用也是很有限&#xff0c;但是在大数据领域&#xff0c;处理上亿级别的时候&#xff0…

管道模式 流处理

&#xff08;一&#xff09;介绍 管道这个名字源于自来水厂的原水处理过程。原水要经过管道&#xff0c;一层层地过滤、沉淀、去杂质、消毒&#xff0c;到管道另一端形成纯净水。我们不应该把所有原水的过滤都放在一个管道中去提纯&#xff0c;而应该把处理过程进行划分&#…

Flink流处理API大合集:掌握所有flink流处理技术,看这一篇就够了

大家好&#xff0c;我是百思不得小赵。 创作时间&#xff1a;2022 年 5 月 18 日 博客主页&#xff1a; &#x1f50d;点此进入博客主页 —— 新时代的农民工 &#x1f64a; —— 换一种思维逻辑去看待这个世界 &#x1f440; 今天是加入CSDN的第1172天。觉得有帮助麻烦&#x…

Python流处理

转自 &#xff1a;https://www.toutiao.com/a6589000256896107015/?tt_frommobile_qq&utm_campaignclient_share&timestamp1534156143&appnews_article&utm_sourcemobile_qq&iid40708017633&utm_mediumtoutiao_ios&group_id6589000256896107015 F…

Stream流式处理

Stream流的三类方法 获取Stream&#xff1a;流创建一条流水线,并把数据放到流水线上准备。 中间方法&#xff1a;流水线上的操作一次操作完毕之后,还可以继续进行其他操作。 终结方法&#xff1a;一个Stream流只能有一个终结方法是流水线上的最后一个操作。 生成Stream流的…

流数据处理与分析

环境 名称 版本 系统 Ubuntu 18.04.4 LTS 内存 7.5GiB 处理器 Intel Core i7-8565U CPU 1.80GHz *8 图形 Intel UHD Graphics&#xff08;Whiskey Lake 3*8 GT2&#xff09; GNOME 3.28.2 操作系统类型 64位 磁盘 251.0 GB Storm 2.1.0 Zookeeper…

流处理系统

文章目录 引言如何发送事件流流处理不可靠的时钟容错总结 引言 清楚数据的类型有助于我们设计一个性能更高&#xff0c;更有针对性的数据系统&#xff0c;比如在线系统&#xff0c;离线系统&#xff08;批处理&#xff09;。近实时系统(流处理)等等。比如说批处理系统&#xf…

流处理简介

一. 流式处理简介 在我接触到java8流式处理的时候&#xff0c;我的第一感觉是流式处理让集合操作变得简洁了许多&#xff0c;通常我们需要多行代码才能完成的操作&#xff0c;借助于流式处理可以在一行中实现。比如我们希望对一个包含整数的集合中筛选出所有的偶数&#xff0c;…

【节点流和处理流】

节点流和处理流 基本介绍 节点流可以从特定数据源读取数据&#xff0c;如FileReader、FileWriter处理流&#xff1a;是对一个已存在的流的连接和封装&#xff0c;通过所封装的流的功能调用实现数据读写。如BufferedReader.处理流的构造方法总是要带一个其他的流对象做参数。一…

流数据处理

流数据处理strom 在2011年Storm开源之前&#xff0c;由于Hadoop的火红&#xff0c;整个业界都在喋喋不休地谈论大数据。Hadoop的高吞吐&#xff0c;海量数据处理的能力使得人们可以方便地处理海量数据。但是&#xff0c;Hadoop的缺点也和它的优点同样鲜明——延迟大&#xff0…

一. 流式处理简介

https://www.cnblogs.com/shenlanzhizun/p/6027042.html Java技术学习 https://www.itkc8.com 一. 流式处理简介 在我接触到java8流式处理的时候&#xff0c;我的第一感觉是流式处理让集合操作变得简洁了许多&#xff0c;通常我们需要多行代码才能完成的操作&#xff0c;借助…

Kafka基础-流处理

1. 什么是流处理&#xff1f; 首先&#xff0c;让我们说一下什么是数据流&#xff08;也称为事件流&#xff09;&#xff1f;它是无边界数据集的抽象说法&#xff0c;无边界意味着无限且不断增长&#xff0c;因为随着时间的推移&#xff0c;新数据会不断地到来。 除了无边界的…

流处理基本介绍

1. 什么是流处理 一种被设计来处理无穷数据集的数据处理系统引擎 2. 流处理的几个概念 1. 无穷数据&#xff08;Unbounded data&#xff09;&#xff1a;一种持续生成&#xff0c;本质上是无穷尽的数据集。它经常会被称为“流数据”。然而&#xff0c;用流和批次来定义…

Spark Streaming与流处理

一、流处理 1.1 静态数据处理 在流处理之前&#xff0c;数据通常存储在数据库&#xff0c;文件系统或其他形式的存储系统中。应用程序根据需要查询数据或计算数据。这就是传统的静态数据处理架构。Hadoop 采用 HDFS 进行数据存储&#xff0c;采用 MapReduce 进行数据查询或分…

什么是流处理

流处理正变得像数据处理一样流行。流处理已经超出了其原来的实时数据处理的范畴&#xff0c;它正在成为一种提供数据处理&#xff08;包括批处理&#xff09;&#xff0c;实时应用乃至分布式事务的新方法的技术。 1、什么是流处理&#xff1f; 流处理是不断合并新数据以计算结果…

嵌入式软件升级方法

一、U盘升级 1.在u盘根目录新建文件夹&#xff0c;命名为‘upgrade’ 2.将软件复制到upgrade文件夹中 3.将u盘插到嵌入式服务器usb口上&#xff0c;断电重启服务器 二、PC工具升级 1.打开PC工具&#xff0c;选中要升级的机器&#xff0c;点击‘素材管理’选项卡&#xff0c…

嵌入式软件架构设计之分层设计

关注、星标公众号&#xff0c;不错过精彩内容 整理&#xff1a;黄工 素材来源&#xff1a;网络 参考来源&#xff1a; https://blog.51cto.com/kenotu/1614390 在正规的项目开发中&#xff0c;项目往往是并行开发的&#xff0c;也就是说硬件设计、底层软件设计、应用软件设计等…

嵌入式系统软件层次结构

文章目录 嵌入式系统软件嵌入式系统软件的层次结构硬件抽象层 嵌入式操作系统嵌入式操作系统——WinCE嵌入式操作系统——VxWorks嵌入式操作系统——Linux嵌入式Linux OS的特点 嵌入式操作系统——uCOS嵌入式操作系统—— PalmOS其他嵌入式操作系统华为鸿蒙系统 嵌入式系统软件…

嵌入式软件开发必备知识体系

嵌入式软件开发学习路线 前言 本章节主要介绍嵌入式软件开发概念以及大致的学习知识点的范围 一、嵌入式软件是什么&#xff1f; 百度百科&#xff1a;嵌入式工程师是指具有C/C语言、汇编语言等基础&#xff0c;熟悉模拟电子技术等硬件知识&#xff0c;了解处理器体系结构&a…

嵌入式开发 | 嵌入式开发设计文档该怎么写?

关注星标公众号&#xff0c;不错过精彩内容 作者 | strongerHuang 微信公众号 | 嵌入式专栏 俗话说&#xff0c;不会写文档的工程师不是好的工程师&#xff01; 如果你只会写代码&#xff0c;而从不写文档&#xff0c;迟早有一天会“出事”。这不是危言耸听&#xff0c;现实生活…