Flink流处理API大合集:掌握所有flink流处理技术,看这一篇就够了

article/2025/9/14 12:12:21
大家好,我是百思不得小赵。

创作时间:2022 年 5 月 18 日
博客主页: 🔍点此进入博客主页
—— 新时代的农民工 🙊
—— 换一种思维逻辑去看待这个世界 👀
今天是加入CSDN的第1172天。觉得有帮助麻烦👏点赞、🍀评论、❤️收藏


目录

    • 前言
    • 一、构建流执行环境(Environment)
    • 二、加载数据源(Source)
    • 三、转换算子(Transform)
    • 四、数据输出(Sink)
    • 五、数据类型、UDF 函数、富函数


注:本文内容为纯干货,字数较多,建议先点赞收藏慢慢学习研读!

前言

在之前的文章中有提到过,一个flink应用程序开发的步骤大致为五个步骤:构建执行环境、获取数据源、操作数据源、输出到外部系统、触发程序执行。由这五个模块组成了一个flink任务,接下来围绕着每个模块对应的API进行梳理。
以下所有的代码案例都已收录在本人的Gitee仓库,有需要的同学点击链接直接获取:
Gitee地址:https://gitee.com/xiaoZcode/flink_test
在这里插入图片描述

一、构建流执行环境(Environment)

getExecutionEnvironment()

创建一个执行环境,表示当前执行程序的上下文。 如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境。它会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。

代码如下:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();

createLocalEnvironment()

返回本地执行环境,需要在调用时指定默认的并行度。

代码如下:

LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);

createRemoteEnvironment()

返回集群执行环境,将 Jar 提交到远程服务器。需要在调用时指定 JobManager的 IP 和端口号,并指定要在集群中运行的 Jar 包。

代码如下:

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname", 6123, "YOURPATH//xxx.jar");

二、加载数据源(Source)

案例场景:

工业物联网的背景下,收集传感器的温度值,将收集到不同传感器的温度值进行计算分析操作。
注:以下代码都围绕此场景进行编写,获取更完整源代码请移步文章开头部分。

创建传感器对象:SensorReading

public class SensorReading {private String id;private Long timestamp;private Double temperature;public SensorReading() {}public SensorReading(String id, Long timestamp, Double temperature) {this.id = id;this.timestamp = timestamp;this.temperature = temperature;}public String getId() {return id;}public void setId(String id) {this.id = id;}public Long getTimestamp() {return timestamp;}public void setTimestamp(Long timestamp) {this.timestamp = timestamp;}public Double getTemperature() {return temperature;}public void setTemperature(Double temperature) {this.temperature = temperature;}@Overridepublic String toString() {return "SensorReading{" +"id='" + id + '\'' +", timestamp=" + timestamp +", temperature=" + temperature +'}';}
}

从集合读取数据

public class SourceTest1_Collection {public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();//设置并行度为 1env.setParallelism(1);//从集合中读取数据DataStream<SensorReading> dataStream  = env.fromCollection(Arrays.asList(new SensorReading("sensor_1", 1547718199L, 35.8),new SensorReading("sensor_2", 1547718199L, 35.0),new SensorReading("sensor_3", 1547718199L, 38.8),new SensorReading("sensor_4", 1547718199L, 39.8)));DataStream<Integer> integerDataStream = env.fromElements(1, 2, 3, 4, 5, 789);//打印输出dataStream.print("data");integerDataStream.print("int");//执行程序env.execute();}
}

从文件读取数据

从文件中获取数据源的核心代码部分:

DataStream<String> dataStream = env.readTextFile("xxx ");
public class SourceTest2_File {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//从文件读取数据DataStream<String> dataStream = env.readTextFile("sensor.txt");dataStream.print();env.execute();}
}

从Kafka读取数据

