StarRocks在七猫的应用(二)

一、为什么七猫要引入StarRocks

1. 背景

之前,我们会采用多种架构来满足业务需要,比如数据报表结果存储在Clickhouse、OLAP查询使用Trino、业务数据的实时同步采用Hudi等。随着业务的迅速发展,已经越来越难满足业务的需要,主要有以下几点原因:

  • 涉及到的组件很多,开发、运维人员需要去熟悉精通多个组件,成本较高。
  • Clickhouse不支持标准SQL,需要时刻注意SQL语法的问题。
  • Clickhouse的并发能力差,Join的性能也不理想。
  • Trino的查询响应时长不稳定。
  • Hudi在采用Merge-On-Read存储格式时,查询性能不理想。
    为了解决这些问题,我们调研了很多款技术组件,结合我们目前的使用场景,综合考虑,我们选择了StarRocks。

2. StarRocks能够支持的场景

StarRocks能够很好的支持我们目前业务中涉及到的各个场景,主要如下:

  • 实时分析
  • OLAP分析(内外表均可)
  • 数仓结果表查询(内外表均可)
  • 多维查询(内外表均可)
  • 业务库实时同步

当然在满足业务的同时,对数据开发、分析等业务人员比较友好的是:标准的SQL语法、兼容Mysql协议与函数

二、StarRocks 在七猫的使用场景

  1. 实时同步业务库数据:FlinkCDC监听数据库Binlog来实时同步更新至StarRocks,然后进行实时分析。
  2. Spark数据导入:针对数仓分析结果表数据(千万以内)同步至StarRocks而开发的同步工具。
  3. FlinkSQL数据导入:针对数仓中的任意数据集(明细表、结果表、主题表等)、Kafka数据等任意Flink能够支持的数据导入。
  4. 经营指标分析:一个非常综合的使用场景,包括:业务库数据实时同步实时分析、表存储模型的选型、多维数据分析、数据的导入。
  5. 互动域查询提速:借助于StarRocks强大的查询性能,采用合适的存储引擎,对近30天内的海量明细数据查询加速。

三、StarRocks 的技术点

在使用StarRocks时,表数据模型的选择是非常重要的,选择不合适的模型,有可能造成:查询性能不满足预期、数据维护不方便、业务体验差等。在此,列出了各个模型以及推荐的使用场景,来提供参考。

1. 明细模型

适用场景:

  • 分析原始数据,例如原始日志、原始操作记录等。
  • 查询方式灵活,不需要局限于预聚合的分析方式。
  • 导入日志数据或者时序数据,主要特点是旧数据不会更新,只会追加新的数据。

2. 聚合模型

适用于分析统计和汇总数据。比如:

  • 通过分析网站或 APP 的访问流量,统计用户的访问总时长、访问总次数。
  • 广告厂商为广告主提供的广告点击总量、展示总量、消费统计等。
  • 通过分析电商的全年交易数据,获得指定季度或者月份中,各类消费人群的爆款商品。

在这些场景中,数据查询和导入,具有以下特点:

  • 多为汇总类查询,比如 SUM、COUNT、MAX 等类型的查询。
  • 不需要查询原始的明细数据。
  • 旧数据更新不频繁,只会追加新的数据。

3. 更新模型

适用场景:实时和频繁更新的业务场景,例如分析电商订单。在电商场景中,订单的状态经常会发生变化,每天的订单更新量可突破上亿。

4. 主键模型

主键模型适用于实时和频繁更新的场景,例如:

  • 实时对接事务型数据至 StarRocks。事务型数据库中,除了插入数据外,一般还会涉及较多更新和删除数据的操作,因此事务型数据库的数据同步至 StarRocks 时,建议使用主键模型。实时同步增删改的数据至主键模型,可以简化数据同步流程,并且相对于 Merge-On-Read 策略的更新模型,查询性能能够提升 3~10 倍。通过 Flink-CDC 等工具直接对接 TP 的 Binlog
  • 利用部分列更新轻松实现多流 JOIN。在用户画像等分析场景中,一般会采用大宽表方式来提升多维分析的性能,同时简化数据分析师的使用模型。而这种场景中的上游数据,往往可能来自于多个不同业务(比如来自购物消费业务、快递业务、银行业务等)或系统(比如计算用户不同标签属性的机器学习系统),主键模型的部分列更新功能就很好地满足这种需求,不同业务直接各自按需更新与业务相关的列即可,并且继续享受主键模型的实时同步增删改数据及高效的查询性能。

