github
其实如果不适用一些可视化工具解析parquet文件,不太好看parquet文件内部正常应该是什么样的。但是使用一些可视化工具的话,可以发现,parquet文件会像表格,如excel文件,csv文件那样,排列数据。通过结构体写入的时候也和写入csv文件很像,写一次就是写一行。
安装
go get github.com/xitongsys/parquet-go
type Student struct {Name string `parquet:"name=name, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"`Age int32 `parquet:"name=age, type=INT32, encoding=PLAIN"`Id int64 `parquet:"name=id, type=INT64"`Weight float32 `parquet:"name=weight, type=FLOAT"`Sex bool `parquet:"name=sex, type=BOOLEAN"`Day int32 `parquet:"name=day, type=INT32, convertedtype=DATE"`Ignored int32 //without parquet tag and won't write
}这里的结构体中,如果有切片类型的属性,就要使用valuetype标签,因为源码中会判断属性的类型
type Acc struct {Accel []float32 `parquet:"name=accel, valuetype=FLOAT" json:"accel,omitempty"` Gyro []float32 `parquet:"name=gyro, valuetype=FLOAT" json:"gyro,omitempty"` Comp []float32 `parquet:"name=comp, valuetype=FLOAT" json:"comp,omitempty"`
}
func TestParquetExp(t *testing.T) {var err errorw, err := os.Create("./flat.parquet")if err != nil {log.Println("Can't create local file", err)return}//writepw, err := writer.NewParquetWriterFromWriter(w, new(Student), 4)//使用4个协程,并发写if err != nil {log.Println("Can't create parquet writer", err)return}pw.RowGroupSize = 128 * 1024 * 1024 //128Mpw.CompressionType = parquet.CompressionCodec_SNAPPYnum := 100for i := 0; i < num; i++ {stu := Student{Name: "StudentName",Age: int32(20 + i%5),Id: int64(i),Weight: float32(50.0 + float32(i)*0.1),Sex: bool(i%2 == 0),Day: int32(time.Now().Unix() / 3600 / 24),}if err = pw.Write(stu); err != nil { //写了100次,也就是100行log.Println("Write error", err)}}if err = pw.WriteStop(); err != nil {log.Println("WriteStop error", err)return}log.Println("Write Finished")w.Close()///readfr, err := local.NewLocalFileReader("./flat.parquet")if err != nil {log.Println("Can't open file")return}pr, err := reader.NewParquetReader(fr, new(Student), 4)if err != nil {log.Println("Can't create parquet reader", err)return}num = int(pr.GetNumRows()) //写了100行,这里就返回100for i := 0; i < num; i++ {//if i%2 == 0 {// pr.SkipRows(10) //skip 10 rows 0-9,20-29,40-49,60-69,80-89被跳过// continue//}stus := make([]Student, 2) //这里的切片长度代表一次都多少行if err = pr.Read(&stus); err != nil {log.Println("Read error", err)}log.Println(stus)}pr.ReadStop()fr.Close()}
解析parquet文件
解析parquet文件
源码
func NewSchemaHandlerFromStruct(obj interface{}) (sh *SchemaHandler, err error)
func NewSchemaHandlerFromStruct(obj interface{}) (sh *SchemaHandler, err error) {defer func() {if r := recover(); r != nil {switch x := r.(type) {case string:err = errors.New(x)case error:err = xdefault:err = errors.New("error occurred")}}}()ot := reflect.TypeOf(obj).Elem() //获取传入的obj的类型(结构体?切片?)item := NewItem() //该方法中主要使用的实例item.GoType = GoType //给item的info属性的属性赋值item.Info.InName = "Parquet_go_root" //给item的info属性的属性赋值item.Info.ExName = "parquet_go_root" //给item的info属性的属性赋值item.Info.RepetitionType = parquet.FieldRepetitionType_REQUIRED //给item的info属性的属性赋值stack := make([]*Item, 1) //该数组应该是存放obj有几个属性stack[0] = NewItem //一开始,item是空的,stack也只有一个itemschemaElements := make([]*parquet.SchemaElement, 0) //存放模板元素的切片infos := make([]*common.Tag, 0) //应该是每一个属性的信息for len(stack) > 0 { //stack刚开始长度为1时,主要是在准备将一些东西放进去,如果是obj是结构体,就是要把obj的属性封装为*Item类型的实例放进去,用来处理。之后才是真正处理stack里面的内容ln := len(stack)item = stack[ln-1]stack = stack[:ln-1] //1.对于刚开始进入该循环的时候,这里的作用是将stack置空,用来之后存放封装了obj的属性的item 2.之后是真正的便开始处理每个obj的属性var newInfo *common.Tagif item.GoType.Kind() == reflect.Struct { //obj如果是结构体类型schema := parquet.NewSchemaElement() //表示模板中的元素。schema.Name = item.Info.InNameschema.RepetitionType = &item.Info.RepetitionTypenumField := int32(item.GoType.NumField()) //obj中的属性个数schema.NumChildren = &numFieldschemaElements = append(schemaElements, schema) //放一个模板元素进去newInfo = common.NewTag()common.DeepCopy(item.Info, newInfo) //item这个时候其实只有类型(GoType里面的东西)与传入的obj相关,并且复制了一个新的infoinfos = append(infos, newInfo) //把新的info存上for i := int(numField - 1); i >= 0; i-- { //开始遍历obj的属性f := item.GoType.Field(i) //获取obj的属性tagStr := f.Tag.Get("parquet") //获取属性的parquet 标签//ignore item without parquet tagif len(tagStr) <= 0 { //没有qarquet标签就跳过,忽略numField--continue}newItem := NewItem()//把字符标签“name=comp, type=FLOAT”,转换为*Tag类型的实例(以“,”拆分,再以“=”拆分)InName = {string} "Comp" ExName = {string} "comp" Type = {string} "FLOAT"newItem.Info, err = common.StringToTag(tagStr)if err != nil {return nil, fmt.Errorf("failed parse tag: %s", err.Error())}newItem.Info.InName = f.Name //将使用反射从结构获取的属性名称赋值给这个item的info属性的InName属性,但是这个属性已经有正确的值了呀newItem.GoType = f.Type //将使用反射从结构获取的属性类型赋值给item的GoType属性if f.Type.Kind() == reflect.Ptr {newItem.GoType = f.Type.Elem()newItem.Info.RepetitionType = parquet.FieldRepetitionType_OPTIONAL}stack = append(stack, newItem) //将封装了obj一个属性的item存起来}} else if item.GoType.Kind() == reflect.Slice &&item.Info.RepetitionType != parquet.FieldRepetitionType_REPEATED {schema := parquet.NewSchemaElement()schema.Name = item.Info.InNamert1 := item.Info.RepetitionTypeschema.RepetitionType = &rt1var numField int32 = 1schema.NumChildren = &numFieldct1 := parquet.ConvertedType_LISTschema.ConvertedType = &ct1schemaElements = append(schemaElements, schema)newInfo = common.NewTag()common.DeepCopy(item.Info, newInfo)infos = append(infos, newInfo)schema = parquet.NewSchemaElement()schema.Name = "List"rt2 := parquet.FieldRepetitionType_REPEATEDschema.RepetitionType = &rt2schema.NumChildren = &numFieldschemaElements = append(schemaElements, schema)newInfo = common.NewTag()common.DeepCopy(item.Info, newInfo)newInfo.InName, newInfo.ExName = "List", "list"infos = append(infos, newInfo)newItem := NewItem()newItem.Info = common.GetValueTagMap(item.Info) //有问题newItem.Info.InName = "Element"newItem.Info.ExName = "element"newItem.GoType = item.GoType.Elem()if newItem.GoType.Kind() == reflect.Ptr {newItem.Info.RepetitionType = parquet.FieldRepetitionType_OPTIONALnewItem.GoType = item.GoType.Elem().Elem()} else {newItem.Info.RepetitionType = parquet.FieldRepetitionType_REQUIRED}stack = append(stack, newItem)} else if item.GoType.Kind() == reflect.Slice &&item.Info.RepetitionType == parquet.FieldRepetitionType_REPEATED {newItem := NewItem()newItem.Info = item.InfonewItem.GoType = item.GoType.Elem()stack = append(stack, newItem)} else if item.GoType.Kind() == reflect.Map {schema := parquet.NewSchemaElement()schema.Name = item.Info.InNamert1 := item.Info.RepetitionTypeschema.RepetitionType = &rt1var numField1 int32 = 1schema.NumChildren = &numField1ct1 := parquet.ConvertedType_MAPschema.ConvertedType = &ct1schemaElements = append(schemaElements, schema)newInfo = common.NewTag()common.DeepCopy(item.Info, newInfo)infos = append(infos, newInfo)schema = parquet.NewSchemaElement()schema.Name = "Key_value"rt2 := parquet.FieldRepetitionType_REPEATEDschema.RepetitionType = &rt2var numField2 int32 = 2schema.NumChildren = &numField2ct2 := parquet.ConvertedType_MAP_KEY_VALUEschema.ConvertedType = &ct2schemaElements = append(schemaElements, schema)newInfo = common.NewTag()common.DeepCopy(item.Info, newInfo)newInfo.InName, newInfo.ExName = "Key_value", "key_value"infos = append(infos, newInfo)newItem := NewItem()newItem.Info = common.GetValueTagMap(item.Info)newItem.GoType = item.GoType.Elem()if newItem.GoType.Kind() == reflect.Ptr {newItem.Info.RepetitionType = parquet.FieldRepetitionType_OPTIONALnewItem.GoType = item.GoType.Elem().Elem()} else {newItem.Info.RepetitionType = parquet.FieldRepetitionType_REQUIRED}stack = append(stack, newItem)newItem = NewItem()newItem.Info = common.GetKeyTagMap(item.Info)newItem.GoType = item.GoType.Key()newItem.Info.RepetitionType = parquet.FieldRepetitionType_REQUIREDstack = append(stack, newItem)} else {schema, err := common.NewSchemaElementFromTagMap(item.Info)if err != nil {return nil, fmt.Errorf("failed to create schema from tag map: %s", err.Error())}schemaElements = append(schemaElements, schema)newInfo = common.NewTag()common.DeepCopy(item.Info, newInfo)infos = append(infos, newInfo)}}res := NewSchemaHandlerFromSchemaList(schemaElements)res.Infos = infosres.CreateInExMap()return res, nil
}