Kudo介绍 + Spark\Python\Scala开发Kudu应用程序

article/2025/9/25 9:52:02

前半部分文章摘自:http://blog.csdn.net/a1043498776/article/details/72681890

Kudu的背景

Hadoop中有很多组件,为了实现复杂的功能通常都是使用混合架构,

  • Hbase:实现快速插入和修改,对大量的小规模查询也很迅速
  • HDFS/Parquet + Impala/Hive:对超大的数据集进行查询分析,对于这类场景, Parquet这种列式存储文件格式具有极大的优势。
  • HDFS/Parquet + Hbase:这种混合架构需要每隔一段时间将数据从hbase导出成Parquet文件,然后用impala来实现复杂的查询分析 
    以上的架构没办法把复杂的实时查询集成在Hbase上

这里写图片描述

这里写图片描述

Kudu的设计

  • Kudu是对HDFS和HBase功能上的补充,能提供快速的分析和实时计算能力,并且充分利用CPU和I/O资源,支持数据原地修改,支持简单的、可扩展 
    的数据模型。
  • Kudu的定位是提供”fast analytics on fast data”,kudu期望自己既能够满足分析的需求(快速的数据scan),也能够满足查询的需求(快速的随机访问)。它定位OLAP和少量的OLTP工作流,如果有大量的random accesses,官方建议还是使用HBase最为合适

这里写图片描述

这里写图片描述

Kudu的结构

这里写图片描述

其实跟Hbase是有点像的

Kudu的使用

1:支持主键(类似 关系型数据库) 
2:支持事务操作,可对数据增删改查数据 
3:支持各种数据类型 
4:支持 alter table。可删除列(非主键) 
5:支持 INSERT, UPDATE, DELETE, UPSERT 
6:支持Hash,Range分区 
进入Impala-shell -i node1ip 
具体的CURD语法可以查询官方文档,我就不一一列了 
http://kudu.apache.org/docs/kudu_impala_integration.html 
建表 
Create table kudu_table (Id string,Namestring,Age int, 
Primary key(id,name) 
)partition by hash partitions 16 
Stored as kudu; 
插入数据 
Insert into kudu_table 
Select * from impala_table; 
注意 
以上的sql语句都是在impala里面执行的。Kudu和hbase一样都是nosql查询的,Kudu本身只提供api。impala集成了kudu。 
这里写图片描述

Kudu Api

奉上我的Git地址: 
https://github.com/LinMingQiang/spark-util/tree/spark-kudu

Scala Api

pom.xml

<dependency><groupId>org.apache.hive</groupId><artifactId>hive-metastore</artifactId><version>1.1.0</version>
</dependency>
<dependency><groupId>org.apache.hive</groupId><artifactId>hive-jdbc</artifactId><version>1.1.0</version>
</dependency>
<dependency><groupId>org.apache.hive</groupId><artifactId>hive-service</artifactId><version>1.1.0</version><exclusions><exclusion><artifactId>servlet-api</artifactId><groupId>javax.servlet</groupId></exclusion></exclusions>
</dependency>
<dependency><groupId>org.apache.kudu</groupId><artifactId>kudu-client</artifactId><version>1.3.0</version>
</dependency>
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.10</artifactId><version>1.6.0</version>
</dependency>
<dependency><groupId>org.kududb</groupId><artifactId>kudu-spark_2.10</artifactId><version>1.3.1</version>
</dependency>
<dependency><groupId>org.apache.kudu</groupId><artifactId>kudu-mapreduce</artifactId><version>1.3.1</version><exclusions><exclusion><artifactId>jsp-api</artifactId><groupId>javax.servlet.jsp</groupId></exclusion><exclusion><artifactId>servlet-api</artifactId><groupId>javax.servlet</groupId></exclusion></exclusions>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
        val client = new KuduClientBuilder("master2").build()val table = client.openTable("impala::default.kudu_pc_log")client.getTablesList.getTablesList.foreach { println }val schema = table.getSchema();val kp = KuduPredicate.newComparisonPredicate(schema.getColumn("id"), KuduPredicate.ComparisonOp.EQUAL, "1")val scanner = client.newScanTokenBuilder(table).addPredicate(kp).limit(100).build()val token = scanner.get(0)val scan = KuduScanToken.deserializeIntoScanner(token.serialize(), client)while (scan.hasMoreRows()) {val results = scan.nextRows()while (results.hasNext()) {val rowresult = results.next();println(rowresult.getString("id"))}}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

Spark Kudu Api

