最全livy代码实战

article/2025/10/13 7:57:35

livy分析应用实战

Github地址:最全livy代码实战

Apache Livy提供Rest service来与Apache Spark进行交互,通过Rest interface或RPC client来简化spark job和spark code snippet的提交,同步或异步获取结果,并提供对spark context的管理。

看下llivy官网的图,我们可以大概了解到livy的工作过程,其实就是充当了当我们提交作业到spark上的这个角色。以前我们直接要到有spark client的机器上进行submit的操作,现在我们只要使用restful就能完成所有的操作。

本文重点是我们从0到1分析运用最最实际的用法,其他的用法可以在此基础上探索。

1、livy安装

1、下载

下载最新版代码

https://github.com/apache/incubator-livy

2、源码`编译

由于我要对接的是spark3.1.1版本,没有现成的livy版本,需要修改后再编辑

修改pom

我用git bash进行编译的,因为里面有bash命令

mvn -DskipTests clean package

2、安装

unzip apache-livy-0.7.1-incubating-bin.zip
cd /opt
mv apache-livy-0.7.1-incubating-bin livy

3、配置

cd /opt/livy
mkdir logs
mkdir tmfile
cd /opt/livy/conf
cp livy.conf.template livy.conf
cp livy-client.conf.template livy-client.conf
cp livy-env.sh.template livy-env.sh
cp log4j.properties.template log4j.properties

vi livy.conf

livy.server.port = 18998
livy.server.host = 192.168.52.101
livy.spark.master = spark://192.168.52.101:7077
livy.server.session.timeout = 1h
livy.file.local-dir-whitelist = /opt/livy/tmpfile
livy.session.staging-dir=/opt/livy/tmpfile/tmp
livy.spark.deploy-mode = client
livy.repl.enable-hive-context = true

vi livy-env.sh

#java路径
export JAVA_HOME=/usr/local/jdk
#spark路径
export SPARK_HOME=/opt/spark
#livyserver内存
export LIVY SERVER JAVA OPTS="-XmX1G"
#livy log存放位置
export LIVY_LOG_DIR=/opt/livy/logs
#livy PID文件存放位置
export LIVY_PID_DIR=/opt/livy/tmpfile/tmp
export LIVY_TEST=true

vi livy-client.conf

livy.rsc.rpc.server.address = 192.168.52.101

4、启动

启动之前把spark先启动了,这边spark的启动不展开了。

cd /opt/livy/bin
./livy-server start

2、livy使用

livy包括两种会话模型,我们一一通过代码去分析。

Github地址:最全livy代码实战

1、Batch Session方式

用户可以通过Livy以批处理的方式启动Spark应用,与spark中的sumbit是一样的,livy中就叫Batch Session,批量的,执行成功后,会关闭当前会话。

1、首先我们写一个spark算子

package com.mn.demoimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}/*** @program:hadoop* @description* @author:miaoneng* @create:2023-02-05 17:28**/
object WordCount {def main(args: Array[String]): Unit = {if(args.length < 2){println("要输入input和output")System.exit(1);}//spark上下文执行环境val conf:SparkConf = new SparkConf().setAppName("wordcount");val sc:SparkContext = new SparkContext(conf);val lines:RDD[String]= sc.textFile(args(0))val words:RDD[String] = lines.flatMap(_.split(" "))val wordAndOnes:RDD[(String,Int)] = words.map((_,1))val result:RDD[(String,Int)] = wordAndOnes.reduceByKey(_+_)result.repartition(1).saveAsTextFile(args(1))sc.stop()}
}

2、打包

我们把spark程序打包,然后上传到livy的机器上【这边livy上可以加个函数,接收jar,放置在指定的地方】

我们的路径是 /opt/livy/tmpfile/jars/spark-1.0-SNAPSHOT.jar,同时这个路径也是在上面livy中配置了白名单的livy.file.local-dir-whitelist。否则你的jar将会提交不上去。

3、利用livy进行提交

package com.mn.livy;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.http.HttpEntity;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.apache.livy.LivyClient;
import org.apache.livy.LivyClientBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;/*** @program:hadoop* @description* @author:miaoneng* @create:2023-02-05 20:18**/
public class BatchDemo {private final static Logger logger = LoggerFactory.getLogger(BatchDemo.class);public static void main(String[] args) throws IOException, URISyntaxException {String livyUrl = "http://192.168.52.101:18998";Map<String ,Object> params = new HashMap<>();params.put("file","/opt/livy/tmpfile/jars/spark-1.0-SNAPSHOT.jar");params.put("className","com.mn.demo.WordCount");List<String > t = new ArrayList<>();t.add("/opt/test.txt");t.add("/opt/result");params.put("args",t);String ret = post(livyUrl + "/batches",new JSONObject(params).toString());Map<String, Object> v = (Map<String, Object>) JSON.parse(ret);String sessionId = v.get("id").toString();logger.info(sessionId);}public static String post(String url, String jsonStr) throws IOException {String mssg;CloseableHttpClient httpClient = HttpClients.createDefault();HttpPost httpPost = new HttpPost(url);RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(30000).setConnectionRequestTimeout(30000).setSocketTimeout(60000).build();httpPost.setConfig(requestConfig);httpPost.setHeader("Content-type", "application/json");CloseableHttpResponse httpResponse = null;try {httpPost.setEntity(new StringEntity(jsonStr, "utf-8"));httpResponse = httpClient.execute(httpPost);HttpEntity httpEntity = httpResponse.getEntity();return EntityUtils.toString(httpEntity);} catch (IOException e) {throw e;} finally {try {if (httpResponse != null) {httpResponse.close();}if (httpClient != null) {httpClient.close();}} catch (IOException e) {e.printStackTrace();}}}
}

4、结果

生成的结果也OK

2、Interactive session方式

第二种方式是交互式的,类似于spark-shell,交互式的。

我们第二个例子就不按照官网的demo来。

我们假设一个场景,我们用scala写好了一段代码,打个一个包,我让别人用java,根据类名,参数进行不断的提交。

例如:提交“add”,参数为10,10,那么spark帮我们计算出是20,提交“sub”,20和10,spark帮我们计算出为10。当然实际工作中会复杂很多。

通过连续性的工作就类似与spark-shell。下面我们来实现上面的场景。

分为核心算法jar:spark.jar

调用核心算法jar:livy.jar

未来还可以进一步封装成一个调用接口类:目前没有做。这样写业务的直接调用接口,完全感知不到livy的存在。

1、spark.jar

先进行写一个简单的scala程序。实现加,减操作。

此块代码就是spark处理的核心代码jar中。我们写在spark的jar中。

package com.mn.demoimport com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.spark.sql.SparkSession/*** @program:hadoop* @description* @author:miaoneng* @create:2023-02-11 12:29**/
object DataOperate {def operateData(spark:SparkSession,stepId:String,jsonDataStr:String):Int ={println(stepId)println(jsonDataStr)var obj:JSONObject = JSON.parse(jsonDataStr).asInstanceOf[JSONObject]val x = obj.get("x").asInstanceOf[Int]val y = obj.get("y").asInstanceOf[Int]if(stepId=="add"){add(x,y)}else if(stepId=="sub"){sub(x,y)}else{0}}def add(x:Int,y:Int): Int ={x+y}def sub(x:Int,y:Int):Int= {x-y}
}

2、livy.jar

我们通过livy来写一个类调用spark代码的core类。

此代码放在我们自己写的livy的jar中。

package com.mn.core;import com.mn.demo.DataOperate;
import org.apache.livy.Job;
import org.apache.livy.JobContext;
import org.apache.spark.sql.SparkSession;/*** @program:hadoop* @description 作业* @author:miaoneng* @create:2023-02-11 13:19**/
public class MyJob implements Job<Integer> {// 方法名private String methodName = "";// 所存步骤参数private String params;public MyJob(String methodName, String params) {this.methodName = methodName;this.params = params;}@Overridepublic Integer call(JobContext jobContext) throws Exception {//让spark算子包进行计算return callSpark(jobContext.sparkSession(), methodName, params);}public Integer callSpark(SparkSession sparkSession, String methodName, String params) {return DataOperate.operateData(sparkSession, methodName, params);}
}

3、业务调用

1、定义步骤:List stepList

2、拆解步骤。

3、通过livy调用spark。

目前我们演示,直接写在livy.jar中。

package com.mn.livy;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.mn.core.MyJob;
import org.apache.livy.LivyClient;
import org.apache.livy.LivyClientBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;/*** @program:hadoop* @description* @author:miaoneng* @create:2023-02-11 13:31**/
public class SeesionDemo {private final static Logger logger = LoggerFactory.getLogger(SeesionDemo.class);public static String url = "http://192.168.52.101:18998";public static void main(String[] args) {//定义步骤1,调用add方法,两个参数为x:10,y:10String jsonString1 =  "{\"methodName\":\"add\",\"params\":{\"x\":10,\"y\":10}}";//定义步骤2,调用sub方法,两个参数为x:20,y:10String jsonString2 =  "{\"methodName\":\"sub\",\"params\":{\"x\":20,\"y\":10}}";List<String> stepList = new ArrayList<>();stepList.add(jsonString1);stepList.add(jsonString2);try {callSpark(stepList);} catch (IOException e) {e.printStackTrace();} catch (URISyntaxException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}public static void callSpark(List<String> stepList) throws IOException, URISyntaxException, ExecutionException, InterruptedException {LivyClient livyClient = new LivyClientBuilder().setURI(new URI(url)).build();//换成你自己的路径//用livy客户端将我们写的Spark的算子包提交到spark环境中livyClient.uploadJar(new File("E:\\github\\hadoop\\spark\\target\\spark-1.0-SNAPSHOT.jar")).get();//livy写的业务操作jarlivyClient.uploadJar(new File("E:\\github\\hadoop\\livy\\target\\livy-1.0-SNAPSHOT.jar")).get();//spark.jar中用到的fastjson包//1、可以在打spark.jar的时候可以打成一个胖jar包//2、可以写个代码把要上传的jar规范好,让大家按照这个格式列好spark.jar用到的额外jar,然后一个个upload上去livyClient.uploadJar(new File("E:\\github\\jars\\fastjson-1.2.83.jar")).get();//拆解需要执行的步骤Integer ret = 0;for(int i =0 ;i < stepList.size();i++){JSONObject jsonObject = JSON.parseObject(stepList.get(i));String methodName = jsonObject.get("methodName").toString();String params = jsonObject.get("params").toString();ret = livyClient.submit(new MyJob(methodName,params)).get();logger.info("步骤"+i+":所做的操作是"+methodName+",结果为:"+ret);}livyClient.stop(true);}
}

4、参数

一些参数就在后期的使用中慢慢来加入。


http://chatgpt.dhexx.cn/article/QFmnUQD2.shtml

相关文章

livy部署和使用

部署 使用版本&#xff1a;apache-livy-0.7.1 环境&#xff1a;spark(配置中已添加hive-site.xml) , hive ,hadoop(hdfsyarn) 基于centos livy只需要配置两个文件(livy-env.sh,livy.conf)&#xff1a; livy-env.sh 中添加环境变量配置 export JAVA_HOME/usr/local/jdk/jdk1.8…

Spark开源REST服务——Apache Livy(Spark 客户端)

文章目录 一、概述二、Apache Livy模块介绍1&#xff09;Client2&#xff09;router3&#xff09;权限管理4&#xff09;生成 Spark App5&#xff09;交互式 Driver6&#xff09;状态数据存储 三、Apache Livy架构1&#xff09;Livy架构2&#xff09;Livy执行作业流程 四、环境…

spark系列-应用篇之通过livy提交Spark任务

#前言 上一篇使用yarn api的提交spark任务比较麻烦&#xff0c; 这次我们使用livy来提交spark任务。livy的使用十分简单&#xff0c;功能上比yarn api还要强大&#xff0c;支持提交spark代码片断&#xff0c;session共享 #安装 下载livy0.5.0&#xff0c;下载完成后进行解压。…

Spark Livy 指南及livy部署访问实践

背景&#xff1a; Apache Spark 是一个比较流行的大数据框架、广泛运用于数据处理、数据分析、机器学习中&#xff0c;它提供了两种方式进行数据处理&#xff0c;一是交互式处理&#xff1a;比如用户使用spark-shell&#xff0c;编写交互式代码编译成spark作业提交到集群上去执…

Livy简单使用 架构解读

Livy使用 —— 关于Session的操作 官网案例&#xff1a;http://livy.incubator.apache.org/examples/ REST API&#xff1a;http://livy.incubator.apache.org/docs/latest/rest-api.html 通过REST API的方式去获取到session&#xff0c;返回活的交互式session 打开Postman&a…

【云原生】Apache Livy on k8s 讲解与实战操作

文章目录 一、概述二、开始编排部署1&#xff09;部署包准备1&#xff09;构建镜像2&#xff09;创建livy chart模板3&#xff09;修改yaml编排4&#xff09;开始部署5&#xff09;测试验证6&#xff09;卸载 一、概述 Livy是一个提供Rest接口和spark集群交互的服务。它可以提交…

Livy:基于Apache Spark的REST服务

原文&#xff1a;http://geek.csdn.net/news/detail/208943 Apache Spark提供的两种基于命令行的处理交互方式虽然足够灵活&#xff0c;但在企业应用中面临诸如部署、安全等问题。为此本文引入Livy这样一个基于Apache Spark的REST服务&#xff0c;它不仅以REST的方式代替了Spar…

Livy安装使用

本次部署的livy是0.7.0版&#xff0c;zip包下载地址&#xff1a;https://mirrors.tuna.tsinghua.edu.cn/apache/incubator/livy/0.7.0-incubating/apache-livy-0.7.0-incubating-bin.zip 安装java jdk1.8 步骤&#xff1a; 一、将下载好的livy的zip包用命令unzip去解压(如果…

livy的安装使用

2019独角兽企业重金招聘Python工程师标准>>> livy简介 Livy是一个提供rest接口和spark集群交互的服务。它可以提交spark job或者spark一段代码,同步或者异步的返回结果;也提供sparkcontext的管理,通过restfull接口或RPC客户端库。Livy也简化了与spark与应用服务的交…

Livy任务提交源码解析

文章目录 简介任务路由代码断任务远程Driver上建立RpcServerLivyServer接收客户端提交代码断任务LivyServer向远程Driver的RpcServer提交任务远程Driver的RpcServer接收任务Driver执行代码断任务Batch任务LivyServer接收batch任务创建BatchSession以提交Spark任务创建SparkYarn…

livy简介

livy简介 什么是livy Livy通过提供REST服务来简化与Spark集群的交互。它可以通过job或者代码片段的方式来提交Spark任务&#xff0c;并同步或者异步地获得任务的结果&#xff0c;以及管理spark context&#xff0c;上述功能通过简单的REST接口或者RPC服务来实现。livy也可以简…

livy部署及应用

一、介绍 Livy把spark交互式和批处理都搬到了web上&#xff0c;提供restful接口&#xff0c;Livy一方面接收并解析客户端提交的REST请求&#xff0c;转换成相应的操作&#xff0c;另一方面它管理着客户端所启动的spark集群 Livy会为用户运行多个session&#xff0c;每个sessio…

C/C++ 实现字符串IP与整数型IP的相互转换

#include <stdio.h> int main() {char ip[32] "192.168.1.151"; //IP值char scIPAddress[32] ""; //存储字符串IPunsigned int nIPAddress 0; //存储整形IPint nTmpIP[4] {0}; //分割IPint i0;//字符串转整形sscanf(ip,"%d.%d.%d.%…

数字字符串转化成 IP 地址

数字字符串转化成 IP 地址 1、参考资料 https://www.nowcoder.com/practice/ce73540d47374dbe85b3125f57727e1e 2、题目要求 题目描述 现在有一个只包含数字的字符串&#xff0c;将该字符串转化成IP地址的形式&#xff0c;返回所有可能的情况。 例如&#xff1a; 给出的字…

IP地址(IPV6)与long数组之间的转换

IP地址&#xff08;IPV6&#xff09;与long数组之间的转换 《IP地址&#xff08;IPV4&#xff09;与int类型之间的转换》《IP地址&#xff08;IPV6&#xff09;与long数组之间的转换》 一、前言 IPv6是英文“Internet Protocol Version 6”&#xff08;互联网协议第6版&#…

如何将字符数串和IP地址进行转换?

这一部分主要是网络编程中会使用&#xff0c;将数串和IP地址进行转换&#xff0c;在进行转换之前&#xff0c;我们需要知道IP地址在linux系统中的结构体定义 地址类型结构体 具体如下表&#xff1a; 结构体功能特性struct sockaddr套接字地址结构IPv4/IPv6通用struct sockad…

如何将IP地址字符串转换为数字数组

如何将IP地址字符串转换为数字数组 最近在做一个项目用到LWIP&#xff0c;通过触摸屏幕上的数字键盘输入要设置的IP地址和网关地址&#xff0c;然后再用输入的地址去设置重新设置lwip。那么问题就来了&#xff0c;输入的IP地址字符串应该怎么去转换成 ip[4] 数组呢&#xff1f…

IP地址字符串和数组相互转换

需求描述&#xff1a; 将字符串“192.168.2.126”&#xff0c;转成Byte类型&#xff0c;存放在字节数组中。数组内容为 192,168,2&#xff0c;126。反之亦然。 实现方法&#xff1a; 通过C# 库中的IPAddress类完成。 IPAddress类 对应的命名空间是using System.Net; 实现代…

花3个月面过华为测开岗,拿个30K不过分吧?

计算机专业&#xff0c;代码能力一般&#xff0c;之前有过两段实习以及一个学校项目经历。第一份实习是大二暑期在深圳的一家互联网公司做前端开发&#xff0c;第二份实习由于大三暑假回国的时间比较短&#xff08;小于两个月&#xff09;&#xff0c;于是找的实习是在一家初创…

华为OD德科面试+机试记录

一、机试&#xff08;6.25&#xff09; 三道编程题&#xff0c;难度偏中。由于时间久远&#xff0c;只记得其中两道题目 1、找车位&#xff08;动态规划&#xff09; 2、题目不记得了&#xff0c;后面如果找到会补充&#xff08;双指针&#xff09; 3、高效的任务规划&#x…