java7 flatmap_flink学习之七-map、fliter、flatmap

article/2025/10/18 10:01:06

看完了Flink的datasource、sink,也就把一头一尾给看完了,从数据流入到数据流出,缺少了中间的处理环节。

而flink的大头恰恰是只在这个中间环节,如下图:

7bbf14996d39

source-transform-sink-update.png

中间的处理环节比较复杂,现在也就看了其中一部分,这里先开始讲其中最简单 也最常用的map、flatmap及filter。

map

flink中dataSourceStream和java8中的map很类似,都是用来做转换处理的,看下map的实现:

public SingleOutputStreamOperator map(MapFunction mapper) {

TypeInformation outType = TypeExtractor.getMapReturnTypes((MapFunction)this.clean(mapper), this.getType(), Utils.getCallLocationName(), true);

return this.transform("Map", outType, new StreamMap((MapFunction)this.clean(mapper)));

}

可以看到:

1、返回的是SingleOutputStreamOperator泛型,这是个基础的类型,好多DataStream的方法都返回它,比如map、flapmap、filter、process等

2、最终是调用transform方法来实现的,看下transfrom的实现:

@PublicEvolving

public SingleOutputStreamOperator transform(String operatorName, TypeInformation outTypeInfo, OneInputStreamOperator operator) {

this.transformation.getOutputType();

OneInputTransformation resultTransform = new OneInputTransformation(this.transformation, operatorName, operator, outTypeInfo, this.environment.getParallelism());

SingleOutputStreamOperator returnStream = new SingleOutputStreamOperator(this.environment, resultTransform);

this.getExecutionEnvironment().addOperator(resultTransform);

return returnStream;

}

额,好像还不如不看,直接看怎么用吧!

@Slf4j

