使用docker搭建canal + mysql环境,并集成到SpringBoot中使用第三方封装工具

使用版本

1
2
3
1. mysql5.7.24
2. canal-server 1.1.3
3. springboot

MySQL的安装和配置

使用docker安装mysql

1
2
3
4
5
6
7
8
9
10
11
12
1. 拉取docker
docker pull mysql:5.7.24

2.1 启动服务
docker run -d --restart=always --name mysql_canal -p 3307:3306 -v /usr/canal/data:/var/lib/mysql -v /usr/canal/conf:/etc/mysql/conf.d -e TZ=Asia/Shanghai -e MYSQL_ROOT_PASSWORD=root mysql:5.7.24 --character-set-server=utf8mb4 --collation-server=utf8mb4_general_ci

2.2 停止服务
docker stop mysql_canal

2.3 删除服务(如果要完全重置,记得删除目录)
docker rm mysql_canal
rm -rf /usr/canal/mysql_cnal

配置mysql主从,添加canal账号

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
在映射出来的配置文件夹创建一个配置文件
touch /usr/canal/conf/my.cnf

给配置文件写入主从相关配置
vi /usr/canal/conf/my.cnf
[mysqld]
log-bin=mysql-bin # 开启binlog
binlog-format=ROW # 选择ROW模式
server_id=1 # 配置MySQL replaction需要定义,不要和Canal的slaveId重复

重启服务
docker restart mysql_canal

进入容器,查看主从是否已经开启
docker exec -it mysql_canal /bin/bash
mysql -uroot -p
123456

# 查看row模式是否生效
show variables like 'binlog_format';
# 查看bin-log是否开启
show variables like 'log_bin';
# 查看主从状态
show master status\G;


创建canal相关账号
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- 这个是授全部权限,可以看情况是否要给
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

# 查看授权
show grants for 'canal';

Canal的安装和配置

安装cancal

1
2
3
4
5
6
7
8
9
10
11
下载镜像
docker pull canal/canal-server:v1.1.3
docker pull canal/canal-server:v1.1.5

启动容器
docker run -d --name canal -p 11111:11111 canal/canal-server:v1.1.3

关闭容器
docker stop canal
删除容器
docker rm canal

配置canal(这里测试,直接使用的方式2)

使用方式1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
#带配置映射
docker run -p 11111:11111 --name canal -v /usr/canal/canal_work/example/instance.properties:/home/admin/canal-server/conf/example/instance.properties -d canal/canal-server

instance.properties配置,需要修改数据库容器的地址,账号和密码:
canal.instance.master.address=192.168.3.2:23306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal


#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0

# enable gtid use true/false
canal.instance.gtidon=false

# position info
canal.instance.master.address=192.168.3.2:23306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://192.168.3.2:23306/canal_test_db
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=

# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################
使用方式2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
直接使用参数启动配置
docker run --name canal -e canal.instance.master.address=192.168.0.109:3307 -e canal.instance.dbUsername=canal -e canal.instance.dbPassword=canal -p 11111:11111 -d canal/canal-server:v1.1.3

docker run --name canal -e canal.instance.master.address=192.168.0.109:3307 -e canal.instance.dbUsername=canal -e canal.instance.dbPassword=canal -p 11111:11111 -d canal/canal-server:v1.1.4



全部参数(参考官方github使用文档)
canal.auto.scan=false \
-e canal.destinations=test \
-e canal.instance.master.address=127.0.0.1:3306 \
-e canal.instance.dbUsername=canal \
-e canal.instance.dbPassword=canal \
-e canal.instance.connectionCharset=UTF-8 \
-e canal.instance.tsdb.enable=true \
-e canal.instance.gtidon=false \

SpringBoot的安装和配置

使用官方demo测试

倒入依赖
1
2
3
4
5
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.3</version>
</dependency>
测试Java代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;

import java.net.InetSocketAddress;
import java.util.List;

public class SimpleCanalClientExample {

public static void main(String args[]) {
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.3.2"/*AddressUtils.getHostIp()*/,
11111), "example", "canal", "canal");
int batchSize = 1000;
int emptyCount = 0;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
int totalEmptyCount = 120;
while (emptyCount < totalEmptyCount) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else {
emptyCount = 0;
// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
printEntry(message.getEntries());
}

connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}

System.out.println("empty too many times, exit");
} finally {
connector.disconnect();
}
}

private static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}

RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}

EventType eventType = rowChage.getEventType();
// 可以获取到数据库实例名称、日志文件、当前操作的表以及执行的增删改查的操作
String logFileName= entry.getHeader().getLogfileName();
long logFileOffset= entry.getHeader().getLogfileOffset();
String dbName=entry.getHeader().getSchemaName();
String tableName=entry.getHeader().getTableName();
System.out.println(String.format("=======&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
logFileName, logFileOffset,
dbName, tableName,
eventType));

for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
// 删除
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == CanalEntry.EventType.INSERT) {
// 新增
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------&gt; before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------&gt; after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}

private static void printColumn(List<CanalEntry.Column> columns) {
for (CanalEntry.Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}

}

使用第三方开源工具,集成到项目测试

加入依赖
1
2
3
4
5
6
7
8
9
<dependency>
<groupId>top.javatool</groupId>
<artifactId>canal-spring-boot-starter</artifactId>
<version>1.2.1-RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
代码测试
1
2
3
使用官方demo代码测试,官方demo也有建表SQL
官方实体类修改为了官方demo
没有使用其他依赖
其他使用注意事项
1
2
3
4
5
6
7
官方有一个while(true)的一个循环操作,并且一直处于输出日志状态
在配置文件中屏蔽:
application.yml:
logging:
level:
# 只屏蔽一直循环打印的日志
top.javatool.canal.client.client.AbstractCanalClient: WARN

参考地址

1
2
3
4
5
6
7
8
9
10
11
docker搭建mysql + canal订阅
https://blog.csdn.net/boonya/article/details/89406067
https://blog.csdn.net/colspanprince/article/details/103033073

canal官方github
https://github.com/alibaba/canal

官方docker启动参数详解
https://github.com/alibaba/canal/wiki/Docker-QuickStart

springboot第三方canal封装: https://github.com/NormanGyllenhaal/canal-client