livy部署和使用

article/2025/10/13 7:57:37

部署

 使用版本:apache-livy-0.7.1

 环境:spark(配置中已添加hive-site.xml) , hive ,hadoop(hdfs+yarn) 基于centos

 livy只需要配置两个文件(livy-env.sh,livy.conf):

livy-env.sh 中添加环境变量配置

export JAVA_HOME=/usr/local/jdk/jdk1.8.0_141
export HADOOP_HOME=/usr/local/jdk/azkabanWebServer/hadoop-2.7.1
export SPARK_CONF_DIR=/usr/local/jdk/azkabanWebServer/spark-2.4.8/conf
export SPARK_HOME=/usr/local/jdk/azkabanWebServer/spark-2.4.8
export HADOOP_CONF_DIR=/usr/local/jdk/azkabanWebServer/hadoop-2.7.1/etc/hadoop
export LIVY_SERVER_JAVA_OPTS="-Xmx2g"

其中HADOOP_CONF_DIR一定要添加,否则创建会话的时候,会话里面没有 appId 及appInfo信息,这是我之前踩过的坑,如下图:

 livy.conf配置:

livy.spark.master = yarn
livy.spark.deploy-mode = cluster
livy.environment = production
livy.impersonation.enabled = true
livy.server.port = 8998
livy.server.session.timeout = 3600000
livy.server.recovery.mode = recovery
livy.server.recovery.state-store=filesystem
livy.server.recovery.state-store.url=/opt/apache/livy/tmp
livy.task.max.concurrent.count = 20
livy.repl.enableHiveContext = true

如果用spark-sql能操作hive里面的数据  livy.repl.enableHiveContext 一定要配置为true,否则 你用livy提交创建表 sql语句的时候,此创建的表,只对当前livy session有效,在新建的livy session中并不存在其他session中创建的表,因为此表没有存放在hive元数据中

启动sh  livy-server start

使用livy

使用livy有两种操作模式:

1、交互式模式:这种模式就类似于在linux中spark-sql进入的交互模式一样,会给当前客户端创建一个会话,只要不进行quit,此会话一直会保留,且一次只能操作一个sql语句,操作的sql都能返回结果