数据域的表多为dwt层表,都是已经经过清洗聚合加工处理过的数据,且包含主键,主要提供给业务方、报表查询加速使用。因此除了聚合模型外,剩余三种模型都可以使用,但在重新导入(overwrite)数据时,需要注意几点:

  • 明细模型:在导入前,需要先删除分区数据,否则数据会重复。
  • 更新模型:重新导入,虽然可以不用先删除分区,但是相同主键下会存在多个版本数据,查询时需要在线聚合多版本(Merge-On-Read),性能会受到影响,但是写入的吞吐量高。
  • 主键模型:重新导入,采用了 Delete+Insert 的策略,保证同一个主键下仅存在一条记录,这样就完全避免了 Merge 操作,但是写入吞吐较更新模型低。

四、最佳实践

1. 最佳实践-业务数据库实时同步

通过FlinkCDC工具监控数据库Binlog实时同步至Starrcosk,选用主键模型创建数据表。

  • StarRocks
    -- 实际广告收入表
    CREATE TABLE edw_ods.ods_partner_adv_base_profit (
     app string comment '产品标识',
     id int,
     ......
     updated_at int
    ) ENGINE=OLAP
    PRIMARY KEY (app,id)
    COMMENT '广告收益'
    DISTRIBUTED BY HASH(id) BUCKETS 5
    PROPERTIES (
     "replication_num" = "3",
       "in_memory" = "false",
       "storage_format" = "DEFAULT",
       "enable_persistent_index" = "true"
    ) ;
  • FlinkSQL
    CREATE TABLE source_table_qm (
      id int,
      ......
      updated_at int,
      PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql-cdc',
      'hostname' = 'xxx',
      'port' = '3306',
      'username' = 'xxx',
      'password' = 'xxx',
      'connect.max-retries' = '10',
      'scan.incremental.snapshot.enabled' = 'true',
      'database-name' = 'xxx',
      'table-name' = 'xxx'
    );

    CREATE TABLE source_table_xm (
      id int,
      ......
      updated_at int,
      PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql-cdc',
      'hostname' = 'xxx',
      'port' = '3306',
      'username' = 'xxx',
      'password' = 'xxx',
      'connect.max-retries' = '10',
      'scan.incremental.snapshot.enabled' = 'true',
      'database-name' = 'xxx',
      'table-name' = 'xxx'
    );

    CREATE TABLE sink_table (
      app string,
      id int,
      ......
      updated_at int,
      PRIMARY KEY (app, id) NOT ENFORCED
    )
    WITH (
     'connector' = 'starrocks',
     'jdbc-url'='jdbc:mysql://xxx1:9030,xxx2:9030,xxx3:9030?characterEncoding=utf-8&useSSL=false',
     'load-url'='xxx1:18030;xxx2:18030;xxx3:18030',
     'database-name' = 'edw_ods',
     'table-name' = 'ods_partner_adv_base_profit',
     'username' = 'xxx',
     'password' = 'xxx',
     'sink.properties.column_separator' = '\\x01',
     'sink.properties.row_delimiter' = '\\x02',
     'sink.max-retries' = '3',
     'sink.semantic' = 'exactly-once'
    );

    INSERT INTO sink_table
    SELECT
      cast('reader_free_all' as string) as app,
      id,
      ......
      updated_at
    FROM source_table_qm
    union all
    SELECT
      cast('reader_fast_panda_all' as string) as app,
      id,
      ......
      updated_at
    FROM source_table_xm;

2. 最佳实践-Spark数据导入

在Spark中写入StarRocks的方式有多种,包括使用JDBC连接、使用Hadoop输入/输出格式、使用Spark Connector等。每种方式都有其优缺点,不同的应用场景可能需要选择不同的方式。以下是各种方式的性能特点:

  • JDBC连接方式

使用JDBC连接的方式可以方便地将Spark中的数据写入StarRocks,但由于JDBC连接需要进行网络传输和数据转换,因此相对于其他方式,性能较低。如果数据量较小或对性能要求不高,可以选择使用JDBC连接的方式。

  • Hadoop输入/输出格式方式

使用Hadoop输入/输出格式的方式可以直接使用StarRocks的Load命令将数据写入数据库中,因此性能较高。但需要注意的是,该方式需要在代码中手动设置输出路径,且需要保证输出路径在Hadoop集群中是可用的。如果数据量较大或对性能要求较高,可以选择使用Hadoop输入/输出格式的方式。

  • Spark Connector方式

使用Spark Connector的方式可以在Spark中直接操作StarRocks表,性能较高且使用方便。但需要注意的是,该方式需要在代码中设置StarRocks相关的配置信息,并且需要使用特定版本的Spark Connector与StarRocks版本兼容。如果需要进行复杂的数据操作或对性能要求很高,可以选择使用Spark Connector的方式。

