Flink 学习三 Flink 流 process function API

article/2025/9/11 1:06:10

Flink 学习三 Flink 流&process function API

1.Flink 多流操作

1.1.split 分流 (deprecated)

把一个数据流根据数据分成多个数据流 1.2 版本后移除

1.2.分流操作 (使用侧流输出)

public class _02_SplitStream {public static void main(String[] args) throws Exception {// 获取环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Integer> streamSource = env.fromElements(1, 2, 3, 4, 5);SingleOutputStreamOperator<Integer> processed = streamSource.process(new ProcessFunction<Integer, Integer>() {/**** @param value 输出的数据* @param ctx A 上下文* @param out 主要流输出器* @throws Exception*/@Overridepublic void processElement(Integer value, ProcessFunction<Integer, Integer>.Context ctx,Collector<Integer> out) throws Exception {if (value % 3 == 0) {//测流数据ctx.output(new OutputTag<Integer>("3%0",TypeInformation.of(Integer.class)) , value);}if (value % 3 == 1) {//测流数据ctx.output(new OutputTag<Integer>("3%1",TypeInformation.of(Integer.class)) , value);}//主流 ,数据out.collect(value);}});DataStream<Integer> output0 = processed.getSideOutput(new OutputTag<>("3%0",TypeInformation.of(Integer.class)));DataStream<Integer> output1 = processed.getSideOutput(new OutputTag<>("3%1",TypeInformation.of(Integer.class)));output1.print();env.execute();}
}

1.3.connect

connect 连接 DataStream ,DataStream ==> ConnectedStream

两个DataStream 连接成一个新的ConnectedStream ,虽然两个流连接在一起,但是两个流依然是相互独立的,这个方法的最大用处是: 两个流共享State 状态

两个流在内部还是各自处理各自的逻辑 比如 CoMapFunction 内的map1,map2 还是各自处理 streamSource,streamSource2;

数据类型可以不一致

public class _03_ConnectedStream {public static void main(String[] args) throws Exception {// 获取环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Integer> streamSource = env.fromElements(1, 2, 3, 4, 5);DataStreamSource<Integer> streamSource2 = env.fromElements(10, 20, 30, 40, 50);ConnectedStreams<Integer, Integer> connected = streamSource.connect(streamSource2);// 原来的 MapFunction ==>  CoMapFunction  ; flatMap ==> CoMapFunctionSingleOutputStreamOperator<Object> mapped = connected.map(new CoMapFunction<Integer, Integer, Object>() {@Overridepublic Object map1(Integer value) throws Exception {return value + 1;}@Overridepublic Object map2(Integer value) throws Exception {return value * 10;}});mapped.print();env.execute();}
}--------------------------------------------------------------------------------------         --------------------    streamSource         --->         map1  --------------------         ----------------------------------------         --------------------    streamSource2       --->          map2  --------------------         -------------------- 
------------------------------------------------------------------    

1.4.union

可以合并多个流,流数据类型必须一致,


public class _04_UnionStream {public static void main(String[] args) throws Exception {// 获取环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Integer> streamSource = env.fromElements(1, 2, 3, 4, 5);DataStreamSource<Integer> streamSource2 = env.fromElements(10, 20, 30, 40, 50,80,1110);DataStream<Integer> unioned = streamSource.union(streamSource2);SingleOutputStreamOperator<String> union = unioned.map(new MapFunction<Integer, String>() {@Overridepublic String map(Integer value) throws Exception {return "union" + value;}});union.print();env.execute();}
}--------------------------------------------------------------------------------------          streamSource               --------------------              --------------------=====>        map--------------------              --------------------    streamSource2               --------------------         
------------------------------------------------------------------    

1.5.coGroup

coGroup 本质上是join 算子的底层算子

有界流的思想去处理; 比如上说是时间窗口: 5S内数据分组匹配

