flink-sql-client提交sql脚本文件

article/2025/9/14 9:37:38

标题: flink-sql-client提交sql脚本文件
日期: 2021-10-22 22:11:34
标签: [flink,sql-client]
分类: flink

我们知道,sql-client.sh可以提供给我们一个sql交互界面,让我们没执行一个sql,就可以看到执行结果,也可以交互式查询表的结果。

其实,我们也可以通过sql-client提交sql脚本,我们来看下。

flink

./bin/sql-client.sh -h 对应的帮助参数:

(base) [chenzuoli@chenzuolis-MacBook /Volumes/chenzuoli/Data/docker_img/flink-1.12.1]$./bin/sql-client.sh -h
./sql-client [MODE] [OPTIONS]The following options are available:Mode "embedded" submits Flink jobs from the local machine.Syntax: embedded [OPTIONS]"embedded" mode options:-d,--defaults <environment file>      The environment properties with whichevery new session is initialized.Properties might be overwritten bysession properties.-e,--environment <environment file>   The environment properties to beimported into the session. It mightoverwrite default environmentproperties.-h,--help                             Show the help message withdescriptions of all options.-hist,--history <History file path>   The file which you want to save thecommand history into. If notspecified, we will auto-generate oneunder your user's home directory.-j,--jar <JAR file>                   A JAR file to be imported into thesession. The file might containuser-defined classes needed for theexecution of statements such asfunctions, table sources, or sinks.Can be used multiple times.-l,--library <JAR directory>          A JAR file directory with which everynew session is initialized. The filesmight contain user-defined classesneeded for the execution ofstatements such as functions, tablesources, or sinks. Can be usedmultiple times.-pyarch,--pyArchives <arg>            Add python archive files for job. Thearchive files will be extracted tothe working directory of python UDFworker. Currently only zip-format issupported. For each archive file, atarget directory be specified. If thetarget directory name is specified,the archive file will be extracted toa name can directory with thespecified name. Otherwise, thearchive file will be extracted to adirectory with the same name of thearchive file. The files uploaded viathis option are accessible viarelative path. '#' could be used asthe separator of the archive filepath and the target directory name.Comma (',') could be used as theseparator to specify multiple archivefiles. This option can be used toupload the virtual environment, thedata files used in Python UDF (e.g.:--pyArchivesfile:///tmp/py37.zip,file:///tmp/data.zip#data --pyExecutablepy37.zip/py37/bin/python). The datafiles could be accessed in PythonUDF, e.g.: f = open('data/data.txt','r').-pyexec,--pyExecutable <arg>          Specify the path of the pythoninterpreter used to execute thepython UDF worker (e.g.:--pyExecutable/usr/local/bin/python3). The pythonUDF worker depends on Python 3.5+,Apache Beam (version == 2.23.0), Pip(version >= 7.1.0) and SetupTools(version >= 37.0.0). Please ensurethat the specified environment meetsthe above requirements.-pyfs,--pyFiles <pythonFiles>         Attach custom python files for job.These files will be added to thePYTHONPATH of both the local clientand the remote python UDF worker. Thestandard python resource filesuffixes such as .py/.egg/.zip ordirectory are all supported. Comma(',') could be used as the separatorto specify multiple files (e.g.:--pyFilesfile:///tmp/myresource.zip,hdfs:///$namenode_address/myresource2.zip).-pyreq,--pyRequirements <arg>         Specify a requirements.txt file whichdefines the third-party dependencies.These dependencies will be installedand added to the PYTHONPATH of thepython UDF worker. A directory whichcontains the installation packages ofthese dependencies could be specifiedoptionally. Use '#' as the separatorif the optional parameter exists(e.g.: --pyRequirementsfile:///tmp/requirements.txt#file:///tmp/cached_dir).-s,--session <session identifier>     The identifier for a session.'default' is the default identifier.-u,--update <SQL update statement>    Experimental (for testing only!):Instructs the SQL Client toimmediately execute the given updatestatement after starting up. Theprocess is shut down after thestatement has been submitted to thecluster and returns an appropriatereturn code. Currently, this featureis only supported for INSERT INTOstatements that declare the targetsink table.

其中第一个参数-d,可以指定一些环境上的参数配置。

在这里插入图片描述