如果对性能要求较高,可以考虑使用Hadoop输入/输出格式的方式;如果对性能要求不高,可以选择使用JDBC连接的方式;如果需要更方便地操作StarRocks表,可以选择使用Spark Connector的方式。

早期,我们使用了JDBC的方式写入,但是导入性能不理想,后来自己实现通过Spark+Stream Load的导入方式,它适合导入数据量较少的场景下使用(千万级别以下),一般用于分析结果数据的导入,速度很快。但是当前版本无法控制批次等缓冲策略,导入超大数据量会导致BE节点瞬间打满,会导入失败。
未来,我们会尝试Hadoop输入/输出格式方式的方式来提高导入性能和稳定性。

3. 最佳实践-FlinkSQL数据导入

FlinkSQL写StarRocks一般我们会采用JDBC和Flink Connector来写入,Flink Connector的底层原理也是使用Stream Load来进行写入,需要注意分隔符的问题。JDBC的写入性能较差,一般我们不去采用,两种写入方式的Sink Table语句如下:

  • JDBC方式
    CREATE TABLE starrocks_table (
     id INT,
     name VARCHAR(20),
     age INT
    ) WITH (
     'connector' = 'jdbc',
     'url' = 'jdbc:mysql://<host>:<port>/<database>',
     'table-name' = '<table>',
     'username' = '<username>',
     'password' = '<password>',
     'sink.buffer-flush.max-rows' = '5000' -- 可选,控制批量写入的行数
    );
  • Flink Connector方式
    CREATE TABLE starrocks_table (
     id INT,
     name VARCHAR(20),
     age INT
    ) WITH (
     'connector' = 'starrocks',
     'jdbc-url'='jdbc:mysql://<host>:9030?characterEncoding=utf-8&useSSL=false',
     'load-url'='<host>:18030',
     'database-name' = '<database>',
     'table-name' = '<table>',
     'username' = '<username>',
     'password' = '<password>',
     'sink.properties.column_separator' = '\\x01',
     'sink.properties.row_delimiter' = '\\x02',
     'sink.max-retries' = '3',
     'sink.semantic' = 'exactly-once'
    );

4. 最佳实践-经营指标分析

在经营分析中大量使用StarRock,主要使用了功能如下:

  • FlinkCDC同步内容、原创业务库数据:通过监听Mysql的Binlog来实时同步业务库数据至Starrocks,具体实现方式上面已介绍
  • Spark导入结果数据:大盘、总览数据由于纬度已经固定,由数仓平台计算完成后,通过自研的导入工具导入数据,具体实现方式上面已介绍
  • 视图View:由于经营分析涉及到的结果表多达二三十张,页面加载时会同时创建很多连接,并且会查询几十次,性能十分低下,通过View将多张表的结果union后,后台只用查询一个View就可以同时查询出几张至十几张表的结果,实际性能提升十分明显。
  • 交互式即席查询OLAP:将在数仓中预处理好的中间明细结果通过Spark导入,再根据页面自定义维度来实时即席查询,性能十分高效。
  • 使用了主键/更新模型表:在导入数据时,不用删除对应分区的数据,直接insert就可以实现更新对应的数据,优化了业务人员的使用体验,尤其在回溯历史数据的时候,效果很好。

5. 最佳实践-互动域查询提速

将数仓中的数据有针对性的处理后导入StarRocks中,再借助StarRocks的强大性能来对查询进行加速。目前我们对外提供的查询引擎为Trino,Trino本身不存储数据,数据源为数仓Hive中的数据(外表),业务人员经常反馈查询慢,因此进行了提速专项工作。

  • 主要流程:收集常用埋点过滤条件以及常用字段,在StarRocks中创建明细模型表(日志数据),通过FlinkSQL同步至StarRocks表中。
  • StarRocks建表语句:日志数据通常都比较大,考虑到存储成本,因此我们只保留近30天的数据,自动删除过期分区数据,自动创建新分区。

CREATE TABLE edw_dwd.dwd_interact_qm_aggs_log (
   `dt、`     date comment '分区字段',
   `project` string comment '项目名称',
   `event_id` string comment '事件id',
    ......
`count` int
)
ENGINE=OLAP
DUPLICATE KEY(dt,project,event_id)
PARTITION BY RANGE (dt) (
   START ("2023-01-01") END ("2025-12-31") EVERY (INTERVAL 1 day)
)
DISTRIBUTED BY HASH(source_uid) BUCKETS 320
PROPERTIES (
"replication_num" = "2",
 "in_memory" = "false",
 "storage_format" = "DEFAULT",
 "enable_persistent_index" = "true",
 "compression" = "ZSTD",
 "dynamic_partition.enable" = "true", -- 动态分区
 "dynamic_partition.time_unit" = "DAY",
 "dynamic_partition.start" = "-30", -- 自动保留近30天分区,删除30天前的分区
 "dynamic_partition.end" = "3",-- 自动创建后3天分区
 "dynamic_partition.prefix" = "p", -- 分区前缀
 "dynamic_partition.buckets" = "320" -- 自动创建的分区buckets
);

