1.11.0 pyflink使用例子

article/2025/10/5 11:14:04

官网链接

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/python/

python版本要求3.5及以上

安装pyflink

python -m pip install apache-flink

如果安装下载太慢会time-out , 换pip源

代码

table_api方式

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
#连接器在这descriptors里面,可以在这里面看需要什么参数
from pyflink.table.descriptors import Schema, OldCsv, FileSystem, Kafka, Json
from pyflink.table.udf import udfenv = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)#编写注册udf,暂时先不用
#t_env.get_config().get_configuration().set_string("python.fn-execution.memory.managed", 'true')
#add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT())
#t_env.register_function("add", add)#读kafka
properties = {"zookeeper.connect" : "172.17.0.2:2181", "bootstrap.servers" : "172.17.0.2:9092", "group.id" : "flink-test-cxy"}
t_env.connect(Kafka().properties(properties).version("universal").topic("mytesttopic").start_from_latest()) \.with_format(Json()).with_schema(Schema() \.field('a', DataTypes.BIGINT()) \.field('b', DataTypes.BIGINT())) \.create_temporary_table('mySource')
#读csv
# t_env.connect(FileSystem().path('C:/Users/xuyin/Desktop/docker_compose_test/src.txt')) \
# 	.with_format(OldCsv()
# 				 .field('a', DataTypes.BIGINT())
# 				 .field('b', DataTypes.BIGINT())) \
# 	.with_schema(Schema()
# 				 .field('a', DataTypes.BIGINT())
# 				 .field('b', DataTypes.BIGINT())) \
# 	.create_temporary_table('mySource')#写入csv
t_env.connect(FileSystem().path('C:/Users/xuyin/Desktop/pyflink_test/tar.txt')) \.with_format(OldCsv().field('sum', DataTypes.BIGINT())) \.with_schema(Schema().field('sum', DataTypes.BIGINT())) \.create_temporary_table('mySink')#读取kafka数据中的a和b字段相加再乘以2 , 并插入sink
t_env.from_path('mySource')\.select("(a+b)") \.insert_into('mySink')t_env.execute("job_test")

调试

打开kafka producer ,输入数据

结果

tar.txt接收到的数据

使用table_sql

比如在创建source时,使用sql_update

t_env.sql_update("""CREATE TABLE mySource (                                       a bigint,                                                    b bigint                                                ) WITH (                                                         'connector.type' = 'kafka',                                    'connector.version' = 'universal',                             'connector.topic' = 'mytesttopic',                             'connector.properties.zookeeper.connect' = '172.17.0.2:2181',  'connector.properties.bootstrap.servers' = '172.17.0.2:9092',  'connector.properties.group.id' = 'flink-test-cxy',            'connector.startup-mode' = 'latest-offset',                    'format.type' = 'json'                                         ) 
""")

使用udf

创建和注册

#编写注册udf
t_env.get_config().get_configuration().set_string("python.fn-execution.memory.managed", 'true')
add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT())
t_env.register_function("add", add)

使用

t_env.sql_update("insert into mySink select add(a,b) from mySource")

另外例子

kafka2mysql

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.descriptors import Schema, OldCsv, FileSystem, Kafka, Json
from pyflink.table.udf import udfenv = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)t_env.sql_update("""CREATE TABLE mySource (                                       a bigint,                                                    b bigint                                                ) WITH (                                                         'connector.type' = 'kafka',                                    'connector.version' = 'universal',                             'connector.topic' = 'mytesttopic',                             'connector.properties.zookeeper.connect' = '172.17.0.2:2181',  'connector.properties.bootstrap.servers' = '172.17.0.2:9092',  'connector.properties.group.id' = 'flink-test-cxy',            'connector.startup-mode' = 'latest-offset',                    'format.type' = 'json'                                         ) 
""")
t_env.sql_update("""CREATE TABLE mysqlsink (id bigint, game_id varchar) with ('connector.type' = 'jdbc',  'connector.url' = 'jdbc:mysql://x.x.x.x:3306/flinksql?useSSL=false' ,'connector.username' = 'x' ,'connector.password' = 'x', 'connector.table' = 'mysqlsink' ,'connector.driver' = 'com.mysql.cj.jdbc.Driver' ,'connector.write.flush.interval' = '5s', 'connector.write.flush.max-rows' = '1' )
""")
t_env.sql_update("insert into mysqlsink select a , cast(b as varchar) b from mySource")
t_env.execute("job")

 