接下来,我们看看conf/sql-client-defaults.yaml文件,这个文件其实就是对应的配置文件。
创建测试用的数据文件:

mkdir sql_test
vim sql_test/book-store.csv枪炮、病菌和钢铁,18,社会学
APP UI设计之道,20,设计
通证经济,22,经济学
区块链的真正商机,21,经济学

我们再来创建一个自己的配置文件,读取csv文件,然后select出来,新建文件conf/book-store.yaml

vim conf/book-store.yamltables:- name: BookStoretype: source-tableupdate-mode: appendconnector:type: filesystempath: "/Users/zhaoqin/temp/202004/26/book-store.csv"format:type: csvfields:- name: BookNametype: VARCHAR- name: BookAmounttype: INT- name: BookCatalogtype: VARCHARline-delimiter: "\n"comment-prefix: ","schema:- name: BookNametype: VARCHAR- name: BookAmounttype: INT- name: BookCatalogtype: VARCHAR- name: MyBookViewtype: viewquery: "SELECT BookCatalog, SUM(BookAmount) AS Amount FROM BookStore GROUP BY BookCatalog"execution:planner: blink                    # optional: either 'blink' (default) or 'old'type: streaming                   # required: execution mode either 'batch' or 'streaming'result-mode: table                # required: either 'table' or 'changelog'max-table-result-rows: 1000000    # optional: maximum number of maintained rows in#   'table' mode (1000000 by default, smaller 1 means unlimited)time-characteristic: event-time   # optional: 'processing-time' or 'event-time' (default)parallelism: 1                    # optional: Flink's parallelism (1 by default)periodic-watermarks-interval: 200 # optional: interval for periodic watermarks (200 ms by default)max-parallelism: 16               # optional: Flink's maximum parallelism (128 by default)min-idle-state-retention: 0       # optional: table program's minimum idle state timemax-idle-state-retention: 0       # optional: table program's maximum idle state time#   (default database of the current catalog by default)restart-strategy:                 # optional: restart strategytype: fallback                  #   "fallback" to global restart strategy by default# Configuration options for adjusting and tuning table programs.# A full list of options and their default values can be found
# on the dedicated "Configuration" page.
configuration:table.optimizer.join-reorder-enabled: truetable.exec.spill-compression.enabled: truetable.exec.spill-compression.block-size: 128kb# Properties that describe the cluster to which table programs are submitted to.deployment:response-timeout: 5000

通过指定配置文件的方式,来启动一个session,执行相应的source-table和sink-table。
其中关于book-store.yaml配置文件,有几点需要注意:
a. tables.type等于source-table,表明这是数据源的配置信息;
b. tables.connector描述了详细的数据源信息,path是book-store.csv文件的完整路径,connector的type指定为filesystem,这跟我们写sql的时候指定的connector参数是一致的;
c. tables.format描述了文件内容,type为csv格式;
d. tables.schema描述了数据源表的表结构;
ed. type为view表示MyBookView是个视图(参考数据库的视图概念);

下面来看一下测试结果:

./bin/start-cluster.sh
./bin/sql-client.sh embedded -d conf/book-store.yaml

进入sql-client sql交互界面之后,可以看到环境已经配置好了,

Flink SQL> show tables;
BookStore
MyBookViewFlink SQL> desc BookStore;
+-------------+--------+------+-----+--------+-----------+
|        name |   type | null | key | extras | watermark |
+-------------+--------+------+-----+--------+-----------+
|    BookName | STRING | true |     |        |           |
|  BookAmount |    INT | true |     |        |           |
| BookCatalog | STRING | true |     |        |           |
+-------------+--------+------+-----+--------+-----------+
3 rows in setFlink SQL> desc MyBookView
> ;
+-------------+--------+------+-----+--------+-----------+
|        name |   type | null | key | extras | watermark |
+-------------+--------+------+-----+--------+-----------+
| BookCatalog | STRING | true |     |        |           |
|      Amount |    INT | true |     |        |           |
+-------------+--------+------+-----+--------+-----------+
2 rows in set

可以看到两个表已经创建好了,我们可以看一下数据:

select * from MyBookView;BookCatalog                    Amount社会学                        18设计                        20经济学                        43