相同查询相同数据量在Trino、StarRocks外表、StarRocks内表的查询如下图所示(StarRocks机器配置略低):

  • Trino
  • StarRocks外表
  • StarRocks内表

结论:

  • StarRocks经过互动域埋点ETL导入内表后,所有查询均在毫秒级完成,符合预期。
  • 通过外表查询Hive场景下,StarRocks单dt查询性能相较Trino有一倍提升,符合预期。
  • 通过外表查询Hive场景下,StarRocks多dt查询性能相较Trino需要sql计算复杂度,由于Trino节点数、机器配置更高,在聚合维度很细的场景下,查询更快,Starrocks容易达到资源、IO瓶颈、会出现查询内存超过worker节点的错误。
    提速效果得到了互动域小组的高度认可

五、踩坑点

1. 删除分区数据

-- 当我们进行删除的时候,如果没有添加force,数据不会被删除,数据会被放置到BE节点存储目录下的trash目录,一天后会被删除。在这个过程中,我们可以进行数据的恢复。如果需要立刻删除数据则必须添加force 关键字。
-- 查看PartitionName
show partitions from edw_dwd.dwd_interact_qm_aggs_log;
-- 对于动态分区表需要先设置"dynamic_partition.enable" = "false",删除完分区后再设置回来
ALTER TABLE edw_dwd.dwd_interact_qm_aggs_log SET ("dynamic_partition.enable" = "false");
 alter table edw_dwd.dwd_interact_qm_aggs_log drop partition p20230410 force;
ALTER TABLE edw_dwd.dwd_interact_qm_aggs_log SET ("dynamic_partition.enable" = "true");
-- 对于非动态分区表可直接删除

2. Delete数据

-- delete只支持删除少量数据,比如几千条这样,删除大数据量,会导致be节点hang住
delete from edw_dwd.dwd_interact_qm_aggs_log where dt = '2023-04-11' ;

3. 删除表

-- 当我们进行删除的时候,如果没有添加force,数据不会被删除,数据会被放置到BE节点存储目录下的trash目录,一天后会被删除。在这个过程中,我们可以进行数据的恢复。如果需要立刻删除数据则必须添加force 关键字。
drop table edw_dwd.dwd_interact_qm_aggs_log force;

4. FlinkSQL导入数据

  • 在FlinkSQL中StarRocks的Sink表必须设置PRIMARY KEY
  • load-url参数中端口为fe-http端口,开源版本默认为8030,阿里云默认为18030
  • 写入时要尤其注意分隔符的问题,一般只要业务库中的字符串中没有乱码、换行符,用默认即可,如果怎么调整都不行,可以使用sql 'sink.properties.format' = 'json','sink.properties.strip_outer_array' = 'true'参数用json的方式导入,但是会略微降低导入性能。

六、展望未来

目前StarRocks在七猫的应用,已经处在比较成熟的阶段,但我们依然在不断进步,不断探索新的技术优化点,未来,我们将会去探索StarRocks的很多高级功能特性,提高资源利用率、查询性能,并且能够有效控制成本。主要技术点如下:

  • CBO 统计信息

CBO 优化器是查询优化的关键。一条 SQL 查询到达 StarRocks 后,会解析为一条逻辑执行计划,CBO 优化器对逻辑计划进行改写和转换,生成多个物理执行计划。通过估算计划中每个算子的执行代价(CPU、内存、网络、I/O 等资源消耗),选择代价最低的一条查询路径作为最终的物理查询计划。

  • 异步物化视图

异步物化视图支持多表关联以及更加丰富的聚合算子。异步物化视图可以通过手动调用或定时任务的方式刷新,并且支持刷新部分分区,可以大幅降低刷新成本。除此之外,异步物化视图支持多种查询改写场景,实现自动、透明查询加速。

  • Colocate Join

Colocate Join 功能是分布式系统实现 Join 数据分布的策略之一,能够减少数据多节点分布时 Join 操作引起的数据移动和网络传输,从而提高查询性

  • Query Cache

StarRocks 提供的 Query Cache 特性,可以帮助您极大地提升聚合查询的性能。

展示评论