目 录CONTENT

文章目录

springboot集成flink-cdc

筱晶哥哥
2023-08-19 / 0 评论 / 0 点赞 / 203 阅读 / 28471 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2024-03-23,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

CDC 的全称是 Change Data Capture,在广义的概念上,只要是能捕获数据变更的技术,我们都可以称之为 CDC 。

目前通常描述的 CDC 技术主要面向数据库的变更,是一种用于捕获数据库中数据变更的技术。

Flink CDC 是一个独立的开源项目,项目代码托管在 GitHub 上。

Flink 社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、PostgreSQL 等数据库直接读取全量数据和增量变更数据的 source 组件。

前文

什么是CDC

CDC:全称是 Change Data Capture,即数据变更捕获技术,具体的含义是 通过识别和捕获对数据库中的数据所做的更改(包括数据或数据表的插入、更新、删除;数据库结构的变更调整等),然后将这些更改按发生的顺序完整记录下来,并实时通过中间技术桥梁(消息中间件、TCP等等)将变更顺序消息传送到下游流程或系统的过程。

CDC Connectors for Apache Flink 是一组用于Apache Flink ®的源连接器,使用变更数据捕获 (CDC) 从不同数据库获取变更。

用于 Apache Flink 的 CDC 连接器将 Debezium 集成为捕获数据更改的引擎,所以它可以充分发挥 Debezium 的能力。白话的意思是,Flink-CDC 一个成型的cdc技术实现(Debezium)的包装。

  1. 支持读取数据库快照,即使发生故障也能继续读取binlog,一次处理。

  2. DataStream API 的 CDC 连接器,用户可以在单个作业中使用多个数据库和表的更改,而无需部署 Debezium 和 Kafka。

  3. Table/SQL API 的 CDC 连接器,用户可以使用 SQL DDL 创建 CDC 源来监控单个表的更改。

CDC与Flink毕业版本

下表显示了 Flink CDC 连接器和 Flink 之间的版本映射:

Flink CDC 版本Flink版本
1.0.01.11.*
1.1.01.11.*
1.2.01.12.*
1.3.01.12.*
1.4.01.13.*
2.0.*1.13.*
2.1.*1.13.*
2.2.*1.13.* , 1.14.*

SpringBoot项目整合Flink-CDC

说明

按常理来说,一个正常的 flink-job 最终我们并不会集成到springboot项目中,我们会直接编写一个maven项目,在发布时使用flink程序来启动任务。

比如官网示例:

本文既要使用flink-cdc进行数据变更捕获 (可以视作为一个flink-job),但又要契合我们的springboot项目,使用spring的特性,因此,我们需要转换一下思路,转换成什么样子呢?

就是不要将这个flink-cdc作为一个job使用flink程序进行发布提交,我们就当它在我们开发时一样,作为一个本地项目,main方法启动。

引入依赖

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  
  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.7.9</version>
    <relativePath/> <!-- lookup parent from repository -->
  </parent>
  
  <groupId>com.itjing</groupId>
  <artifactId>flink-cdc-demo</artifactId>
  <version>1.0.0</version>
  <packaging>jar</packaging>

  <name>flink-cdc-demo</name>
  
  <properties>
    <encoding>UTF-8</encoding>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <java.version>1.8</java.version>
    <flink.version>1.13.6</flink.version>
  </properties>
  <dependencies>
    
    <!-- web -->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    
    <!-- fastjson -->
    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>1.2.83</version>
    </dependency>
    
    <!-- flink -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_2.12</artifactId>
      <version>${flink.version}</version>
    </dependency>
    
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>${flink.version}</version>
    </dependency>
    
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_2.12</artifactId>
      <version>${flink.version}</version>
    </dependency>
    
    <!-- mysql-cdc -->
    <dependency>
      <groupId>com.ververica</groupId>
      <artifactId>flink-connector-mysql-cdc</artifactId>
      <version>2.0.0</version>
    </dependency>
    
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
    </dependency>
    
  </dependencies>
  
</project>

接入springboot项目

无法简单的使用main方法来启动 cdc 作业,因为如果这样的话,我们就无法与spring完美的契合。

因此我们可以利用springboot的特性, 实现 ApplicationRunner 将 flink-cdc 作为一个项目启动时需要运行的分支子任务即可。

创建测试表user

