Flink学习——DataStream API

article/2025/9/16 19:41:49

        一个flink程序,其实就是对DataStream的各种转换。具体可以分成以下几个部分:

  • 获取执行环境(Execution Environment)
  • 读取数据源(Source)
  • 定义基于数据的转换操作(Transformations)
  • 定义计算结果的输出位置(Sink)
  • 触发程序执行(Execute)

一、执行环境(Execution Environment) 

        flink 程序可以在各种上下文环境中运行:我们可以在本地JVM中执行程序,也可以提交到远程集群上运行。

        不同的环境代码的提交运行过程会有所不同。这就要求我们再提交作业执行计算时,必须获取当前 flink 的运行环境,从而建立起与flink框架之间的联系。只有获取了环境上下文信息,才能将具体的任务调度到不同的 TaskManager 执行。

1.1 创建执行环境

        执行环境是 StreamExecutionEnvironment 类的对象。创建执行环境的方式,就是调用这个类的静态方法。

1.1.1 getExecutionEnvironment

        getExecutionEnvironment 方法会根据当前运行的方式,自行决定该返回什么样的运行环境。如果程序是独立运行的,就返回一个本地执行环境;如果创建了jar包然后从命令行调用后提交到集群执行,那么久返回集群的执行环境。

val env = StreamExecutionEnvironment.getExecutionEnvironment

1.1.2 createLocalEnvironment

        返回一个本体执行环境。可以在调用时传入一个参数,指定默认的并行度。如果传入,默认就是本地的CPU核心数。

val localEnvironment = StreamExecutionEnvironment.getExecutionEnvironment()

1.1.3 createRemoteEnvironment

        返回集群执行环境。需要在调用时指定JobManager的主机名和端口号,并指定在集群中运行的jar包。

        获取执行环境后,还可以对执行环境进行灵活的设置。如可以全局设置程序的并行度、禁用算子链,还可以定义程序的时间语义、配置容错机制等。