        <左边流>.coGroup(<右边流>).where(<KeySelector>).equalTo(<KeySelector>).window(<窗口>).apply(<处理逻辑>)

在这里插入图片描述

数据组比如说是时间窗口是5或者是10s 为一批数据, 时间窗口内的数据完成后,根据 where,和 equalTo 选择的key 数据一致 来分组

public class _05_CoGroupStream {public static void main(String[] args) throws Exception {// 获取环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Person> name_idCard = env.socketTextStream("192.168.141.131", 8888).map(x -> {Person person = new Person();person.setName(x.split(",")[0]);person.setIdCard(x.split(",")[1]);return person;}).returns(TypeInformation.of(Person.class)).name("==idCard==");//name_idCard.print();DataStream<Person> name_addr = env.socketTextStream("192.168.141.131", 7777).map(x -> {Person person = new Person();person.setName(x.split(",")[0]);person.setAddr(x.split(",")[1]);return person;}).returns(TypeInformation.of(Person.class)).name("==addr==");//name_addr.print();DataStream<Person> dataStream = name_idCard.coGroup(name_addr)// 左边流的key.where(new KeySelector<Person, Object>() {@Overridepublic Object getKey(Person value) throws Exception {return value.getName();}})// 右边流的key.equalTo(new KeySelector<Person, Object>() {@Overridepublic Object getKey(Person value) throws Exception {return value.getName();}})//时间窗口.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))//处理逻辑  左边 Person ,右边  Person ,输出 Person.apply(new CoGroupFunction<Person, Person, Person>() {/*** first 协调组第一个流个数据* second 协调组第二个流数据*/@Overridepublic void coGroup(Iterable<Person> first, Iterable<Person> second, Collector<Person> out) throws Exception {//左连接实现Iterator<Person> iterator = first.iterator();while (iterator.hasNext()) {Person next1 = iterator.next();Iterator<Person> iterator1 = second.iterator();Boolean noDataFlag = true;while (iterator1.hasNext()) {Person result = new Person(next1);Person next = iterator1.next();result.setAddr(next.getAddr());out.collect(result);noDataFlag = false;}if (noDataFlag) {out.collect(next1);}}}});dataStream.print();env.execute();}
}

1.6. join 关联操作

用于关联两个流,需要指定join 条件;需要在窗口中进行关联后的计算逻辑

join 使用coGroup 实现的

public class _06_JoinStream {public static void main(String[] args) throws Exception {// 获取环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//Perple 数据打平为Tuple  name,idCard,addrDataStream<Tuple3<String, String,String>> name_idCard = env.socketTextStream("192.168.141.131", 8888).map(x -> {return Tuple3.of(x.split(",")[0],x.split(",")[1],"");}).returns(TypeInformation.of(new TypeHint<Tuple3<String, String,String>>() {})) ;DataStream<Tuple3<String, String,String>> name_addr = env.socketTextStream("192.168.141.131", 7777).map(x -> {return Tuple3.of(x.split(",")[0],"",x.split(",")[1]);}) .returns(TypeInformation.of(new TypeHint<Tuple3<String, String,String>>() {}));//name_addr.print();DataStream<Tuple3<String, String,String>> dataStream = name_idCard.join(name_addr)// 左边流的f0 字段.where(tp3->tp3.f0)// 右边流的f0 字段.equalTo(tp3->tp3.f0)//时间窗口.window(TumblingProcessingTimeWindows.of(Time.seconds(20)))//处理逻辑  左边 Person ,右边  Person ,输出 Person.apply(new JoinFunction<Tuple3<String, String,String>, Tuple3<String, String,String>, Tuple3<String, String,String>>() {/*** @param first 匹配到的数据  first input.* @param second 匹配到的数据 second input.* @return* @throws Exception*/@Overridepublic Tuple3 join(Tuple3 first, Tuple3 second) throws Exception {return Tuple3.of(first.f0,first.f1,second.f2);}});dataStream.print();env.execute();}
}

1.7.broadcast

