Python流处理

article/2025/9/14 12:22:16

转自 :https://www.toutiao.com/a6589000256896107015/?tt_from=mobile_qq&utm_campaign=client_share&timestamp=1534156143&app=news_article&utm_source=mobile_qq&iid=40708017633&utm_medium=toutiao_ios&group_id=6589000256896107015

 

Python流处理

Faust是一个流处理库,将kafka流中的思想移植到Python中。

它被用于Robinhood去构建高性能的分布式系统和实时数据通道,每天处理数十亿的数据。

Faust同时提供流处理和事件处理,同类型的工具分享例如:Kafka Streams, Apache Spark/Storm/Samza/Flink

它不需要使用一个DSL,仅需要用到Python!这意味着你在做流处理的时候可以使用所有你喜欢的Python库:NumPy, PyTorch, Pandas, NLTK, Django, Flask, SQLAlchemy等等。

由于需要使用新的async/await语法和变量类型注释方法,Faust需要使用Python3.6以上的版本。

这里有一个处理输入命令流的示例:

Python流处理

这个agent装饰器定义了一个“流处理器”,它本质上是一个Kafka topic,并且可以对接收到的每个事件做一些处理。

这个agent是一个async def的函数,因此它还可以异步执行其他操作,如web请求。

这个系统可以持久化状态,执行方式类似于数据库。表被命名成分布式的key/value储存,你可以使用常规的Python字典来做这件事。

在每台机器上的本地用c++编写的超快嵌入式数据库(被称为RocksDB)存储表。

表还可以存储可选的“窗口”聚合计数,以便跟踪“前一天的单击次数”或“前一个小时的单击次数”。与Kafka流一样,我们支持滚动、跳跃和滑动时间窗口,旧窗口可以过期以阻止数据填充。

为了提高可靠性,我们使用Kafka topic作为“预写日志”。当一个密钥被更改时,我们将其发布到更新的日志上。备用节点使用这个更新日志来保存数据的精确副本,并在任何节点发生故障时支持立即恢复。

对于用户来说,表只是一个字典,但是数据在重新启动和跨节点复制之间存在,所以在故障发生时其他节点可以自动接管。

您可以通过URL统计页面浏览数量:

Python流处理

发送到Kafka topic的数据是分区的,这意味着点击数将用URL的这种方式进行分片。因此,同一个URL的每个计数都会立刻被传递给同一个Faust worker实例。

Faust支持任何类型的流数据:字节、Unicode和序列化结构,同时也支持使用现代Python语法的“模型”来描述流中的keys和value是如何被序列化的。

Python流处理

Faust是静态类型的,使用mypy类型检查器,所以您在编写应用程序时可以充分利用静态类型的优势。

Faust源代码很小,组织良好,是学习Kafka流实现的好资源。

在引言页学习更多关于Faust的知识

去关于Faust,系统请求,安装指导,论坛资源等等,或者直接访问快速开始的教程。在一个编写流处理的应用中去查看关于Faust应用,然后通过使用者手册深入探讨。深层次的信息都根据不同主题在这个手册中进行说明

Faust是…

简介

Faust非常容易使用。在学习其他的流处理方法时,你总是需要从一个复杂的hello-world工程和相应的基础要求开始学习。Faust仅仅需要Kafka,剩下的就是只需要Python,如果你知道Python的话你就可以直接使用Faust去做流处理的工作了,并且它可以整合和他相关的一切。

这儿有一个简单的应用程序你可以做:源代码是Python的

Python流处理

您可能会被async和await这两个关键字吓到,但是您在使用Faust时不需要知道asyncio是如何工作的:只要模仿这些例子就可以得到您想要的结果。

示例应用程序启动两个任务:一个是处理流,另一个是向流发送事件的后台线程。在实际的应用程序中,您的系统将向Kafka topic发布事件,您的处理器可以从Kafka topic获取事件信息,并且只需要后台线程将数据输入到我们的示例中。

高可用性

Faust是高度可用的,并且可以在网络问题和服务器崩溃中生存下来。在节点失败的情况下,它可以自动恢复,并且表将接管备用节点。

分布式的

根据您的应用程序的需要启动更多实例。

快速

一个单内核的Faust worker实例已经可以每秒处理数万个事件,我们有理由相信,一旦我们能够支持一个更优化的Kafka客户端,吞吐量就会增加。

灵活性