val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("Test"))val sparksql = new SQLContext(sc)import sparksql.implicits._val a = new KuduContext(kuduMaster, sc)
def getKuduRDD() {val tableName = "impala::default.kudu_pc_log"val columnProjection = Seq("id", "name")val kp = KuduPredicate.newComparisonPredicate(new ColumnSchemaBuilder("id", Type.STRING).build(), KuduPredicate.ComparisonOp.EQUAL, "q")val df = a.kuduRDD(sc, tableName, columnProjection,Array(kp))df.foreach { x => println(x.mkString(",")) }}def writetoKudu() {val tableName = "impala::default.student"val rdd = sc.parallelize(Array("k", "b", "a")).map { n => STU(n.hashCode, n) }val data = rdd.toDF()a.insertRows(data, tableName)}case class STU(id: Int, name: String)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18


Kudu与Spark集成

Kudu从1.0.0版开始,通过Data Source API与Spark集成。使用--packages选项包括kudu-spark依赖关系:

如果使用Spark与Scala 2.10,请使用kudu-spark_2.10工件

spark-shell --packages org.apache.kudu:kudu-spark_2.10:1.1.0

如果在Scala 2.11中使用Spark 2,请使用kudu-spark2_2.11工件

spark-shell --packages org.apache.kudu:kudu-spark2_2.11:1.1.0

然后导入kudu-spark并创建一个数据框:

import org.apache.kudu.spark.kudu._
import org.apache.kudu.client._
import collection.JavaConverters._

// Read a table from Kudu
val df = sqlContext.read.options(Map("kudu.master" -> "kudu.master:7051","kudu.table" -> "kudu_table")).kudu

// Query using the Spark API...
df.select("id").filter("id" >= 5).show()

// ...or register a temporary table and use SQL
df.registerTempTable("kudu_table")
val filteredDF = sqlContext.sql("select id from kudu_table where id >= 5").show()

// Use KuduContext to create, delete, or write to Kudu tables
val kuduContext = new KuduContext("kudu.master:7051", sqlContext.sparkContext)

// Create a new Kudu table from a dataframe schema
// NB: No rows from the dataframe are inserted into the table
kuduContext.createTable("test_table", df.schema, Seq("key"),new CreateTableOptions().setNumReplicas(1).addHashPartitions(List("key").asJava, 3))

// Insert data
kuduContext.insertRows(df, "test_table")

// Delete data
kuduContext.deleteRows(filteredDF, "test_table")

// Upsert data
kuduContext.upsertRows(df, "test_table")

// Update data
val alteredDF = df.select("id", $"count" + 1)
kuduContext.updateRows(filteredRows, "test_table"

// Data can also be inserted into the Kudu table using the data source, though the methods on KuduContext are preferred
// NB: The default is to upsert rows; to perform standard inserts instead, set operation = insert in the options map
// NB: Only mode Append is supported
df.write.options(Map("kudu.master"-> "kudu.master:7051", "kudu.table"-> "test_table")).mode("append").kudu

// Check for the existence of a Kudu table
kuduContext.tableExists("another_table")

// Delete a Kudu table
kuduContext.deleteTable("unwanted_table")


Spark集成已知问题和限制

  • 注册为临时表时,必须为具有大写字母或非ASCII字符的名称的Kudu表分配备用名称。

  • 具有包含大写字母或非ASCII字符的列名称的Kudu表可能不与SparkSQL一起使用。为解决这个问题可能Columns会被重命名

  • <>并且OR谓词不被推送到Kudu,而是将被Spark任务解析。只有LIKE具有后缀通配符的谓词被推送到Kudo,这意味着LIKE "FOO%"被Kudo解析,但LIKE "FOO%BAR"不是可解析的通配符。

  • Kudo不支持SparkSQL支持的所有类型,如Date, Decimal和复杂类型。

  • Kudu表只能在SparkSQL中注册为临时表。可能不会使用HiveContext查询Kudu表。


Kudu Python客户端

Kudu Python客户端为C ++客户端API提供了一个Python友好的界面。下面的示例演示了部分Python客户端的使用。

import kudu
from kudu.client import Partitioning
from datetime import datetime

# Connect to Kudu master server
client = kudu.connect(host='kudu.master', port=7051)

# Define a schema for a new table
builder = kudu.schema_builder()
builder.add_column('key').type(kudu.int64).nullable(False).primary_key()
builder.add_column('ts_val', type_=kudu.unixtime_micros, nullable=False, compression='lz4')
schema = builder.build()

# Define partitioning schema
partitioning = Partitioning().add_hash_partitions(column_names=['key'], num_buckets=3)

# Create new table
client.create_table('python-example', schema, partitioning)

# Open a table
table = client.table('python-example')

# Create a new session so that we can apply write operations
session = client.new_session()

# Insert a row
op = table.new_insert({'key': 1, 'ts_val': datetime.utcnow()})
session.apply(op)

# Upsert a row - upsert操作该操作的实现原理是通过判断插入的记录里是否存在主键冲突来决定是插入还是更新,当出现主键冲突时则进行更新操作
op = table.new_upsert({'key': 2, 'ts_val': "2016-01-01T00:00:00.000000"})
session.apply(op)

# Updating a row
op = table.new_update({'key': 1, 'ts_val': ("2017-01-01", "%Y-%m-%d")})
session.apply(op)

# Delete a row
op = table.new_delete({'key': 2})
session.apply(op)

# Flush write operations, if failures occur, capture print them.
try:session.flush()
except kudu.KuduBadStatus as e:print(session.get_pending_errors())

# Create a scanner and add a predicate
scanner = table.scanner()
scanner.add_predicate(table['ts_val'] == datetime(2017, 1, 1))

# Open Scanner and read all tuples
# Note: This doesn't scale for large scans
result = scanner.open().read_all_tuples()


与MapReduce,YARN和其他框架集成

Kudu旨在与Hadoop生态系统中的MapReduce,YARN,Spark和其他框架集成。见 RowCounter.java 和 ImportCsv.java 的例子,你可以模拟在你自己的集成。请继续关注未来使用YARN和Spark的更多示例。



小结

  • Kudu简单来说就是加强版的Hbase,除了像hbase一样可以高效的单条数据查询,他的表结构是类型关系型数据库的。集合impala可以达到复杂sql的实时查询。适合做OLAP(官方也是这么定位的)
  • Kudu本质上是将性能的优化,寄托在以列式存储为核心的基础上,希望通过提高存储效率,加快字段投影过滤效率,降低查询时CPU开销等来提升性能。而其他绝大多数设计,都是为了解决在列式存储的基础上支持随机读写这样一个目的而存在的。比如类Sql的元数据结构,是提高列式存储效率的一个辅助手段,唯一主键的设定也是配合列式存储引入的定制策略,至于其他如Delta存储,compaction策略等都是在这个设定下为了支持随机读写,降低latency不确定性等引入的一些Tradeoff方案。 
    官方测试结果上,如果是存粹的随机读写,或者单行的检索请求这类场景,由于这些Tradeoff的存在,HBASE的性能吞吐率是要优于Kudu不少的(2倍到4倍),kudu的优势还是在支持类SQL检索这样经常需要进行投影操作的批量顺序检索分析场合。目前kudu还处在Incubator阶段,并且还没有成熟的线上应用(小米走在了前面,做了一些业务应用的尝试),在数据安全,备份,系统健壮性等方面也还要打个问号,所以是否使用kudu,什么场合,什么时间点使用,是个需要好好考量的问题 ;)


