阿里巴巴开源的 Canal 使用教程

JAVA herman 27656浏览
公告:“业余草”微信公众号提供免费CSDN下载服务(只下Java资源),关注业余草微信公众号,添加作者微信:codedq,发送下载链接帮助你免费下载!
本博客日IP超过2000,PV 3000 左右,急需赞助商。
极客时间所有课程通过我的二维码购买后返现24元微信红包,请加博主新的微信号:codedq,之前的微信号好友位已满,备注:返现
受密码保护的文章请关注“业余草”公众号,回复关键字“0”获得密码
所有面试题(java、前端、数据库、springboot等)一网打尽,请关注文末小程序
视频教程免费领

我百度了一下 Canal,发现与 Canal 相关的技术文章并不多,再加上我上一篇文章《阿里 canal 内存溢出 Java heap space 问题解决》中走入的误区,导致了我想要写一篇关于 Canal 教程的文章。所以便有了本文,下面就和我一起来看看如何使用 canal 吧。

Canal 教程

根据 https://github.com/alibaba/canal 上的原理解释,我们知道 canal 会模拟 mysql slave 的交互协议,伪装自己为 mysql slave,然后向 mysql master 发送 dump 协议。

mysql master 收到 dump 请求,开始推送 binary log 给 slave(也就是 canal),然后 canal 解析 binary log 对象(原始为 byte流)。

经 canal 解析过的对象,我们使用起来就非常的方便了。

再根据 https://github.com/alibaba/canal/releases 提供的版本信息,你会发现 canal 其实相当于一个中间件,专门用来解析 MySQL 的 binlog 日志。canal 解析好了之后,会封装成一个数据对象,通过 protobuf3.0 协议进行交互,让 canal 客户端进行消费。

根据上面的解释,以及 canal 提供的版本信息,我们在使用 canal 的时候,首选要安装一个 canal.deployer-1.1.2.tar.gz 进行解析 MySQL 的 binlog 日志。

Canal 部署安装

下载后,复制 canal.deployer-1.1.2.tar.gz 到 MySQL 主机上,比如放在 /var/xttblog/otter 目录下。然后依次执行下面的命令:

mkdir canal
cd canal
tar -zxvf ../canal.deployer-1.1.2.tar.gz

然后修改 canal 的配置文件 vi conf/example/instance.properties。

instance.properties 配置

这三项改成你自己的,比如我的配置如下:

canal.instance.dbUsername=xttblog
canal.instance.dbPassword=xttblog
canal.instance.connectionCharset = UTF-8
canal.instance.defaultDatabaseName =xttblog

然后保存并退出。(VI 模式下,按 Esc 输入 :wq 回车退出。)

接着,我们检查一下 MySQL 的配置。确定版本和是否开启了 binlog 日志,以及日志格式。

show variables like 'binlog_format';
show variables like 'log_bin';
select version();

canal 支持 binlog 格式为 ROW 的模式。如果你没开启 binlog,并且格式是非 row 的,建议修改一下 mysql 的配置文件。

执行 mysql –help | grep my.cnf 找到 mysql 的 my.cnf 文件。

执行 vi /etc/my.cnf 命令。添加下面 3 个配置。

log-bin=mysql-bin #添加这一行就ok 
binlog-format=ROW #选择row模式 
server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复

然后保存并退出。(VI 模式下,按 Esc 输入 :wq 回车退出。)

接着执行 sudo service mysqld restart 重启 MySQL。

需要注意的是你的 mysql 用户,必须要有 REPLICATION SLAVE 权限。该权限授予 slave 服务器以该账户连接 master 后可以执行 replicate 操作的权利。

如果没有权限,则使用 root 账户登录进 MySQL,执行下面的语句,创建用户,分配权限。

CREATE USER xttblog IDENTIFIED BY ‘xttblog’; 
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO ‘‘xttblog’’@’%’;
FLUSH PRIVILEGES;

MySQL 启动后,就可以开启 canal 服务了。

/var/xttblog/otter/canal/bin/startup.sh

开启后,观察 canal 服务的日志,确保服务正常。

tail -n 200 /var/xttblog/otter/canal/logs/canal/canal.log

查看 canal 的日志

确定没有问题后,开始编写我们的测试程序。

pom.xml 中导入下面的依赖。

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.2</version>
</dependency>

然后编写下面的 Java 代码。

package com.xttblog.canal.test;
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
/**
 * CanalTest
 * @author www.xttblog.com
 * @date 2019/2/27 上午10:01
 */
public class CanalTest {
    public static void main(String args[]) {
        // 创建链接
        CanalConnector connector = CanalConnectors.newSingleConnector(
                new InetSocketAddress(AddressUtils.getHostIp(),
                11111), "example", "", "");
        int batchSize = 1000;
        int emptyCount = 0;
        try {
            connector.connect();
            connector.subscribe(".*\\..*");
            connector.rollback();
            int totalEmptyCount = 120;
            while (emptyCount < totalEmptyCount) {
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    System.out.println("empty count : " + emptyCount);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                } else {
                    emptyCount = 0;
                    // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                    printEntry(message.getEntries());
                }

                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }
            System.out.println("empty too many times, exit");
        } finally {
            connector.disconnect();
        }
    }
    private static void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN 
                    || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }
            RowChange rowChage = null;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event " +
                        "has an error , data:" + entry.toString(), e);
            }
            EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================&gt; " +
                            "binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));
            for (RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------&gt; before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------&gt; after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }
    private static void printColumn(List<Column> columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + 
                    " : " + column.getValue() + 
                    "    update=" + column.getUpdated());
        }
    }
}

然后执行 main 方法。你再修改修改 MySQL 中的数据,你会发现所有改变都同步过来了。

业余草公众号

最后,欢迎关注我的个人微信公众号:业余草(yyucao)!可加作者微信号1:xmtxtt(5000人已满),微信号2:xttblog(5000人已满),微信号3:codedq(超3800)。备注:“1”,添加博主微信拉你进微信群。备注错误不会同意好友申请。再次感谢您的关注!后续有精彩内容会第一时间发给您!原创文章投稿请发送至532009913@qq.com邮箱。商务合作也可添加作者微信进行联系!

本文原文出处:业余草: » 阿里巴巴开源的 Canal 使用教程