Faust就是Python,而流是一个无限的异步迭代器。如果您知道如何使用Python,那么您已经知道如何使用Faust,它可以与您喜欢的Python库一起使用,比如Django、Flask、SQLAlchemy、NTLK、NumPy、Scikit、TensorFlow等等。

安装

您可以通过Python包或从源文件中安装Faust

使用pip安装它:

Python流处理

绑定

Faust还定义了一组setuptools扩展,可以用来安装Faust,并且有一个给定特性的依赖关系。

您可以在您的需求中或在pip命令行中使用方括号来指定它们。使用逗号分隔多个包:

Python流处理

以下的绑定均是有效的:

商店

Python流处理

最优化

Python流处理

传感器

Python流处理

事件循环

Python流处理

调试

Python流处理

下载并从源文件中安装

下载最新的Faust版本的网址是:http: //pypi.python.org/pypi/faust

您可以这样安装它:

Python流处理

如果当前没有使用virtualenv,则必须以特权用户的身份执行最后一个命令。

使用开发版本

您可以使用以下pip命令安装Faust的最新版本:

Python流处理

常见问题

Faust可以在Django/Flask/etc上使用吗?

使用gevent

这种方法适用于任何可以与gevent一起工作的阻塞Python库。

使用gevent需要您安装aiogevent模块,您可以将其作为Faust的包进行安装:

Python流处理

然后要真正的使用gevent作为事件循环,您要么在faust程序中使用-L <faust --loop>

命令:

Python流处理

要么在你脚本的前面加入import mode.loop.gevent

Python流处理

记住:非常重要的一点是,它位于模块的最顶端,并且在导入库之前执行。

使用eventlet

这种方法适用于任何可以使用eventlet的阻塞Python库。

使用eventlet需要您安装aioeventlet模块,您可以将其安装为与Faust一起的捆绑包。

Python流处理

然后要实际使用eventlet作为事件循环,您要么在faust程序中使用-L <faust --loop>

命令:

Python流处理

要么在你脚本的前面加入import mode.loop.gevent

Python流处理

警告

非常重要的是,它位于模块的最顶端,并且在导入库之前执行。

Faust可以在Tornado上使用吗?

可以!使用tornado.platform.asyncio链接:http://www.tornadoweb.org/en/stable/asyncio.html

Faust可以在Twisted上使用吗?

可以!使用asyncio反应器实现:https://twistedmatrix.com/documents/17.1.0/api/twisted.internet.asyncioreactor.html

是否支持Python3.5或者更早的版本?

目前还没有支持Python 3.5的计划,但是欢迎您为这个项目做出贡献。

以下是实现这一目标所需的一些步骤

  • 源代码转换以重写变量注释到注释

示例,代码:

Python流处理

  • 重写异步函数的源代码转换

示例,代码:

Python流处理

必须重写:

Python流处理

你将支持Python2吗?

目前还没有支持Python 2的计划,但是欢迎您为项目做贡献(上面问题中的细节也与Python 2相关)。

在本地运行Faust应用程序时,我得到的打开文件的最大数量超过了RocksDB的错误。我该怎么解决这个问题呢

您可能需要增加打开文件的最大数量的限制。下面的文章解释了如何在OS X上这么做:https://blog.dekstroza.io/ulimit-shenanigans-on-osx-el-capitan

资源

问题跟踪

如果你有任何意见,问题,或者烦恼,请记录下来作为我们的问题追踪报告:https://github.com/robinhood/faust/issues/

Wiki

https://wiki.github.com/robinhood/faust/

许可证

该软件在新的BSD许可下获得许可。有关完整的许可文本,请参阅顶部分发目录中的许可文件。

贡献

Faust的发展发生在GitHub: https://github.com/robinhood/faust

我们非常鼓励您参与Faust的发展。

请务必也阅读文件中对Faust的贡献部分。

编码规范

在项目代码库、问题跟踪器、聊天室和邮件列表中进行交互的每个人都应该遵循《Faust行为准则》。

作为这些项目的贡献者和维护者,为了培养开放和受欢迎的社区,我们承诺尊重所有通过报告问题、发布特性请求、更新文档、提交合并请求或补丁和其他活动的人。

我们致力于使参与这些项目的每个人都无骚扰体验,不论其经验水平、性别、性别认同和表现、性取向、残疾、个人外貌、体型、种族、种族、年龄、宗教或国籍。