val localEnvironment = StreamExecutionEnvironment.getExecutionEnvironment("host",    // JobManager主机名1234,    // 进程端口号"path/to/jarFile.jar"    // 提交给JobManager的JAR包)    

1.2 执行模式(Execution Mode)

// 批处理环境
// 1.12.0版本起,可以通过“执行模式: execution mode”实现切换
val batchEnv = ExecutionEnvironment.getExecutionEnvironment// 流处理环境
val env = StreamEnvironment.getExecutionEnvironment
  • 流执行模式(STREAMING)

        DataStream API经典模式,一般用于需要持续实时处理的无界数据流。默认情况下使用的就是流执行模式。

  • 批执行模式(BATCH)

        专门用于批处理的执行模式,这种模式下,flink处理作业的方式类似于MapReduce。对于不会持续计算的有界数据,这种模式处理会更方便。

配置方式:
1. 命令行配置
bin/flink run -Dexecution.runtime-mode=BATCH2. 代码配置(不推荐)
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setRuntimeMode(RuntimeExecutionMode.BATCH)
  • 自动模式(AUTOMATIC)

        根据数据源是否有界,来自动选择执行模式 

        总结:用 BATCH 处理批数据,用 Streaming 处理流数据。

1.3 触发程序执行

        写完输出(sink)操作之后不代表程序已经结束。这是因为main()方法被调用时,只定义了作业的每个执行操作,然后添加到数据流图中,这时候并没有真正的处理数据。

        Flink是事件驱动的,只有等数据到来,才会触发真正的计算,是懒执行/延迟执行。所以我们需要显式的调用执行环境的execute()方法,来触发程序执行。execute()方法将一直等待作业完成,然后返回一个执行结果。

env.execute()

二、源算子(Source)

        flink可以从各种源获取数据,然后构建DataStream进行转换处理。数据的输入来源就是数据源,读取数据的算子就是源算子(Source)

2.1 准备工作

        我们可以定义一个样例类Event,字段如下:

字段名数据类型说明
idString用户id
timestampLong时间戳
temperatureDouble温度
//定义样例类  温度传感器case class SensorReading(id:String,timestamp:Long,temperature:Double)

2.2 从元素中读取数据 

从元素中读取数据
val stream1: DataStream[SensorReading] = env.fromElements(SensorReading("北京",1684201960L,23.5),SensorReading("南京",1684201960L,32.8)
)

2.3 从集合中读取数据

从集合中读取数据
val temp = List(SensorReading("北京",1684201960L,23.5),SensorReading("南京",1684201960L,32.8)
)
val stream2: DataStream[SensorReading] = env.fromCollection(temp)

2.4 从文件中读取数据

从文件中读取数据
val path = "F:\\Server\\flink\\resources\\sensor.txt"
val value: DataStream[String] = env.readTextFile(path)

2.5 从Socket读取数据

        socket并行度默认为1,且不够稳定,一般仅测试使用。

val parameterTool = ParameterTool.fromArgs(args)
val hostname = parameterTool .get("host")
val port = parameterTool .get("port")
val lineDataStream = env.socketTtextStream(hostname, port)

2.6 从Kafka读取数据

        kafka进行数据的收集与传输,flink进行分析与计算,这种架构目前已经称为很多企业的首选。但是Kafka与flink的连接比较复杂,flink内部没有提供预实现的方法,所以我们需要通过调用addSource()来传入一个 SourceFunction 的实现类。而同时,Flink官方提供了连接工具flink-connector-kafka 帮我们实现了一个消费者 FlinkKafkaConsumer ,用来读取Kafka数据的 SourceFunction。

1> 导入依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version>
</dependency>

2> 传入FlinkKafkaConsumer实例对象

object SourceTest {def main(args: Array[String]): Unit = {//    1. 创建环境val env = StreamExecutionEnvironment.getExecutionEnvironment//    2. 用Properties保存Kafka连接的相关配置val properties = new Properties()properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.20:9092")properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"sensorgroup1")//    3. 调用 env.addSource()val stream1 = env.addSource(new FlinkKafkaConsumer[String]("sensor",        // topicnew SimpleStringSchema(),    // 当前值的反序列化器properties       // prop))//    4. 输出stream1.print()//    5. 启动env.execute()}
}

2.7 读取自定义源算子

class MySensorSource() extends SourceFunction[SensorReading]{// 标志位
var running = true// run方法:不停循环,发送数据override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = {// 1. 随机数生成器val random = new Random()// 2. 用标志位作为循环判断的条件,不断发送数据while (running){val i = random.nextInt()// 3. 调用sourceContext的方法向下游发送数据sourceContext.collect(SensorReading("生成:"+i,1,1))}Thread.sleep(500)}// cancel方法:定义标志位,用于run中断的控制override def cancel(): Unit = {running = false}}
val env = StreamExecutionEnvironment.getExecutionEnvironment读取自定义的数据源
val stream1 = env.addSource(new MySensorSource)stream1.print()env.execute()

三、转换算子(Transformation)

        数据源读入数据之后,可以使用各种转换算子,将一个或多个DataStream转换为新的DataStream。可以对一条数据流进行转换操作,也可以进行分流、合流等多流转换操作,从而组合成复杂的数据流拓扑。

3.1 基本转换算子

1. 映射 map

        一一映射,消费一个元素就产出一个元素。

import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, _}object TransformMapTest {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)val stream: DataStream[Event] = env.fromElements(Event("Mary","./home",1000L),Event("Bob","./cart",2000L))// 提取每次点击事件的用户名// 1. 使用匿名函数stream.map( _.user ).print("1")// 2. 实现 MapFUnction 接口stream.map(new MyMapFunction).print("2")env.execute()}class MyMapFunction extends MapFunction[Event, String]{override def map(t: Event): String = {t.user}}
}

2. 过滤 filter

        对数据流执行一个过滤,通过一个布尔表达表达式设置过滤条件。

import org.apache.flink.api.common.functions.FilterFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, _}object TransformFilterTest {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)val stream: DataStream[Event] = env.fromElements(Event("Mary","./home",1000L),Event("Bob","./cart",2000L))// 提取每次点击事件的用户名// 1. 使用匿名函数stream.filter( _.user=="Mary" ).print("1")// 2. 实现 FilterFUnction 接口stream.filter(new MyFilterFunction).print("2")env.execute()}class MyFilterFunction extends FilterFunction[Event]{override def filter(t: Event): Boolean = t.user=="Bob"}
}

