部署
使用版本:apache-livy-0.7.1
环境:spark(配置中已添加hive-site.xml) , hive ,hadoop(hdfs+yarn) 基于centos
livy只需要配置两个文件(livy-env.sh,livy.conf):
livy-env.sh 中添加环境变量配置
export JAVA_HOME=/usr/local/jdk/jdk1.8.0_141
export HADOOP_HOME=/usr/local/jdk/azkabanWebServer/hadoop-2.7.1
export SPARK_CONF_DIR=/usr/local/jdk/azkabanWebServer/spark-2.4.8/conf
export SPARK_HOME=/usr/local/jdk/azkabanWebServer/spark-2.4.8
export HADOOP_CONF_DIR=/usr/local/jdk/azkabanWebServer/hadoop-2.7.1/etc/hadoop
export LIVY_SERVER_JAVA_OPTS="-Xmx2g"
其中HADOOP_CONF_DIR一定要添加,否则创建会话的时候,会话里面没有 appId 及appInfo信息,这是我之前踩过的坑,如下图:
livy.conf配置:
livy.spark.master = yarn
livy.spark.deploy-mode = cluster
livy.environment = production
livy.impersonation.enabled = true
livy.server.port = 8998
livy.server.session.timeout = 3600000
livy.server.recovery.mode = recovery
livy.server.recovery.state-store=filesystem
livy.server.recovery.state-store.url=/opt/apache/livy/tmp
livy.task.max.concurrent.count = 20
livy.repl.enableHiveContext = true
如果用spark-sql能操作hive里面的数据 livy.repl.enableHiveContext 一定要配置为true,否则 你用livy提交创建表 sql语句的时候,此创建的表,只对当前livy session有效,在新建的livy session中并不存在其他session中创建的表,因为此表没有存放在hive元数据中
启动sh livy-server start
使用livy
使用livy有两种操作模式:
1、交互式模式:这种模式就类似于在linux中spark-sql进入的交互模式一样,会给当前客户端创建一个会话,只要不进行quit,此会话一直会保留,且一次只能操作一个sql语句,操作的sql都能返回结果
如:创建会话rest接口(可以配置yarn运行的参数)
作业提交方式rest接口:
查询结果rest接口:
注意:这里查询结果返回默认最多只有1000条
看源码里面LivyConf有个配置:
2、非交互式模式(及batch模式):这种模式如常见的使用方式是:需要我们编写scala程序,打成jar文件,放到hdfs上,在进行rest batch提交的时候要指定执行哪个jar及main类,不会给调用方返回结果,因为创建的会话,在执行程序后,会自动释放掉,并不会保留
此种模式并不需要创建会话,如:
以下是交互式模式的Java操作rest接口的工具类:
package com.kyexpress.bdp.common.utils;import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Stopwatch;
import com.kyexpress.bdp.common.enums.LivySessionState;
import com.kyexpress.bdp.common.model.BdpLivyInfo;
import com.kyexpress.bdp.common.model.livy.LivySession;
import com.kyexpress.bdp.common.model.livy.LivySessionListResponse;
import com.kyexpress.bdp.common.model.livy.LivyStatement;
import lombok.extern.slf4j.Slf4j;
import okhttp3.*;
import org.apache.commons.lang3.StringUtils;import java.io.IOException;
import java.util.concurrent.TimeUnit;@Slf4j
public class LivyClient {protected String url;public static final int CREATE_SESSION_TIMEOUT = 10 * 60;public static String NAME_PREFIX = "bdp@";private OkHttpClient okHttpClient;public LivyClient(String livyUrl) {this.url = livyUrl;this.okHttpClient = new OkHttpClient.Builder().connectTimeout(50000, TimeUnit.MILLISECONDS).readTimeout(50000, TimeUnit.MILLISECONDS).build();}//create sessionpublic LivySession createSession(BdpLivyInfo bdpLivyInfo) throws IOException {JSONObject body = new JSONObject();body.put("proxyUser", bdpLivyInfo.getProxyUser());body.put("kind", "sql");body.put("driverMemory", bdpLivyInfo.getDriverMemory());body.put("driverCores", bdpLivyInfo.getDriverCores());body.put("executorMemory", bdpLivyInfo.getExecutorMemory());body.put("executorCores", bdpLivyInfo.getExecutorCores());body.put("numExecutors", bdpLivyInfo.getNumExecutors());body.put("queue", bdpLivyInfo.getQueue());body.put("heartbeatTimeoutInSecond",bdpLivyInfo.getHeartbeatTimeoutInSecond());body.put("name", NAME_PREFIX + GuidUtils.newGuild());JSONObject sparkConf = new JSONObject();sparkConf.put("spark.dynamicAllocation.enabled", true);sparkConf.put("spark.shuffle.service.enabled", true);body.put("conf", sparkConf);RequestBody requestBody = RequestBody.create(MediaType.parse("application/json; charset=utf-8"), body.toString());Request request = new Request.Builder().url(url + "/sessions").post(requestBody).header("X-Requested-By", "DataQuery").build();Response response = okHttpClient.newCall(request).execute();String responseContent = response.body().string();int code = response.code();if (!(code >= 200 && code < 300)) {throw new IOException("create session exception: " + responseContent);}LivySession session = JSONObject.parseObject(responseContent, LivySession.class);session.setLivyClient(this);session.setBdpLivyInfo(bdpLivyInfo);//轮询等待Stopwatch sw = Stopwatch.createStarted();for (; ; ) {try {Thread.sleep(2000);log.info("waiting some time for creating session:{}", session.getId());//超时处理long elapsed = sw.elapsed(TimeUnit.SECONDS);if (elapsed > CREATE_SESSION_TIMEOUT) {log.error("create session timeout(s) : " + CREATE_SESSION_TIMEOUT);deleteSession(session.getId());throw new IOException("create session timeout");}LivySession session_current = getSession(session.getId());session_current.setLivyClient(this);session_current.setBdpLivyInfo(bdpLivyInfo);if (StringUtils.equals(session_current.getState(), LivySessionState.idle.name())) {return session_current;} else if (StringUtils.equals(session_current.getState(), LivySessionState.starting.name())) {continue;} else {deleteSession(session.getId());log.error("create session error,state:{}", session_current.getState());}} catch (Exception e) {log.error("create session error:{}", e.getMessage());throw new IOException("create session error : " + e.getMessage());}}}//delete sessionpublic void deleteSession(String sessionId) throws IOException {Request request = new Request.Builder().url(url + "/sessions/" + sessionId).delete().header("X-Requested-By", "DataQuery").build();Response response = okHttpClient.newCall(request).execute();String responseContent = response.body().string();int code = response.code();if (!(code >= 200 && code < 300)) {throw new IOException("delete session exception: " + responseContent);}}//get sessionpublic LivySession getSession(String sessionId) throws IOException {Request request = new Request.Builder().url(url + "/sessions/" + sessionId).get().header("X-Requested-By", "DataQuery").build();Response response = okHttpClient.newCall(request).execute();String responseContent = response.body().string();int code = response.code();if (code == 404) {return null;}if (!(code >= 200 && code < 300)) {throw new IOException("get session exception: " + responseContent);}LivySession session = JSONObject.parseObject(responseContent, LivySession.class);return session;}//list sessionpublic LivySessionListResponse listSession() throws IOException {Request request = new Request.Builder().url(url + "/sessions/").get().header("X-Requested-By", "DataQuery").build();Response response = okHttpClient.newCall(request).execute();String responseContent = response.body().string();int code = response.code();if (!(code >= 200 && code < 300)) {throw new IOException("list session exception: " + responseContent);}LivySessionListResponse listSessionResponse = JSONObject.parseObject(responseContent, LivySessionListResponse.class);return listSessionResponse;}//execute statementpublic LivyStatement executeStatement(String sessionId, String code_, String kind) throws IOException {JSONObject body = new JSONObject();body.put("code", code_);body.put("kind", kind);RequestBody requestBody = RequestBody.create(MediaType.parse("application/json; charset=utf-8"), body.toString());Request request = new Request.Builder().url(url + "/sessions/" + sessionId + "/statements").post(requestBody).header("X-Requested-By", "DataQuery").build();Response response = okHttpClient.newCall(request).execute();String responseContent = response.body().string();int code = response.code();if (!(code >= 200 && code < 300)) {throw new IOException("execute statement exception: " + responseContent);}LivyStatement statement = JSONObject.parseObject(responseContent, LivyStatement.class);return statement;}//get statementpublic LivyStatement getStatement(String sessionId, String statementId) throws IOException {Request request = new Request.Builder().url(url + "/sessions/" + sessionId + "/statements/" + statementId).get().header("X-Requested-By", "DataQuery").build();Response response = okHttpClient.newCall(request).execute();String responseContent = response.body().string();int code = response.code();if (!(code >= 200 && code < 300)) {log.info("------response------------"+response.toString());throw new IOException("get statement exception: " + responseContent);}LivyStatement statement = JSONObject.parseObject(responseContent, LivyStatement.class);return statement;}//cancel statementpublic void cancelStatement(String sessionId, String statementId) throws IOException {Request request = new Request.Builder().url(url + "/sessions/" + sessionId + "/statements/" + statementId + "/cancel").post(RequestBody.create(MediaType.parse("application/json; charset=utf-8"), "")).header("X-Requested-By", "DataQuery").build();Response response = okHttpClient.newCall(request).execute();String responseContent = response.body().string();int code = response.code();if (!(code >= 200 && code < 300)) {throw new IOException("cancel statement exception: " + responseContent);}}
}