http://chatgpt.dhexx.cn/article/WwjCwI0q.shtml

相关文章

Pypeline:一种实现Anylogic与Python连接的新工具

Pypeline:一种实现Anylogic与Python连接的新工具 Pypeline背景Pypeline安装方法和环境配置要求Pypeline使用方法 Pypeline背景 Pypeline是由Anylogic官方团队推出的一个功能插件,旨在实现运行中的Anylogic模型和本地安装好的Python的链接。换句话说就是…

Python的连接符

python的连接符主要有 加号()、逗号(,)、空格( ) 、反斜线(\)、join()的方式、 加号() #注意,只能连接字符串,如果一个…

在 Python 中连接列表——如何连接列表

将两个或多个字符串、列表或其他数据结构组合成单个实体的过程在编程中称为串联。 串联产生一个新对象,其中包含原始对象的所有组件,并按串联顺序排列。 字符串上下文中的连接是指将一个字符串连接到另一个字符串的末尾以创建更长的字符串。例如&#xf…

win10磁盘管理_win10合并磁盘分区教程

Win10怎么合并磁盘分区?在首次安装系统时我们需要对硬盘进行分区,但是在系统正常使用时也是可以对硬盘进行合并等操作的。许多用户在使用电脑的过程中可能都会遇到磁盘空间不够用的情况,那么对于磁盘空间不足,我们可以通过合并磁盘分区来解决…

win10磁盘管理 磁盘分区和合并

一、磁盘分区管理 如果是只需要给单个磁盘分区进行划分的话,我们可以采用电脑自带的磁盘管理工具进行操作。具体的步骤如下: 第一步:右击开始菜单图表,选择磁盘管理打开 第二步:在新弹出的窗口的右下边可以看到磁盘&…

计算机硬盘合并怎么弄,如何将分区的硬盘合并为一个磁盘?

使用Windows磁盘管理将两个硬盘都转换为动态磁盘。请参阅Microsoft的介绍。要将基本磁盘升级为动态磁盘,请右键单击“我的电脑”,然后选择“管理”以打开计算机管理控制台。在计算机管理中,单击“磁盘管理”,右键单击要升级到动态…

Windows 10 创建 删除 合并磁盘分区

Windows 10 创建 删除 合并磁盘分区 右键我的电脑 管理 磁盘管理 选择一个空间比较大的磁盘分区 右键 压缩卷 自己选择一个合适的压缩范围,进行压缩 新建分区完成 右键新建分区 选择添加卷 看新建完成的F盘 新建分区就结束啦 然后是删除分区磁盘: …

计算机如何将两个磁盘合在一起,win10怎么把电脑自带的两个磁盘合并到一起

2019-10-06阅读(447) 合并分区时,要扩展的磁盘没有“扩展卷”选项,是因为整个硬盘上没有空白卷可扩展。所以,要合并分区,应该先准备出要合并的空间大小。方法步骤进入磁盘管理:按win+x选磁盘管理,进主界面2.如要将上图1G的恢复分区合并到C盘,应:直接删除该恢复分区,得…

Win10电脑如何合并磁盘分区

电脑空间不足怎么办?很多网友可能都被这个问题困扰过,其实我们可以通过管理磁盘分区来缓解空间不足的问题,把多余的空间合并起来就行了,现在来看看该如何操作吧。 更多系统教程尽在小白系统重装官网 系统:win10专业版…

Win7分割合并磁盘