首先需要引入Kafka的以来到工程中

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.11_2.12</artifactId><version>1.10.1</version>
</dependency>
public class SourceTest3_Kafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);Properties properties=new Properties();properties.setProperty("bootstrap.servers","localhost:9092");properties.setProperty("group.id","consumer-group");properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("auto.offset.reset","latest");DataStream<String> dataStream=env.addSource(new FlinkKafkaConsumer011<String>("sensor",new SimpleStringSchema(),properties));dataStream.print();env.execute();}
}

自定义数据源Source

除了从集合、文件以及Kafka中获取数据外,还给我们提供了一个自定义source的方式,需要传入sourceFunction函数。核心代码如下:

DataStream<SensorReading> dataStream = env.addSource( new MySensor());
public class SourceTest4_UDF {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//从文件读取数据DataStream<SensorReading> dataStream = env.addSource(new MySensorSource());dataStream.print();env.execute();}// 实现自定义数据源public static class MySensorSource implements SourceFunction<SensorReading>{// 定义一个标记位,控制数据产生private boolean running = true;@Overridepublic void run(SourceContext<SensorReading> ctv) throws Exception {// 随机数Random random=new Random();//设置10个初始温度HashMap<String, Double> sensorTempMap = new HashMap<>();for (int i = 0; i < 10; i++) {sensorTempMap.put("sensor_"+(i+1), 60 + random.nextGaussian() * 20); // 正态分布}while (running){for (String sensorId: sensorTempMap.keySet()) {Double newTemp = sensorTempMap.get(sensorId) + random.nextGaussian();sensorTempMap.put(sensorId,newTemp);ctv.collect(new SensorReading(sensorId,System.currentTimeMillis(),newTemp));}Thread.sleep(1000);}}@Overridepublic void cancel() {running=false;}}
}

三、转换算子(Transform)

获取到指定的数据源后,还要对数据源进行分析计算等操作,

基本转换算子:Map、flatMap、Filter
在这里插入图片描述

public class TransformTest1_Base {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//从文件读取数据DataStream<String> inputStream = env.readTextFile("sensor.txt");// 1. map 把String转换成长度生成DataStream<Integer> mapStream = inputStream.map(new MapFunction<String, Integer>() {@Overridepublic Integer map(String value) throws Exception {return value.length();}});//   2. flatmap 按逗号切分字段DataStream<String> flatMapStream = inputStream.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] fields=value.split(",");for (String field : fields){out.collect(field);}}});//    3. filter ,筛选sensor_1 开头对id对应的数据DataStream<String> filterStream=inputStream.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String value) throws Exception {return value.startsWith("sensor_1");}});//    打印输出mapStream.print("map");flatMapStream.print("flatMap");filterStream.print("filter");// 执行程序env.execute();}
}

KeyBy、滚动聚合算子【sum()、min()、max()、minBy()、maxBy()】

  • KeyBy:DataStream → KeyedStream:逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同 key 的元素,在内部以 hash 的形式实现的。
  • 如上算子可以针对 KeyedStream 的每一个支流做聚合。

在这里插入图片描述

public class TransformTest2_RollingAggregation {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//从文件读取数据DataStream<String> inputStream = env.readTextFile("sensor.txt");// 转换成SensorReading类型DataStream<SensorReading> dataStream=inputStream.map(new MapFunction<String, SensorReading>() {@Overridepublic SensorReading map(String s) throws Exception {String[] fields=s.split(",");return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));}});// DataStream<SensorReading> dataStream = inputStream.map(line -> {//     String[] fields = line.split(",");//     return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));// });// 分组KeyedStream<SensorReading, Tuple> keyedStream = dataStream.keyBy("id");// KeyedStream<SensorReading, String> keyedStream1 = dataStream.keyBy(SensorReading::getId);//滚动聚合,取当前最大的温度值// DataStream<SensorReading> resultStream = keyedStream.maxBy("temperature");DataStream<SensorReading> resultStream = keyedStream.maxBy("temperature");resultStream.print();env.execute();}
}

Reduce

KeyedStream → DataStream:一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。