对不对,ok了,你要是yaml文件中写有sink-table那么,直接就提交了一个flink job到flink集群了,是不是达到了提交flink sql脚本文件的效果了。

好了,今天就这样,因为这几天在倒腾公司数据平台组开发的一个 流数据平台,发现他们是通过sql-client,提交到k8s上的,这一个提交任务方式,着实让我感到意外。因为之前翻译过一篇官方提供的flink submit job的文章,里面提到了四种提交方式:

  1. local cluster
  2. application mode
  3. per job mode
  4. session mode
    我以为只有这四种呢,其实仔细看,sql-client提交sql的方式类似于session的方式,在整个session启动过程中,你可以不听地执行sql语句,session关闭,则任务关闭。

ok,下次见。

flink, yyds.


书山有路勤为径,学海无涯苦作舟。

欢迎关注我的微信公众号,比较喜欢分享知识,也喜欢宠物,所以做了这2个公众号:
程序员写书

喜欢宠物的朋友可以关注:【电巴克宠物Pets】
电巴克宠物

一起学习,一起进步。


http://chatgpt.dhexx.cn/article/2ANQr8yK.shtml

相关文章

如何在mysql中执行sql脚本文件

一、sql脚本文件 简介 xxxx.sql这种文件被称为sql脚本文件。sql脚本文件中编写了大量的sql语句。我们执行sql脚本文件的时候&#xff0c;该文件中所有的sql语句会全部执行&#xff01;批量的执行SQL语句&#xff0c;可以使用sql脚本文件。 上面这个vip.sql就是sql脚本文件&am…

使用sql脚本创建数据库表

准备脚本语句&#xff1a; CREATE TABLE test (title varchar(100) DEFAULT NULL,author varchar(10) DEFAULT NULL,digest varchar(255) DEFAULT NULL,content text,content_source_url varchar(500) DEFAULT NULL,thumb_media_id varchar(255) DEFAULT NULL,need_open_comme…

PowerDesigner生成Sql脚本

点击工具栏上的“Database”&#xff0c;选择“Change Current DBMS”进行修改导出脚本类型&#xff0c;可以选择mysql、sql server/ oracle 、db2等主流的数据库。 在DBMS中点击下拉菜单&#xff0c;选择要导出的数据库脚本&#xff0c;对名字进行自定义&#xff0c;点击确定即…

PowerDesigner导入sql脚本

一个好的数据库建模,不但可以让人直观的理解模型,充分的利用数据库技术,优化数据库的设计,而且还可以让新员工快速的熟悉数据库表结构与业务之间的关系.无奈的是随着开发过程中,数据库表结构字段的增删以及关联关系的变动给数据库模型带来维护上的巨大工作量.现为了维护上的简单…

dbeaver导入sql脚本

新建数据库 执行脚本 选择脚本文件 选择mysql 然后按确定就行了

springboot + mybatis启动时执行sql脚本

目录 1. 创建数据版本表&#xff0c;结构如下&#xff1a; 2. 创建HdVersion对象 3. 创建执行sql的dao 4. 创建dao对应的xml 5.创建sql执行器&#xff0c;实现ApplicationRunner 6. 结语 背景&#xff1a;项目开发或发布阶段修改表结构&#xff0c;项目更新时需要手动执行脚…

SpringBoot启动自动执行sql脚本

在开发当中我们每次发布服务都需要手动执行脚本&#xff0c;然后重启服务&#xff0c;而SpringBoot有服务启动自动执行sql脚本的功能的&#xff0c;可以为我们省去手动执行脚本的这一步&#xff0c;只需要部署新的服务即可。 这个功能是SpringBoot自带的不需要引入额外的依赖&a…

Excel数据转化为sql脚本

在实际项目开发中&#xff0c;有时会遇到客户让我们把大量Excel数据导入数据库的情况。这时我们就可以通过将Excel数据转化为sql脚本来批量导入数据库。 1 在数据前插入一列单元格&#xff0c;用来拼写sql语句。 具体写法&#xff1a;"insert into t_student (id,name,ag…

MySQL导出sql脚本文件