参与者不良行为包括:

  • 性化的语言或意象的使用

  • 个人人身攻击

  • 恶意破坏或侮辱/侮辱性的评论

  • 公共或者私人的骚扰

  • 未经明确许可,发布他人的私人信息,如住址或电子地址

  • 其他不道德或不专业的行为。

项目维护人员有权利和责任删除、编辑或拒绝评论、提交、代码、wiki编辑、问题和其他与行为准则不一致的贡献。通过采用这一行为准则,项目维护者承诺在管理这个项目的每个方面都公平、一致地应用这些原则。不遵守或执行行为准则的项目维护者可能被永久地从项目团队中删除。

当个人代表项目或社区时,此行为准则适用于项目空间和公共空间。

可以通过创建一个问题或联系一个或多个项目负责人来举报虐待、骚扰或其他不可接受行为。

本行为守则改编自Contributor Covenant 1.2.0版本,可在http://contributor-covenant.org/version/1/2/0/查阅.


http://chatgpt.dhexx.cn/article/0RiDqQit.shtml

相关文章

Stream流式处理

Stream流的三类方法 获取Stream&#xff1a;流创建一条流水线,并把数据放到流水线上准备。 中间方法&#xff1a;流水线上的操作一次操作完毕之后,还可以继续进行其他操作。 终结方法&#xff1a;一个Stream流只能有一个终结方法是流水线上的最后一个操作。 生成Stream流的…

流数据处理与分析

环境 名称 版本 系统 Ubuntu 18.04.4 LTS 内存 7.5GiB 处理器 Intel Core i7-8565U CPU 1.80GHz *8 图形 Intel UHD Graphics&#xff08;Whiskey Lake 3*8 GT2&#xff09; GNOME 3.28.2 操作系统类型 64位 磁盘 251.0 GB Storm 2.1.0 Zookeeper…

流处理系统

文章目录 引言如何发送事件流流处理不可靠的时钟容错总结 引言 清楚数据的类型有助于我们设计一个性能更高&#xff0c;更有针对性的数据系统&#xff0c;比如在线系统&#xff0c;离线系统&#xff08;批处理&#xff09;。近实时系统(流处理)等等。比如说批处理系统&#xf…

流处理简介

一. 流式处理简介 在我接触到java8流式处理的时候&#xff0c;我的第一感觉是流式处理让集合操作变得简洁了许多&#xff0c;通常我们需要多行代码才能完成的操作&#xff0c;借助于流式处理可以在一行中实现。比如我们希望对一个包含整数的集合中筛选出所有的偶数&#xff0c;…

【节点流和处理流】

节点流和处理流 基本介绍 节点流可以从特定数据源读取数据&#xff0c;如FileReader、FileWriter处理流&#xff1a;是对一个已存在的流的连接和封装&#xff0c;通过所封装的流的功能调用实现数据读写。如BufferedReader.处理流的构造方法总是要带一个其他的流对象做参数。一…

流数据处理

流数据处理strom 在2011年Storm开源之前&#xff0c;由于Hadoop的火红&#xff0c;整个业界都在喋喋不休地谈论大数据。Hadoop的高吞吐&#xff0c;海量数据处理的能力使得人们可以方便地处理海量数据。但是&#xff0c;Hadoop的缺点也和它的优点同样鲜明——延迟大&#xff0…

一. 流式处理简介

https://www.cnblogs.com/shenlanzhizun/p/6027042.html Java技术学习 https://www.itkc8.com 一. 流式处理简介 在我接触到java8流式处理的时候&#xff0c;我的第一感觉是流式处理让集合操作变得简洁了许多&#xff0c;通常我们需要多行代码才能完成的操作&#xff0c;借助…

Kafka基础-流处理

1. 什么是流处理&#xff1f; 首先&#xff0c;让我们说一下什么是数据流&#xff08;也称为事件流&#xff09;&#xff1f;它是无边界数据集的抽象说法&#xff0c;无边界意味着无限且不断增长&#xff0c;因为随着时间的推移&#xff0c;新数据会不断地到来。 除了无边界的…

流处理基本介绍

1. 什么是流处理 一种被设计来处理无穷数据集的数据处理系统引擎 2. 流处理的几个概念 1. 无穷数据&#xff08;Unbounded data&#xff09;&#xff1a;一种持续生成&#xff0c;本质上是无穷尽的数据集。它经常会被称为“流数据”。然而&#xff0c;用流和批次来定义…

Spark Streaming与流处理

