上篇:Transformation的map使用
第一种方式重写flatMap方法实现
实现需求:根据字符串在nc -lk 8888的窗口命令下输入的数据,在控制台打印输出发现:在同一行数据输入的单词字符串自动换行,按每个单词独立换行并且若输入有重复的单词时,它不会再次输出
package cn._51doit.flink.day02;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Collector;/*** Transformation的flatMap底层实现[无界流]*/
public class FlatMapDemo3 {public static void main(String[] args) throws Exception {// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//查看本地的并行度StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());DataStreamSource<String> lines = env.socketTextStream("Master", 8888);SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {//重写flatMap方法@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {//输入数据String[] words = value.split(" ");for (String word : words) {if (!"error".equals(word)) {out.collect(word);}}}});words.print();env.execute();}public static class MyStreamMap extends AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer,Integer>{//重写processElement方法@Overridepublic void processElement(StreamRecord<Integer> element) throws Exception {//输入方法Integer i = element.getValue();Integer j = i * 2;//将要输出的数据放入到element【但是没有输出】element.replace(j);//输出数据output.collect(element);}}
}
打印输出
第二种方式重写processElement方法实现
编码:无界流
package cn._51doit.flink.day02;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;/*** Transformation的flatMap底层实现* 第二种方式重写processElement方法实现*/
public class FlatMapDemo02 {public static void main(String[] args) throws Exception {// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//查看本地的并行度StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());DataStreamSource<String> lines = env.socketTextStream("Master", 8888);SingleOutputStreamOperator<String> words = lines.transform("MyFlatMap",TypeInformation.of(String.class),new MyStreamFlatMap());words.print();env.execute();}public static class MyStreamFlatMap extends AbstractStreamOperator<String> implements OneInputStreamOperator<String,String>{//重写processElement方法@Overridepublic void processElement(StreamRecord<String> element) throws Exception {//拿到一行数据String lines = element.getValue();String[] words = lines.split(" ");for (String word : words){if (!word.equals("error")){//StreamRecord<String> record = new StreamRecord<>(word);output.collect(element.replace(word));}}}}
}
控制台打印输出:
源码解析
(1)点击flatMap进去查看,根据反射获取输出的类型,输入的是T类型,返回的R类型,没有把代码写死,运行时获取返回的类型
(2)在FlatMapFunction接口中的TypeInformation方法,它需要返回一个clean,需要进行对闭包引用类型序列化的检测
(3)StreamFlatMap实现了OneInputStreamOperator,而AbstractUdfStreamOperator不断可以约束你输入上面类型,还可以约束你输入哪些类型的方法,但是它一定是继承了AbstractUdfStreamOperator
(4)输入FlatMapFunction函数,在进行计算的时候,在processElement方法的userFunction一定会调flatMap函数,有输入的数据flatMap(element.getValue(),然后把collector函数传入进去做输出数据