public class TransformTest3_Reduce {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//从文件读取数据DataStream<String> inputStream = env.readTextFile("sensor.txt");// 转换成SensorReading类型DataStream<SensorReading> dataStream=inputStream.map(new MapFunction<String, SensorReading>() {@Overridepublic SensorReading map(String s) throws Exception {String[] fields=s.split(",");return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));}});// 分组KeyedStream<SensorReading, Tuple> keyedStream = dataStream.keyBy("id");// reduce 聚合,取最大的温度,以及当前最新对时间戳DataStream<SensorReading> resultStream = keyedStream.reduce(new ReduceFunction<SensorReading>() {@Overridepublic SensorReading reduce(SensorReading value1, SensorReading value2) throws Exception {return new SensorReading(value1.getId(), value2.getTimestamp(), Math.max(value1.getTemperature(), value2.getTemperature()));}});resultStream.print();env.execute();}
}

分流【Split 、Select】、合流【Connect 、CoMap、union】

Split

DataStream → SplitStream:根据某些特征把一个 DataStream 拆分成两个或者多个 DataStream。

在这里插入图片描述
Select

SplitStream→DataStream:从一个 SplitStream 中获取一个或者多个DataStream。

在这里插入图片描述
Connect

DataStream,DataStream → ConnectedStreams:连接两个保持他们类型的数据流,两个数据流被 Connect 之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。

在这里插入图片描述

CoMap、CoFlatMap

ConnectedStreams → DataStream:作用于 ConnectedStreams 上,功能与 map和 flatMap 一样,对 ConnectedStreams 中的每一个 Stream 分别进行 map 和 flatMap处理。

在这里插入图片描述

Union

DataStream → DataStream:对两个或者两个以上的 DataStream 进行 union 操作,产生一个包含所有 DataStream 元素的新 DataStream。

在这里插入图片描述

DataStream<SensorReading> unionStream = xxxstream.union(xxx);

Connect 与 Union 区别:

  • Union 之前两个流的类型必须是一样,Connect 可以不一样,在之后的 coMap中再去调整成为一样的。
  • Connect 只能操作两个流,Union 可以操作多个。
public class TransformTest4_MultipleStreams {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//从文件读取数据DataStream<String> inputStream = env.readTextFile("sensor.txt");// 转换成SensorReading类型DataStream<SensorReading> dataStream=inputStream.map(new MapFunction<String, SensorReading>() {@Overridepublic SensorReading map(String s) throws Exception {String[] fields=s.split(",");return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));}});// 1。分流 按照温度值30度为界进行分流SplitStream<SensorReading> splitStream = dataStream.split(new OutputSelector<SensorReading>() {@Overridepublic Iterable<String> select(SensorReading value) {return (value.getTemperature() > 30) ? Collections.singletonList("high") : Collections.singletonList("low");}});// 通过条件选择对应流数据DataStream<SensorReading> highTempStream = splitStream.select("high");DataStream<SensorReading> lowTempStream = splitStream.select("low");DataStream<SensorReading> allTempStream = splitStream.select("high","low");highTempStream.print("high");lowTempStream.print("low");allTempStream.print("all");// 2。合流 connect,先将高温流转换为二元组,与低温流合并后,输出状态信息。DataStream<Tuple2<String, Double>> warningStream = highTempStream.map(new MapFunction<SensorReading, Tuple2<String, Double>>() {@Overridepublic Tuple2<String, Double> map(SensorReading value) throws Exception {return new Tuple2<>(value.getId(), value.getTemperature());}});// 只能是两条流进行合并,但是两条流的数据类型可以不一致ConnectedStreams<Tuple2<String, Double>, SensorReading> connectStream = warningStream.connect(lowTempStream);DataStream<Object> resultStream = connectStream.map(new CoMapFunction<Tuple2<String, Double>, SensorReading, Object>() {@Overridepublic Object map1(Tuple2<String, Double> value) throws Exception {return new Tuple3<>(value.f0, value.f1, "high temp warning");}@Overridepublic Object map2(SensorReading value) throws Exception {return new Tuple2<>(value.getId(), "normal");}});resultStream.print();// 3。union联合多条流 限制就是每条流数据类型必须一致DataStream<SensorReading> union = highTempStream.union(lowTempStream, allTempStream);union.print("union stream");env.execute();}
}

