官网链接
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/python/
python版本要求3.5及以上
安装pyflink
python -m pip install apache-flink
如果安装下载太慢会time-out , 换pip源
代码
table_api方式
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
#连接器在这descriptors里面,可以在这里面看需要什么参数
from pyflink.table.descriptors import Schema, OldCsv, FileSystem, Kafka, Json
from pyflink.table.udf import udfenv = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)#编写注册udf,暂时先不用
#t_env.get_config().get_configuration().set_string("python.fn-execution.memory.managed", 'true')
#add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT())
#t_env.register_function("add", add)#读kafka
properties = {"zookeeper.connect" : "172.17.0.2:2181", "bootstrap.servers" : "172.17.0.2:9092", "group.id" : "flink-test-cxy"}
t_env.connect(Kafka().properties(properties).version("universal").topic("mytesttopic").start_from_latest()) \.with_format(Json()).with_schema(Schema() \.field('a', DataTypes.BIGINT()) \.field('b', DataTypes.BIGINT())) \.create_temporary_table('mySource')
#读csv
# t_env.connect(FileSystem().path('C:/Users/xuyin/Desktop/docker_compose_test/src.txt')) \
# .with_format(OldCsv()
# .field('a', DataTypes.BIGINT())
# .field('b', DataTypes.BIGINT())) \
# .with_schema(Schema()
# .field('a', DataTypes.BIGINT())
# .field('b', DataTypes.BIGINT())) \
# .create_temporary_table('mySource')#写入csv
t_env.connect(FileSystem().path('C:/Users/xuyin/Desktop/pyflink_test/tar.txt')) \.with_format(OldCsv().field('sum', DataTypes.BIGINT())) \.with_schema(Schema().field('sum', DataTypes.BIGINT())) \.create_temporary_table('mySink')#读取kafka数据中的a和b字段相加再乘以2 , 并插入sink
t_env.from_path('mySource')\.select("(a+b)") \.insert_into('mySink')t_env.execute("job_test")
调试
打开kafka producer ,输入数据
结果
tar.txt接收到的数据
使用table_sql
比如在创建source时,使用sql_update
t_env.sql_update("""CREATE TABLE mySource ( a bigint, b bigint ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'mytesttopic', 'connector.properties.zookeeper.connect' = '172.17.0.2:2181', 'connector.properties.bootstrap.servers' = '172.17.0.2:9092', 'connector.properties.group.id' = 'flink-test-cxy', 'connector.startup-mode' = 'latest-offset', 'format.type' = 'json' )
""")
使用udf
创建和注册
#编写注册udf
t_env.get_config().get_configuration().set_string("python.fn-execution.memory.managed", 'true')
add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT())
t_env.register_function("add", add)
使用
t_env.sql_update("insert into mySink select add(a,b) from mySource")
另外例子
kafka2mysql
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.descriptors import Schema, OldCsv, FileSystem, Kafka, Json
from pyflink.table.udf import udfenv = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)t_env.sql_update("""CREATE TABLE mySource ( a bigint, b bigint ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'mytesttopic', 'connector.properties.zookeeper.connect' = '172.17.0.2:2181', 'connector.properties.bootstrap.servers' = '172.17.0.2:9092', 'connector.properties.group.id' = 'flink-test-cxy', 'connector.startup-mode' = 'latest-offset', 'format.type' = 'json' )
""")
t_env.sql_update("""CREATE TABLE mysqlsink (id bigint, game_id varchar) with ('connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://x.x.x.x:3306/flinksql?useSSL=false' ,'connector.username' = 'x' ,'connector.password' = 'x', 'connector.table' = 'mysqlsink' ,'connector.driver' = 'com.mysql.cj.jdbc.Driver' ,'connector.write.flush.interval' = '5s', 'connector.write.flush.max-rows' = '1' )
""")
t_env.sql_update("insert into mysqlsink select a , cast(b as varchar) b from mySource")
t_env.execute("job")