CREATE TABLE `user` (
  `id` int(10) NOT NULL AUTO_INCREMENT,
  `name` varchar(50) DEFAULT NULL,
  `age` int(10) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

创建监听类实现 ApplicationRunner

package com.itjing.flink.listener;

import com.itjing.flink.MysqlDeserialization;
import com.itjing.flink.domain.DataChangeInfo;
import com.itjing.flink.sink.DataChangeSink;
import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

/**
 * mysql变更监听
 *
 * @author lijing
 * @date 2023-08-19 21:20
 */
@Component
public class MysqlEventListener implements ApplicationRunner {
    
    private final DataChangeSink dataChangeSink;
    
    public MysqlEventListener(DataChangeSink dataChangeSink) {
        this.dataChangeSink = dataChangeSink;
    }
    
    @Override
    public void run(ApplicationArguments args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DebeziumSourceFunction<DataChangeInfo> dataChangeInfoMySqlSource = buildDataChangeSource();
        DataStream<DataChangeInfo> streamSource = env.addSource(dataChangeInfoMySqlSource, "mysql-source")
                .setParallelism(1);
        streamSource.addSink(dataChangeSink);
        env.execute("mysql-stream-cdc");
        
    }
    
    /**
     * 构造变更数据源
     */
    private DebeziumSourceFunction<DataChangeInfo> buildDataChangeSource() {
        return MySqlSource.<DataChangeInfo>builder().hostname("127.0.0.1").port(3306).databaseList("flink-cdc")
//                .tableList("flink-cdc.user_*").username("root").password("root123456")
                .tableList("flink-cdc.user").username("root").password("root123456")
                /*
                initial: 初始化快照,即全量导入后增量导入(检测更新数据写入)
                latest: 只进行增量导入(不读取历史变化)
                timestamp: 指定时间戳进行数据导入(大于等于指定时间错读取数据)
                */
                .startupOptions(StartupOptions.latest()).deserializer(new MysqlDeserialization())
                .serverTimeZone("GMT+8").build();
    }
}

自定义数据读取解析器

这里解析为一个数据变更对象

package com.itjing.flink;

import com.alibaba.fastjson.JSONObject;
import com.itjing.flink.domain.DataChangeInfo;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

import java.util.List;
import java.util.Objects;
import java.util.Optional;

/**
 * 自定义数据读取解析器 mysql消息读取自定义序列化
 *
 * @author lijing
 * @date 2023-08-19 21:33
 */
public class MysqlDeserialization implements DebeziumDeserializationSchema<DataChangeInfo> {
    
    public static final String TS_MS = "ts_ms";
    
    public static final String BIN_FILE = "file";
    
    public static final String POS = "pos";
    
    public static final String CREATE = "CREATE";
    
    public static final String BEFORE = "before";
    
    public static final String AFTER = "after";
    
    public static final String SOURCE = "source";
    
    public static final String UPDATE = "UPDATE";
    
    /**
     * 反序列化数据,转为变更JSON对象
     */
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<DataChangeInfo> collector) {
        String topic = sourceRecord.topic();
        String[] fields = topic.split("\\.");
        String database = fields[1];
        String tableName = fields[2];
        Struct struct = (Struct) sourceRecord.value();
        final Struct source = struct.getStruct(SOURCE);
        DataChangeInfo dataChangeInfo = new DataChangeInfo();
        dataChangeInfo.setBeforeData(getJsonObject(struct, BEFORE).toJSONString());
        dataChangeInfo.setAfterData(getJsonObject(struct, AFTER).toJSONString());
        // 获取操作类型  CREATE UPDATE DELETE
        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        String type = operation.toString().toUpperCase();
        int eventType = type.equals(CREATE) ? 1 : UPDATE.equals(type) ? 2 : 3;
        dataChangeInfo.setEventType(eventType);
        dataChangeInfo.setFileName(Optional.ofNullable(source.get(BIN_FILE)).map(Object::toString).orElse(""));
        dataChangeInfo.setFilePos(
                Optional.ofNullable(source.get(POS)).map(x -> Integer.parseInt(x.toString())).orElse(0));
        dataChangeInfo.setDatabase(database);
        dataChangeInfo.setTableName(tableName);
        dataChangeInfo.setChangeTime(Optional.ofNullable(struct.get(TS_MS)).map(x -> Long.parseLong(x.toString()))
                .orElseGet(System::currentTimeMillis));
        // 输出数据
        collector.collect(dataChangeInfo);
    }
    
    /**
     * 从原始数据获取出变更之前或之后的数据
     */
    private JSONObject getJsonObject(Struct value, String fieldElement) {
        Struct element = value.getStruct(fieldElement);
        JSONObject jsonObject = new JSONObject();
        if (Objects.nonNull(element)) {
            Schema afterSchema = element.schema();
            List<Field> fieldList = afterSchema.fields();
            for (Field field : fieldList) {
                Object afterValue = element.get(field);
                jsonObject.put(field.name(), afterValue);
            }
        }
        return jsonObject;
    }
    
    @Override
    public TypeInformation<DataChangeInfo> getProducedType() {
        return TypeInformation.of(DataChangeInfo.class);
    }
    
}

变更对象

package com.itjing.flink.domain;

import lombok.Data;

/**
 * 数据变更对象
 *
 * @author lijing
 * @date 2023-08-19 21:29
 */
@Data
public class DataChangeInfo {
    
    /**
     * 变更前数据
     */
    private String beforeData;
    
    /**
     * 变更后数据
     */
    private String afterData;
    
    /**
     * 变更类型 1新增 2修改 3删除
     */
    private Integer eventType;
    
    /**
     * binlog文件名
     */
    private String fileName;
    
    /**
     * binlog当前读取点位
     */
    private Integer filePos;
    
    /**
     * 数据库名
     */
    private String database;
    
    /**
     * 表名
     */
    private String tableName;
    
    /**
     * 变更时间
     */
    private Long changeTime;
    
}

自定义sink交由spring管理

package com.itjing.flink.sink;

import com.itjing.flink.domain.DataChangeInfo;
import lombok.extern.log4j.Log4j2;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.springframework.stereotype.Component;

/**
 * Flink Sink是Apache Flink中的一个组件,用于将数据从Flink流式处理程序发送到外部系统。
 * 它充当了数据的最终目的地,可以将数据写入各种不同的存储系统,如数据库、消息队列、文件系统等。Sink的作用是将处理后的数据输出到外部系统,以便进一步的分析、存储或展示。
 * 在Flink中,你可以选择不同类型的Sink来满足你的需求,例如将数据写入Kafka、Hadoop、Elasticsearch等。
 *
 * @author lijing
 * @date 2023-08-19 21:31
 */
@Component
@Log4j2
public class DataChangeSink implements SinkFunction<DataChangeInfo> {
    
    @Override
    public void invoke(DataChangeInfo value, Context context) {
        log.info("收到变更数据:{}", value);
        // todo 数据处理 因为此sink也是交由了spring管理,您想进行任何操作都非常简单
    }
}

当然,以上仅仅只是整合思路,如果你想使用flink-cdc进行数据同步或日志记录等,结合您自身的需求进行调整,以上内容,大的架子是没问题的。

0

评论区