四、数据输出(Sink)

Flink官方提供了一部分框架的Sink,用户也可以自定义实现Sink。flink将任务进行输出的操作核心代码:stream.addSink(new MySink(xxxx))

Kafka
引入Kafka依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.11_2.12</artifactId><version>1.10.1</version>
</dependency>
public class SinkTest1_Kafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//从文件读取数据DataStream<String> inputStream = env.readTextFile("/Volumes/Update/flink/flink_test/src/main/resources/sensor.txt");// 转换成SensorReading类型DataStream<String> dataStream=inputStream.map(new MapFunction<String, String>() {@Overridepublic String map(String s) throws Exception {String[] fields=s.split(",");return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2])).toString();}});//输出到外部系统dataStream.addSink(new FlinkKafkaProducer011<String>("localhost:9092","sinktest",new SimpleStringSchema()));env.execute();}
}

Redis
引入Redis依赖:

<dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version>
</dependency>
public class SinkTest2_Redis {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//从文件读取数据DataStream<String> inputStream = env.readTextFile("/Volumes/Update/flink/flink_test/src/main/resources/sensor.txt");// 转换成SensorReading类型DataStream<SensorReading> dataStream=inputStream.map(new MapFunction<String, SensorReading>() {@Overridepublic SensorReading map(String s) throws Exception {String[] fields=s.split(",");return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));}});// jedis配置FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).build();dataStream.addSink(new RedisSink<>(config,new MyRedisMapper()));env.execute();}// 自定义RedisMapperpublic static class MyRedisMapper implements RedisMapper<SensorReading>{//自定义保存数据到Redis的命令,存成hash表Hset@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.HSET,"sensor_temp");}@Overridepublic String getKeyFromData(SensorReading data) {return data.getId();}@Overridepublic String getValueFromData(SensorReading data) {return data.getTemperature().toString();}}}

Elasticsearch
引入依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch6_2.12</artifactId><version>1.10.1</version>
</dependency>
public class SinkTest3_ES {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env;env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//从文件读取数据DataStream<String> inputStream = env.readTextFile("/Volumes/Update/flink/flink_test/src/main/resources/sensor.txt");// 转换成SensorReading类型DataStream<SensorReading> dataStream=inputStream.map(new MapFunction<String, SensorReading>() {public SensorReading map(String s) throws Exception {String[] fields=s.split(",");return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));}});// 定义ES的链接配置ArrayList<HttpHost> httpHosts = new ArrayList<>();httpHosts.add(new HttpHost("localhost",9200));dataStream.addSink(new ElasticsearchSink.Builder<SensorReading>(httpHosts,new MyEsSinkFunction()).build());env.execute();}//实现自定义的ES写入操作public static class MyEsSinkFunction implements ElasticsearchSinkFunction<SensorReading> {@Overridepublic void process(SensorReading element, RuntimeContext ctx, RequestIndexer indexer) {// 定义写入的数据sourceHashMap<String, String> dataSource = new HashMap<>();dataSource.put("id",element.getId());dataSource.put("temp",element.getTemperature().toString());dataSource.put("ts",element.getTimestamp().toString());// 创建请求作为向ES发起的写入命令IndexRequest indexRequest = Requests.indexRequest().index("sensor").type("readingdata").source(dataSource);// 用indexer发送请求indexer.add(indexRequest);}}
}

自定义Sink(JDBC)
引入依赖:

<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.44</version>
</dependency>
public class SinkTest4_JDBC {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//从文件读取数据DataStream<String> inputStream = env.readTextFile("sensor.txt");// 转换成SensorReading类型DataStream<SensorReading> dataStream=inputStream.map(new MapFunction<String, SensorReading>() {@Overridepublic SensorReading map(String s) throws Exception {String[] fields=s.split(",");return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));}});dataStream.addSink(new MyJDBCSink());env.execute();}// 实现自定义SinkFunctionpublic static class MyJDBCSink extends RichSinkFunction<SensorReading> {//声明连接和预编译Connection connection=null;PreparedStatement insert=null;PreparedStatement update=null;@Overridepublic void open(Configuration parameters) throws Exception {connection= DriverManager.getConnection("jdbc:mysql://localhost:3306/test","root","123456");insert=connection.prepareStatement("insert into sensor_temp (id,temp) values (?,?)");update=connection.prepareStatement("update sensor_temp set temp = ? where id = ? ");}// 每来一条数据,调用链接,执行sql@Overridepublic void invoke(SensorReading value, Context context) throws Exception {// 直接执行更新update.setDouble(1,value.getTemperature());update.setString(2,value.getId());update.execute();if (update.getUpdateCount() == 0){insert.setString(1,value.getId());insert.setDouble(2,value.getTemperature());insert.execute();}}// 关闭连接流@Overridepublic void close() throws Exception {connection.close();insert.close();update.close();}}
}

五、数据类型、UDF 函数、富函数

Flink支持的数据类型

