使用Canal监听MySQL数据变化

application.yml

canal:
  server:
    ip: 127.0.0.1
    port: 11111
  hotel:
    destination: hotel
    batchSize: 1000

CanalHotelConfig.java

package cn.j0n4than.gxa.cachestuff.config;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.net.InetSocketAddress;

@Configuration
public class CanalHotelConfig {

    @Value("${canal.server.ip}")
    private String canalServerIp;

    @Value("${canal.server.port}")
    private int canalServerPort;

    @Value("${canal.server.username:blank}")
    private String userName;

    @Value("${canal.server.password:blank}")
    private String password;

    @Value("${canal.hotel.destination}")
    private String destination;

    @Bean("hotelConnector")
    public CanalConnector newSingleConnector() {
        String userNameStr = "blank".equals(userName) ? "" : userName;
        String passwordStr = "blank".equals(password) ? "" : password;
        return CanalConnectors.newSingleConnector(new InetSocketAddress(canalServerIp,
                canalServerPort), destination, userNameStr, passwordStr);
    }
}

TaskResetHotelCache.java

package cn.j0n4than.gxa.cachestuff.task;

import cn.j0n4than.gxa.cachestuff.service.HotelService;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.List;

@Slf4j
@Component
public class TaskResetHotelCache implements InitializingBean {

    private final CanalConnector canalConnector;
    private final String subscribe;
    private final Integer batchSize;
    private final HotelService hotelService;

    public TaskResetHotelCache(
            CanalConnector canalConnector,
            @Value("${canal.hotel.subscribe:server}") String subscribe,
            @Value("${canal.hotel.batchSize}") Integer batchSize,
            HotelService hotelService) {
        this.canalConnector = canalConnector;
        this.subscribe = subscribe;
        this.batchSize = batchSize;
        this.hotelService = hotelService;
    }

    /**
     * Runs this operation.
     */
    @Scheduled(initialDelayString = "${canal.hotel.initialDelay:5000}", fixedDelayString = "${canal.hotel.fixedDelay:5000}")
    @Async
    public void run() throws InvalidProtocolBufferException {

        if (!canalConnector.checkValid()) {
            canalConnector.connect();

            log.info("[-1] 连接Canal失败 下个周期执行任务");
            return;
        }
        Message message = canalConnector.getWithoutAck(batchSize);
        long batchId = message.getId();

        int size = message.getEntries().size();
        if (batchId == -1 || size == 0) {
            // 没必要输出
            // log.info("[{}] 本次没有检测到酒店数据更新。", batchId);
            return;
        }

        log.info("[{}] 酒店数据本次共有{}次更新需要处理", batchId, size);
        // 一个表在一次周期内可能会被修改多次,而对Redis缓存的处理只需要处理一次即可
        // Set<String> factKeys = new HashSet<>();  //unused

        // 遍历Entry
        for (CanalEntry.Entry entry : message.getEntries()) {
            if (
                    entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN ||
                            entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND
            ) {
                continue;
            }

            CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            String tableName = entry.getHeader().getTableName();
            CanalEntry.EventType eventType = rowChange.getEventType();
            log.info(
                    "[{}]数据变更详情: 来自binlog[{}.{}],数据源{}.{},变更类型{}",
                    batchId,
                    entry.getHeader().getLogfileName(),
                    entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(),
                    tableName,
                    eventType
            );

            List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();

            // 遍历 Entry 中的 DataList
            for (CanalEntry.RowData rowData : rowDatasList) {

                JSONObject beforeData = new JSONObject();
                JSONObject afterData = new JSONObject();

                // 变更前和变更后的Columns
                List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
                List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();

                // 遍历
                for (CanalEntry.Column column : beforeColumnsList) {
                    beforeData.put(column.getName(), column.getValue());
                }
                for (CanalEntry.Column column : afterColumnsList) {
                    afterData.put(column.getName(), column.getValue());
                }

                log.info("变更前数据: {}", beforeData);
                log.info("变更后数据: {}", afterData);

                // 重置缓存
                hotelService.resetCache(afterData.getInteger("id"));
            }
        }

        canalConnector.ack(batchId);
        log.info("[{}] 同步完成", batchId);
    }

    @Override
    public void afterPropertiesSet() {
        connect();
    }

    @PreDestroy
    public void onDestroy() {
        canalConnector.disconnect();
    }

    private void connect() {
        canalConnector.connect();
        if ("server".equals(subscribe)) {
            canalConnector.subscribe(null);
        } else {
            canalConnector.subscribe(subscribe);
        }

        canalConnector.rollback();
    }
}

评论