3. 扁平映射 flatMap

        将数据流中的整体(一般为集合类型)拆分成一个个的个体使用。消费一个元素,可以产生0个到多个元素。 先按照某种规则对数据进行打散拆分,再对拆分后的元素做转换处理。

import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, _}
import org.apache.flink.util.Collectorobject TransformFlatmapTest {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)val stream: DataStream[Event] = env.fromElements(Event("Mary", "./home", 1000L),Event("Bob", "./cart", 2000L),Event("Alice","./cart", 3000L))// 提取每次点击事件的用户名stream.flatMap( new MyFlatMap ).print()env.execute()}// 自定义实现FlatMapFunctionclass MyFlatMap extends FlatMapFunction[Event, String]{override def flatMap(t: Event, collector: Collector[String]): Unit = {// 如果当前数据是Mary的点击事件,那么就直接输出userif (t.user == "Mary"){collector.collect(t.user)}// 如果当前数据是Bob的,那么就输出user和urlelse if (t.user == "Bob"){collector.collect(t.user)collector.collect(t.url)}}}
}

3.2 聚合算子

1. 按键分区 keyBy

        DataStream是没有直接进行聚合的API的。所以如果需要聚合,需要先进行分区操作。

        keyBy()通过指定键key,将一条流从逻辑上划分成不同的分区partitions,也就是并行处理的子任务,对应着任务槽task slots。基于不同的key,流中的数据将被分配到不同的分区中去,下一步算子将会在同一个slot中进行处理。

        对于相同的key,一定会被分到同一个分区。不同的key值可能会被分到同一个分区,也可能被分到不同分区。

 键选择器

import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, _}object TransformAggTest {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(4)val stream: DataStream[Event] = env.fromElements(Event("Mary", "./home", 1000L),Event("Bob", "./cart", 2000L),Event("Alice","./cart", 3000L),Event("Mary", "./prod?id=1", 4000L),)// 1. 使用lambda表达式stream.keyBy( data => data.id )或stream.keyBy( _.id )// 2. 使用键选择器stream.keyBy( new MyKeySelector() env.execute()}class MyKeySelector() extends KeySelector[Event, String]{override def getKey(in: Event): String = in.user}
}

2. 简单聚合

        如sum()、min()、max()、minBy()。聚合方法在调用时,也需要传入参数。但不像基本转换算子那样需要实现自定义函数,只要说明聚合指定的字段就可以了。指定字段的方式有两种:指定位置,和指定名称。

        minBy()和min()类似,都是用于简单聚合的函数,求指定字段的最小值。

min()  只计算指定字段的最小值,其他字段回保留最初的第一个数据的值。

minBy()  会返回包含字段最小值的整条数据。

         对于聚合计算而言,先进行 keyBy( ),得到 keyedStream ,在进行聚合得到,得到 dataStream 类型。

stream.keyBy(_.user).max("timestamp").print()

3. 归约聚合 reduce

        与简单聚合类似,reduce()操作也会将KeyStream转换为DataStream。不会改变流的元素数据类型,所以输出类型和输入类型一致。

import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, _}object TransformReduceTest {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)val stream: DataStream[Event] = env.fromElements(Event("Mary", "./home", 1000L),Event("Bob", "./cart", 2000L),Event("Alice", "./cart", 3000L),Event("Mary", "./prod?id=1", 4000L),)// reduce规约聚合// 提前当前最活跃用户stream.map( data => (data.user, 1L )).keyBy(_._1).reduce( new MySum() )  // 统计每个用户的活跃度.keyBy( data => true )  // 将所有数据按照同样的 key 分到同一个组中.reduce( (state,data)=> if(data._2>state._2) data else state ) // 选取当前最活跃的用户.print()env.execute()}class MySum extends ReduceFunction[(String, Long)]{override def reduce(t: (String, Long), t1: (String, Long)): (String, Long) = ( t._1, t._2 + t1._2 )}
}

