Flink 数据处理通用代码开发

1 背景介绍

Flink 在七猫大数据发展过程中,一直扮演着重要角色。Flink 作为实时计算引擎,经历了多个发展阶段,实时计算平台也在不断地迭代完善。在七猫内部,主要以 Flink Jar 包任务为主,并逐步引入 Flink Sql ,不断的降低了使用门槛和提高了任务的开发效率;从起初基础的 Flink 任务开发,发展到跨网络、跨云厂商的任务多版本任务开发,满足了业务发展的需求。

七猫内部 Flink 主要是基于 Yarn 资源管理的计算平台,支持了数仓、算法、数据分析、运营、和服务端的业务。实时任务数几百个,Flink Jar 任务占比 80% 左右。使用的 Yarn Vcores 超 10000+ 核,内存 20T +,其中单个任务峰值吞吐在 100 万 QPS,每天吞吐的数据规模超千亿。

2 实时平台建设

2.1 遇到的问题

在 Flink 探索发展的过程中,都会遇到 Flink 使用的一些痛点。总结起来,大概是以下几个方面:

  • 一是技术门槛高、对于不熟悉 Flink 代码的同学来说学习成本大、开发周期长;
  • 二是数据开发迭代效率比较低,重复逻辑反复的开发缺少复用;
  • 三是Jar 任务维护成本高,一些代码逻辑的改动会涉及到重新打包、上传,上线等动作;
  • 四是测试运维难,测试需要读 Java 代码才能理解确认业务逻辑,同时对任务维护同学来说,需要看代码才能进行问题的诊断。

上面的几个问题,可能是普遍的问题,所以各家公司都会基于内部自建或者开源项目二次开发,来满足自身任务开发管理需求。对于七猫,除了上述问题,还存在跨云厂商中遇到的问题需要解决,因为跨云厂商,环境和依赖版本都不一致等,开发者在编写 Flink 任务进行实时数据处理时,都需要处理依赖版本问题。

2.2 解决方案

针对以上问题,我们借鉴其他公司的成功实践,结合当前人力现状和业务优先原则,探索出短期和长期方案:

  • 短期方案:使用通用配置解决简单通用数据处理的需求,比如数据同步需求,使用 SQL 解决更复杂计算场景,比如数据聚合计算和关联计算场景,用户编辑完 SQL 或者配置文件后,便可提交任务执行。
  • 长期方案:使用任务管理系统进行代码的编辑、任务的提交和后期的运维。

使用 Flink SQL 开发可以借助开源的 Flink Sql-Client 实现,这里主要介绍使用配置进行作业生成的方案方案。该方案支持的 Source 有 Filesystem 和 Kafka,Sink Print、Kafka、Starrocks、Hive、Filesystem,以及 数据处理器 Processor 和数据过滤器 Filter 等。

2.3 优势

相较于直接使用 DataStream API,配置化的方式有以下优势:

  • 降低实时数据开发门槛,通过标准化的开发,实现低代码配置化数据加工;
  • 提升开发效率,提高复用性,一站式加工;
  • 提升代码管理效率,使用配置方案能保证 一套代码+多个配置文件= 多个实时任务,而普通方式则需要多套代码才能生成多个实时任务。

相较于直接使用Flink SQL,配置化的方式也有以下几种优势:

  • 一是sql 易会难精,使用方便,但是后期想基于 sql 进行定制化的开发门槛较高;
  • 二是配置化的方式更易集成,后期可通过拖拉拽的方式直接进行任务的生成;
  • 三是开源的 Flink SQL 本身有一定局限,比如全局唯一状态和并行度,这个问题在配置化的方式里很容易解决。

2.4 使用方法及原理

2.4.1 数据源配置

2.4.1.1 Kafka 数据源配置

该配置指定了 kafka source 信息,包括 kafka_source_server 和 kafka_source_topic 以及 kafka_group,启动模式配置,支持 timestamp、latest 、earliest 等常用启动模式。

# kafka source 配置
kafka_source_topic = xxxxxx
kafka_source_server = xxxxxx
kafka_group = dwd_adx_sdk_log_2_starrocks
kafka_partition_discovery_ms = 10000
start_mode=earliest

2.4.1.2 Json 解析

source 节点接入的数据是字符串类型的 Json,需要序列化成对象后才能进一步的处理和过滤,目前有两种序列化 Json 的方法,通过 bean 和通过 map,通过 bean 的方式在解析不同的数据源时,需要更改代码,不具有通用性,所以本文使用 map 进行序列化。