一、流处理 1.1 静态数据处理 在流处理之前&#xff0c;数据通常存储在数据库&#xff0c;文件系统或其他形式的存储系统中。应用程序根据需要查询数据或计算数据。这就是传统的静态数据处理架构。Hadoop 采用 HDFS 进行数据存储&#xff0c;采用 MapReduce 进行数据查询或分…

什么是流处理

流处理正变得像数据处理一样流行。流处理已经超出了其原来的实时数据处理的范畴&#xff0c;它正在成为一种提供数据处理&#xff08;包括批处理&#xff09;&#xff0c;实时应用乃至分布式事务的新方法的技术。 1、什么是流处理&#xff1f; 流处理是不断合并新数据以计算结果…

嵌入式软件升级方法

一、U盘升级 1.在u盘根目录新建文件夹&#xff0c;命名为‘upgrade’ 2.将软件复制到upgrade文件夹中 3.将u盘插到嵌入式服务器usb口上&#xff0c;断电重启服务器 二、PC工具升级 1.打开PC工具&#xff0c;选中要升级的机器&#xff0c;点击‘素材管理’选项卡&#xff0c…

嵌入式软件架构设计之分层设计

关注、星标公众号&#xff0c;不错过精彩内容 整理&#xff1a;黄工 素材来源&#xff1a;网络 参考来源&#xff1a; https://blog.51cto.com/kenotu/1614390 在正规的项目开发中&#xff0c;项目往往是并行开发的&#xff0c;也就是说硬件设计、底层软件设计、应用软件设计等…

嵌入式系统软件层次结构

文章目录 嵌入式系统软件嵌入式系统软件的层次结构硬件抽象层 嵌入式操作系统嵌入式操作系统——WinCE嵌入式操作系统——VxWorks嵌入式操作系统——Linux嵌入式Linux OS的特点 嵌入式操作系统——uCOS嵌入式操作系统—— PalmOS其他嵌入式操作系统华为鸿蒙系统 嵌入式系统软件…

嵌入式软件开发必备知识体系

嵌入式软件开发学习路线 前言 本章节主要介绍嵌入式软件开发概念以及大致的学习知识点的范围 一、嵌入式软件是什么&#xff1f; 百度百科&#xff1a;嵌入式工程师是指具有C/C语言、汇编语言等基础&#xff0c;熟悉模拟电子技术等硬件知识&#xff0c;了解处理器体系结构&a…

嵌入式开发 | 嵌入式开发设计文档该怎么写?

关注星标公众号&#xff0c;不错过精彩内容 作者 | strongerHuang 微信公众号 | 嵌入式专栏 俗话说&#xff0c;不会写文档的工程师不是好的工程师&#xff01; 如果你只会写代码&#xff0c;而从不写文档&#xff0c;迟早有一天会“出事”。这不是危言耸听&#xff0c;现实生活…

简单嵌入式系统软件架构

本文为原创&#xff0c;以下链接有比较及时的更新&#xff1a; https://www.yuque.com/docs/share/334f4a3d-2974-49db-8f68-4db6601a0d21?# 《简单嵌入式系统》 引言 本文描述的内容&#xff0c;适用范围是简单嵌入式系统。举一些可能不恰当的例子&#xff0c;如手环、蓝牙…

嵌入式软件设计层级划分概念

嵌入式软件设计层级划分概念 设计过程中体会的细化更新部分&#xff1a; 层级描述备注应用层直接控制应用&#xff0c;比如led_light_on(),led_light_off() 器件层&#xff08;如果操作复杂可进一步划分为器件应用层和器件驱动层&#xff09;比如&#xff1a;实现led_light_on …

浅议嵌入式软件测试

近年来&#xff0c;随着嵌入式系统的功能和复杂性不断增加&#xff0c;其开发时间和成本也随之不断上升。对于安全关键领域的嵌入式系统和软件来说&#xff0c;其稳定性和可靠性往往需要通过大量的测试和验证来保证。 01.一般软件测试vs嵌入式软件测试 嵌入式软件测试针对嵌入…

嵌入式程序设计思路

项目做的多了&#xff0c;深切地体会到架构的重要性。 俗话说&#xff0c;没有好的架构&#xff0c;移植和复用是件很痛苦的事&#xff0c;只能重复的造轮子。特别是嵌入式的代码&#xff0c;如果应用层中间穿插着驱动层的代码&#xff0c;维护起来是一件相当痛苦的事情。 这…