flatMap底层实现

article/2025/10/18 10:03:40

上篇:Transformation的map使用

第一种方式重写flatMap方法实现

实现需求:根据字符串在nc -lk 8888的窗口命令下输入的数据,在控制台打印输出发现:在同一行数据输入的单词字符串自动换行,按每个单词独立换行并且若输入有重复的单词时,它不会再次输出

package cn._51doit.flink.day02;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Collector;/*** Transformation的flatMap底层实现[无界流]*/
public class FlatMapDemo3 {public static void main(String[] args) throws Exception {//    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//查看本地的并行度StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());DataStreamSource<String> lines = env.socketTextStream("Master", 8888);SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {//重写flatMap方法@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {//输入数据String[] words = value.split(" ");for (String word : words) {if (!"error".equals(word)) {out.collect(word);}}}});words.print();env.execute();}public static class MyStreamMap extends AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer,Integer>{//重写processElement方法@Overridepublic void processElement(StreamRecord<Integer> element) throws Exception {//输入方法Integer i = element.getValue();Integer j = i * 2;//将要输出的数据放入到element【但是没有输出】element.replace(j);//输出数据output.collect(element);}}
}

打印输出

第二种方式重写processElement方法实现

编码:无界流

package cn._51doit.flink.day02;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;/*** Transformation的flatMap底层实现* 第二种方式重写processElement方法实现*/
public class FlatMapDemo02 {public static void main(String[] args) throws Exception {//    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//查看本地的并行度StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());DataStreamSource<String> lines = env.socketTextStream("Master", 8888);SingleOutputStreamOperator<String> words = lines.transform("MyFlatMap",TypeInformation.of(String.class),new MyStreamFlatMap());words.print();env.execute();}public static class MyStreamFlatMap extends AbstractStreamOperator<String> implements OneInputStreamOperator<String,String>{//重写processElement方法@Overridepublic void processElement(StreamRecord<String> element) throws Exception {//拿到一行数据String lines = element.getValue();String[] words = lines.split(" ");for (String word : words){if (!word.equals("error")){//StreamRecord<String> record = new StreamRecord<>(word);output.collect(element.replace(word));}}}}
}

控制台打印输出:

源码解析

(1)点击flatMap进去查看,根据反射获取输出的类型,输入的是T类型,返回的R类型,没有把代码写死,运行时获取返回的类型

(2)在FlatMapFunction接口中的TypeInformation方法,它需要返回一个clean,需要进行对闭包引用类型序列化的检测

(3)StreamFlatMap实现了OneInputStreamOperator,而AbstractUdfStreamOperator不断可以约束你输入上面类型,还可以约束你输入哪些类型的方法,但是它一定是继承了AbstractUdfStreamOperator

(4)输入FlatMapFunction函数,在进行计算的时候,在processElement方法的userFunction一定会调flatMap函数,有输入的数据flatMap(element.getValue(),然后把collector函数传入进去做输出数据


http://chatgpt.dhexx.cn/article/V4ydHKL6.shtml

相关文章

java7 flatmap_flink学习之七-map、fliter、flatmap

看完了Flink的datasource、sink&#xff0c;也就把一头一尾给看完了&#xff0c;从数据流入到数据流出&#xff0c;缺少了中间的处理环节。 而flink的大头恰恰是只在这个中间环节&#xff0c;如下图&#xff1a; source-transform-sink-update.png 中间的处理环节比较复杂&…

Stream之flatMap

一、flatMap简介 flatMap:将小Stream转换为大Stream 二、示例转换要求 目标&#xff1a;将如下对象中的分类category提取出来&#xff0c;去重。其中如"哲学,爱情",需要解析为两个分类&#xff0c;["哲学","爱情"] [{"age":33,"…

flatmap使用

使用场景 适用于嵌套list数据结构&#xff0c;想把内部的list数据合并成一个list.。 举例如下&#xff1a; public class Test {public static void main(String[] args) {List<List<String>> list new ArrayList<>();List<String> list1 new Arr…

Java8中map与flatMap用法

目录 1 概述 2 map与flatMap 3 常用写法 1 概述 Java8中一些新特性在平时工作中经常会用到&#xff0c;但有时候总感觉不是很熟练&#xff0c;今天特意将这个Java8中的映射记录一下。 2 map与flatMap map---对集合中的元素逐个进行函数操作映射成另外一个 flatMap---接收一…

如何使用flatMap

1. 什么情况下用到flatMap 当使用map&#xff08;&#xff09;操作时&#xff0c;不是返回一个值&#xff0c;而是返回一个集合或者一个数组的时候&#xff0c;这时候就可以使用flatMap解决这个问题。举个例子&#xff0c;你有一个列表 [21,23,42]&#xff0c;然后你调用getPr…

【JavaScript中数组的flatMap方法的详细介绍】

在我们平时对数组进行操作的时候&#xff0c;通常map、forEach和filter方法比较常用。而flatMap方法用得相对少一些。当你掌握了flatMap方法的使用之后&#xff0c;我相信你一定会喜欢上它的&#xff01; 下面我们会通过以下三个问题展开对flatMap方法的讲解&#xff1a; 1. f…

Java8 - Streams flatMap()

文章目录 官方文档What is flatMap()?Why flat a Stream?Demo需求1&#xff1a;Find all books需求2&#xff1a;Order and LineItems需求3&#xff1a;Splits the line by spaces需求4&#xff1a; flatMap and primitive type 官方文档 https://docs.oracle.com/javase/8/…

JAVA8 中的flatmap

构建对象 class User{private String addr } 将多个User集合中的addr按照;分割合并成一个字符串list List<User> uList Lists.newArrayList();User u1 new User();u1.setAddr("a1;a2;a3;a4;a5");User u2 new User();u2.setAddr("b1;b2;b3;b4;b5&qu…

Unity resource style/Theme.AppCompat.Dialog (aka xxx:style/Theme.AppCompat.Dialog) not found

关于Unity 打包报错"resource style/Theme.AppCompat.Dialog (aka com.game.chipsmerge:style/Theme.AppCompat.Dialog) not found."的问题 解决方法: 在mainTemplate文件中添加依赖: implementation ‘com.android.support:appcompat-v7:28.0.0’ 或者自己去下载其…

android最新v7包下载,support v7 appcompat.jar包下载

android support v7 appcompat.jar包是一款非常实用的jar文件,是android开发中必备的一份文件,能够在低版本Android平台上开发一个应用程序,兼容性极强。感兴趣的朋友欢迎前来IT猫扑下载体验吧! android support v7 appcompat.jar包介绍 android-support-v7-appcompat.jar包…

解决 appcompat 1.1.0 导致 webview crash 的问题

Android SDK 太不让人省心了&#xff0c;正式版本居然也埋雷。 前段时间把 support 升级到了 androidx&#xff0c;appcompat 自动升级了新版本 androidx.appcompat:appcompat:1.1.0。 简单回归了下功能就发上线了&#xff0c;结果在在 5.1 的系统上发生了大规模的 crash&…

从AppCompat切换到MaterialComponents一些主题属性介绍

文章目录 前言主题属性颜色排版字体形状小部件ButtonsText FieldsCardsBottom Navigation 后话 前言 絮叨两句&#xff0c;感觉Component这个库有点傲娇&#xff0c;我碰到一个情景&#xff0c;使用Button&#xff0c;设置了background属性&#xff0c;当使用样式是AppCompat时…

Gradle编译问题(appcompat和material相关)

在使用Android Studio编译项目时&#xff0c;发现的编译问题。已解决&#xff0c;在此记录一下 问题1 Cant determine type for tag <macro name"m3_comp_bottom_app_bar_container_color">?attr/colorSurface</macro> 原因是androidx.appcompat:app…

Android报错之You need to use a Theme.AppCompat theme (or descendant) with this activity.

[TOC](Android报错之You need to use a Theme.AppCompat theme (or descendant) with this activity.) 一、报错如下 原因为&#xff1a;Activty继承自android.support.v7.app.AppCompatActivty,而不是android.app.Activty。 二、解决方法 看一下提示&#xff0c;就是要用Th…

appcompat_v7项目说明

一、appcompat_v7项目说明 今天来说一下appcompat_v7项目的问题&#xff0c;使用eclipse创建Android项目时&#xff0c;发现project列表中会多创建出一个appcompat_v7项目&#xff0c;这是我搭建最新的Android开发环境创建第一个Android测试项目后发现的&#xff0c;我在创建An…

Android Studio报错Could not find any version that matches com.android.support:appcompat-v7:33.+.

今天用AndroidStudio新建了一个项目&#xff0c;没想到新建项目就爆红了 而且Java代码有标红&#xff0c;cannot reslove symbol"v7" 解决方案&#xff1a; 1.打开build.gradle文件&#xff0c;找到dependencies下 implementation com.android.support:appcompat-…

Android关于Theme.AppCompat相关问题的深入分析

先来看这样一个错误&#xff1a; No resource found that matches the given name style/Theme.AppCompat.Light 对于这个错误&#xff0c;相信大部分Android开发者都遇到过&#xff0c;可能很多朋友通过百度或者Google已经解决了这个问题&#xff0c;但是网上大部分都只给出…

细说 AppCompat 主题引发的坑:You need to use a Theme.AppCompat theme with this activity!

一般来说按照文档的建议去做&#xff0c;出现问题的概率很低。但很多人的情况不同&#xff0c;每每会发生意外状况&#xff0c;就比如这次没有使用 AppCompat 主题引发的坑&#xff01; AppCompat 框架作为 Jetpack 集合的基石&#xff0c;非常重要。Android Studio 上创建的默…

Android Jetpack基础组件之AppCompat

1.简介 相比苹果封闭的IOS系统&#xff0c;Android系统的开放性带来了很多的优势。与此同时&#xff0c;也带来了严重的碎片化问题&#xff0c;包括硬件的碎片化和软件碎片化。这里&#xff0c;我们主要说的是软件方面。各Android设备厂商&#xff0c;受限于成本和技术原因&am…

AppCompat发布两年了,还没了解?

近日随笔 近期疫情日渐严峻&#xff0c;大家多多保重&#xff0c;出门记得戴口罩。希望河北&#xff0c;黑龙江能尽早控制住好局面迎来拐点&#xff0c;全国人民过个好年。 为了能够让低版本的Android系统能够运行新特性&#xff0c;AppCompat框架自Support时代就已推出。但随着…