3.3 用户自定义函数UDF

        Flink 的 DataStream API 编程风格其实是一致的:基本上都是基于 DataStream 调用一个方

法,表示要做一个转换操作;方法需要传入一个参数,这个参数都是需要实现一个接口。
        这些接口有一个共同特点:全部都以算子操作名称 + Function 命名。例如源算子需要实现SourceFunction 接口,map 算子需要实现 MapFunction 接口,reduce()算子需要实现 ReduceFunction 接口。我们不仅可以通过自定义函数类或者匿名类来实现接口,也可以直接传入 Lambda 表达式。这就是所谓的用户自定义函数(user-defined function,UDF)。

1. 函数类

——实现一个自定义的函数类

// 通过传入自定义FilterFunction实现过滤
val stream = clicks.filter( new FlinkFilter )// 自定义FilterFunction函数类
class FlinkFilter extends FilterFunction[Event]{override def filter(value: Event): Boolean = value.url.contains("home")
}

——使用匿名类

stream.filter( new FilterFunction[Event]{override def filter(t: Event): Boolean = t.url.contains("prod")
} )

 ——使用 lambda 表达式

stream.filter( _.url.contains("prod") )
import org.apache.flink.api.common.functions.FilterFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, _}object TransformUDFTest {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)val stream: DataStream[Event] = env.fromElements(Event("Mary", "./home", 1000L),Event("Bob", "./cart", 2000L),Event("Alice", "./cart", 3000L),Event("Mary", "./prod?id=1", 4000L),)// 测试UDF的用法,筛选url中包含某个关键字home的Event事件// 1. 实现一个自定义的函数类stream.filter( new MyFilterFunction("prod") ).print("1")// 2. 使用匿名类stream.filter( new FilterFunction[Event] {override def filter(t: Event): Boolean = t.url.contains("prod")}).print("2")// 3. 使用lambda表达式stream.filter(_.url.contains("prod")).print("3")env.execute()}class MyFilterFunction(keyWord: String) extends FilterFunction[Event]{override def filter(t: Event): Boolean = t.url.contains("keyWord")}
}

2. 富函数类

        富函数类是 DataStream API 的一个函数类的接口,所有的 flink 函数类都有其 rich 版本。

        与常规函数类不同主要在于:富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,可以实现更复杂的功能。几乎每一个算子都有对应的rich版本

        典型的生命周期有:

open()方法:

        Rich Function的 初始化方法,开启一个算子的生命周期。当一个算子的实际工作方法被调用之前,会先调用 open() 方法

close()方法:

        生命周期的最后一个调用方法。

        在下面的代码中,getRuntimeContext()的作用是获取运行时上下文信息。其提供了访问与运行时环境相关的属性和方法。

核心逻辑: 

// 自定义一个RichMapFunction,测试富函数类的功能
stream.map( new MyRichMap() )class MyRichMap() extends RichMapFunction[Evnet, Long]{override def open(parameters: Configuration): Unit = {println("索引号为:"+ getRuntimeContext.getIndefOfThisSubtask + "的任务开始")}override def map(in: Event): Long = {in.timestamp}override def close(): Unit = {println("索引号为:"+ getRuntimeContext.getIndefOfThisSubtask + "的任务结束")}
}
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, _}object TransformRichFunctionTest {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)val stream: DataStream[Event] = env.fromElements(Event("Mary", "./home", 1000L),Event("Bob", "./cart", 2000L),Event("Alice", "./cart", 3000L),Event("Mary", "./prod?id=1", 4000L),)// 自定义一个 RichMapFunction,测试富函数类的功能stream.map( new MyRichMap() ).print()env.execute()}class MyRichMap() extends RichMapFunction[Event, Long]{override def open(parameters: Configuration): Unit =println("索引号为:" + getRuntimeContext.getIndexOfThisSubtask + "的任务开始")override def map(in: Event): Long = in.timestampoverride def close(): Unit = {println("索引号为:" + getRuntimeContext.getIndexOfThisSubtask + "的任务结束")}}
}

 调整并行度:

import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, _}object TransformRichFunctionTest {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(2)val stream: DataStream[Event] = env.fromElements(Event("Mary", "./home", 1000L),Event("Bob", "./cart", 2000L),Event("Alice", "./cart", 3000L),Event("Mary", "./prod?id=1", 4000L),)// 自定义一个 RichMapFunction,测试富函数类的功能stream.map( new MyRichMap() ).print()env.execute()}class MyRichMap() extends RichMapFunction[Event, Long]{override def open(parameters: Configuration): Unit =println("索引号为:" + getRuntimeContext.getIndexOfThisSubtask + "的任务开始")override def map(in: Event): Long = in.timestampoverride def close(): Unit = {println("索引号为:" + getRuntimeContext.getIndexOfThisSubtask + "的任务结束")}}
}

3.4 物理分区

1. 随机分区shuffle

        随即分区服从均匀分布,可以把流中的数据随机打乱,均匀地传递到下游任务分区。如下图所示。

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)val stream = env.addSource( new ClickSource )
stream.shuffle.setParallelism(4)

2. 轮询分区 Round-Robin

        按照先后顺序将数据依次分发。通过调用DataStream的rebalance()方法,实现轮询重分区。

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)val stream = env.addSource( new ClickSource )
stream.rebalance.setParallelism(4)

3. 重缩放分区 rescale

        重缩放分区与轮询分区类似,当调用rescale()方法时,底层也是使用轮询,但是只会讲数据轮询发送到下游并行任务的一部分中。

        如果理解成发牌,rebalance()是每个发牌人都面向所有人发牌;而rescale()是分成小团体,发牌人只给自己团体内的所有人轮流发牌。所以当下游任务数量是上有任务数量的整数倍时,rescale()的效率明显会更高。

object RescaleExample {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)// 使用匿名类的方式自定义数据源,这里使用了并行数据源函数的富函数版本env.addSource(new RichParallelSourceFunction[Int] {override def run(sourceContext: SourceFunction.SourceContext[Int]): Unit = {for (i <- 0 to 7) {// 将偶数发送到下游索引为 0 的并行子任务中去// 将奇数发送到下游索引为 1 的并行子任务中去if ((i + 1) % 2 == getRuntimeContext.getIndexOfThisSubtask) {sourceContext.collect(i + 1)}}}// 这里???是 Scala 中的占位符override def cancel(): Unit = ???}).setParallelism(2).rescale.print().setParallelism(4)env.execute()}
}

4. 广播 broadcast

        经过广播之后,数据会在不同的分区都保留一份,可能进行重复处理。通过调用DataStream的broadcast()方法,将输入数据复制并发送到下游算子的所有并行任务中去。

object BroadcastTest {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)// 读取数据源,并行度为1val stream = env.addSource(new ClientSource)// 经过广播后打印输出,并行度为4stream.broadcast.print("broadcast").setParallelism(4)env.execute()}
}

5. 全局分区 global

        通过.global()方法,将所有的输入流数据都发送到下游算子的第一个并行子任务中去。对程序压力很大,谨慎使用。

6. 自定义分区 Custom

        当flink提供的所有分区策略都不能满足用户需求时,可以通过使用partitionCustom()方法来自定义分区策略。

partitionCustom()方法传参:

- 自定义分区器(Partitioner)对象

- 应用分区器的字段

object TransCustomPartitioner {def main(args: Array[String]): Unit = {val env =  StreamExecutionEnvironment.getExecutionEnvironmentenv.fromElements(1,2,3,4,5,6,7,8).partitionCustom(// 根据 key 的奇偶性计算出数据将被发送到哪个分区new Partitioner[Int] {override def partition(key: Int, numPartitions: Int) = key % 2},// 以自身作为keydata => data).print()env.execute()}
}

四、输出算子(Sink)

4.1 输出到外部系统

        与 source 类似,一般情况下sink算子的创建是通过调用 DataStream 的 addSink() 方法来实现的。

4.2 输出到文件