  • Flink 支持所有的 Java 和 Scala 基础数据类型,Int, Double, Long, String等
DataStream<Integer> numberStream = env.fromElements(1, 2, 3, 4);
  • Java 和 Scala 元组(Tuples)
DataStream<Tuple2<String, Integer>> personStream = env.fromElements(new Tuple2("Adam", 17),new Tuple2("Sarah", 23) );
personStream.filter(p -> p.f1 > 18);
  • Flink 对 Java 和 Scala 中的一些特殊目的的类型也都是支持的,比如 Java 的
    ArrayList,HashMap,Enum 等等

UDF 函数

Flink 暴露了所有 udf 函数的接口(实现方式为接口或者抽象类)。例如MapFunction, FilterFunction, ProcessFunction 等等。

富函数(Rich Functions)

“富函数”是 DataStream API 提供的一个函数类的接口,所有 Flink 函数类都有其 Rich 版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。RichMapFunction、RichFlatMapFunction、RichFilterFunction

Rich Function 有一个生命周期的概念。典型的生命周期方法有:

  • open()方法是 rich function 的初始化方法,当一个算子例如 map 或者 filter 被调用之前open()会被调用。
  • close()方法是生命周期中的最后一个调用的方法,做一些清理工作。
  • getRuntimeContext()方法提供了函数的 RuntimeContext 的一些信息,例如函 数执行的并行度,任务的名字,以及state 状态。
public class TransformTest5_RichFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(4);//从文件读取数据DataStream<String> inputStream = env.readTextFile("sensor.txt");// 转换成SensorReading类型DataStream<SensorReading> dataStream=inputStream.map(new MapFunction<String, SensorReading>() {@Overridepublic SensorReading map(String s) throws Exception {String[] fields=s.split(",");return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));}});DataStream<Tuple2<String,Integer>> resultStream=dataStream.map(new MyMapper());resultStream.print();env.execute();}public static class MyMapper0 implements MapFunction<SensorReading,Tuple2<String,Integer>>{@Overridepublic Tuple2<String, Integer> map(SensorReading value) throws Exception {return new Tuple2<>(value.getId(),value.getId().length());}}// 继承富函数public static class MyMapper extends RichMapFunction<SensorReading,Tuple2<String,Integer>>{@Overridepublic Tuple2<String, Integer> map(SensorReading value) throws Exception {// getRuntimeContext().getState()return new Tuple2<String,Integer>(value.getId(),getRuntimeContext().getIndexOfThisSubtask());}@Overridepublic void open(Configuration parameters) throws Exception {// 初始化工作,一般是定义状态,或者创建数据库链接System.out.println("open");// super.open(parameters);}@Overridepublic void close() throws Exception {// 关闭链接,收尾状态System.out.println("close");// super.close();}}
}