Map<String,Object> sourceMap = null;
try {
	sourceMap = gson.fromJson(s,Map.class);
} catch (Exception exception) {
	logger.error("->->-> Json 解析异常:" + s + exception);
}

2.4.2 数据处理器配置

通过反射的方式调用用户自定义的 Processor 的 process 方法,用户在 process 方法中可以进行字段生成和类型转换等操作,用户可以在一个 process 方法中定义多个处理逻辑,也可以实现多个 Processor,配置的时候逗号分隔。

# 数据处理函数,多个 processors 使用逗号分隔
register_processors = com.qm.dsp.bean.MinPointProcessor

for(Object processor : registerProcessors.split(",")){
    Class processorClass = Class.forName(processor.toString());
    Method processorMethod = processorClass.getMethod("process",Map.class);
    processorMethod.invoke(processorClass.newInstance(),sourceMap);
}

public interface Processor {
    void process(Map<String,Object> map);
}

public class MinPointProcessor implements Processor{
    @Override
    public void process(Map<String, Object> map) {
        ......
    }
}

2.4.3 数据过滤器配置

通过反射的方式调用用户自定义的 Filter 的 filter 方法,用户在 filter 方法中定义哪些数据不会被下发,用户可以在一个 filter 方法中定义多个排除逻辑,也可以实现多个 Filter,多个 Filter 之间是或关系,即只要触发一个 Filter,这条数据就会被排除。

# 数据过滤函数,多个 filters 使用逗号分隔
register_filters = com.qm.dsp.bean.AdEventFilter
for(String filter: registerFilters.split(",")){
    Class filterClass = Class.forName(filter);
    Method filterMethod = filterClass.getMethod("filter",Map.class);
    if(!(Boolean) filterMethod.invoke(filterClass.newInstance(),map)){
        return false;
    }
}
public interface Filter {
    public boolean filter(Map<String,Object> map);
}

public class AdEventFilter implements Filter{
    @Override
    public boolean filter(Map<String, Object> map) {
        if("adclick".equals(map.get("event_type"))){
            return true;
        }
        return false;
    }
}

2.4.4 数据存储配置

支持多种数据存储方案,包括 StarRocks、kafka、hive 和 OBS 、OSS、 HDFS 等文件系统。

2.4.4.1 Starrocks Sink 配置

Flink 写 Starrocks 使用 StarRocksSink 类实现,StarRocksSink 需要用户传入 TableSchema、StarRocksSinkOptions 和 StarRocksSinkRowBuilder,这些入参分别下面文件的配置生成。

sink_connector = starrocks
# starrocks sink 配置
# starrocks sink 连接信息配置
sink_jdbc_url = jdbc:mysql://xxxxxx:9030?characterEncoding=utf-8&useSSL=false
sink_load_url = xxxxxx:8030
sink_user_name = xxxxxx
sink_password = xxxxxx
sink_db_name = dwd
sink_table_name = xxxxxx
sink_thread = 10
batch_size = 1000000
flush_interval = 300000
# starrocks 表字段定义
sink_ddl_schema_config = {"columnList":[{"name":"col1","type":"BIGINT"},{"name":"col2","type":"VARCHAR","len":150}]}
# map 中要插入 starrocks 的 key。
sink_column_list = col1,col2

StarRocksSinkOptions 为表的连接信息和 StreamLoad 相关的配置信息, 表连接信息包括jdbc-url、username、password、database-name、table-name,StreamLoad 相关的配置包含 load-url、sink.properties.column_separator、sink.properties.row_delimiter、sink.buffer-flush.max-rows、sink.buffer-flush.interval-ms、sink.buffer-flush.max-bytes 和 sink.io.thread-count。

StarRocksSink.sink(
	// the table structure
	generateTableSchema(sinkDDLSchemaConfig,gson),
	// the sink options 
	StarRocksSinkOptions.builder()
		.withProperty("jdbc-url", sinkJdbcUrl)
		.withProperty("username", sinkUserName)
		.withProperty("password", sinkPassword)
		.withProperty("database-name", sinkDbName)
		.withProperty("table-name", sinkTableName)
		.withProperty("load-url", sinkLoadUrl)
		.withProperty("sink.properties.column_separator", "\\x01")
		.withProperty("sink.properties.row_delimiter", "\\x02")
		.withProperty("sink.buffer-flush.max-rows", batchSize)
		.withProperty("sink.buffer-flush.interval-ms", flushInterval)
		.withProperty("sink.buffer-flush.max-bytes", "943718400")
		.withProperty("sink.io.thread-count", sinkThread)
		.build(),
	new AdStarRocksSinkRowBuilder(sinkColumnList)
)