        flink有一些输出到文件的预实现方法,如writeAsText()、writeAsCsv()。但是对于大数据来说,这种方法过于简单,无法满足分布式的需求。StreamingFileSink支持行编码和批量编码,这两种不同的方式都有各自的构造器,可以直接调用StreamingFileSink的静态方法:

行编码:StreamingFileSink.forRowFormat( basePath, rowEncoder )

批量编码:StreamingFileSink.forBulkFormat( basePath, bulkWriterFactory )

stream.addSink( StreamFileSink.forRowFormat(new Path("F:\Server\flink\resources\out1.txt"),new SimpleStringEncoder[String]("UTF-8")
) )

4.3 输出到Kafka

        flink为Kafka提供了source和sink的连接器,我们可以用它方便地从Kafka读写数据。而且flink和Kafka的连接器提供了端到端的精确一次保证。

object SinkToKafka {def main(agrs: Array[String]): Unit = {// 1. 配置环境val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)// 2. 编辑Kafka环境val properties = new Properties()properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.20:9092")properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"sensorgroup1")// 3. 读取文件val stream = env.readTextFile("input/clicks.csv")// 4. 数据处理后写入到Kafkastream.addSink( new FlinkKafkaProducer[String]("clicks",new SimpleStringSchema(),properties) )// 5. 执行env.execute()}}

4.4 输出到HBase

添加对应pom依赖

    <dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>2.3.5</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId><version>2.3.5</version></dependency>

main函数中调用addSink方法

dataStream.addSink(new MyHbaseSink)

 实现MyHbaseSink方法

class MyHbaseSink extends RichSinkFunction[SensorReading] {var connection: Connection = _var mutator: BufferedMutator = _override def open(parameters: Configuration): Unit = {val configuration: conf.Configuration = HBaseConfiguration.create()configuration.set(HConstants.HBASE_DIR, "hdfs://192.168.78.20:9000/hbase")configuration.set(HConstants.ZOOKEEPER_QUORUM, "192.168.136.20")configuration.set(HConstants.CLIENT_PORT_STR, "2181")connection = ConnectionFactory.createConnection(configuration) val params: BufferedMutatorParams = new BufferedMutatorParams(TableName.valueOf("ha:test"))params.writeBufferSize(10*1024*1024)params.setWriteBufferPeriodicFlushTimeoutMs(5*1000L)mutator = connection.getBufferedMutator(params)}override def close() = {connection.close()}override def invoke(value: SensorReading, context: SinkFunction.Context) = {val put = new Put(Bytes.toBytes(value.id + value.temperature + value.timestamp))put.addColumn("sensor".getBytes(), "id".getBytes(), value.id.getBytes())put.addColumn("sensor".getBytes(), "timestamp".getBytes(), value.timestamp.toString.getBytes())put.addColumn("sensor".getBytes(), "temperature".getBytes(), value.temperature.toString.getBytes())mutator.mutate(put)mutator.flush()}}

4.5 输出到MySQL

添加依赖

    <dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.29</version></dependency>

 main函数中调用addSink方法

dataStream.addSink(new MyJdbcSink)

实现MyJdbcSink方法 

class MyJdbcSink extends RichSinkFunction[SensorReading]{var connection: Connection = _var insertState: PreparedStatement = _var updateState: PreparedStatement = _override def open(parameters: Configuration): Unit = {connection = DriverManager.getConnection("jdbc:mysql://192.168.136.20:3306/kb21?useSSL=false","root","root")insertState = connection.prepareStatement("insert into sensor_temp(id, temp) value(?,?)")updateState = connection.prepareStatement("update sensor_temp set temp=? where id=?")}override def invoke(value: SensorReading, context: SinkFunction.Context): Unit = {updateState.setDouble(1,value.temperature)updateState.setString(2, value.id)val i: Int = updateState.executeUpdate()if (i==0){insertState.setString(1,value.id)insertState.setDouble(2,value.temperature)insertState.execute()}}override def close(): Unit = {insertState.close()updateState.close()connection.close()}}

4.6 自定义Sink

        与Source类似,Flink为我们提供了通用的SinkFunction接口和对应的RichSinkFunction抽象类。可以通过简单的调用DataStream的addSink()方法来自定义写入任何外部存储。比如hbase的连接。