public class KafkaUrlSinkJob {

public static void main(String[] args) throws Exception {

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Properties properties = new Properties();

properties.put("bootstrap.servers", "localhost:9092");

properties.put("zookeeper.connect", "localhost:2181");

properties.put("group.id", "metric-group");

properties.put("auto.offset.reset", "latest");

properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

SingleOutputStreamOperator dataStreamSource = env.addSource(

new FlinkKafkaConsumer010(

"testjin",// topic

new SimpleStringSchema(),

properties

)

).setParallelism(1)

// map操作,转换,从一个数据流转换成另一个数据流,这里是从string-->UrlInfo

.map(string -> JSON.parseObject(string, UrlInfo.class))

}

可以看到,kafka中传递的是String类型,在这里通过map转换后,变SingleOutputStreamOperator 类型,否则就是SingleOutputStreamOperator 。

map方法不允许缺少数据,也就是原来多少条数据,处理后依然是多少条数据,只是用来做转换。

flatmap

flatmap,也就是将嵌套集合转换并平铺成非嵌套集合。看个例子,还是用上面的kafka datasource:

// 构造一个嵌套的数据

SingleOutputStreamOperator> listDataStreaamSource = dataStreamSource

.map(urlInfo -> {

List list = Lists.newArrayList();

list.add(urlInfo);

UrlInfo urlInfo1 = new UrlInfo();

urlInfo1.setUrl(urlInfo.getUrl() + "-copy");

urlInfo1.setHash(DigestUtils.md5Hex(urlInfo1.getUrl()));

list.add(urlInfo1);

return list;

}).returns(new ListTypeInfo(UrlInfo.class));

listDataStreaamSource.addSink(new PrintSinkFunction<>());

说明:

1、注意这里的returns方法,如果不指定,会在运行时报错

/*I think the short description of the error message is quite good, but let me expand it a bit.

In order to execute a program, Flink needs to know the type of the values that are processed because it needs to serialize and deserialize them. Flink's type system is based on TypeInformation which describes a data type. When you specify a function, Flink tries to infer the return type of that function. In case of the FlatMapFunction of your example the type of the objects that are passed to the Collector.

Unfortunately, some Lambda functions lose this information due to type erasure such that Flink cannot automatically infer the type. Therefore, you have to explicitly declare the return type.

如果直接上面这样转换,因为lambda表达式会丢失部分信息,会报如下异常:

org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface. Otherwise the type has to be specified explicitly using type information.

*/

不过由于返回的是一个List,不可能直接用 List.class,没这种写法。而flink则

提供了更多选项,这里使用的是

public SingleOutputStreamOperator returns(TypeInformation typeInfo){}

这个构造函数,而ListTypeInfo则是继承TypeInfomation抽象类的一个List实现。

和上文的KafkaSender一起运行,会有如下结果:

kafkaSender:

2019-01-15 20:21:46.650 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : e89bffd6b2eff799

2019-01-15 20:21:46.653 [main] INFO myflink.KafkaSender - send msg:{"domain":"so.com","id":0,"url":"http://so.com/1547554906650"}

KafkaUrlSinkJob

[UrlInfo(id=0, url=http://so.com/1547554906650, hash=null), UrlInfo(id=0, url=http://so.com/1547554906650-copy, hash=efb0862d481297743b08126b2cda602e)]

也就是一个UrlInfo 扩展成了 一个List

下面看看怎么使用flatmap

...

SingleOutputStreamOperator flatSource = listDataStreaamSource.flatMap(new FlatMapFunction, UrlInfo>() {

@Override

public void flatMap(List urlInfos, Collector collector) throws Exception {

urlInfos.parallelStream().forEach(urlInfo -> collector.collect(urlInfo));

}

});

flatSource.addSink(new PrintSinkFunction<>());

...

当然可以写成lambda表达式:(注意lambda表达式需要显式指定return type)

SingleOutputStreamOperator flatSource = listDataStreaamSource.flatMap(

(FlatMapFunction, UrlInfo>) (urlInfos, collector) ->

urlInfos.parallelStream().forEach(urlInfo -> collector.collect(urlInfo))).returns(UrlInfo.class);

看看打印出来的结果:

2> [UrlInfo(id=0, url=http://so.com/1547554906650, hash=null), UrlInfo(id=0, url=http://so.com/1547554906650-copy, hash=efb0862d481297743b08126b2cda602e)]

1> [UrlInfo(id=0, url=http://so.com/1547554903640, hash=null), UrlInfo(id=0, url=http://so.com/1547554903640-copy, hash=138f79ecc92744a65b03132959da2f73)]

1> UrlInfo(id=0, url=http://so.com/1547554903640-copy, hash=138f79ecc92744a65b03132959da2f73)

1> UrlInfo(id=0, url=http://so.com/1547554903640, hash=null)

2> UrlInfo(id=0, url=http://so.com/1547554906650, hash=null)

2> UrlInfo(id=0, url=http://so.com/1547554906650-copy, hash=efb0862d481297743b08126b2cda602e)

也就是说,flatmap方法最终返回的是一个collector,而这个collector只有一层,当输入数据有嵌套的情况下,可以将数据平铺处理。

当然,不只是针对嵌套集合,由于flatmap返回的数据条数并不会做限制,也就可以做一些扩展数据处理的情况,如下:

dataStream.flatMap((FlatMapFunction) (value, out) -> {

for (String word : value.split(" ")) {

out.collect(word);

}

});

这里就是将string使用空格切割后,组成一个新的dataStream.

filter

顾名思义,filter用于过滤数据,继续在上面代码的基础上写测试。为了避免干扰,将上面两个dataSourceStream.addSink注释掉,添加以下代码:

// 根据domain字段,过滤数据,只保留BAIDU的domain

SingleOutputStreamOperator filterSource = flatSource.filter(urlInfo -> {

if(StringUtils.equals(UrlInfo.BAIDU,urlInfo.getDomain())){

return true;

}

return false;

});

filterSource.addSink(new PrintSinkFunction<>());

这里排除别的domain数据,只保留BAIDU的数据,运行结果就不贴出来了,验证了filter的效果。


http://chatgpt.dhexx.cn/article/XhX46jb9.shtml

相关文章

Stream之flatMap

一、flatMap简介 flatMap:将小Stream转换为大Stream 二、示例转换要求 目标&#xff1a;将如下对象中的分类category提取出来&#xff0c;去重。其中如"哲学,爱情",需要解析为两个分类&#xff0c;["哲学","爱情"] [{"age":33,"…

flatmap使用

使用场景 适用于嵌套list数据结构&#xff0c;想把内部的list数据合并成一个list.。 举例如下&#xff1a; public class Test {public static void main(String[] args) {List<List<String>> list new ArrayList<>();List<String> list1 new Arr…

Java8中map与flatMap用法

目录 1 概述 2 map与flatMap 3 常用写法 1 概述 Java8中一些新特性在平时工作中经常会用到&#xff0c;但有时候总感觉不是很熟练&#xff0c;今天特意将这个Java8中的映射记录一下。 2 map与flatMap map---对集合中的元素逐个进行函数操作映射成另外一个 flatMap---接收一…

如何使用flatMap

1. 什么情况下用到flatMap 当使用map&#xff08;&#xff09;操作时&#xff0c;不是返回一个值&#xff0c;而是返回一个集合或者一个数组的时候&#xff0c;这时候就可以使用flatMap解决这个问题。举个例子&#xff0c;你有一个列表 [21,23,42]&#xff0c;然后你调用getPr…

【JavaScript中数组的flatMap方法的详细介绍】

在我们平时对数组进行操作的时候&#xff0c;通常map、forEach和filter方法比较常用。而flatMap方法用得相对少一些。当你掌握了flatMap方法的使用之后&#xff0c;我相信你一定会喜欢上它的&#xff01; 下面我们会通过以下三个问题展开对flatMap方法的讲解&#xff1a; 1. f…

Java8 - Streams flatMap()

文章目录 官方文档What is flatMap()?Why flat a Stream?Demo需求1&#xff1a;Find all books需求2&#xff1a;Order and LineItems需求3&#xff1a;Splits the line by spaces需求4&#xff1a; flatMap and primitive type 官方文档 https://docs.oracle.com/javase/8/…

JAVA8 中的flatmap

构建对象 class User{private String addr } 将多个User集合中的addr按照;分割合并成一个字符串list List<User> uList Lists.newArrayList();User u1 new User();u1.setAddr("a1;a2;a3;a4;a5");User u2 new User();u2.setAddr("b1;b2;b3;b4;b5&qu…

Unity resource style/Theme.AppCompat.Dialog (aka xxx:style/Theme.AppCompat.Dialog) not found

关于Unity 打包报错"resource style/Theme.AppCompat.Dialog (aka com.game.chipsmerge:style/Theme.AppCompat.Dialog) not found."的问题 解决方法: 在mainTemplate文件中添加依赖: implementation ‘com.android.support:appcompat-v7:28.0.0’ 或者自己去下载其…

android最新v7包下载,support v7 appcompat.jar包下载

android support v7 appcompat.jar包是一款非常实用的jar文件,是android开发中必备的一份文件,能够在低版本Android平台上开发一个应用程序,兼容性极强。感兴趣的朋友欢迎前来IT猫扑下载体验吧! android support v7 appcompat.jar包介绍 android-support-v7-appcompat.jar包…

解决 appcompat 1.1.0 导致 webview crash 的问题

Android SDK 太不让人省心了&#xff0c;正式版本居然也埋雷。 前段时间把 support 升级到了 androidx&#xff0c;appcompat 自动升级了新版本 androidx.appcompat:appcompat:1.1.0。 简单回归了下功能就发上线了&#xff0c;结果在在 5.1 的系统上发生了大规模的 crash&…

从AppCompat切换到MaterialComponents一些主题属性介绍

文章目录 前言主题属性颜色排版字体形状小部件ButtonsText FieldsCardsBottom Navigation 后话 前言 絮叨两句&#xff0c;感觉Component这个库有点傲娇&#xff0c;我碰到一个情景&#xff0c;使用Button&#xff0c;设置了background属性&#xff0c;当使用样式是AppCompat时…

Gradle编译问题(appcompat和material相关)

在使用Android Studio编译项目时&#xff0c;发现的编译问题。已解决&#xff0c;在此记录一下 问题1 Cant determine type for tag <macro name"m3_comp_bottom_app_bar_container_color">?attr/colorSurface</macro> 原因是androidx.appcompat:app…

Android报错之You need to use a Theme.AppCompat theme (or descendant) with this activity.

[TOC](Android报错之You need to use a Theme.AppCompat theme (or descendant) with this activity.) 一、报错如下 原因为&#xff1a;Activty继承自android.support.v7.app.AppCompatActivty,而不是android.app.Activty。 二、解决方法 看一下提示&#xff0c;就是要用Th…

appcompat_v7项目说明

一、appcompat_v7项目说明 今天来说一下appcompat_v7项目的问题&#xff0c;使用eclipse创建Android项目时&#xff0c;发现project列表中会多创建出一个appcompat_v7项目&#xff0c;这是我搭建最新的Android开发环境创建第一个Android测试项目后发现的&#xff0c;我在创建An…

Android Studio报错Could not find any version that matches com.android.support:appcompat-v7:33.+.

今天用AndroidStudio新建了一个项目&#xff0c;没想到新建项目就爆红了 而且Java代码有标红&#xff0c;cannot reslove symbol"v7" 解决方案&#xff1a; 1.打开build.gradle文件&#xff0c;找到dependencies下 implementation com.android.support:appcompat-…

Android关于Theme.AppCompat相关问题的深入分析

先来看这样一个错误&#xff1a; No resource found that matches the given name style/Theme.AppCompat.Light 对于这个错误&#xff0c;相信大部分Android开发者都遇到过&#xff0c;可能很多朋友通过百度或者Google已经解决了这个问题&#xff0c;但是网上大部分都只给出…

细说 AppCompat 主题引发的坑:You need to use a Theme.AppCompat theme with this activity!

一般来说按照文档的建议去做&#xff0c;出现问题的概率很低。但很多人的情况不同&#xff0c;每每会发生意外状况&#xff0c;就比如这次没有使用 AppCompat 主题引发的坑&#xff01; AppCompat 框架作为 Jetpack 集合的基石&#xff0c;非常重要。Android Studio 上创建的默…

Android Jetpack基础组件之AppCompat

1.简介 相比苹果封闭的IOS系统&#xff0c;Android系统的开放性带来了很多的优势。与此同时&#xff0c;也带来了严重的碎片化问题&#xff0c;包括硬件的碎片化和软件碎片化。这里&#xff0c;我们主要说的是软件方面。各Android设备厂商&#xff0c;受限于成本和技术原因&am…

AppCompat发布两年了,还没了解?

近日随笔 近期疫情日渐严峻&#xff0c;大家多多保重&#xff0c;出门记得戴口罩。希望河北&#xff0c;黑龙江能尽早控制住好局面迎来拐点&#xff0c;全国人民过个好年。 为了能够让低版本的Android系统能够运行新特性&#xff0c;AppCompat框架自Support时代就已推出。但随着…

AppCompat (AppCompatActivity) Jetpack

进入AppCompat章节后&#xff0c;我们发现它又被分为了4个部分&#xff0c;这4个部分被称为“key class”&#xff0c;也就是重点类&#xff0c;它们分别是&#xff1a; ActionBar&#xff1a;提供Actionbar用户界面模式的实现&#xff1b;AppCompatActivity&#xff1a;添加可…