在这里插入图片描述


http://chatgpt.dhexx.cn/article/ik9t1bd3.shtml

相关文章

Python流处理

转自 &#xff1a;https://www.toutiao.com/a6589000256896107015/?tt_frommobile_qq&utm_campaignclient_share&timestamp1534156143&appnews_article&utm_sourcemobile_qq&iid40708017633&utm_mediumtoutiao_ios&group_id6589000256896107015 F…

Stream流式处理

Stream流的三类方法 获取Stream&#xff1a;流创建一条流水线,并把数据放到流水线上准备。 中间方法&#xff1a;流水线上的操作一次操作完毕之后,还可以继续进行其他操作。 终结方法&#xff1a;一个Stream流只能有一个终结方法是流水线上的最后一个操作。 生成Stream流的…

流数据处理与分析

环境 名称 版本 系统 Ubuntu 18.04.4 LTS 内存 7.5GiB 处理器 Intel Core i7-8565U CPU 1.80GHz *8 图形 Intel UHD Graphics&#xff08;Whiskey Lake 3*8 GT2&#xff09; GNOME 3.28.2 操作系统类型 64位 磁盘 251.0 GB Storm 2.1.0 Zookeeper…

流处理系统

文章目录 引言如何发送事件流流处理不可靠的时钟容错总结 引言 清楚数据的类型有助于我们设计一个性能更高&#xff0c;更有针对性的数据系统&#xff0c;比如在线系统&#xff0c;离线系统&#xff08;批处理&#xff09;。近实时系统(流处理)等等。比如说批处理系统&#xf…

流处理简介

一. 流式处理简介 在我接触到java8流式处理的时候&#xff0c;我的第一感觉是流式处理让集合操作变得简洁了许多&#xff0c;通常我们需要多行代码才能完成的操作&#xff0c;借助于流式处理可以在一行中实现。比如我们希望对一个包含整数的集合中筛选出所有的偶数&#xff0c;…

【节点流和处理流】

节点流和处理流 基本介绍 节点流可以从特定数据源读取数据&#xff0c;如FileReader、FileWriter处理流&#xff1a;是对一个已存在的流的连接和封装&#xff0c;通过所封装的流的功能调用实现数据读写。如BufferedReader.处理流的构造方法总是要带一个其他的流对象做参数。一…

流数据处理

流数据处理strom 在2011年Storm开源之前&#xff0c;由于Hadoop的火红&#xff0c;整个业界都在喋喋不休地谈论大数据。Hadoop的高吞吐&#xff0c;海量数据处理的能力使得人们可以方便地处理海量数据。但是&#xff0c;Hadoop的缺点也和它的优点同样鲜明——延迟大&#xff0…

一. 流式处理简介

https://www.cnblogs.com/shenlanzhizun/p/6027042.html Java技术学习 https://www.itkc8.com 一. 流式处理简介 在我接触到java8流式处理的时候&#xff0c;我的第一感觉是流式处理让集合操作变得简洁了许多&#xff0c;通常我们需要多行代码才能完成的操作&#xff0c;借助…

Kafka基础-流处理

1. 什么是流处理&#xff1f; 首先&#xff0c;让我们说一下什么是数据流&#xff08;也称为事件流&#xff09;&#xff1f;它是无边界数据集的抽象说法&#xff0c;无边界意味着无限且不断增长&#xff0c;因为随着时间的推移&#xff0c;新数据会不断地到来。 除了无边界的…

流处理基本介绍

1. 什么是流处理 一种被设计来处理无穷数据集的数据处理系统引擎 2. 流处理的几个概念 1. 无穷数据&#xff08;Unbounded data&#xff09;&#xff1a;一种持续生成&#xff0c;本质上是无穷尽的数据集。它经常会被称为“流数据”。然而&#xff0c;用流和批次来定义…

Spark Streaming与流处理