⭐️前言⭐️ sql脚本文件在我们做项目时&#xff0c;特别是学习别人的开源项目时经常需要进行导入导出操作&#xff0c;才能在自己的系统上跑起来&#xff0c;这篇文章主要介绍如何导出sql脚本文件&#xff0c;具体操作如下&#xff0c;附带截图详解。 &#x1f349;博客主页…

dataGrip导出sql脚本

1.打开dataGrip。 2.选择要导出的数据库表。 3.点击右键->选择"Dump Data to File(s)", 同时选择&#xff0c;Skip Computed Columns(sql),Add Table Definition(sql),Overwrite Exsting Files和Single File。 4.点击sql Inserts 5.选择文件保存位置 6.生成sql脚…

linux下plsql怎么执行sql脚本,plsql怎么执行sql脚本

首先,我们需要登录需要执行sql文件的用户,在我们确保sql文件无误的情况下,进入plsqldeveloper: 1,找到tools---》import tables ---》选择sql insert,不要选中sqlplus,选择最下面的那个导入sql文件,选中好sql文件后,点击import就会执行sql语句,生成日志。 2,如果执行…

DBeaver执行SQL脚本文件

1、右键库名&#xff0c;点击工具-->执行脚本 2、在弹出窗口中选择输入文件&#xff0c;并修改Extra command args:--default-character-setutf8&#xff0c;防止中文乱码&#xff0c;点击开始按钮。 3、执行完成。

kettle执行SQL脚本

参考一下kettle官方文档 kettle什么时候需要创建临时表呢 SELECT * WHERE cid IN(xxx) 的数据太多&#xff0c;占了很大内存。 目标表有没有必要做逻辑删除&#xff0c;如果做逻辑删除&#xff0c;后期数据量增长过快。 目标表增量更新&#xff1a;1、sql直接插入&#xff1…

SQL Server SQL脚本

本节的主要内容是要教大家怎么通过编写 SQL 脚本来查询、更新并且运行数据库。 利用 SQL 脚本我们能做很多事情&#xff0c;比如插入数据、读取数据、更新数据以及删除数据等&#xff1b;它们也可以用于创建数据库对象&#xff0c;如表&#xff0c;视图&#xff0c;存储过程&a…

SQL Server 数据库之生成与执行 SQL 脚本

生成与执行 SQL 脚本 1. 将数据库生成2. 将数据表生成 SQL 脚本4. 执行 SQL 脚本 1. 将数据库生成 使用对象资源管理器能快速创建整个数据的脚本&#xff0c;也能使用默认选项创建单个数据库对象的脚本&#xff1b; 用户能在查询编辑器窗口中对文件或剪贴板创建脚本&#xff0…

SQL 常用脚本大全

1、行转列的用法PIVOT CREATE table test (id int,name nvarchar(20),quarter int,number int) insert into test values(1,N苹果,1,1000) insert into test values(1,N苹果,2,2000) insert into test values(1,N苹果,3,4000) insert into test values(1,N苹果,4,5000) insert …

打砖游戏,详解每一行代码,历经三个小时解析,初学可看

打转游戏详解版 网上只要搜一下“打砖游戏”&#xff0c;基本会看到很多一样的代码&#xff0c;主要是注释也很少&#xff0c;对于python不熟悉的人来说&#xff0c;根本看不懂&#xff0c;只会拿来运行着玩玩。 于是我历经三个小时&#xff0c;把代码几乎每一行都注释了一遍&…

今天开始敲代码

新手小白今天开始自学敲代码了呜呜呜 加油加油加油

开发12年,整整6百万行代码,史上最烂的开发项目长这样

程序员&#xff08;ID&#xff1a;imkuqin&#xff09;猿妹编译 原文&#xff1a;https://projectfailures.wordpress.com 最近有个史称世界上最烂的开发项目在朋友圈刷屏&#xff0c;这个项目到底有多烂呢&#xff1f; 这个项目拖了整整12年&#xff0c;造出6百万行代码&#…

“低代码”抢走程序员的饭碗?没有的事

编者按&#xff1a;眼下发展势头正猛的低代码&#xff0c;承受着两种截然不同的声音。一种声音觉得低代码是“减负”神器&#xff0c;可以帮助减轻开发工作量&#xff0c;大幅提升开发效率&#xff1b;另一种声音却觉得低代码非“善类”&#xff08;主要是程序员群体&#xff0…