        在实现SinkFunction的时候需要重写关键方法invoke(),在这个方法中我们可以实现将流里的数据发送出去的逻辑。创建连接以及关闭连接分别放在open()和close()方法中。这里不做赘述。


http://chatgpt.dhexx.cn/article/NWiLDkFw.shtml

相关文章

大数据开发-Flink-数据流DataStream和DataSet

文章目录 一、DataStream的三种流处理Api1.1 DataSource1.2 Transformation1.3 Sink 二、DataSet的常用Api2.1 DataSource2.2 Transformation2.3 Sink Flink主要用来处理数据流&#xff0c;所以从抽象上来看就是对数据流的处理&#xff0c;正如前面大数据开发-Flink-体系结构 &…

Flink DataStream API 介绍

Flink DataStream API 介绍 StreamExecutionEnvironment #mermaid-svg-JKeWa22W2vWA4zBS {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-JKeWa22W2vWA4zBS .error-icon{fill:#552222;}#mermaid-svg-JKeWa22W2vWA4z…

DataStream API介绍与使用(一)

详细API参考官网 DataStream编程模型 在Flink整个系统架构中&#xff0c;对流计算的支持是其最重要的功能之一&#xff0c;Flink基于Google提出的DataFlow模型&#xff0c;实现了支持原生数据流处理的计算引擎。Flink中定义了DataStream API让用户灵活且高效地编写Flink流式应…

DataStream API(一)

Flink 有非常灵活的分层 API 设计&#xff0c;其中的核心层就是 DataStream/DataSet API。由于新版 本已经实现了流批一体&#xff0c; DataSet API 将被弃用&#xff0c;官方推荐统一使用 DataStream API 处理流数 据和批数据。由于内容较多&#xff0c;我们将会用几章的篇幅来…

DataStream(二)

目录 5.3.2 聚合算子&#xff08;Aggregation&#xff09; 5.3.3 用户自定义函数&#xff08;UDF&#xff09; 3. 扁平映射&#xff08;flatMap&#xff09; flatMap 操作又称为扁平映射&#xff0c;主要是将数据流中的整体&#xff08;一般是集合类型&#xff09;拆分成一个 …

Flink DataStream API

Flink DataStream API 编程指南 概览前言什么是DataStreamFlink程序剖析程序样例 Data SourcesDataStream Transformations算子数据流转换算子物理分区算子链和资源组 Data Sinks迭代执行参数 概览 前言 Flink中的DataStream程序是常规程序&#xff0c;可对数据流进行转换&am…

DataStream API(三)

目录 5.3.4 物理分区&#xff08;Physical Partitioning&#xff09; 5.4 输出算子&#xff08;Sink&#xff09; 5.4.1 连接到外部系统 5.4.2 输出到文件 5.4.3 输出到 Kafka 5.4.4 输出到 MySQL&#xff08;JDBC&#xff09; 5.4.5 自定义 Sink 输出 5.5 本章总结 5.3.…

流式数据采集和计算(十):Flink的DataStream学习笔记

Flink的DataStream学习笔记.. 1 Flink 基础.. 3 Flink特性.. 3 Flink和Spark对比.. 3 设计思路.. 3 状态管理.. 3 Flink 初探.. 4 设计架构.. 4 Flink on yarn. 5 流程分析.. 6 DataStream. 7 API程序结构.. 7 DataSource 8 Transformation. 9 Sink. 13 Time 14…

DataStream API(基础篇) 完整使用 (第五章)

DataStream API基础篇 一、执行环境&#xff08;Execution Environment&#xff09;1、创建执行环境1. getExecutionEnvironment2. createLocalEnvironment3. createRemoteEnvironment 二、执行模式(Execution Mode)1. BATCH模式的配置方法&#xff08;1&#xff09;通过命令行…

DataStream API 四 之 Flink DataStream编程

DataStream API 四 之 Flink DataStream编程 1.分布式流处理基本模型2.流应用开发步骤3.数据类型4. Connector5. Execution environment6. 参数传递7.配置并⾏度8.Watermark9.Checkpoint10.State11. Data Source11.111.2 自定义Source 12.Transformations13.Window13.1窗⼝处理…

Flink的DataStream介绍

1|0一&#xff1a;流式处理基本概念 流处理系统本身有很多自己的特点。一般来说&#xff0c;由于需要支持无限数据集的处理&#xff0c;流处理系统一般采用一种数据驱动的处理方式。它会提前设置一些算子&#xff0c;然后等到数据到达后对数据进行处理。 为了表达复杂的逻辑&am…

Flink DataStream API(基础版)

概述 DataStream&#xff08;数据流&#xff09;本身是 Flink 中一个用来表示数据集合的类&#xff08;Class&#xff09;&#xff0c;我们编写的 Flink 代码其实就是基于这种数据类型的处理&#xff0c;所以这套核心API 就以DataStream 命名。对于批处理和流处理&#xff0c;我…

node.js上开启服务,在同一局域网下的另一客户端访问

选择的服务是我之前做的案例&#xff1a;链接 1.在本机上开启服务&#xff1a; 2.本机上用浏览器访问验证无误&#xff1a; 3.运行cmd使用命令ipconfig查看本机ip地址 4.在另一台局域网下的机子&#xff0c;要求可以ping到。 浏览器访问ip地址&#xff1a;3000即可。&#…

前端 面试题

介绍项目 安全性 token 验证 处理令牌续期问题&#xff0c;在header中获取到新令牌时&#xff0c;替换老令牌&#xff0c;以达到用户无感刷新令牌 1、第一次登录的时候&#xff0c;前端调后端的登陆接口&#xff0c;发送用户名和密码 2、后端收到请求&#xff0c;验证用户名和…

基础知识---cmd命令行篇

1、echo&#xff08;输出&#xff09; > 覆盖 >>追加 2、dir&#xff08;展示当前目录的文件 .当前的目录 ..表示上一层目录&#xff09; 3、d&#xff1a;和cd&#xff1a;d为驱动器 cd展示当前目录的文件&#xff08; .为当前文件 ..为上一层目录的文件&#xf…

虚拟机NAT+静态IP+DNS

NAT模式下 虚拟机联网是通过物理机的VMware Nat服务&#xff08;电脑网络切换也无碍&#xff09;&#xff0c;禁用状态下ping不通物理机&#xff0c;也连不了网 物理机连接虚拟机的通过VMnet8虚拟网络适配器&#xff0c;禁用情况下ping不通虚拟机&#xff0c;Xshell工具也没法…

ACP考前错题总结(精华,已过ACP)

前言 证书和战绩镇楼&#xff0c;希望大家都可以拿到自己想要的Certificate。无论ACA-ACP-ACE亦或者GCP、AWS等等。 错题总结 镜像、本地磁盘部分 不建议基于本地服务器制作镜像上传到阿里云ECS并提供服务。 不支持写在座位数据盘使用的本地盘 镜像和快照&#xff1a;镜像可…

React+Native Unable to download JS Bundle解决办法

在配置ReactNative开发环境中&#xff0c;会遇到很多坑。 这个会困扰很多很多人。 在前序工作中&#xff0c;我们开启了8081端口&#xff0c;以供手机通过该端口下载相应的js。 而在命令行执行adb reverse tcp:8081 tcp:8081命令能解决一定的问题&#xff0c;但也有可能失效。…

网络协议 一 OSI参考模型、计算机通信基础 (集线器、网桥、交换机、路由器)

萌宅鹿网络系列 的基础上增强 目录 互联网&#xff08;internet&#xff09;为什么要学习网络协议客户端-服务器跨平台的原理&#xff08;Java、C&#xff09;网络互连模型&#xff08;OSI参考模型&#xff09;计算机之间的通信基础 计算机之间的连接方式 - 网线直连计算机之间…

物联网安全实践二

正文 一 实验目的及要求 物联网智能设备一般都提供WiFi接入&#xff0c;本实验是在WiFi密码破解基础上进一步对物联网智能设备配置服务开展安全性分析实验。比如智能物联网家居网关、智能家居中的智能插座等&#xff0c;一般都内置Web服务&#xff0c;方便本地登录Web网页开展…