TableSchema 为 StarRocks 中的字段定义,包括字段名称的定义和字段类型的定义,根据用户配置的 sinkDDLSchemaConfig 生成,字段类型目前仅支持 VARCHAR 和 BIGINT ,需要用到其他类型时在下面添加即可。

public static TableSchema generateTableSchema(String sinkDDLSchemaConfig,Gson gson){
    TableSchema.Builder builder = TableSchema.builder();
    SchemaConfig schemaConfig = gson.fromJson(sinkDDLSchemaConfig, SchemaConfig.class);
    for(Column column:schemaConfig.getColumnList()){
        if("VARCHAR".equals(column.getType().toUpperCase().trim())){
            builder.field(column.getName(),DataTypes.VARCHAR(column.getLen()));
        }else if("BIGINT".equals(column.getType().toUpperCase().trim())){
            builder.field(column.getName(),DataTypes.BIGINT());
        }else {
            throw new IllegalArgumentException("Unsupport DataType:"+column.getType());
        }
    }
    return builder.build();
}

StarRocksSinkRowBuilder 定义了流元素对象和表字段的一一对应关系,根据用户配置的 sinkColumnList ,从 Map 取出数据,写入 StarRocks 表中,顺序与 TableSchema 的字段一一对应。

public class AdStarRocksSinkRowBuilder implements StarRocksSinkRowBuilder<Map> {
    private String sinkColumnList;

    public AdStarRocksSinkRowBuilder(String sinkColumnList) {
        this.sinkColumnList = sinkColumnList;
    }

    @Override
    public void accept(Object[] objects, Map map) {
        int i = 0;
        for (String column : sinkColumnList.split(",")){
            objects[i++] = map.get(column);
        }
    }
}

2.4.4.2 分布式文件系统 Sink

Flink 写分布式文件系统使用 StreamingFileSink 类实现,配置StreamingFileSink 需要用户传入 BulkFormatBuilder ,BulkFormatBuilder 内含 basePath、bucketAssigner 和 writerFactory 等对象,basePath 定义了文件写入的根路径,bucketAssigner 定义了分区路径的生成方式,writerFactory 定义了写入的文件的工厂类,不同的文件格式有不同的 writerFactory 实现,由于 Hive 表大多数格式是 Parquet ,此处传入 Parquet 的工具类 ParquetWriterFactory,该工具类初始化时需要指定 Schema 信息,用户指定 Schema 文件的路径,使用改路径初始化 Schema。

# sink 数据存储类型,目前支持 kafka、obs和starrocks
sink_connector = obs
# 存储路径配置
sink_obs_path = obs://xxxxxx
# 文件 schema 配置
sink_obs_schema_path = /path/ParquetPojo.avsc
final StreamingFileSink<GenericRecord> obsSink = StreamingFileSink
        .forBulkFormat(new Path(sinkObsPath), ParquetAvroWriters.forGenericRecord(schema))
        .withBucketAssigner(new EventTimeDataHourBucketAssigner<>())
        .withBucketCheckInterval(1000L)
        .build();
public String getBucketId(IN element, BucketAssigner.Context context) {
	if (this.eventTimeFormatter == null) {
		this.eventTimeFormatter = DateTimeFormatter.ofPattern(this.formatString).withZone(this.zoneId);
	}
	GenericData.Record record = (GenericData.Record)element;
	Long ts = (Long) record.get("ts");
	return "dt="+this.eventTimeFormatter.format(Instant.ofEpochMilli(ts)).replace(":","/hour=");
}

2.4.4.3 Hive Sink 配置


Flink 写 Hive 是通过 Table API 实现的,写入前需要使用 hive-site.xml 文件所在路径初始化 HiveCatalog,然后再基于 DataStream 创建临时视图,该 DataStream 中的流元素需要为结构化的 Row,所以需要使用 MapFunction 将 Map 元素转成 Row 元素,另外 Row 对象在使用时需要定义字段名称和类型,使用 sink_ddl_schema_config 参数生成, 最后调用executeSql 方法查询临时视图写 Hive。

