基础
安装好es
本文使用版本canal
1.1.15
https://github.com/alibaba/canal
server端安装
docker安装
sh run.sh -e canal.auto.scan=false \
-e canal.destinations=blog \
-e canal.instance.master.address=106.14.239.36:3306 \
-e canal.instance.dbUsername=zzf-dev \
-e canal.instance.dbPassword=YaThM7reZJjXt4pj \
-e canal.instance.connectionCharset=UTF-8 \
-e canal.instance.tsdb.enable=true \
-e canal.instance.gtidon=false
安装包安装
- 下载
bashwget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.adapter-1.1.5.tar.gz
- 修改配置
#conf/example/instance.properties
## mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306 #修改数据库地址
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal #修改数据库账号
canal.instance.dbPassword = canal #修改数据库密码
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .\*\\\\..\*
- 启动
sh bin/startup.sh
cat logs/canal/canal.log
cat logs/example/example.log
sh bin/stop.sh
es适配器
- 下载适配器,不要下载正式版,有bug
- 修改启动配置
#application.yml
canal.conf:
canalServerHost: 127.0.0.1:11111
batchSize: 500
syncBatchSize: 1000
retries: 0
timeout:
mode: tcp
srcDataSources:
defaultDS:
url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true #修改数据库链接
username: root #修改数据库账号
password: 121212 #修改数据库密码
canalAdapters:
- instance: example
groups:
- groupId: g1
outerAdapters:
-
key: exampleKey
name: es7 #修改为es7
hosts: 127.0.0.1:9300 #es 集群地址, 逗号分隔
properties:
mode: transport # or rest #可指定transport模式或者rest模式
cluster.name: elasticsearch #es cluster name
- 修改映射配置
dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值
outerAdapterKey: exampleKey # 对应application.yml中es配置的key
destination: example # cannal的instance或者MQ的topic
groupId: # 对应MQ模式下的groupId, 只会同步对应groupId的数据
esMapping:
_index: mytest_user # es 的索引名称
_type: _doc # es 的type名称, es7下无需配置此项
_id: _id # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配
# pk: id # 如果不需要_id, 则需要指定一个属性为主键属性
# sql映射
sql: "select a.id as _id, a.name as _name, a.role_id as _role_id, b.role_name as _role_name,
a.c_time as _c_time, c.labels as _labels from user a
left join role b on b.id=a.role_id
left join (select user_id, group_concat(label order by id desc separator ';') as labels from label
group by user_id) c on c.user_id=a.id"
# objFields:
# _labels: array:; # 数组或者对象属性, array:; 代表以;字段里面是以;分隔的
# _obj: object # json对象
etlCondition: "where a.c_time>='{0}'" # etl 的条件参数
commitBatch: 3000 # 提交批大小
- 启动
sh bin/startup.sh
sh bin/stop.sh
es同步
- 初始化索引
put
http://127.0.0.1:9200/blog
{
"settings": {
"analysis.analyzer.default.type": "ik_max_word",
"number_of_shards": 5,
"number_of_replicas": 0
},
"mappings": {
"properties": {
"id": {
"type": "long"
},
"title": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
},
"tag": {
"type": "keyword"
},
"tag_desc": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
},
"is_delete": {
"type": "short"
},
"is_release": {
"type": "short"
},
"content": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
}
}
}
}
- 全量同步
post
curl -X POST http://127.0.0.1:8081/etl/es7/mytest_user.yml
F&Q
- 如果系统是1个cpu,需要将canal.instance.parser.parallel设置为false
- 启动失败一般来说只需调整startup里的内存配置即可