   datastream1: 用户id|行为|操作数据                   datastream2: 用户id|用户name|用户phone   
windows time1 ---------------------------------- 	---------------------------------12  |click| xxdssd						12  |aa| 131	13  |click| dasd             			 13  |cc| 1331					14  |click| ad    						14  |dd| 1321	
windows time2 ---------------------------------- 	---------------------------------12  |click| sfs          															13  |click| sdfs       15  |click| ghf     					17  |dd| 1321											
windows time3 ----------------------------------  	---------------------------------14  |click| ghf   17  |click| ghf 												注: 左边流数据是基础数据,使用 join不合适 ,适合 broadcastbroadcast 适用于关联字典表 主流算子		<<<----------------------------------	广播状态				

public class _07_BroadcastStream {public static void main(String[] args) throws Exception {// 获取环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 数据打平为 用户id|行为|操作数据DataStream<Tuple3<String, String, String>> operationInfo = env.socketTextStream("192.168.141.131", 8888).map(x -> {return Tuple3.of(x.split(",")[0], x.split(",")[1], x.split(",")[2]);}).returns(TypeInformation.of(new TypeHint<Tuple3<String, String, String>>() {}));// 数据打平为 用户id|用户name|用户phoneDataStream<Tuple3<String, String, String>> baseInfo = env.socketTextStream("192.168.141.131", 7777).map(x -> {return Tuple3.of(x.split(",")[0], x.split(",")[1], x.split(",")[2]);}).returns(TypeInformation.of(new TypeHint<Tuple3<String, String, String>>() {}));//状态描述MapStateDescriptor<String, Tuple3<String, String, String>> userBaseInfoStateDesc = new MapStateDescriptor<>("user base info", TypeInformation.of(String.class),TypeInformation.of(new TypeHint<Tuple3<String, String, String>>() {}));// 基础信息 变成广播流BroadcastStream<Tuple3<String, String, String>> userBaseInfoBroadcast = baseInfo.broadcast(userBaseInfoStateDesc);// 关联行为流和广播流BroadcastConnectedStream<Tuple3<String, String, String>, Tuple3<String, String, String>> connected = operationInfo.connect(userBaseInfoBroadcast);SingleOutputStreamOperator<Tuple5<String, String, String, String, String>> processed =// 连接后,处理的逻辑// connected 如果是keyedStream ===> 参数就是 KeyedBroadcastProcessFunction// connected 如果不是keyedStream ===> 参数就是 BroadcastProcessFunctionconnected.process(new BroadcastProcessFunction<Tuple3<String, String, String>, // 左流的数据Tuple3<String, String, String>, // 广播的类型Tuple5<String, String, String, String, String> // 返回数据类型>() {/*** 此方法是处理主流方法 主流来一条处理一下* * @throws Exception*/@Overridepublic void processElement(Tuple3<String, String, String> value, // 左流 主流 数据BroadcastProcessFunction<Tuple3<String, String, String>, Tuple3<String, String, String>, Tuple5<String, String, String, String, String>>.ReadOnlyContext ctx, // 上下文Collector<Tuple5<String, String, String, String, String>> out // 输出器) throws Exception {// 基础数据还没有 broadcastStateReadOnly// 和 processBroadcastElement 里面获取的 broadcastState 数据一致,只是是只读的// 数据是一致的ReadOnlyBroadcastState<String, Tuple3<String, String, String>> broadcastStateReadOnly = ctx.getBroadcastState(userBaseInfoStateDesc);if (broadcastStateReadOnly == null) {out.collect(Tuple5.of(value.f0, value.f1, value.f2, null, null));} else {Tuple3<String, String, String> baseInfo = broadcastStateReadOnly.get(value.f0);// 基础数据为空if (baseInfo == null) {out.collect(Tuple5.of(value.f0, value.f1, value.f2, null, null));} else {out.collect(Tuple5.of(value.f0, value.f1, value.f2, baseInfo.f1, baseInfo.f2));}}}/**** 处理广播流数据:拿到数据后,存到状态里面*/@Overridepublic void processBroadcastElement(Tuple3<String, String, String> value, // 广播流里面的一条数据BroadcastProcessFunction<Tuple3<String, String, String>, Tuple3<String, String, String>, Tuple5<String, String, String, String, String>>.Context ctx, // 上下文Collector<Tuple5<String, String, String, String, String>> out // 输出器) throws Exception {// 上下文 里面获取状态BroadcastState<String, Tuple3<String, String, String>> broadcastState = ctx.getBroadcastState(userBaseInfoStateDesc);//状态里面 以用户id 作为key , 基础信息为valuebroadcastState.put(value.f0, value);}});processed.print();env.execute();}
}

2.Flink 编程 process function

2.1 process function 简介

process function相对于前面的map , flatmap ,filter 的区别就是,对数据的处理有更大的自由度; 可以获取到数据的上下文,数据处理逻辑 ,如何控制返回等交给编写者;

在事件驱动的应用中,使用最频繁的api 就是process function

注: 在对不同的流的时候, process function 的类型也不一致

数据流的转换

在这里插入图片描述

不同的DataStream 的process 处理方法需要的参数类型有如下几种

2.2 ProcessFunction


public class _01_ProcessFunction {public static void main(String[] args) throws Exception {// 获取环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 数据打平为 用户id|行为|操作数据DataStreamSource<String> streamSource = env.fromElements("1,click,data1", "2,click1,data2", "10,flow,data1","22,doubleclick,data22");DataStream<Tuple3<String, String, String>> operationInfo = streamSource.map(x -> {return Tuple3.of(x.split(",")[0], x.split(",")[1], x.split(",")[2]);}).returns(TypeInformation.of(new TypeHint<Tuple3<String, String, String>>() {}));// ProcessFunctionSingleOutputStreamOperator<String> processed = operationInfo.process(new ProcessFunction<Tuple3<String, String, String>, String>() {// 处理元素@Overridepublic void processElement(Tuple3<String, String, String> value,ProcessFunction<Tuple3<String, String, String>, String>.Context ctx, Collector<String> out)throws Exception {// 可以做主流输出out.collect(value.f0 + value.f1 + value.f2);// 可以做侧流输出ctx.output(new OutputTag<Tuple3<String, String, String>>("adasd",TypeInformation.of(new TypeHint<Tuple3<String, String, String>>() {})), value);}// 其余 声明周期方法 ... 任务状态 ... 都可以获取@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);}});processed.print();env.execute();}}

2.3 KeyedProcessFunction

public class _02_KeyedProcessFunction {public static void main(String[] args) throws Exception {// 获取环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 数据打平为 用户id|行为|操作数据DataStreamSource<String> streamSource = env.fromElements("1,click,data1", "2,click1,data2", "10,flow,data1","22,doubleclick,data22", "2,doubleclick,data22");DataStream<Tuple3<String, String, String>> operationInfo = streamSource.map(x -> {return Tuple3.of(x.split(",")[0], x.split(",")[1], x.split(",")[2]);}).returns(TypeInformation.of(new TypeHint<Tuple3<String, String, String>>() {}));// keyedStreamKeyedStream<Tuple3<String, String, String>, String> keyedStream = operationInfo.keyBy(tp3 -> tp3.f0);// ProcessFunctionSingleOutputStreamOperator<String> processed = keyedStream.process(new ProcessFunction<Tuple3<String, String, String>, String>() {@Overridepublic void processElement(Tuple3<String, String, String> value,ProcessFunction<Tuple3<String, String, String>, String>.Context ctx, Collector<String> out)throws Exception {out.collect((value.f0 + value.f1 + value.f2).toUpperCase(Locale.ROOT));}});processed.print();env.execute();}}

2.4 ProcessWindowFunction

2.5 ProcessAllWindowFunction

2.6 CoProcessFunction

2.7 ProcessJoinFunction

2.8 BroadcastProcessFunction

参考1.7

2.9 KeyedBroadcastProcessFunction

3.测试

package demo.sff.flink.exercise;import demo.sff.flink.source.Person;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.parquet.ParquetWriterFactory;
import org.apache.flink.formats.parquet.avro.ParquetAvroWriters;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Random;/*** 创建流 Stream 1: id | event | count 1,event1,3 2,event1,5 3,event1,4** Stream 2: id | gender | city 1 , male ,beijin 2 ,female,shanghai** 需求 : 1.Stream 1 按照 count字段展开为对应的个数 比如id=1 展开为3条 1,event1,随机1 1,event1,随机2* 1,event1,随机3 ,id=2 展开为5 条** 2.Stream 1 关联上 Stream 2 数据** 3.关联不上 测流 其余主流** 4.主流,性别分组,取出最大随机数** 5.主流写入mysql** 6.测流写入parquet*/
public class Test1 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 创建流 后面可以使用socket 替换 stream2 先写入广播 不然关联不上DataStreamSource<String> stream1 = env.fromElements("1,event1,3", "2,event1,5", "3,event3,4");DataStreamSource<String> stream2 = env.fromElements("1,male,beijin", " 2,female,shanghai");DataStream<Tuple3<String, String, String>> streamOperator1 = stream1.map(x -> Tuple3.of(x.split(",")[0], x.split(",")[1], x.split(",")[2])).returns(TypeInformation.of(new TypeHint<Tuple3<String, String, String>>() {}));DataStream<Tuple3<String, String, String>> streamOperator2 = stream2.map(x -> Tuple3.of(x.split(",")[0], x.split(",")[1], x.split(",")[2])).returns(TypeInformation.of(new TypeHint<Tuple3<String, String, String>>() {}));// 需求1DataStream<Tuple3<String, String, String>> mapDataStream = streamOperator1.flatMap(new FlatMapFunction<Tuple3<String, String, String>, Tuple3<String, String, String>>() {@Overridepublic void flatMap(Tuple3<String, String, String> value,Collector<Tuple3<String, String, String>> out) throws Exception {Integer integer = Integer.valueOf(value.f2);for (Integer i = 0; i < integer; i++) {int r = new Random().nextInt(100);out.collect(Tuple3.of(value.f0, value.f1, r + ""));}}}).returns(TypeInformation.of(new TypeHint<Tuple3<String, String, String>>() {}));// mapDataStream.print();// 需求2 stream2 数据广播MapStateDescriptor<String, Tuple3<String, String, String>> descriptor = new MapStateDescriptor<String, Tuple3<String, String, String>>("userinfo", TypeInformation.of(String.class),TypeInformation.of(new TypeHint<Tuple3<String, String, String>>() {}));BroadcastStream<Tuple3<String, String, String>> tuple3BroadcastStream = streamOperator2.broadcast(descriptor);BroadcastConnectedStream<Tuple3<String, String, String>, Tuple3<String, String, String>> tuple3BroadcastConnectedStream = mapDataStream.connect(tuple3BroadcastStream);SingleOutputStreamOperator<Tuple5<String, String, String, String, String>> processed = tuple3BroadcastConnectedStream.process(new BroadcastProcessFunction<Tuple3<String, String, String>, Tuple3<String, String, String>, Tuple5<String, String, String, String, String>>() {@Overridepublic void processElement(Tuple3<String, String, String> value,BroadcastProcessFunction<Tuple3<String, String, String>, Tuple3<String, String, String>, Tuple5<String, String, String, String, String>>.ReadOnlyContext ctx,Collector<Tuple5<String, String, String, String, String>> out) throws Exception {ReadOnlyBroadcastState<String, Tuple3<String, String, String>> broadcastState = ctx.getBroadcastState(descriptor);// 需求3.关联不上 测流 其余主流if (broadcastState == null) {// out.collect(Tuple5.of(value.f0, value.f1, value.f2, null, null));ctx.output(new OutputTag<String>("nojoin", TypeInformation.of(String.class)),value.f0 + value.f1 + value.f2);} else {Tuple3<String, String, String> stringTuple3 = broadcastState.get(value.f0);if (stringTuple3 == null) {// out.collect(Tuple5.of(value.f0, value.f1, value.f2, null, null));ctx.output(new OutputTag<String>("nojoin", TypeInformation.of(String.class)),value.f0 + value.f1 + value.f2);} else {out.collect(Tuple5.of(value.f0, value.f1, value.f2, stringTuple3.f1,stringTuple3.f2));}}}@Overridepublic void processBroadcastElement(Tuple3<String, String, String> value,BroadcastProcessFunction<Tuple3<String, String, String>, Tuple3<String, String, String>, Tuple5<String, String, String, String, String>>.Context ctx,Collector<Tuple5<String, String, String, String, String>> out) throws Exception {BroadcastState<String, Tuple3<String, String, String>> broadcastState = ctx.getBroadcastState(descriptor);broadcastState.put(value.f0, value);}}).returns(TypeInformation.of(new TypeHint<Tuple5<String, String, String, String, String>>() {}));// 主流processed.print();// 测流DataStream<String> sideOutput = processed.getSideOutput(new OutputTag<String>("nojoin", TypeInformation.of(String.class)));// sideOutput.print();// 需求4 主流,性别分组,取出最大随机数SingleOutputStreamOperator<Tuple5<String, String, Integer, String, String>> streamOperator = processed.keyBy(x -> x.f3).map(new MapFunction<Tuple5<String, String, String, String, String>, Tuple5<String, String, Integer, String, String>>() {@Overridepublic Tuple5<String, String, Integer, String, String> map(Tuple5<String, String, String, String, String> value) throws Exception {return Tuple5.of(value.f0, value.f1, Integer.valueOf(value.f2), value.f3, value.f4);}}).returns(TypeInformation.of(new TypeHint<Tuple5<String, String, Integer, String, String>>() {}));SingleOutputStreamOperator<Tuple5<String, String, Integer, String, String>> maxBy = streamOperator.keyBy(tp5 -> tp5.f3).maxBy(2);maxBy.print();// 5.主流写入mysql  未验证 待测试String sql = " insert into testa values (?,?,?,?,?) on duplicate key a=?,b=?,c=?,d=?,e=?  ";SinkFunction<Tuple5<String, String, Integer, String, String>> jdbcSink = JdbcSink.sink(sql,new JdbcStatementBuilder<Tuple5<String, String, Integer, String, String>>() {@Overridepublic void accept(PreparedStatement preparedStatement,Tuple5<String, String, Integer, String, String> tuple5) throws SQLException {preparedStatement.setString(0, tuple5.f0);preparedStatement.setString(1, tuple5.f1);preparedStatement.setInt(2, tuple5.f2);preparedStatement.setString(3, tuple5.f3);preparedStatement.setString(4, tuple5.f4);preparedStatement.setString(5, tuple5.f0);preparedStatement.setString(6, tuple5.f1);preparedStatement.setInt(7, tuple5.f2);preparedStatement.setString(8, tuple5.f3);preparedStatement.setString(9, tuple5.f4);}}, JdbcExecutionOptions.builder().withBatchSize(2) // 两条数据一批插入.withMaxRetries(3) // 失败插入重试次数.build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withPassword("root") // jdbc 连接信息.withUsername("root")// jdbc 连接信息.withUrl("jdbc:mysql://192.168.141.131:3306/flinkdemo").build());streamOperator.addSink(jdbcSink);// 6.测流写入parquet  未验证 待测试ParquetWriterFactory<String> writerFactory = ParquetAvroWriters.forReflectRecord(String.class);FileSink<String> build = FileSink.forBulkFormat(new Path("d:/sink"), writerFactory).withBucketAssigner(new DateTimeBucketAssigner<String>()) // 文件分桶策略.withBucketCheckInterval(5)// 文件夹异步线程创建和检测周期.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("flinkdemo") // 文件前缀.withPartSuffix(".txt") // 文件后缀.build())// 文件的输出格式对象.build();sideOutput.sinkTo(build);env.execute();}}

http://chatgpt.dhexx.cn/article/YQ9RO0u5.shtml

相关文章

基于Unity3D实现的牙医病人小游戏【100011407】

1 总体设计 1&#xff09;基本技术路线 运用面向对象的设计理念&#xff0c;设计了病人类&#xff0c;牙医类&#xff0c;候诊椅类&#xff0c;诊疗椅类等&#xff0c;通过对这些类的封装&#xff0c;创立各个类的对象&#xff0c;并调用类的成员函数。 2&#xff09;总体结…

Java IO,BIO、NIO、AIO

操作系统中的 I/O 以上是 Java 对操作系统的各种 IO 模型的封装&#xff0c;【文件的输入、输出】在文件处理时&#xff0c;其实依赖操作系统层面的 IO 操作实现的。【把磁盘的数据读到内存种】操作系统中的 IO 有 5 种&#xff1a; 阻塞、 非阻塞、【轮询】 异步、 IO复…

TensorFlow2.0学习笔记-3.模型训练

3.模型训练 3.1.Keras版本模型训练 • 构建模型&#xff08;顺序模型、函数式模型、子类模型&#xff09; • 模型训练&#xff1a; model.fit() • 模型验证&#xff1a; model.evaluate() • 模型预测&#xff1a; model.predict() • 使用样本加权和类别加权 回调函数 •…

sql注入学习

提示&#xff1a;萌新学习路程的记录型文章&#xff0c;有错误的地方欢迎大佬们指正。 文章目录 前言一、SQLi-Labs1、SQLi-Labs下载、安装&#xff08;环境搭建)(1) 首先下载解压并移动sqli-labs(2) 找到sql-connections下的db-creds.inc进行修改(3) 打开网页&#xff1a;http…

从客户端中检测到有潜在危险的 request.form值[解决方法]

当页面编辑或运行提交时,出现“从客户端中检测到有潜在危险的request.form值”问题,该怎么办呢?如下图所示: 下面博主汇总出现这种错误的几种解决方法:问题原因:由于在asp.net中,Request提交时出现有html代码或javascript等字符串时,程序系统会认为其具有潜在危险的值。…

sadadas

dasdasdasdasdasd

python_day3_集合与运算/文件读写、修改详解/字符编码转换/函数和函数式编程(return/参数详解/局部变量与全局变量/递归/高阶函数)

Python_集合与运算/文件读写、修改详解/字符编码详解/函数和函数式编程/函数式编程之参数/局部变量与全局变量/递归/高阶函数 在这里得感谢&#xff0c;老师Alex金角大王(路飞学城IT) Python(给兄弟们挂个&#x1f517;) >_<…因为有的是自己理解&#xff0c;如有错误&a…

svn添加新项目的步骤

1.服务端给一个地址&#xff1a;拉取代码 2.上传代码&#xff08;不含没有module的build文件夹和以下文件夹&#xff09;

项目新添加页面svn上传

你项目当中新添加了页面上传SVN&#xff0c;需要先把新添加的页面Add到SVN上&#xff0c; 右击新添加的页面出现TortoiseSVN — Add&#xff0c;然后上传整个项目就可以了&#xff0c;SVN上就有新添加的页面了。 如下图

Eclipse用SVN上传新项目

首先右击项目–>team --> share project 选择repository为svn–>点击next 使用已有的资源库的位置&#xff0c;如下图所示&#xff1a; 使用项目名称作为文件夹名 --> 点击Finish --> 输入用户名和密码(此步不一定每个人都有)&#xff0c;如下图所示&#xff1a…

IDEA添加新项目到SVN

1.打开IDEA &#xff0c;上面工具栏选择VCS 选择把项目交给SVN管理 2.选择SVN 3、右键项目选择如下 4.点击绿色的号&#xff0c;选择一个SVN仓库的地址&#xff0c;下面可以选择上传到SVN仓库的目录格式&#xff0c;然后点击Share 5.默认1.8 6.右键项目选择-->Subversion--…

项目上传到SVN

步骤1、首先在服务器上安装svn。 步骤2、然后找到svn 选择VisualSVN Server Manager 在Repositories下新建Repository name 步骤3、在MyEclipse中右击项目&#xff0c;Team–>Share Project–>SVN–>创建新的资源位置或使用已有的资源位置&#xff08;最好新建&#…

idea上传新项目至svn仓库

linux下安装svn服务器&#xff0c;idea上传新项目 linux下安装svn服务器,配置svn仓库 就不写了,百度一大把 导入项目: 点击号 linux下svn://开头,输入svn服务器创建的仓库地址,也可连接http:// 输入svn仓库配置的账号.密码 导入

Eclipse中SVN上传项目

上传新项目到SVN服务器 选中你要上传的项目&#xff0c;右键-->Team-->Share Project&#xff0c;选中SVN-->Next。 前提是已经安装SVN插件&#xff1a;https://blog.csdn.net/weixin_44306005/article/details/95487732 如图所示进行下一步操作&#xff1a; 如图所…

IDEA使用SVN上传项目

1、设置SVN 2、在settings→Version Control中可以改变版本控制 3、右键项目选择如下 4、添加仓库地址 5、点击share提交 6、文件颜色由红变绿 7、右键选择commit 一路选择commit 7、如果在IDEA中上传太慢,可以选择在项目文件中直接使用SVN上传 PS&#xff1a;如果账号有问题,…

IDEA如何将上传项目到SVN

1.打开IDEA &#xff0c;上面工具栏选择VCS 选择把项目交给SVN管理 2.选择SVN 3、选择SVN管理后可以看到项目变砖红色颜色 4、右键项目选择如下 5、点击绿色的号&#xff0c;选择一个SVN仓库的地址&#xff0c;下面可以选择上传到SVN仓库的目录格式&#xff0c;然后点击Shard…

linuxsvn服务器导入项目,linux svn 导入项目

linux svn 导入项目 内容精选 换一换 用于将其它云端仓库导入到代码托管服务中&#xff0c;也可以将代码托管服务中一个区域的仓库导入到另一个区域(异地备份)&#xff0c;导入后的仓库与源仓库彼此独立。在代码托管服务控制台导入外部仓库的步骤如下&#xff1a;外部仓库可以是…

在Linux服务器上安装SVN并上传项目

安装svn &#xff08;1&#xff09;安装svn服务器&#xff1a; yum install subversion&#xff08;2&#xff09;查看版本&#xff08;随自己意愿&#xff09;&#xff1a; svnserve --version创建svn仓库并配置 &#xff08;1&#xff09;创建svn仓库 在/home下创建svn目…

LINUX SVN 新建项目

从第三方代码创建代码库&#xff1a; 1、通过客户端进入服务端 2、在对应的目录创建新的项目/目录 在对应的目录右击 &#xff1a;creat folder... 例&#xff1a;创建testSvn 3、在客户端checkout(co) testSvn 4、将第三方源码(srcTest)拷贝到客户端下的对应路径 防止L…

IDEA上传项目到SVN

一、什么是SVN SVN就是用来进行版本控制的工具&#xff0c;主要用于团队协作开发&#xff0c;和历史版本恢复等。 SVN分为服务端和客户端 推荐使用&#xff1a; 1、服务端&#xff1a;VisualSVN Sever 说明&#xff1a;用来创建项目仓库&#xff08;存放项目用的&#xff0…