# hive sink 配置
sink_connector = hive
sink_hive_db = ods
sink_hive_view_name = test
hive_config_dir = /home/httpd/bigdata-dsp-platform
sink_hive_insert_sql = insert into  ods_cc_dsp_log_realtime   select col1,cast(col2 AS INT) as col2 test where ts is not null  
sink_ddl_schema_config = {"columnList":[{"name":"col1","type":"VARCHAR"},{"name":"col2","type":"INT"}]}
sink_column_list = col1,col2
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
tEnv.registerCatalog("hive", hive);
tEnv.useCatalog("hive");
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tEnv.useDatabase(sinkHiveDb);
DataStream<Row> rowDateStream = outputDataStream.map(new RichMapFunction<Map, Row>() {
	private String[] sinkColumnArr;
	@Override
	public void open(Configuration parameters){
		sinkColumnArr = sinkColumnList.split(",");
	}
	@Override
	public Row map(Map map) throws Exception {
		Row row = new Row(sinkColumnArr.length);
		for(int i = 0; i < sinkColumnArr.length; i++){
			row.setField(i,map.get(sinkColumnArr[i]));
		}
		return row;
	}
},getRowTypeInfo(sinkDDLSchemaConfig,gson));
tEnv.createTemporaryView(sinkHiveViewName, rowDateStream);
tEnv.executeSql(sinkHiveInsertSql);
return;

2.4.4.4 Kafka Sink 配置


Flink 写 Hive 是通过 FlinkKafkaProducer 类实现的,该类最少只需要配置两个参数,主题名(Kafka topic) 和所在的机器 (bootstrap.servers),当 processor 和 filter 为空时,直接将 source 落地写 Kafka;否则,需要将 source 序列化,进行处理过滤之后再反序列化再写 Kafka。

# sink 数据存储类型,目前支持 kafka、obs和starrocks
sink_connector = kafka
# kafka sink 配置
kafka_sink_topic = xxxxxx
kafka_sink_server = xxxxxx

2.4.4.5 SQL Sink 配置

以上几种数据同步方式只能适用于简单的端到端数据传输, 对于更加复杂的数据处理,处理数据聚合,采用自定义  SQL 的方式实现。用户只需要传入表定义语句(SQL_DDL)和插入语句即可(SQL_DML)。该方法和 Flink Sql-Client 提交都效果相同,用户可根据需要自行选择。

String viewName = parameters.get(SQL_VIEW_NAME);
tEnv.createTemporaryView(viewName,sourceOutputStream,$("message"));      
tEnv.executeSql(parameters.get(SQL_DDL));
tEnv.executeSql(parameters.get(SQL_DML));

2.4.5 示例任务配置

最后,我们使用一个简单的实例将以上配置穿起来,并且使用 Flink 提供的脚本进行任务的提交。注意,由于通用处理逻辑都被封装在 FlinkToStartRocksRealtimeCommonJob 类中,所以对于 Flink 内核而言,对于普通 Jar 包任务并无区别。

# kafka source 配置
kafka_source_topic = xxxxxx
kafka_source_server = xxxxxx
kafka_group = ods_cc_qm_ad_log_2_kafka
#启动模式配置,支持 timestamp、latest 、earliest ,若为 timestamp,需要配置 start_up_ms 作为启动时间,若未配置则取当前系统为启动时间。
start_mode = timestamp
start_up_ms= 1678549200000
kafka_partition_discovery_ms = 10000
# 节点并行度配置
source_parallelism = 300
map_parallelism = 300
filter_parallelism = 300
sink_parallelism = 300
# sink 节点配置
# sink 数据存储类型,目前支持 kafka、obs和starrocks
sink_connector = kafka
# kafka sink 配置
kafka_sink_topic = xxxxxx
kafka_sink_server = xxxxxx
# 注册过滤器配置
register_filters = com.qm.dsp.bean.AdEventFilter
# checkpoint 数据存储地址配置
checkpoint_path = obs://xxxxxx

3. 不足与展望

3.1 不足

  • 一是任务管理功能缺失,其中多租户、历史版本回溯、开发版本和线上版本管理、UDF 管理、血缘管理是实时平台管理的重要内容;
  • 二是 Flink 引擎本身管理,主要涉及到多 Flink 版本管理,任务参数配置、常用 Connector 的二次开发、多资源环境管理等问题;
  • 三是配置方案支持的功能远没有 SQL 全面,所以碰到复杂的数据处理需要还是使用业界常用的 SQL + UDF 方式开发。

3.2 展望

正是因为配置化的方案具备易用、易学和易管理等特征,Flink SQL 作为业界通用的数据处理方案,支持丰富的实时数据处理,所以后期会考虑会考虑开发一套基于页面的开发工具,同时支持配置和 SQL 编辑提交等功能,进行实时数据处理。