先扯点蛋
公司有个项目要求使用InfluxDB时序数据库储存点东西。第一次听说还有这种数据库,哈哈哈,孤陋寡闻了,先从各位大佬的博客看起,慢慢学习,逐渐了解了之后在服务器上进行安装。直接使用官方包进行安装很简单,查看官方说明即可。安装之后使用才发现,开源的只支持单机版的,但是公司用不能这么low吧,怎么也要“高可用”一点,于是自己参考MySQL主从复制原理、半同步操作步骤及原理和饿了么 Influxdb 实践之路,做了InfluxDB主从系统。
客户端系统
客户端系统拓扑图
这个系统主要是用来从kafka获取数据源,经过处理之后存到InfluxDB中去,这里参考了「饿了么」那篇文章,但是我看过他们的源码,基本上搞不懂,就根据文章的描述搞了一个简陋版的东西吧,这里涉及到kafka和influxdb-java,项目源代码可以看这里。该项目启动运行参考其中的README。
InfluxDB主从同步系统
InfluxDB同步系统
主从同步架构,是简陋版的MySQL主从同步。脚本读取InfluxDB的增删改操作日志,使用脚本将记录写入从机中。经过测试,运行状况良好,只要主从机不挂,同步系统可以健康运行。
同步脚本使用Python编写,项目源代码在这里。该项目启动运行参考其中的README。
为什么使用Python脚本来编写主从同步代码?
- 主要是考虑到降低与数据库的代码耦合程度,InfluxDB源码如果出现大规模的升级改动,同步脚本只需略作修改就可以了。
- 毕竟Python是世界上**的语言。?
InfluxDB源码修改
下载源码
在github上下载InfluxDB源码(这里使用的是我的fork地址,已经修改好的源码)。
修改源码
由于脚本需要读取InfluxDB的增删改操作日志,需要对源码中这一部分操作的日志进行修改,方便脚本解析日志。官方版的日志不记录写入操作的数据值,所以需要我们自己修改源码中对应的记录写日志的地方。
修改文件influxdb/services/httpd/handler.go
// 记录日志的具体方法 func buildLogLine(l *responseLogger, r *http.Request, start time.Time, body string) string {redactPassword(r)username := parseUsername(r)host, _, err := net.SplitHostPort(r.RemoteAddr)if err != nil {host = r.RemoteAddr}if xff := r.Header["X-Forwarded-For"]; xff != nil {addrs := append(xff, host)host = strings.Join(addrs, ",")}uri := r.URL.RequestURI()referer := r.Referer()userAgent := r.UserAgent()// 新增请求中的请求路径path := r.URL.Path// 新增请求中的form值r.ParseForm()form := r.Form// 新增请求中的body的值newbody := strings.Replace(body, "\n", ";", -1)// 将日志记录变为json格式return fmt.Sprintf(`{"timeindex":%d,"host":"%s","username":"%s","method":"%s","path":"%s","uri":"%s","form":"%s","body":"%s","proto":"%s","status":"%s","size":"%s","referer":"%s","agent":"%s","reqId":"%s"}`,start.Nanosecond(),host,detect(username, "-"),r.Method,path,uri,form,newbody,r.Proto,detect(strconv.Itoa(l.Status()), "-"),strconv.Itoa(l.Size()),detect(referer, "-"),detect(userAgent, "-"),r.Header.Get("Request-Id")) }修改文件influxdb/services/httpd/handler.go其中两处调用上面修改过的函数buildLogLine的地方
func (h *Handler) logging(inner http.Handler, name string) http.Handler {return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {start := time.Now()l := &responseLogger{w: w}inner.ServeHTTP(l, r)// 增加请求中的bodyh.CLFLogger.Println(buildLogLine(l, r, start, h.body))// Log server errors.if l.Status()/100 == 5 {errStr := l.Header().Get("X-InfluxDB-Error")if errStr != "" {h.Logger.Error(fmt.Sprintf("[%d] - %q", l.Status(), errStr))}}}) }func (h *Handler) recovery(inner http.Handler, name string) http.Handler {return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {start := time.Now()l := &responseLogger{w: w}defer func() {if err := recover(); err != nil {// 增加请求中的bodylogLine := buildLogLine(l, r, start, h.body)logLine = fmt.Sprintf("%s [panic:%s] %s", logLine, err, debug.Stack())h.CLFLogger.Println(logLine)http.Error(w, http.StatusText(http.StatusInternalServerError), 500)atomic.AddInt64(&h.stats.RecoveredPanics, 1) // Capture the panic in _internal stats.if willCrash {h.CLFLogger.Println("\n\n=====\nAll goroutines now follow:")buf := debug.Stack()h.CLFLogger.Printf("%s\n", buf)os.Exit(1) // If we panic then the Go server will recover.}}}()inner.ServeHTTP(l, r)}) }到此InfluxDB源码修改完成。
为什么要在
func buildLogLine(l *responseLogger, r *http.Request, start time.Time, body string) string方法上增加一个字段body,而body明明是从h *Handler中获取的,而函数buildLogLine中明明是有r *http.Request的?因为经过测试,在
func buildLogLine中从r *http.Request里面实际上取不到body的值,我没有深入研究过这段代码,确实不懂为什么会这样。所以才从上层将body的值直接传入这个方法中。
编译源码和运行
使用源码构建时序数据库主从同步系统。
系统要求
Linux 64位系统即可。
依赖环境
本系统使用InfluxDB v1.5.2,所以需要Go 1.9.2或者以上的版本。
InfluxDB使用Dep管理依赖包,需要安装dep。
主从同步脚本使用Python 2.7开发。
需要从Github上拉取源码,环境中需要安装git。
安装InfluxDB
新建目录$YOUR_PATH/gocodez/src、$YOUR_PATH/gocodez/src,$YOUR_PATH为自定义目录:
mkdir -p $YOUR_PATH/gocodez/src
mkdir -p $YOUR_PATH/gocodez/bin
将源码下载解压到目录$YOUR_PATH/gocodez/src中:
cd $YOUR_PATH/gocodez/src/
git clone https://github.com/callELPSYCONGROO/influxdb.git
将目录$YOUR_PATH/设置为GOPATH:
export GOPATH=$YOUR_PATH/
进入目录中:
cd $YOUR_PATH/gocodez/src/influxdb
如果正确安装了dep,这使用这个命令安装依赖:
dep ensure
安装依赖时,如果遇到无法连接到依赖所需的服务器,需要查看所缺依赖,手动将这些依赖源码下载到
$YOUR_PATH/src/对应路径下。所需依赖在Github上均有源码可以下载。如果安装或使用dep不成功,可以跳过此步,在下一步时根据错误提示,手动安装所需依赖。
安装依赖可以使用依赖包,将依赖包的/src解压到$YOUR_PATH/gocodez目录下即可。
构建安装二进制执行码:
go clean ./...
go install ./...
安装完成后,二进制执行码放在$YOUR_PATH/gocodez/bin/中,启动时序数据库:
$YOUR_PATH/gocodez/bin/influxd
安装主从同步脚本
新建目录$YOUR_SCRIPT,$YOUR_SCRIPT为自定义脚本目录:
mkdir $YOUR_SCRIPT
从Github中拉取脚本:
cd $YOUR_SCRIPT
git clone https://github.com/callELPSYCONGROO/master_slave
参考其中的README.md完成脚本配置和运行。















