Java基础、中级、高级、架构面试资料

canal-adapter趟坑记录

JAVA herman 7174浏览
公告:“业余草”微信公众号提供免费CSDN下载服务(只下Java资源),关注业余草微信公众号,添加作者微信:xttblog2,发送下载链接帮助你免费下载!
本博客日IP超过2000,PV 3000 左右,急需赞助商。
极客时间所有课程通过我的二维码购买后返现24元微信红包,请加博主新的微信号:xttblog2,之前的微信号好友位已满,备注:返现
受密码保护的文章请关注“业余草”公众号,回复关键字“0”获得密码
所有面试题(java、前端、数据库、springboot等)一网打尽,请关注文末小程序
视频教程免费领
腾讯云】1核2G5M轻量应用服务器50元首年,高性价比,助您轻松上云

用过 canal 的都知道,canal 现在坑非常的多,本文记录一个我遇到的坑之一!

canal-adapter目前支持rdb、es、kafka、hbase等多个目标端的同步,最近几篇我会记录一下我在使用adapter向这些目标端同步时,是如何解决es、hbase版本适配,添加部分个性化需求,以及如何处理一些我遇到各种问题。今天先来记录一下adapter同步mysql。

同步mysql

application.yml 配置文件内容如下所示:

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp # kafka rocketMQ
  canalServerHost: 127.0.0.1:11111
# zookeeperHosts: slave1:2181
# mqServers: 127.0.0.1:9092 #or rocketmq
# flatMessage: true
  batchSize: 500
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  username:
  password:
  vhost:
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true
      username: root
      password: 121212
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: rdb
      # 在所有目标端的同步中key是一个必须要配置的参数,不配置可能会出现数据丢失的问题
        key: mysql1
        properties:
          jdbc.driverClassName: com.mysql.jdbc.Driver
          jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
          jdbc.username: root
          jdbc.password: 121212

conf/rdb下配置同步表信息。

dataSourceKey: defaultDS
destination: example
groupId: ''
# 在所有目标端的同步中outerAdapterKey是一个必须要配置的参数,不配置可能会出现数据丢失的问题
# 对应application.yml中key字段
outerAdapterKey: mysql1
concurrent: false
dbMapping:
  commitBatch: 5000
  etlCondition: null
  mirrorDb: false
  readBatch: 5000
  database: mytest # 源数据源的database/shcema
  table: tb1 # 源数据源表名
  targetTable: mytest2.tb1 # 目标数据源的库名.表名
  targetPk: # 主键映射
    ID: ID # 如果是复合主键可以换行映射多个
  mapAll: true

问题

在同步过程中,出现了insert成功,delete和update日志反馈成功,但实际数据没有同步的现象,查看同步源码。

com.alibaba.otter.canal.client.adapter.rdb.service.RdbSyncService.java

for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
            
    //首先注意映射字段冒号前是目标字段名,冒号后是源端字段名
    String targetColumnName = entry.getKey();
    String srcColumnName = entry.getValue();
    
    //如果未配置冒号后字段,将其处理成与冒号前一致
    if (srcColumnName == null) {
        srcColumnName = Util.cleanColumn(targetColumnName);
    }

    Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase());
    if (type == null) {
        throw new RuntimeException("Target column: " + targetColumnName + " not matched");
    }
    
    //主要问题出在这里
    Object value = data.get(srcColumnName);
    BatchExecutor.setValue(values, type, value);
}

这段代码,能够看出,同步过程中更新的value值是存储在data这个Map结构中的,所以这就要求srcColumnName这个字段的值要与源端数据表的字段名大小写完全一致(即使数据库本身对于字段大小写是不敏感的)

举例

源端tb有ID、id1、Id2三列

目标端tb2有id、ID1、id2三列

其实在数据查询过程中,由于数据库对字段大小写不敏感,所有无论怎么写都是没问题的,但在配置同步表过程中,他们的表现就有所不同。

targetPk: # 主键映射
    ID: ID
    
targetPk: # 主键映射
 ID:
  
targetPk: # 主键映射
 id: ID
    
#以上几种写法,最后解析出来的srcColumnName 均为ID,所以可以从有binlog解析出来的数据转换得到的源端Map类型的data中获取到数据
targetPk: # 主键映射
  id:
#类似这种写法,解析出的srcColumnName 均为id,
#而Map类型中data中只包含ID这个key,而不包含id这个key,
#所以就取不到相应的值,导致需要主键操作的update和delete语句执行无效

这是我在同步rdb时遇到的需要注意的地方,当然也可以根据需要将这部分代码改造一下,这里就不作描述了,代码具体位置已给出。

如果你也遇到了一些坑,欢迎加我微信号:canal,拉你进群学 canal。推荐阅读:https://mp.weixin.qq.com/s/Qerwilr7D8HDdzd1-jd_4w

业余草公众号

最后,欢迎关注我的个人微信公众号:业余草(yyucao)!可加作者微信号:xttblog2。备注:“1”,添加博主微信拉你进微信群。备注错误不会同意好友申请。再次感谢您的关注!后续有精彩内容会第一时间发给您!原创文章投稿请发送至532009913@qq.com邮箱。商务合作也可添加作者微信进行联系!

本文原文出处:业余草: » canal-adapter趟坑记录