使用Canal监听MySQL数据变化
application.yml
canal:
server:
ip: 127.0.0.1
port: 11111
hotel:
destination: hotel
batchSize: 1000CanalHotelConfig.java
package cn.j0n4than.gxa.cachestuff.config;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.net.InetSocketAddress;
@Configuration
public class CanalHotelConfig {
@Value("${canal.server.ip}")
private String canalServerIp;
@Value("${canal.server.port}")
private int canalServerPort;
@Value("${canal.server.username:blank}")
private String userName;
@Value("${canal.server.password:blank}")
private String password;
@Value("${canal.hotel.destination}")
private String destination;
@Bean("hotelConnector")
public CanalConnector newSingleConnector() {
String userNameStr = "blank".equals(userName) ? "" : userName;
String passwordStr = "blank".equals(password) ? "" : password;
return CanalConnectors.newSingleConnector(new InetSocketAddress(canalServerIp,
canalServerPort), destination, userNameStr, passwordStr);
}
}
TaskResetHotelCache.java
package cn.j0n4than.gxa.cachestuff.task;
import cn.j0n4than.gxa.cachestuff.service.HotelService;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.List;
@Slf4j
@Component
public class TaskResetHotelCache implements InitializingBean {
private final CanalConnector canalConnector;
private final String subscribe;
private final Integer batchSize;
private final HotelService hotelService;
public TaskResetHotelCache(
CanalConnector canalConnector,
@Value("${canal.hotel.subscribe:server}") String subscribe,
@Value("${canal.hotel.batchSize}") Integer batchSize,
HotelService hotelService) {
this.canalConnector = canalConnector;
this.subscribe = subscribe;
this.batchSize = batchSize;
this.hotelService = hotelService;
}
/**
* Runs this operation.
*/
@Scheduled(initialDelayString = "${canal.hotel.initialDelay:5000}", fixedDelayString = "${canal.hotel.fixedDelay:5000}")
@Async
public void run() throws InvalidProtocolBufferException {
if (!canalConnector.checkValid()) {
canalConnector.connect();
log.info("[-1] 连接Canal失败 下个周期执行任务");
return;
}
Message message = canalConnector.getWithoutAck(batchSize);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
// 没必要输出
// log.info("[{}] 本次没有检测到酒店数据更新。", batchId);
return;
}
log.info("[{}] 酒店数据本次共有{}次更新需要处理", batchId, size);
// 一个表在一次周期内可能会被修改多次,而对Redis缓存的处理只需要处理一次即可
// Set<String> factKeys = new HashSet<>(); //unused
// 遍历Entry
for (CanalEntry.Entry entry : message.getEntries()) {
if (
entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN ||
entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND
) {
continue;
}
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
String tableName = entry.getHeader().getTableName();
CanalEntry.EventType eventType = rowChange.getEventType();
log.info(
"[{}]数据变更详情: 来自binlog[{}.{}],数据源{}.{},变更类型{}",
batchId,
entry.getHeader().getLogfileName(),
entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(),
tableName,
eventType
);
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
// 遍历 Entry 中的 DataList
for (CanalEntry.RowData rowData : rowDatasList) {
JSONObject beforeData = new JSONObject();
JSONObject afterData = new JSONObject();
// 变更前和变更后的Columns
List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
// 遍历
for (CanalEntry.Column column : beforeColumnsList) {
beforeData.put(column.getName(), column.getValue());
}
for (CanalEntry.Column column : afterColumnsList) {
afterData.put(column.getName(), column.getValue());
}
log.info("变更前数据: {}", beforeData);
log.info("变更后数据: {}", afterData);
// 重置缓存
hotelService.resetCache(afterData.getInteger("id"));
}
}
canalConnector.ack(batchId);
log.info("[{}] 同步完成", batchId);
}
@Override
public void afterPropertiesSet() {
connect();
}
@PreDestroy
public void onDestroy() {
canalConnector.disconnect();
}
private void connect() {
canalConnector.connect();
if ("server".equals(subscribe)) {
canalConnector.subscribe(null);
} else {
canalConnector.subscribe(subscribe);
}
canalConnector.rollback();
}
}
评论