http://chatgpt.dhexx.cn/article/tF4NoMFj.shtml

相关文章

adb remount 挂载失败

打开cmd ,输入adb remount 挂载设备的时候失败&#xff0c;提示如下&#xff1a; 按照提示&#xff0c;输入adb root 再输入adb remount &#xff0c;成功。

adb remount overlayfs的说明

在android R项目中执行adb remount的时候&#xff0c;能看到"Using overlayfs for xxx"的打印&#xff0c;类似如下&#xff1a; #adb root restarting adbd as root#adb remount Disabling verity for /system Using overlayfs for /system Disabling verity for /…

[高通SDM450][Android9.0]adb无法进行remount的解决方案

文章目录 开发平台基本信息问题描述解决方法 开发平台基本信息 芯片: SDM450 版本: Android 9.0 kernel: msm-4.9 问题描述 在调试开发的时候&#xff0c;执行remount可以获得更高的权限&#xff0c;对系统的一些应用或者文件进行删除或替换&#xff0c;达到快速调试的目的&…

adb remount

使用adb remount失败了&#xff0c;提示 如下图。 解决方法 先执行 adb root 然后 ctrlc, 然后再adb remount就成功了

Android 11 无法remount问题

问题描述&#xff1a; 在Android 11开发的时候&#xff0c;想快速调试把单独编译好的模块push 到 /system 目录下&#xff0c;结果发现remount failed C:>adb root restarting adbd as rootC:>adb remount Skipping /system for remount Skipping /vendor for remount S…

Android P(9.0) userdebug 版本执行adb remount失败

当你执行 adb remount 时会发现提示 remount of the / superblock failed: Permission denied remount failed 原因是android P 版本后 google 启用 avb(Android Verified Boot)2.0&#xff0c;verified boot and DM-verity默认启用策略发生了变化。详情如下&#xff1a; DM-V…

remount

1. 需要获取手机的root权限&#xff0c;方法很多了&#xff0c;我用的是360一键Root&#xff0c;有时也用百度一键Root 2. 从其他手机拷贝sqlite3文件到PC&#xff0c;我是从模拟器copy出来的&#xff0c;为方便大家&#xff0c;附件就有&#xff0c;可以直接下载哈 3. 进入手机…

Typescript之接口(Interface)

