kudu介绍
Kudu是运行在hadoop平台上的列式存储系统,拥有Hadoop生态系统应用的常见技术特性,运行在一般的商用硬件上,支持水平扩展,高可用。
kudu的优势
1)一个table由多个tablet组成,对分区查看、扩容和数据高可用支持非常好
2)支持update和upsert操作。
3)与presto集成或spark集成后(dataframe)可通过标准的sql操作,使用起来很方便
4)可与spark系统集成
kudu的劣势
1)只有主键可以设置range分区,且只能由一个主键,也就是一个表只能有一个字段range分区,且该字段必须是主键。
2)如果是pyspark连接kudu,则不能对kudu进行额外的操作;而scala的spark可以调用kudu本身的库,支持kudu的各种语法。
3)kudu的shell客户端不提供表schema查看。如果你不通过imapla连接kudu,且想要查看表的元数据信息,需要用spark加载数据为dataframe,通过查看dataframe的schema查看表的元数据信息。
3)kudu的shell客户端不提供表内容查看。如果你想要表的据信息,要么自己写脚本,要么通过spark、imapla查看。
4)如果使用range 分区需要手动添加分区。假设id为分区字段,需要手动设置第一个分区为1-30.第二个分区为30-60等等
问题描述
在项目中有个功能是外导用户群,每次导入重复的记录数,总是显示重复记录数为0。因为我们是将导入的文件内容先存储在group_input表中,表结构如下
CREATE TABLE group_input (
l_date varchar commen ‘日期’,
file_id varchar ‘导入文件id’,
id_type varchar ‘id类型’,
id_str varchar ‘id值’,
super_id varchar ‘唯一id’
)
WITH (
column_design = ‘{“l_date”:{“key”:true,“nullable”:false,“encoding”:“AUTO_ENCODING”,“compression”:“DEFAULT_COMPRESSION”},“file_id”:{“key”:true,“nullable”:false,“encoding”:“AUTO_ENCODING”,“compression”:“DEFAULT_COMPRESSION”},“id_type”:{“key”:true,“nullable”:false,“encoding”:“AUTO_ENCODING”,“compression”:“DEFAULT_COMPRESSION”},“id_str”:{“key”:true,“nullable”:false,“encoding”:“AUTO_ENCODING”,“compression”:“DEFAULT_COMPRESSION”},“super_id”:{“key”:false,“nullable”:true,“encoding”:“AUTO_ENCODING”,“compression”:“DEFAULT_COMPRESSION”}}’,
partition_design = ‘{“hash”:[{“columns”:[“file_id”,“id_type”,“id_str”],“buckets”:3}],“range”:null}’
);
我们设计是以l_date,id_str,id_type,file_id作为联合主键,现在使用java API操作kudu,往表里面添加10条重复数据。
public static void main(String[] args) throws KuduException {String kuduAddress = "nffapp01:7051,nffapp02:7051,nffdata02:7051";KuduClient kuduClient = new KuduClient.KuduClientBuilder(kuduAddress).build();//创建表//创建kudu表字段LinkedList<ColumnSchema> columnSchemas = new LinkedList<>();columnSchemas.add(KuduUtil.newColumn("l_date", Type.STRING,true));columnSchemas.add(KuduUtil.newColumn("file_id",Type.INT32,true));columnSchemas.add(KuduUtil.newColumn("id_type",Type.STRING,true));columnSchemas.add(KuduUtil.newColumn("id_str",Type.STRING,true));columnSchemas.add(KuduUtil.newColumn("super_id",Type.STRING,false));//创建schemaSchema schema = new Schema(columnSchemas);//创建表提供的所有选项CreateTableOptions createTableOptions = new CreateTableOptions();//设置副本数createTableOptions.setNumReplicas(1);//设置范围分区的规则LinkedList<String> list = new LinkedList<String>();list.add("l_date");//设置hash分区createTableOptions.addHashPartitions(list,5);kuduClient.createTable("group_input",schema,createTableOptions);//向表内插入新数据KuduTable table = kuduClient.openTable("group_input");KuduSession session = kuduClient.newSession();session.setTimeoutMillis(60000);for (int i = 0; i < 10; i++) {logger.info("----------------insert "+i+"---------------");Insert insert = table.newInsert();PartialRow row = insert.getRow();row.addString(0, "2019-08-01");row.addInt(1,20);row.addString(2, "20"+i);row.addString(3, "18664301061");row.addString(4, "super_id:9980767");session.apply(insert);}kuduClient.close();
}
}
获得的结果,根据id_type可以看出插入了第一条记录后其他记录都未插入。
当修改id_type和file_id值的时候,则数据可以全部插入,结果如图
总结
产品在设计之时没有考虑到kudu表主键的唯一性。当同一个文件中包含相同的记录时,只能导入一条记录。为了满足项目需要,只能修改统计规则,总记录条数=导入成功数+导入失败数+重复数,这样的话就重复数=成功重复数+失败重复数