使用 go-mysql-elasticsearch 把 MySQL 中的业务日志导入 Elasticsearch
太长不看版:
go-mysql-elasticsearch
能把 MySQL 数据库中的数据导入到 Elasticsearch 之中。
前言
相当一部分应用的日志是保存在数据库之中的,这些陈旧又稳定的应用在支撑着业务的运行。它们产生的日志通常来说也是有其价值的,但是如果能够融入到目前较为通用的 Elasticsearch 当中的话,可能有助于降低运维工作量,防止信息孤岛,并且进一步挖掘既有应用和业务的商业价值。
go-mysql-elasticsearch
就是这样一个项目,它可以从 MySQL 的数据表中读取指定数据表的数据,发送到 ElasticSearch 之中。它会使用 mysqldump
命令处理现有存量数据,并借助 binlog 的方式跟踪增量数据,从而保证 Elasticsearch 的数据和 MySQL 数据库中的数据保持同步。下面会简单讲一下这一项目的配置,并试验一个简单例子,最后根据实际情况进行一些改进。
条件和假设
- 目前该工具支持 MySQL 和 ES 的版本都是 5.x。
- MySQL 服务器需要开启 row 模式的 binlog。
- 因为要使用
mysqldump
命令,因此该进程的所在的服务器需要部署这一工具。 - 这一工具使用 GoLang 开发,需要 Go 1.9+ 的环境进行构建。
- 可用的 MySQL、Elasticsearch 以及 Kibana 实例。
另外为了进行演示,这里做一点假设:
业务日志表
CREATE TABLE `biz_log` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`receive_content` text,
`send_content` text,
`fd_receive_content` text,
`fd_send_content` text,
`log_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=21 DEFAULT CHARSET=utf8mb4;
四个 text
字段使用 JSON 格式存储了几个不同的日志种类。
工具构建
go get github.com/siddontang/go-mysql-elasticsearch
cd $GOPATH/src/github.com/siddontang/go-mysql-elasticsearch
make
转换配置
执行 go-mysql-elasticsearch --help
,会看到一系列的参数,最主要的参数就是 -config
,这个参数用于设置转换过程所需的参数配置文件,在源码的 /etc/river.toml
中包含了一份配置样本。作者在这一配置样本中提供了非常详尽的注释,可以对转换过程做出很多定制。但是由于工具本身具备很好的适应能力,加上 ES 的强大功能,只需要一点简单的设置,就能够顺利完成常见任务了。
一个简单的配置:
# MySQL 的相关配置
# 指定用户必须具备复制权限
my_addr = "127.0.0.1:3306"
my_user = "root"
my_pass = "Flzx3000c"
my_charset = "utf8mb4"
# ES 相关配置
es_addr = "127.0.0.1:9200"
es_user = ""
es_pass = ""
# 数据源配置
# 以 Slave 模式工作
server_id = 10001
# mysql/mariadb
flavor = "mysql"
# mysqldump 路径,如果为空或者未设置,会跳过这一环节。
mysqldump = "mysqldump"
bulk_size = 128
flush_bulk_time = "200ms"
skip_no_pk_table = false
[[source]]
# 数据库名称
schema = "biz"
# 数据表同步范围,支持通配符
tables = ["biz_log"]
# 规则定义
[[rule]]
# 数据库名称
schema = "biz"
# 规则对应的数据表,支持通配符
table = "biz_log"
# 目标 ES 索引
index = "biz"
# 该规则在 ES 中生成的文档类型
type = "log_db"
同步
配置文件完成之后,就可以执行 ./go-mysql-elasticsearch -config=./river.toml
,日志中会显示首先执行 mysqldump
导出存量数据,然后开始守护进程阶段,跟踪 binlog 并进行同步。
此时打开 Kibana,执行 GET _search
,会看到数据库记录已经进入了 ES 中,并且按照我们定义的规则进行了索引。在守护进行运行期间,如果有新的数据插入,也会同步到 ES 之中。
但是这里我们会发现一个小问题,前面提到的 JSON 字段被作为单一的字符串存入了 ES 索引。这样就根据 JSON 中的特定字段进行搜索的需要就比较费劲了,而我们也知道,如果直接向 ES 提交文档,其中的 JSON 是会被映射为 Object 类型的。如果对 ES 索引进行数据类型的定义,会发现直接将 JSON 字段映射到 Object 类型后,同步过程会失败,返回错误认为将无效内容映射到了这一类型。因此可以推测是字符串并没有使用原有格式提交给 ES。
经过对代码的阅读跟踪,发现在 elastic/client.go
中对数据进行了一次 Json 编码:
default:
//for create and index
data, err = json.Marshal(r.Data)
if err != nil {
return errors.Trace(err)
}
下面就尝试进行一点改动,使之支持嵌套在字段内容中的 JSON 内容。
JSON
这里我想到了一个简单粗暴的办法就是,对数据报文进行一次检查,如果该字段内容是有效 JSON 的话,就使用 github.com/buger/jsonparser
的 set
方法,将压缩后的 JSON 字符串重新赋值给编码后的 byte[]
。
首先给 BulkRequest
定义一个新方法,用于数据编码
func (b *BulkRequest) encodeData() []byte {
jsonResults,_ := json.Marshal(b.Data)
// 判断是否有效的 JSON 数据
isJson := func(s string) bool {
var js map[string]interface{}
return json.Unmarshal([]byte(s), &js) == nil
}
for key, value := range b.Data {
stringValue, ok := value.(string)
// 如果字段内容是字符串并且是 JSON 格式
if ok && isJson(stringValue) {
// 设置编码后内容该字段的值为原文
jsonResults,_ = jsonparser.Set(jsonResults, []byte(stringValue), key)
}
}
return jsonResults
}
然后将原有的 data, err = json.Marshal(r.Data)
替换为 data = r.encodeData()
,再次构建运行。会看到 ES 成功的将 JSON 字段进行了解析,生成了 Object 类型的映射关系。
补充说明
这里引用go-mysql-elasticsearch功能及性能验证 一文的性能测试结果:
1.全量同步
支持:需要安装mysqldump(mysql自带),同步11.5w数据,耗时3分13秒。
全量基于mysqldump,需要将工具和mysql安装在同一个节点,其它方式尚未找到。
2.增量同步
支持。
增量插入20W数据,耗时8分钟。
删除20w条数据,耗时6分。
更新20w条数据,12分钟。
这一工具还有一些其它亮点,例如多表聚合、字段过滤、自定义字段映射等。
相关链接
- 项目地址:
https://github.com/siddontang/go-mysql-elasticsearch
- 配置文件样本:
https://github.com/siddontang/go-mysql-elasticsearch/blob/master/etc/river.toml
- Fork 地址:
https://github.com/fleeto/go-mysql-elasticsearch
- 作者原版教程:
https://www.jianshu.com/p/96c7858b580f
- go-mysql-elasticsearch 功能及性能验证 :
https://my.oschina.net/u/2282993/blog/1821930
- JsonParser:
https://github.com/buger/jsonparser