如:创建会话rest接口(可以配置yarn运行的参数

作业提交方式rest接口:

 查询结果rest接口:

注意:这里查询结果返回默认最多只有1000条

看源码里面LivyConf有个配置:

 

2、非交互式模式(及batch模式):这种模式如常见的使用方式是:需要我们编写scala程序,打成jar文件,放到hdfs上,在进行rest batch提交的时候要指定执行哪个jar及main类,不会给调用方返回结果,因为创建的会话,在执行程序后,会自动释放掉,并不会保留

此种模式并不需要创建会话,如:

 

 以下是交互式模式的Java操作rest接口的工具类:

package com.kyexpress.bdp.common.utils;import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Stopwatch;
import com.kyexpress.bdp.common.enums.LivySessionState;
import com.kyexpress.bdp.common.model.BdpLivyInfo;
import com.kyexpress.bdp.common.model.livy.LivySession;
import com.kyexpress.bdp.common.model.livy.LivySessionListResponse;
import com.kyexpress.bdp.common.model.livy.LivyStatement;
import lombok.extern.slf4j.Slf4j;
import okhttp3.*;
import org.apache.commons.lang3.StringUtils;import java.io.IOException;
import java.util.concurrent.TimeUnit;@Slf4j
public class LivyClient {protected String url;public static final int CREATE_SESSION_TIMEOUT = 10 * 60;public static String NAME_PREFIX = "bdp@";private OkHttpClient okHttpClient;public LivyClient(String livyUrl) {this.url = livyUrl;this.okHttpClient = new OkHttpClient.Builder().connectTimeout(50000, TimeUnit.MILLISECONDS).readTimeout(50000, TimeUnit.MILLISECONDS).build();}//create sessionpublic LivySession createSession(BdpLivyInfo bdpLivyInfo) throws IOException {JSONObject body = new JSONObject();body.put("proxyUser", bdpLivyInfo.getProxyUser());body.put("kind", "sql");body.put("driverMemory", bdpLivyInfo.getDriverMemory());body.put("driverCores", bdpLivyInfo.getDriverCores());body.put("executorMemory", bdpLivyInfo.getExecutorMemory());body.put("executorCores", bdpLivyInfo.getExecutorCores());body.put("numExecutors", bdpLivyInfo.getNumExecutors());body.put("queue", bdpLivyInfo.getQueue());body.put("heartbeatTimeoutInSecond",bdpLivyInfo.getHeartbeatTimeoutInSecond());body.put("name", NAME_PREFIX + GuidUtils.newGuild());JSONObject sparkConf = new JSONObject();sparkConf.put("spark.dynamicAllocation.enabled", true);sparkConf.put("spark.shuffle.service.enabled", true);body.put("conf", sparkConf);RequestBody requestBody = RequestBody.create(MediaType.parse("application/json; charset=utf-8"), body.toString());Request request = new Request.Builder().url(url + "/sessions").post(requestBody).header("X-Requested-By", "DataQuery").build();Response response = okHttpClient.newCall(request).execute();String responseContent = response.body().string();int code = response.code();if (!(code >= 200 && code < 300)) {throw new IOException("create session exception: " + responseContent);}LivySession session = JSONObject.parseObject(responseContent, LivySession.class);session.setLivyClient(this);session.setBdpLivyInfo(bdpLivyInfo);//轮询等待Stopwatch sw = Stopwatch.createStarted();for (; ; ) {try {Thread.sleep(2000);log.info("waiting some time for creating session:{}", session.getId());//超时处理long elapsed = sw.elapsed(TimeUnit.SECONDS);if (elapsed > CREATE_SESSION_TIMEOUT) {log.error("create session timeout(s) : " + CREATE_SESSION_TIMEOUT);deleteSession(session.getId());throw new IOException("create session timeout");}LivySession session_current = getSession(session.getId());session_current.setLivyClient(this);session_current.setBdpLivyInfo(bdpLivyInfo);if (StringUtils.equals(session_current.getState(), LivySessionState.idle.name())) {return session_current;} else if (StringUtils.equals(session_current.getState(), LivySessionState.starting.name())) {continue;} else {deleteSession(session.getId());log.error("create session error,state:{}", session_current.getState());}} catch (Exception e) {log.error("create session error:{}", e.getMessage());throw new IOException("create session error : " + e.getMessage());}}}//delete sessionpublic void deleteSession(String sessionId) throws IOException {Request request = new Request.Builder().url(url + "/sessions/" + sessionId).delete().header("X-Requested-By", "DataQuery").build();Response response = okHttpClient.newCall(request).execute();String responseContent = response.body().string();int code = response.code();if (!(code >= 200 && code < 300)) {throw new IOException("delete session exception: " + responseContent);}}//get sessionpublic LivySession getSession(String sessionId) throws IOException {Request request = new Request.Builder().url(url + "/sessions/" + sessionId).get().header("X-Requested-By", "DataQuery").build();Response response = okHttpClient.newCall(request).execute();String responseContent = response.body().string();int code = response.code();if (code == 404) {return null;}if (!(code >= 200 && code < 300)) {throw new IOException("get session exception: " + responseContent);}LivySession session = JSONObject.parseObject(responseContent, LivySession.class);return session;}//list sessionpublic LivySessionListResponse listSession() throws IOException {Request request = new Request.Builder().url(url + "/sessions/").get().header("X-Requested-By", "DataQuery").build();Response response = okHttpClient.newCall(request).execute();String responseContent = response.body().string();int code = response.code();if (!(code >= 200 && code < 300)) {throw new IOException("list session exception: " + responseContent);}LivySessionListResponse listSessionResponse = JSONObject.parseObject(responseContent, LivySessionListResponse.class);return listSessionResponse;}//execute statementpublic LivyStatement executeStatement(String sessionId, String code_, String kind) throws IOException {JSONObject body = new JSONObject();body.put("code", code_);body.put("kind", kind);RequestBody requestBody = RequestBody.create(MediaType.parse("application/json; charset=utf-8"), body.toString());Request request = new Request.Builder().url(url + "/sessions/" + sessionId + "/statements").post(requestBody).header("X-Requested-By", "DataQuery").build();Response response = okHttpClient.newCall(request).execute();String responseContent = response.body().string();int code = response.code();if (!(code >= 200 && code < 300)) {throw new IOException("execute statement exception: " + responseContent);}LivyStatement statement = JSONObject.parseObject(responseContent, LivyStatement.class);return statement;}//get statementpublic LivyStatement getStatement(String sessionId, String statementId) throws IOException {Request request = new Request.Builder().url(url + "/sessions/" + sessionId + "/statements/" + statementId).get().header("X-Requested-By", "DataQuery").build();Response response = okHttpClient.newCall(request).execute();String responseContent = response.body().string();int code = response.code();if (!(code >= 200 && code < 300)) {log.info("------response------------"+response.toString());throw new IOException("get statement exception: " + responseContent);}LivyStatement statement = JSONObject.parseObject(responseContent, LivyStatement.class);return statement;}//cancel statementpublic void cancelStatement(String sessionId, String statementId) throws IOException {Request request = new Request.Builder().url(url + "/sessions/" + sessionId + "/statements/" + statementId + "/cancel").post(RequestBody.create(MediaType.parse("application/json; charset=utf-8"), "")).header("X-Requested-By", "DataQuery").build();Response response = okHttpClient.newCall(request).execute();String responseContent = response.body().string();int code = response.code();if (!(code >= 200 && code < 300)) {throw new IOException("cancel statement exception: " + responseContent);}}
}


http://chatgpt.dhexx.cn/article/kgjvNphn.shtml

相关文章

Spark开源REST服务——Apache Livy(Spark 客户端)

文章目录 一、概述二、Apache Livy模块介绍1&#xff09;Client2&#xff09;router3&#xff09;权限管理4&#xff09;生成 Spark App5&#xff09;交互式 Driver6&#xff09;状态数据存储 三、Apache Livy架构1&#xff09;Livy架构2&#xff09;Livy执行作业流程 四、环境…

spark系列-应用篇之通过livy提交Spark任务

#前言 上一篇使用yarn api的提交spark任务比较麻烦&#xff0c; 这次我们使用livy来提交spark任务。livy的使用十分简单&#xff0c;功能上比yarn api还要强大&#xff0c;支持提交spark代码片断&#xff0c;session共享 #安装 下载livy0.5.0&#xff0c;下载完成后进行解压。…

Spark Livy 指南及livy部署访问实践

背景&#xff1a; Apache Spark 是一个比较流行的大数据框架、广泛运用于数据处理、数据分析、机器学习中&#xff0c;它提供了两种方式进行数据处理&#xff0c;一是交互式处理&#xff1a;比如用户使用spark-shell&#xff0c;编写交互式代码编译成spark作业提交到集群上去执…

Livy简单使用 架构解读

Livy使用 —— 关于Session的操作 官网案例&#xff1a;http://livy.incubator.apache.org/examples/ REST API&#xff1a;http://livy.incubator.apache.org/docs/latest/rest-api.html 通过REST API的方式去获取到session&#xff0c;返回活的交互式session 打开Postman&a…

【云原生】Apache Livy on k8s 讲解与实战操作

文章目录 一、概述二、开始编排部署1&#xff09;部署包准备1&#xff09;构建镜像2&#xff09;创建livy chart模板3&#xff09;修改yaml编排4&#xff09;开始部署5&#xff09;测试验证6&#xff09;卸载 一、概述 Livy是一个提供Rest接口和spark集群交互的服务。它可以提交…

Livy:基于Apache Spark的REST服务

原文&#xff1a;http://geek.csdn.net/news/detail/208943 Apache Spark提供的两种基于命令行的处理交互方式虽然足够灵活&#xff0c;但在企业应用中面临诸如部署、安全等问题。为此本文引入Livy这样一个基于Apache Spark的REST服务&#xff0c;它不仅以REST的方式代替了Spar…

Livy安装使用

本次部署的livy是0.7.0版&#xff0c;zip包下载地址&#xff1a;https://mirrors.tuna.tsinghua.edu.cn/apache/incubator/livy/0.7.0-incubating/apache-livy-0.7.0-incubating-bin.zip 安装java jdk1.8 步骤&#xff1a; 一、将下载好的livy的zip包用命令unzip去解压(如果…

livy的安装使用

2019独角兽企业重金招聘Python工程师标准>>> livy简介 Livy是一个提供rest接口和spark集群交互的服务。它可以提交spark job或者spark一段代码,同步或者异步的返回结果;也提供sparkcontext的管理,通过restfull接口或RPC客户端库。Livy也简化了与spark与应用服务的交…

Livy任务提交源码解析

文章目录 简介任务路由代码断任务远程Driver上建立RpcServerLivyServer接收客户端提交代码断任务LivyServer向远程Driver的RpcServer提交任务远程Driver的RpcServer接收任务Driver执行代码断任务Batch任务LivyServer接收batch任务创建BatchSession以提交Spark任务创建SparkYarn…

livy简介

livy简介 什么是livy Livy通过提供REST服务来简化与Spark集群的交互。它可以通过job或者代码片段的方式来提交Spark任务&#xff0c;并同步或者异步地获得任务的结果&#xff0c;以及管理spark context&#xff0c;上述功能通过简单的REST接口或者RPC服务来实现。livy也可以简…

livy部署及应用

一、介绍 Livy把spark交互式和批处理都搬到了web上&#xff0c;提供restful接口&#xff0c;Livy一方面接收并解析客户端提交的REST请求&#xff0c;转换成相应的操作&#xff0c;另一方面它管理着客户端所启动的spark集群 Livy会为用户运行多个session&#xff0c;每个sessio…

C/C++ 实现字符串IP与整数型IP的相互转换

#include <stdio.h> int main() {char ip[32] "192.168.1.151"; //IP值char scIPAddress[32] ""; //存储字符串IPunsigned int nIPAddress 0; //存储整形IPint nTmpIP[4] {0}; //分割IPint i0;//字符串转整形sscanf(ip,"%d.%d.%d.%…

数字字符串转化成 IP 地址

数字字符串转化成 IP 地址 1、参考资料 https://www.nowcoder.com/practice/ce73540d47374dbe85b3125f57727e1e 2、题目要求 题目描述 现在有一个只包含数字的字符串&#xff0c;将该字符串转化成IP地址的形式&#xff0c;返回所有可能的情况。 例如&#xff1a; 给出的字…

IP地址(IPV6)与long数组之间的转换

IP地址&#xff08;IPV6&#xff09;与long数组之间的转换 《IP地址&#xff08;IPV4&#xff09;与int类型之间的转换》《IP地址&#xff08;IPV6&#xff09;与long数组之间的转换》 一、前言 IPv6是英文“Internet Protocol Version 6”&#xff08;互联网协议第6版&#…

如何将字符数串和IP地址进行转换?

这一部分主要是网络编程中会使用&#xff0c;将数串和IP地址进行转换&#xff0c;在进行转换之前&#xff0c;我们需要知道IP地址在linux系统中的结构体定义 地址类型结构体 具体如下表&#xff1a; 结构体功能特性struct sockaddr套接字地址结构IPv4/IPv6通用struct sockad…

如何将IP地址字符串转换为数字数组

如何将IP地址字符串转换为数字数组 最近在做一个项目用到LWIP&#xff0c;通过触摸屏幕上的数字键盘输入要设置的IP地址和网关地址&#xff0c;然后再用输入的地址去设置重新设置lwip。那么问题就来了&#xff0c;输入的IP地址字符串应该怎么去转换成 ip[4] 数组呢&#xff1f…

IP地址字符串和数组相互转换

需求描述&#xff1a; 将字符串“192.168.2.126”&#xff0c;转成Byte类型&#xff0c;存放在字节数组中。数组内容为 192,168,2&#xff0c;126。反之亦然。 实现方法&#xff1a; 通过C# 库中的IPAddress类完成。 IPAddress类 对应的命名空间是using System.Net; 实现代…

花3个月面过华为测开岗,拿个30K不过分吧?

计算机专业&#xff0c;代码能力一般&#xff0c;之前有过两段实习以及一个学校项目经历。第一份实习是大二暑期在深圳的一家互联网公司做前端开发&#xff0c;第二份实习由于大三暑假回国的时间比较短&#xff08;小于两个月&#xff09;&#xff0c;于是找的实习是在一家初创…

华为OD德科面试+机试记录

一、机试&#xff08;6.25&#xff09; 三道编程题&#xff0c;难度偏中。由于时间久远&#xff0c;只记得其中两道题目 1、找车位&#xff08;动态规划&#xff09; 2、题目不记得了&#xff0c;后面如果找到会补充&#xff08;双指针&#xff09; 3、高效的任务规划&#x…

准备4个月过华为测试岗,拿个23k应该不多吧

我大学是学的编程专业&#xff0c;写代码能力非常一般&#xff0c;之前有一个学校项目经验和两段实习。第一份实习是在进大三之前的暑假在广州一家软件公司做前端&#xff0c;第二份实习时大三暑假两个月在一家刚创业的公司做全栈。 我面试的是测试开发&#xff0c;在2022年初…