我们可以通过Interface关键字来定义限制数据的类型。 1.给对象定义类型 /*** 定义一种类型&#xff0c;名称叫做PersonInfo&#xff0c;里面有三个属性* name 人物的名字,类型为string* age 人物的年龄&#xff0c;类型为number* say 人物的方法&#xff0c;类型为函数类型&a…

astype

anp.array([1.1,1.2]) print(a数据类型&#xff1a;,a.dtype) print(astype修改数据类型&#xff1a;,a.astype(float).dtype) print(原数据类型未改变,a.dtype)#正确操作 aa.astype(float32) print(修改类型后再次操作&#xff0c;类型改变&#xff1a;,a.dtype) ba.astype(in…

TypeScript中的interface和type区别

&#x1f482; 个人网站: 【紫陌】【笔记分享网】 &#x1f485; 想寻找共同学习交流、共同成长的伙伴&#xff0c; 请点击【前端学习交流群】 在 TypeScript中&#xff0c;type 和 interface有些相似&#xff0c;都可以给类型命名并通过该名字来引用表示的类型。不过它们之间使…

TypeScript接口——interface

目录 一、接口概述&#xff1a; 二、接口类型介绍&#xff1a; 1、属性接口&#xff1a; 2、 函数接口&#xff1a; 3、可索引接口&#xff1a; &#xff08;1&#xff09;可索引接口约束数组示例&#xff1a; &#xff08;2&#xff09; 可索引接口约束对象示例&#xf…

【TypeScript】接口类型 Interfaces 的使用理解

导语&#xff1a; 什么是 类型接口&#xff1f; 在面向对象语言中&#xff0c;接口&#xff08;Interfaces&#xff09;是一个很重要的概念&#xff0c;它是对行为的抽象&#xff0c;而具体如何行动需要由类&#xff08;classes&#xff09;去实现&#xff08;implement&#x…

type 与 interface 的区别,你真的懂了吗?

大厂技术 高级前端 Node进阶 点击上方 程序员成长指北&#xff0c;关注公众号 回复1&#xff0c;加入高级Node交流群 在写 ts 相关代码的过程中&#xff0c;总能看到 interface 和 type 的身影。它们的作用好像都一样的&#xff0c;相同的功能用哪一个都可以实现&#xff0c;…

自定义字体 Typeface ttf

一、简介 有时候界面在设计app时会使用一些比较美观的字体&#xff0c;在安卓中使用起来也并不困难&#xff0c;随着安卓SDK的更新&#xff0c;它的实现方式也有所不同&#xff0c;该文章来看看怎么实现自定义字体。 二、普通方法 设置字体TextView.setTypeface(Typeface) pub…

TypeScript - Interfaces(接口)

目录 1、接口介绍 1.1 接口示例 2、可选属性 3、只读属性 4、额外的属性检查 5、函数类型 6、可索引的类型 &#xff17;、类类型 &#xff17;.1 类静态部分和实例部分 &#xff18;、继承接口 &#xff19;、混合类型 1&#xff10;、接口继承类 1、接口介绍 Ty…

type 与 interface

type 与 interface 官方文档是这么说的&#xff1a; For the most part, you can choose based on personal preference, and TypeScript will tell you if it needs something to be the other kind of declaration. If you would like a heuristic, use interface until you…

TypeScript中type和interface区别

typescript中interface介绍&#xff1a;TypeScript 中的接口 interface_疆~的博客-CSDN博客通常使用接口&#xff08;Interface&#xff09;来定义对象的类型。https://blog.csdn.net/qq_40323256/article/details/128478749 type type关键字是声明类型别名的关键字。用来给一…

TypeScript中interface 与 type的区别,你真的懂吗?

在写 ts 相关代码的过程中&#xff0c;总能看到 interface 和 type 的身影。它们的作用好像都一样的&#xff0c;相同的功能用哪一个都可以实现&#xff0c;也都很好用&#xff0c;所以也很少去真正的理解它们之间到底有啥区别&#xff0c; 分别在什么场景下使用&#xff0c;将…

Android 设置字体的三种方法(TypeFace)

http://blog.csdn.net/legend12300/article/details/69875816 Android系统默认字体支持四种字体&#xff0c;分别为&#xff1a; noraml &#xff08;普通字体,系统默认使用的字体&#xff09;sans&#xff08;非衬线字体&#xff09;serif &#xff08;衬线字体&#xff09;…

Android 之 Paint API —— Typeface (字型)

本节带来Paint API系列的最后一个API&#xff0c;Typeface(字型)&#xff0c;由字义&#xff0c;我们大概可以猜到&#xff0c;这个 API是用来设置字体以及字体风格的&#xff0c;使用起来也非常的简单&#xff01;下面我们来学习下Typeface的一些相关 的用法&#xff01; 官方…