目录
原算子
准备工作,环境搭建
读取数据
从文件中读取数据
从集合中读取数据
从元素中读取数据
从source文件中读取数据
从kafka中读取数据
自定义source类型输出
转换算子
map转换
Filter转换
FlatMap转换
原算子
准备工作,环境搭建
为了更好地理解,我们先构建一个实际应用场景。比如网站的访问操作,可以抽象成一个三元组(用户名,用户访问的 urrl,用户访问 url 的时间戳),所以在这里,我们可以创建一个类 Event,将用户行为包装成它的一个对象。
import java.sql.Timestamp;
/*
应用场景*/
public class Event {public String user;public String ur1;//用户访问的urlpublic Long timestape;//用户访问url的时间public Event(){};public Event(String user,String ur1,Long timestape){this.timestape=timestape;this.ur1=ur1;this.user=user;}@Overridepublic String toString() {return "Event{" +"user='" + user + '\'' +", ur1='" + ur1 + '\'' +", timestape=" + new Timestamp(timestape) +'}';}
}
读取数据
要先创建读取数据的环境
//创建执行环境StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);
从文件中读取数据
运用readTextFile的方法
1.创建一个文件clicks.txt
Mary, ./home, 1000
DOUDOU, ./cart, 2000
Bob, ./porp?id=100, 3000
DOUDOU, ./home, 4000
2.读取文件中的数据
//从文件中读取数据DataStreamSource<String> stream1=env.readTextFile("input/clicks.txt");stream1.print("1");env.execute();
从集合中读取数据
//从集合里读取数据ArrayList<Event> events=new ArrayList<>();events.add(new Event("DOUDOU","./home",1000L));DataStreamSource<Event> stream2=env.fromCollection(events);stream2.print("2");env.execute();
从元素中读取数据
//从元素中读取数据DataStreamSource<Event> stream3=env.fromElements(new Event("DOUDOU","./home",1000L));
从source文件中读取数据
首先要打开hadoop102的端口
//从socket文本流中读取数据DataStreamSource<String> stream4=env.socketTextStream("hadoop102",7777);stream4.print("4");
从kafka中读取数据
//从kafka中读取数据Properties properties=new Properties();properties.setProperty("bootstrap.servers","hadoop102:9092");properties.setProperty("group.id", "consumer-group");properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("auto.offset.reset", "latest");DataStreamSource<String> kafkaStream=env.addSource(new FlinkKafkaConsumer<String>("clicks",new SimpleStringSchema(),properties));kafkaStream.print();env.execute();
开启zookeeper和kafka
zk.sh start
kf.sh start
创建用户
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic clicks
自定义source类型输出
1.创建一个实现SourceFunction<Event>的类,创造数据
import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.util.Calendar;
import java.util.Random;public class ClicksSource implements SourceFunction<Event> {//声明一个标志位private Boolean running=true;@Overridepublic void run(SourceContext<Event> sourceContext) throws Exception {//生成随机数据Random random=new Random();//自定义选取的数据集String[] users = {"Mary", "Alice", "Bob", "Cary"};String[] urls = {"./home", "./cart", "./fav", "./prod?id=1","./prod?id=2"};//循环生成的数据while (running){String user=users[random.nextInt(users.length)];String ur1=urls[random.nextInt(urls.length)];Long timestap= Calendar.getInstance().getTimeInMillis();sourceContext.collect(new Event(user,ur1,timestap));Thread.sleep(1000L);}}@Overridepublic void cancel() {running=false;}
}
2.实现自定义source输出
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*
用户自定义source测试*/
public class SourceCustomTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Event> customStream=env.addSource(new ClicksSource());customStream.print();env.execute();}
}
转换算子
map转换
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class TransformMapTest {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//从元素中读取数据DataStreamSource<Event> stream3=env.fromElements(new Event("DOUDOU","./home",1000L));//进行转换计算,提取user字段//1.使用自定义类实现MapFunction接口SingleOutputStreamOperator<String> result=stream3.map(new MyMapper());//2.使用匿名类实现SingleOutputStreamOperator<String> result2=stream3.map(new MapFunction<Event, String>() {@Overridepublic String map(Event event) throws Exception {return event.user;}});//3.传入Lambda表达式SingleOutputStreamOperator<String> relult3=stream3.map(data -> data.user);result2.print();result.print();relult3.print();env.execute();}//自定义MapFunctionpublic static class MyMapper implements MapFunction<Event,String>{@Overridepublic String map(Event event) throws Exception {return event.user;}}
}
Filter转换
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class TransformFilterTest {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//从元素中读取数据DataStreamSource<Event> stream3=env.fromElements(new Event("DOUDOU","./home",1000L));//1.自定义类对象SingleOutputStreamOperator<Event> result1= stream3.filter(new MyFilter());//2.传入匿名类SingleOutputStreamOperator<Event> result2=stream3.filter(new FilterFunction<Event>() {@Overridepublic boolean filter(Event event) throws Exception {return event.user.equals("DOUDOU");}});//输入Lambda表达式SingleOutputStreamOperator<Event> result3=stream3.filter(data ->data.user.equals("DOUDOU"));result3.print();result1.print();result2.print();env.execute();}//自定义对象public static class MyFilter implements FilterFunction<Event>{@Overridepublic boolean filter(Event event) throws Exception {return event.user.equals("DOUDOU");}}
}
FlatMap转换
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class TransformFlatMapTest {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//从元素中读取数据DataStreamSource<Event> stream3=env.fromElements(new Event("DOUDOU","./home",1000L));//1.自定义stream3.flatMap(new MyFlatMap()).print();//2.传入Lamba表达式stream3.flatMap((Event value ,Collector<String> out) -> {if (value.user.equals("DOUDOU"))out.collect(value.ur1);else if (value.user.equals("DOUDOU")){out.collect(value.user);out.collect(value.ur1);out.collect(value.timestape.toString());}}) .returns(new TypeHint<String>() {}).print("2");env.execute();}//自定义类public static class MyFlatMap implements FlatMapFunction<Event,String>{@Overridepublic void flatMap(Event event, Collector<String> collector) throws Exception {collector.collect(event.user);collector.collect(event.ur1);collector.collect(event.timestape.toString());}}
}