如何在Windows7系统下对硬盘进行分区?很多时候我们出于某些原因需要在硬盘上划分一块新区域,说到这里,大家可能想到了很多第三方的硬盘分区工具,其实在Win7中不需要繁琐的操作和第三方软件就可以在系统下直接进行分区。 首先点击“…

虚拟服务器的磁盘合并,磁盘管理怎么合并分区

磁盘管理怎么合并分区 内容精选 换一换 DESS磁盘扩容成功后,需要在裸金属服务器的操作系统中对扩容部分的磁盘分配分区。已登录裸金属服务器,详细操作请参见《裸金属服务器用户指南》中章节“登录Windows裸金属服务器”。已挂载磁盘至裸金属服务器&#…

电脑磁盘怎么分区以及合并?

提示:当新装硬盘后,C盘不能进行合并时,不要强行合并(需要重做系统进行分区),否则会引起崩盘 目录 前言: 如何给磁盘分区? 如何将两个及以上的分区合并 前言: 分区更方…

win7磁盘合并步骤

姐妹篇:win7磁盘合并 本文告诉给大家分析win7系统磁盘合并的详细步骤。一、右键点击我的电脑,然后选择“管理”,然后选择磁盘管理。二、选择要合并的磁盘(这次我们要把F盘合并到E盘)三、右键单击要合并的磁盘&#xff…

Orace VM VirtualBox下Linux虚拟机磁盘空间不够处理方法,扩容vdi盘+gparted合并磁盘空间(超详细图文详解)

主要分成两个角度 ①清理磁盘空间 ②外界给Linux虚拟机扩容 对于①博主搜寻了很多方法,但是因为对于Linux系统很不熟悉不知道该删些什么东西来清理,尝试半天未果,遂选择 方法②外界给Linux虚拟机扩容 本人是mac/windows双系统,…

macbook删除windows后合并磁盘分区

macbook删除windows后合并磁盘分区 打开终端,列出卷 $ diskutil list 可以看到我给windows卷格式化后是 disk2 90.2G /dev/disk0 (internal, physical): #: TYPE NAME SIZE IDENTIFIER 0: GUID_part…

计算机如何将两个磁盘合在一起,Win7系统如何合并磁盘分区将两个或多个合并到一起...

Win7系统如何合并磁盘分区将两个或多个合并到一起 腾讯视频/爱奇艺/优酷/外卖 充值4折起 在重装雨林木风Win7系统之前,进行了磁盘分区,装完系统后才发现有一个磁盘只分到了17.4GB,于是就想到了合并磁盘分区。下面教大家雨林木风Win7系统下合并磁盘分区的方法,并将实战成果作…

移动硬盘如何合并磁盘

前几天小编刚换了固态硬盘,于是乎,机械硬盘就当移动硬盘用了,还买了一个感觉不错的硬盘盒,美滋滋,但是每次用的时候都会在电脑上显示好多盘,看着很是不舒服。怎么办呢?下面小编就介绍一种超级简…

虚拟机Win 7中重新 划分 分区 合并 磁盘

背景 热爱学习的我在学习如何重新给电脑的磁盘分区( √ )之前在系统安装的时候少划分了一个磁盘,导致只有一个系统C盘。( ) 前言 在开始磁盘分配前一定要记得 备份 好原有文件啊!!&#xff0…

磁盘管理删除受保护的“恢复分区”、或合并磁盘发现中间有恢复分区无法合并,怎么办?方法如下

使用电脑时,把C盘挪一点到D盘或者D盘挪一点到C盘时,发现有“恢复分区”导致不能合并;这时候我们可以把这个恢复分区给删掉(通常磁盘里不止一个恢复分区,删除一个碍事的不要紧,别全都删了就好) 删…

怎样合并磁盘分区?看这里~

如何合并磁盘分区? 很多电脑用户拿到新电脑就会马上给磁盘分区,有的忘记分区了。小编我就给自己的电脑分了好几个区,每次使用分区的时候非常难受,因为太多了不知道怎么选择,实际在用的就只有三个,其中一个…