一、流处理 1.1 静态数据处理 在流处理之前&#xff0c;数据通常存储在数据库&#xff0c;文件系统或其他形式的存储系统中。应用程序根据需要查询数据或计算数据。这就是传统的静态数据处理架构。Hadoop 采用 HDFS 进行数据存储&#xff0c;采用 MapReduce 进行数据查询或分…

什么是流处理

流处理正变得像数据处理一样流行。流处理已经超出了其原来的实时数据处理的范畴&#xff0c;它正在成为一种提供数据处理&#xff08;包括批处理&#xff09;&#xff0c;实时应用乃至分布式事务的新方法的技术。 1、什么是流处理&#xff1f; 流处理是不断合并新数据以计算结果…

嵌入式软件升级方法

一、U盘升级 1.在u盘根目录新建文件夹&#xff0c;命名为‘upgrade’ 2.将软件复制到upgrade文件夹中 3.将u盘插到嵌入式服务器usb口上&#xff0c;断电重启服务器 二、PC工具升级 1.打开PC工具&#xff0c;选中要升级的机器&#xff0c;点击‘素材管理’选项卡&#xff0c…

嵌入式软件架构设计之分层设计

关注、星标公众号&#xff0c;不错过精彩内容 整理&#xff1a;黄工 素材来源&#xff1a;网络 参考来源&#xff1a; https://blog.51cto.com/kenotu/1614390 在正规的项目开发中&#xff0c;项目往往是并行开发的&#xff0c;也就是说硬件设计、底层软件设计、应用软件设计等…

嵌入式系统软件层次结构

文章目录 嵌入式系统软件嵌入式系统软件的层次结构硬件抽象层 嵌入式操作系统嵌入式操作系统——WinCE嵌入式操作系统——VxWorks嵌入式操作系统——Linux嵌入式Linux OS的特点 嵌入式操作系统——uCOS嵌入式操作系统—— PalmOS其他嵌入式操作系统华为鸿蒙系统 嵌入式系统软件…

嵌入式软件开发必备知识体系

嵌入式软件开发学习路线 前言 本章节主要介绍嵌入式软件开发概念以及大致的学习知识点的范围 一、嵌入式软件是什么&#xff1f; 百度百科&#xff1a;嵌入式工程师是指具有C/C语言、汇编语言等基础&#xff0c;熟悉模拟电子技术等硬件知识&#xff0c;了解处理器体系结构&a…

嵌入式开发 | 嵌入式开发设计文档该怎么写?

关注星标公众号&#xff0c;不错过精彩内容 作者 | strongerHuang 微信公众号 | 嵌入式专栏 俗话说&#xff0c;不会写文档的工程师不是好的工程师&#xff01; 如果你只会写代码&#xff0c;而从不写文档&#xff0c;迟早有一天会“出事”。这不是危言耸听&#xff0c;现实生活…

简单嵌入式系统软件架构

本文为原创&#xff0c;以下链接有比较及时的更新&#xff1a; https://www.yuque.com/docs/share/334f4a3d-2974-49db-8f68-4db6601a0d21?# 《简单嵌入式系统》 引言 本文描述的内容&#xff0c;适用范围是简单嵌入式系统。举一些可能不恰当的例子&#xff0c;如手环、蓝牙…

嵌入式软件设计层级划分概念

嵌入式软件设计层级划分概念 设计过程中体会的细化更新部分&#xff1a; 层级描述备注应用层直接控制应用&#xff0c;比如led_light_on(),led_light_off() 器件层&#xff08;如果操作复杂可进一步划分为器件应用层和器件驱动层&#xff09;比如&#xff1a;实现led_light_on …

浅议嵌入式软件测试

近年来&#xff0c;随着嵌入式系统的功能和复杂性不断增加&#xff0c;其开发时间和成本也随之不断上升。对于安全关键领域的嵌入式系统和软件来说&#xff0c;其稳定性和可靠性往往需要通过大量的测试和验证来保证。 01.一般软件测试vs嵌入式软件测试 嵌入式软件测试针对嵌入…