StarRocks 在七猫的应用(三)-实时数据写入与查询性能优化
往期文章
基于 StarRocks 的应用,我们之前已经沉淀了一系列的文章:
本篇文章紧接其后,给大家介绍一下 StarRocks 在实时数据上的应用。
1.为何选择 StarRocks 处理实时数据
在选择 StarRocks 前,我们在开发实时任务时基本只依靠 Flink 内部计算来实现各种场景。但是随着业务的发展,数据繁多、场景越发复杂、大家对数据的准确性要求越来越高,这种开发模式暴露出如下痛点:
- 由于所有计算都放在 Flink 任务中处理,开发成本高、运维复杂。后续调整逻辑也麻烦
- 由于数据源(Kafka、MySQL、Redis等)多样化,业务场景越来越复杂。在进行计算时,只依赖 Flink 的状态来处理数据很难保证数据稳定正确
- 由于Flink 任务中大量使用状态、中间件缓存,任务失败后重启风险大、问题排查和修复困难
- 每次逻辑有调整时,需要重新开发代码再打包上传,扩展性较差并且无法刷新历史数据
针对这些痛点,在探索各种方案时,发现 StarRocks 能够很好的解决这些情况。 我们来看看 StarRocks 的特性:
- Delete & Insert 模式 + 可实时更新的列式存储引擎 = 高效的实时数据导入,极低延迟(可以达到<1秒)下实时更新数据< li>
- 全新的基于代价的优化器CBO = 支持高性能多表关联查询
- 分布式执行框架 + 向量化执行引擎 = 高效查询性能,保证数据能够实时计算出
- 支持MySQL、Spark SQL、Trino语法 = 开发无门槛,使用方学习成本低
这样我们在处理复杂的实时业务时,可以将多方数据实时同步到 StarRocks 中,后续逻辑处理和查询均依赖 StarRocks ,这样的好处是:
- 简化了开发难度,提高了开发效率:我们只需开发实时数据同步任务,业务所需要的各类数据通过开发 StarRocks SQL 实时查询即可
- 数据准确性提高:数据均存放在 StarRocks 中,以往使用状态或者中间件缓解处理各场景无需再考虑,只需要保证数据准确导入 StarRocks
- 任务运维简单:问题排查只需在 StarRocks 进行数据查询即可,操作简单
- 任务扩展方便:在业务场景有更新时,我们大部分情况下只需调整 StarRocks 的实时数据查询逻辑即可,无需调整 Flink 任务
说了好处,接来下就介绍下 StarRocks 的实时数据写入开发流程。
2.实时数据写 StarRocks 开发流程
2.1.确定 StarRocks 表模型
StarRocks 支持四种数据模型,分别是明细模型 (Duplicate Key Model)、聚合模型 (Aggregate Key Model)、更新模型 (Unique Key Model) 和主键模型 (Primary Key Model)。这四种数据模型能够支持多种数据分析场景,例如日志分析、数据汇总分析、实时分析等。
七猫在实际应用中,根据业务场景确定 StarRocks 表模型总结出了如下方案:
- 对于大规模(亿级别以上)的明细数据,且查询侧重于获取详细的原始记录时,优先选择明细模型。例如导入日志数据或者时序数据,主要特点是旧数据不会更新,只会追加新的数据。
- 对于实时和频繁更新数据,且业务需要同时需要高效查询时,可使用主键模型 或者 明细模型。例如会员、订单这种要求数据一点不能重复且查询要求高的业务就建议使用该模型
- 当业务需求主要是对数据进行聚合计算,如计算阅读时长总和、平均订单金额等。为了提高查询性能和减少数据存储量,采用聚合模型是明智之选。
各类表模型的建表语句样例:
--1.明细模型例子
--注意点:
--在建表语句中,排序键必须定义在其他列之前;
--排序键可以通过 DUPLICATE KEY 显式定义;
--建表时必须使用 DISTRIBUTED BY HASH 子句指定分桶键,否则建表失败。
CREATE TABLE db.table
(
dt date not null comment '日期分区',
user_id varchar(65533) not null comment 'user_id',
event_id varchar(65533) null comment '埋点ID',
event_time varchar(65533) null comment '埋点上报时间',
info varchar(65533) null comment '埋点信息'
) ENGINE=OLAP
DUPLICATE KEY (dt,user_id)
COMMENT '明细模型样例'
PARTITION BY date_trunc('day', dt) --自动创建分区,3.0版本后支持
DISTRIBUTED BY HASH(user_id)
PROPERTIES (
"replication_num" = "3",
"in_memory" = "false",
"storage_format" = "DEFAULT",
"enable_persistent_index" = "true",
"compression" = "ZSTD"
);
--2.主键模型例子
--注意点:
--主键通过 PRIMARY KEY 定义;
--在建表语句中,主键必须定义在其他列之前;
--主键支持以下数据类型:BOOLEAN、TINYINT、SMALLINT、INT、BIGINT、LARGEINT、DATE、DATETIME、VARCHAR/STRING。并且不允许为 NULL;
--分区列和分桶列必须在主键中;
--如果实时更新的数据量大、主键多且存储大(比如主键都是字符串)的情况下,主键模型的索引可能会占用太多内存导致数据写入出问题。这种场景在如果对查询性能要求不是很高的情况下,可使用明细模型。总而言之,主键模型是明细模型的上位,建议优先使用。
CREATE TABLE db.table
(
dt date not null comment '日期分区',
user_id varchar(65533) not null comment 'user_id',
info varchar(65533) null comment '信息'
) ENGINE=OLAP
PRIMARY KEY(dt,user_id)
COMMENT "主键模型样例"
PARTITION BY date_trunc('day', dt) --自动创建分区,3.0版本后支持
DISTRIBUTED BY HASH(dt,user_id)
PROPERTIES (
"replication_num" = "3",
"in_memory" = "false",
"storage_format" = "DEFAULT",
"enable_persistent_index" = "true", --是否持久化主键索引,同时使用磁盘和内存存储主键索引,避免主键索引占用过大内存空间
"compression" = "LZ4"
);
--3.明细模型样例
--注意点:
--主键通过 UNIQUE KEY 定义;
--在建表语句中,主键必须定义在其他列之前;
--建表时,不支持为指标列创建 BITMAP、Bloom Filter 等索引。
CREATE TABLE db.table
(
dt date not null comment '日期分区',
user_id varchar(65533) not null comment 'user_id',
info varchar(65533) null comment '信息'
) ENGINE=OLAP
UNIQUE KEY(dt,user_id)
COMMENT "明细模型样例"
PARTITION BY date_trunc('day', dt) --自动创建分区,3.0版本后支持
DISTRIBUTED BY HASH(dt,user_id)
PROPERTIES (
"replication_num" = "3",
"in_memory" = "false",
"storage_format" = "DEFAULT",
"compression" = "LZ4"
);
--4.聚合模型样例
--注意点:
--在建表语句中,主键必须定义在其他列之前;
--排序键必须满足唯一性约束,必须包含全部维度列,并且列的值不会更新;
--查询时,排序键在多版聚合之前就能进行过滤,而指标列的过滤在多版本聚合之后。因此建议将频繁使用的过滤字段作为排序键,在聚合前就能过滤数据,从而提升查询性能;
--由于数据会经过聚合计算,在出现问题后修复数据较为困难,使用时需考虑如果快速修复数据。
CREATE TABLE db.table(
(
dt date not null comment '日期分区',
user_id varchar(65533) not null comment 'user_id',
max_score bigint(20) SUM NULL COMMENT '最大的score',
sum_socre bigint(20) SUM NULL COMMENT '汇总的score'
) ENGINE=OLAP
AGGREGATE KEY (dt,user_id)
COMMENT "聚合模型样例"
PARTITION BY date_trunc('day', dt) --自动创建分区,3.0版本后支持
DISTRIBUTED BY HASH(dt,user_id) BUCKETS 36
PROPERTIES (
"replication_num" = "3",
"in_memory" = "true",
"storage_format" = "DEFAULT",
"enable_persistent_index" = "true",
);
各类表模型功能对比:
主键表 (Primary Key table) | 明细表 (Duplicate Key table) | 聚合表 (Aggregate table) | 更新表 (Unique Key table) | |
---|---|---|---|---|
导入数据时实现 INSERT | 支持。内部实现时,StarRocks 将 INSERT 和 UPDATE 操作均视为 UPSERT 操作 | 支持 | 支持(同聚合键值的数据行会聚合) | 支持(同唯一键值的数据行会更新) |
导入数据时实现 UPDATE | 支持。内部实现时,StarRocks 将 INSERT 和 UPDATE 操作均视为 UPSERT 操作 | 不支持 | 支持(使用 Replace 聚合函数实现) | 支持(更新表本身就可以视为使用 Replace 聚合函数的聚合表) |
导入数据时实现 DELETE | 支持 | 不支持 | 不支持 | 不支持 |
导入数据列值的完整性 | 默认必须导入全部列值。如果开启部分列更新partial_update,或者列具有默认值,则无需导入全部列值。 | 默认必须导入全部列值。如果列具有默认值,则无需导入全部列值。 | 默认必须导入全部列值。不过,聚合表可以通过指定 Value 列的聚合函数为 REPLACE_IF_NOT_NULL 实现部分列更新,具体使用方式,请参见 aggr_type。并且如果列具有默认值,也无需导入全部列值。 | 默认必须导入全部列值。如果列具有默认值,则无需导入全部列值。 |
DML INSERT | 支持 | 支持 | 支持 | 支持 |
DML UPDATE | Key 列作为过滤条件:支持 Value 列作为过滤条件:支持 | 不支持 | 不支持 | 不支持 |
DML DELETE | Key 列作为过滤条件:支持 Value 列作为过滤条件:支持 | Key 列作为过滤条件:支持 Value 列作为过滤条件:支持。注意,仅支持基于 Key 或 Value 列本身的简单过滤条件,如 =、<、>,不支持复杂条件,如函数、子查询。 | Key 列作为过滤条件:支持。注意,仅支持基于 Key 列本身的简单过滤条件,如 =、<、>,不支持复杂条件,如函数、子查询。 Value 列作为过滤条件:不支持 |
更详细的信息可参考官方文档介绍:StarRocks表设计
我们根据场景选定好表模型后,接下来看一下实时任务脚本开发。
2.2.开发Flink SQL脚本
首先获取 Apache Flink® 连接器。建议是直接下载官方已经编译好的 JAR 文件,省时省力,下载地址:https://repo1.maven.org/maven2/com/starrocks/
下面是 Flink connector 中 Flink 和 StarRocks 版本对应关系,根据自己集群的版本进行选择:
Connector | Flink | StarRocks | Java | Scala |
---|---|---|---|---|
1.2.10 | 1.15,1.16,1.17,1.18,1.19 | 2.1 及以上 | 8 | 2.11,2.12 |
1.2.9 | 1.15,1.16,1.17,1.18 | 2.1 及以上 | 8 | 2.11,2.12 |
1.2.8 | 1.13,1.14,1.15,1.16,1.17 | 2.1 及以上 | 8 | 2.11,2.12 |
1.2.7 | 1.11,1.12,1.13,1.14,1.15 | 2.1 及以上 | 8 | 2.11,2.12 |
接下来以下面例子来介绍实时数据写 StarRocks 的代码开发。
首先创建一个明细模型的表
CREATE TABLE ods.ods_event_inc_d(
(
dt date not null comment '日期分区',
user_id varchar(65533) not null comment 'user_id',
event_id varchar(65533) null comment '埋点ID',
event_time varchar(65533) null comment '埋点上报时间',
info varchar(65533) null comment '埋点信息'
) ENGINE=OLAP
DUPLICATE KEY (dt,user_id)
COMMENT '日志明细数据'
PARTITION BY date_trunc('day', dt) --自动创建分区,3.0版本后支持
DISTRIBUTED BY HASH(user_id)
PROPERTIES (
"replication_num" = "3",
"in_memory" = "false",
"storage_format" = "DEFAULT",
"enable_persistent_index" = "true",
"compression" = "ZSTD"
);
接下来开发 Flink SQL脚本
---------------------------------------------------- Flink 任务级别参数配置 ---------------------------------------------
-- 为所有操作符(如聚合、联接、筛选)设置默认并行性,使其与并行实例一起运行。这个配置比StreamExecutionEnvironment的并行性优先级更高(实际上,这个配置覆盖了StreamExecutionEnvironment的并行性)。值-1表示没有设置默认并行度,那么它将回退到使用StreamExecutionEnvironment的并行度。
SET table.exec.resource.default-parallelism = 10;
-- 设置ck目录
SET state.checkpoints.dir=xxx;
-- 设置yarn上任务名称
SET yarn.application.name=xxx;
-- 操作符链允许非shuffle操作位于同一线程中,完全避免序列化和反序列化(chaining合并,默认true,false便于观察)
SET pipeline.operator-chaining=true;
SET taskmanager.memory.process.size = '3g';
SET execution.checkpointing.interval = 5min;
---------------------------------------------------- Flink Source ---------------------------------------------
--这里以 Kafka 作为 Source 进行举例
CREATE TABLE kafka_source (
dt STRING,
user_id STRING,
event_id STRING,
event_time STRING,
info STRING,
kafka_timestamp TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'event',
'properties.bootstrap.servers' = 'xxx',
'properties.group.id' = 'xxx',
'scan.startup.mode' = 'group-offsets',
'scan.topic-partition-discovery.interval' = '180',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
);
---------------------------------------------------- Flink Sink ---------------------------------------------
--StarRocks 各个表模型均可以用这种方式创建Sink
CREATE TABLE starrocks_sink (
dt STRING,
user_id STRING,
event_id STRING,
event_time STRING,
info STRING,
PRIMARY KEY (dt,user_id) NOT ENFORCED
)
WITH (
'connector' = 'starrocks',
'jdbc-url'='jdbc:mysql://ip:port?characterEncoding=utf-8&useSSL=false',
'load-url'='ip:port',
'database-name' = 'ods',
'table-name' = 'ods_event_inc_d',
'username' = 'xxxx',
'password' = 'xxxx' ,
'sink.buffer-flush.max-rows' = '200000',
'sink.buffer-flush.max-bytes' = '94371840',
'sink.buffer-flush.interval-ms' = '120000',
'sink.properties.column_separator' = '<col_sep>',
'sink.max-retries' = '3'
);
---------------------------------------------------- Flink 业务逻辑 ---------------------------------------------
--这里只写简单的插入操作,根据业务场景调整逻辑
begin statement set;
INSERT INTO sink_table
SELECT dt
, user_id
, event_id
, event_time
, info STRING
FROM starrocks_sink
;
end;
大家可以看到整体代码比较简单,这里着重讲解 StarRocks 连接的各种参数配置和含义。
注意点:下面有备注的参数需重点关注
参数 | 是否必填 | 默认值 | 描述 | 备注 |
---|---|---|---|---|
connector | Yes | NONE | 固定设置为 starrocks。 | |
jdbc-url | Yes | NONE | 用于访问 FE 节点上的 MySQL 服务器。多个地址用英文逗号(,)分隔。格式:jdbc:mysql://<fe_host1>:<fe_query_port1>,<fe_host2>:<fe_query_port2>。 | 建议配置为均衡节点IP。 原因:集群在做操作重启后具体的FE节点IP可能会变化。设置为均衡节点IP(一般不会变化),集群层面操作基本不会影响到任务 |
load-url | Yes | NONE | 用于访问 FE 节点上的 HTTP 服务器。多个地址用英文分号(;)分隔。格式:<fe_host1>:<fe_http_port1>;<fe_host2>:<fe_http_port2>。 | 和jdbc-url保持一致即可 |
database-name | Yes | NONE | StarRocks 数据库名。 | |
table-name | Yes | NONE | StarRocks 表名。 | |
username | Yes | NONE | StarRocks 集群的用户名。使用 Flink connector 导入数据至 StarRocks 需要目标表的 SELECT 和 INSERT 权限。如果您的用户账号没有这些权限,请参考 GRANT 给用户赋权。 | |
password | Yes | NONE | StarRocks 集群的用户密码。 | |
sink.version | No | AUTO | 导入数据的接口。此参数自 Flink connector 1.2.4 开始支持。 V1:使用 Stream Load 接口导入数据。1.2.4 之前的 Flink connector 仅支持此模式。 V2:使用 Stream Load 事务接口导入数据。要求 StarRocks 版本大于等于 2.4。建议选择 V2,因为其降低内存使用,并提供了更稳定的 exactly-once 实现。 AUTO:如果 StarRocks 版本支持 Stream Load 事务接口,将自动选择 V2,否则选择 V1。 | 建议使用 V1 虽然在官方描述中建议选择V2,但是阿里官方同学说V2版本内部对事务的处理有缺陷(具体不清楚),他们建议使用V1 |
sink.buffer-flush.max-bytes | No | 94371840(90M) | 积攒在内存的数据大小,达到该阈值后数据通过 Stream Load 一次性导入 StarRocks。取值范围:[64MB, 10GB]。将此参数设置为较大的值可以提高导入性能,但可能会增加导入延迟。 该参数只在 sink.semantic 为at-least-once才会生效。 sink.semantic 为 exactly-once,则只有 Flink checkpoint 触发时 flush 内存的数据,因此该参数不生效。 | 一般来说使用默认值就OK |
sink.buffer-flush.max-rows | No | 500000 | 积攒在内存的数据条数,达到该阈值后数据通过 Stream Load 一次性导入 StarRocks。取值范围:[64000, 5000000]。该参数只在 sink.version 为 V1,sink.semantic 为 at-least-once 才会生效。 | 根据任务实时性酌情调整,需要实时性高就适当调大 |
sink.buffer-flush.interval-ms | No | 300000 | 数据发送的间隔,用于控制数据写入 StarRocks 的延迟,取值范围:[1000, 3600000]。该参数只在 sink.semantic 为 at-least-once才会生效。 | 根据任务实时性酌情调整,需要实时性高就适当调小 |
sink.max-retries | No | 3 | Stream Load 失败后的重试次数。超过该数量上限,则数据导入任务报错。取值范围:[0, 10]。该参数只在 sink.version 为 V1 才会生效。 | |
sink.connect.timeout-ms | No | 30000 | 与 FE 建立 HTTP 连接的超时时间。取值范围:[100, 60000]。 Flink connector v1.2.9 之前,默认值为 1000。 | |
sink.ignore.update-before | No | TRUE | 此参数自 Flink connector 1.2.8 开始支持。将数据导入到主键模型表时,是否忽略来自 Flink 的 UPDATE_BEFORE 记录。如果将此参数设置为 false,则将该记录在主键模型表中视为DELETE 操作。 | |
sink.parallelism | No | NONE | 写入的并行度。仅适用于Flink SQL。如果未设置, Flink planner 将决定并行度。在多并行度的场景中,用户需要确保数据按正确顺序写入。 | 除非业务场景是需要大量实时数据极短时间内写入StarRocks,可以按照集群资源酌情调整。一般情况默认即可 |
sink.properties.* | No | NONE | Stream Load 的参数,控制 Stream Load 导入行为。例如 参数 sink.properties.format 表示 Stream Load 所导入的数据格式,如 CSV 或者 JSON。全部参数和解释,请参见 STREAM LOAD。 | |
sink.properties.format | No | csv | Stream Load 导入时的数据格式。Flink connector 会将内存的数据转换为对应格式,然后通过 Stream Load 导入至 StarRocks。取值为 CSV 或者 JSON。 | StarRocks 连接器在写入数据时是使用Stream Load方式,数据被写入到内存中的批处理缓存,这时候数据的缓存格式就是该配置项的格式。 CSV格式使用场景:上游数据干净且正确 JSON格式使用场景: 上游数据可能有脏信息 举例: 表一共有三个字段a,b,c 假设来了一条数据a=1,b=2,c=3 当配置为CSV格式时,数据在内存里就1\t2\t3\n StarRocks在分割列和行时就按照配置的\t 和 \n,此时无问题 当配置为JSON格式时,数据在内存里就{a:1,b:2,c:3} StarRocks正常解析数据无问题 假设来了一条数据a=1\n,b=2,c=3 当配置为CSV格式时,数据在内存里就1\n\t2\t3\n StarRocks在分割数据时由于分隔符\n会把这份数据切成两行。第一行是 1,第二行是\t2\t3,此时第一行由于只有一个字段在写入表时就会报列数不匹配错误 当配置为JSON格式时,数据在内存里就{a:1\n,b:2,c:3} StarRocks正常解析数据无问题 这样看下来JSON是最通用安全的,但是相对于CSV来说增加JSON的结构体,在处理上需要更多的资源,所以大家按照业务场景选择使用 |
sink.properties.column_separator | No | \t | CSV 数据的列分隔符。 | |
sink.properties.row_delimiter | No | \n | CSV 数据的行分隔符。 | |
json.fail-on-missing-field | No | false | 去除外层数组:当导入的 JSON 数据是一个数组时,设置 strip_outer_array 为 true 可以自动去除最外层的数组结构,将数组中的每个元素作为单独的记录进行处理。 sink.properties.format = JSON时生效 | |
json.ignore-parse-errors | No | false | 忽略解析错误:当设置为 true 时,StarRocks 在解析 JSON 数据时会忽略那些无法解析的记录,而不是因为错误而停止整个导入过程。 sink.properties.format = JSON时生效 建议使用true |
更多参数详看官方文档:从 Apache Flink 导入数据
当我们按照这种开发模式将所需数据均实时接入到 StarRocks 后,接下来开发就简单多了。 例如多流 JOIN 的场景,转换到 StarRocks 中操作就变成了:
--建立一个带有多个实时表表 JOIN 的视图,利用 StarRocks 强大的查询性能将数据实时产出。
CREATE OR REPLACE VIEW viw.viw1 AS
SELECT a.xxx
, b.yyy
, c.zzz
FROM table1 AS a
LEFT JOIN table2 AS b
ON a.x = b.x
LEFT JOIN table3 AS c
ON a.y = c.y
在这种开发模式下,小到指标含义变更,大到整个业务逻辑调整,绝大部分情况下我们只需调整 StarRocks 里对应视图的逻辑,而不需要去改实时任务,这样大大提高的开发效率。
3.StarRocks 查询性能优化
上面提到了我们在产出实时报表时,都是将实时数据按照既定逻辑临时产出,得益于 StarRocks 的查询性能,绝大部分查询下查询都能在5秒之内完成。但是面对大数据量或者复杂业务逻辑的查询时,偶尔还是出现慢查询的情况。针对这种情况,我们也是一直在做性能优化,接下来大概为大家介绍一下优化方法
3.1.慢查询SQL获取
目前 Serverless StarRocks 集群中有一份审计日志记录了每天所有的查询SQL和相关的资源使用等信息,对应的库表名为:_starrocks_audit_db_.starrocks_audit_tbl
表结构如下:
CREATE TABLE `starrocks_audit_tbl` (
`queryId` varchar(64) NULL COMMENT "查询的唯一ID",
`timestamp` datetime NOT NULL COMMENT "查询开始时间",
`queryType` varchar(12) NULL COMMENT "查询类型(query, slow_query, connection)",
`clientIp` varchar(32) NULL COMMENT "客户端IP",
`user` varchar(64) NULL COMMENT "查询用户名",
`authorizedUser` varchar(64) NULL COMMENT "用户唯一标识,既user_identity",
`resourceGroup` varchar(64) NULL COMMENT "资源组名",
`catalog` varchar(32) NULL COMMENT "数据目录名",
`db` varchar(96) NULL COMMENT "查询所在数据库",
`state` varchar(8) NULL COMMENT "查询状态(EOF,ERR,OK)",
`errorCode` varchar(512) NULL COMMENT "错误码",
`queryTime` bigint(20) NULL COMMENT "查询执行时间(毫秒)",
`scanBytes` bigint(20) NULL COMMENT "查询扫描的字节数",
`scanRows` bigint(20) NULL COMMENT "查询扫描的记录行数",
`returnRows` bigint(20) NULL COMMENT "查询返回的结果行数",
`cpuCostNs` bigint(20) NULL COMMENT "查询CPU耗时(纳秒)",
`memCostBytes` bigint(20) NULL COMMENT "查询消耗内存(字节)",
`stmtId` int(11) NULL COMMENT "SQL语句增量ID",
`isQuery` tinyint(4) NULL COMMENT "SQL是否为查询(1或0)",
`feIp` varchar(128) NULL COMMENT "执行该语句的FE IP",
`stmt` varchar(1048576) NULL COMMENT "SQL原始语句",
`digest` varchar(32) NULL COMMENT "慢SQL指纹",
`planCpuCosts` double NULL COMMENT "查询规划阶段CPU占用(纳秒)",
`planMemCosts` double NULL COMMENT "查询规划阶段内存占用(字节)",
`warehouse` varchar(96) NULL DEFAULT "default_warehouse" COMMENT "查询使用的计算组",
`stmtType` varchar(8) NULL DEFAULT "OTHER" COMMENT "SQL类型(DQL,DML,DDL,DCL,OTHER)",
`isFilter` tinyint(4) NULL DEFAULT "1" COMMENT "SQL是否过滤(1或0)",
INDEX index_timestamp (`timestamp`) USING BITMAP COMMENT '查询开始时间排序索引',
INDEX index_queryTime (`queryTime`) USING BITMAP COMMENT '查询执行时间排序索引'
) ENGINE=OLAP
DUPLICATE KEY(`queryId`, `timestamp`, `queryType`)
COMMENT "审计日志表"
PARTITION BY RANGE(`timestamp`)
DISTRIBUTED BY HASH(`queryId`) BUCKETS 3
PROPERTIES (
"replication_num" = "3",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.time_zone" = "Asia/Shanghai",
"dynamic_partition.start" = "-30",
"dynamic_partition.end" = "3",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "3",
"dynamic_partition.history_partition_num" = "0",
"in_memory" = "false",
"enable_persistent_index" = "false",
"replicated_storage" = "true",
"compression" = "LZ4"
);
根据审计日志表,我们可以按照查询时间和内存排序可以获得当天的慢查询SQL,查询逻辑参考:
SELECT queryId --查询ID
, timestamp --查询开始时间
, user --查询用户名
, queryTime --查询执行时间(毫秒)
, scanBytes --查询扫描的字节数
, scanRows --查询扫描的记录行数
, returnRows --查询返回的结果行数
, cpuCostNs --查询CPU耗时(纳秒)
, memCostBytes --查询消耗内存(字节)
, stmt --SQL原始语句
FROM _starrocks_audit_db_.starrocks_audit_tbl
WHER isQuery = 1 --是否查询。1是,0否
AND stmtType = 'DQL' --查询类型。DQL:查询数据,DML:增删改数据,DDL
AND queryTime > 5000 --查询时长过滤
AND DATE(timestamp) = '2024-10-30'
AND planCpuCosts not in (0,-1) --查询规划阶段CPU占用(纳秒)。为0或者-1基本都是查询元数据的操作,过滤这类查询
ORDER BY queryTime DESC --优先按照查询时长倒序排序
, memCostBytes DESC --其次按照内存使用倒序排序
;
获取到慢查询的SQL后,我们就可以根据具体的逻辑来分析经常出现慢查询的原因和针对性进行优化了。
根据我们长时间的分析,发现目前导致查询慢的原因主要就是两个:
- 查询逻辑太复杂
- 扫描的数据量过多
下面用两个解决案例来介绍一下两个查询优化的方案。
3.2.通过物化视图解决查询逻辑太复杂的场景
在之前的七猫专栏 StarRocks之视图及物化视图的实践 中我们介绍过如何使用物化视图。我们在多次实践中得出结论:物化视图通常能够提高30-80倍的查询效率
下面以新媒体业务的一个实时查询场景为例做介绍:
可以看到下图中的数据模型,是由一个 6张表复杂JOIN的视图 JOIN 三张维度表 构建而成。每当我们进行查询时,都需要执行一个600+行复杂SQL。随着数据量增加,查询耗时也越来越长,影响使用方体验。
为了解决这种情况,我们就可以使用物化视图来提前处理逻辑,这样使用方在进行查询时,可以直接查询到结果数据,大量减少查询耗时。
使用物化视图前的查询信息:可以看到一共扫描了5849972522行数据,共耗时33秒。
--使用物化视图前的查询情况
queryId: 5b7e6df8-ed9f-11ef-a9bf-12fae80bf3e0
timestamp: 2025-02-18 10:23:36
queryType: slow_query
clientIp: xxxx
user: xxxx
authorizedUser: 'xxxx'@'%'
resourceGroup: default_wg
catalog: default_catalog
db:
state: EOF
errorCode:
queryTime: 33035 --查询耗时
scanBytes: 338461852262 --扫描的文件大小
scanRows: 5849972522 --扫描总行数
returnRows: 1
cpuCostNs: 986486312217
memCostBytes: 6940243011
stmtId: 771034
isQuery: 1
feIp: xxxx
stmt: SELECT sum(CAST(callback_recharge_amt AS DECIMAL(27, 9))) AS _col_20 , sum(`recharge_sum_amt_1d`) / sum(`promotion_amt`) AS _col_15 , sum(`promotion_amt`) / sum(`new_user_cnt`) AS _col_14 , sum(CAST(all_recharge_amt AS DECIMAL(27, 9))) AS _col_18 , sum(`promotion_amt`) / sum(`recharge_sum_user_cnt_1d`) AS _col_12 , sum(`media_recharge_amt`) AS _col_19 , sum(`recharge_sum_amt_1d`) / sum(`recharge_sum_user_cnt_1d`) AS _col_13 , sum(`new_user_cnt`) AS _col_17, sum(`recharge_sum_user_cnt_1d`) AS _col_16 FROM ( SELECT coalesce(str_to_date(`a`.`dt`, '%Y-%m-%d'), `b`.`date`) AS `dt`, coalesce(`a`.`project`, `b`.`project`) AS `project`, coalesce(`a`.`source_channel`, `b`.`channel_name`) AS `source_channel`, `a`.`mp_app_id`, `newmedia_ods`.`h`.`app_name` AS `mp_app_name`, `edw_dim`.`c`.`account_id`, `edw_dim`.`c`.`media_name`, `edw_dim`.`c`.`agent_name`, `edw_dim`.`c`.`admin_name`, `edw_dim`.`c`.`book_id` AS `newmedia_book_id`, `newmedia_ods`.`e`.`new_media_title` AS `newmedia_book_name`, `newmedia_ods`.`e`.`title` AS `qm_book_name`, `edw_dim`.`d`.`ad_name`, `edw_dim`.`d`.`ad_status`, `b`.`promotion_fee` AS `media_recharge_amt`, `b`.`after_discount` AS `promotion_amt`, `b`.`click` AS `click_cnt`, `b`.`media_recharge_cost_amt`, `edw_dim`.`d`.`bid` AS `bid_price`, `f`.`first_recharge_user_cnt_d`, `f`.`callback_first_recharge_user_cnt_d`, `f`.`surplus_first_recharge_user_cnt_d`, `f`.`all_recharge_amt`, `f`.`callback_recharge_amt`, `f`.`no_callback_recharge_amt`, `g`.`new_user_cnt`, `a`.`new_user_dur_1d`, `a`.`new_user_read_dur_1d`, `a`.`recharge_user_cnt_30minute`, `a`.`recharge_amt_30minute`, `a`.`recharge_user_cnt_1h`, `a`.`recharge_amt_1h`, `a`.`recharge_user_cnt_2h`, `a`.`recharge_amt_2h`, `a`.`recharge_user_cnt_6h`, `a`.`recharge_amt_6h`, `a`.`recharge_user_cnt_12h`, `a`.`recharge_amt_12h`, `a`.`recharge_user_cnt_24h`, `a`.`recharge_amt_24h`, `a`.`recharge_user_cnt_1d`, `a`.`recharge_amt_1d`, `a`.`repeat_recharge_user_cnt_1d`, `a`.`recharge_user_cnt_2d`, `a`.`recharge_amt_2d`, `a`.`recharge_member_cnt_30minute`, `a`.`recharge_member_amt_30minute`, `a`.`recharge_member_cnt_1h`, `a`.`recharge_member_amt_1h`, `a`.`recharge_member_cnt_2h`, `a`.`recharge_member_amt_2h`, `a`.`recharge_member_cnt_6h`, `a`.`recharge_member_amt_6h`, `a`.`recharge_member_cnt_12h`, `a`.`recharge_member_amt_12h`, `a`.`recharge_member_cnt_24h`, `a`.`recharge_member_amt_24h`, `a`.`recharge_member_cnt_1d`, `a`.`recharge_member_amt_1d`, `a`.`recharge_member_cnt_2d`, `a`.`recharge_member_amt_2d`, `a`.`recharge_sum_user_cnt_30minute`, `a`.`recharge_sum_amt_30minute`, `a`.`recharge_sum_user_cnt_1h`, `a`.`recharge_sum_amt_1h`, `a`.`recharge_sum_user_cnt_2h`, `a`.`recharge_sum_amt_2h`, `a`.`recharge_sum_user_cnt_6h`, `a`.`recharge_sum_amt_6h`, `a`.`recharge_sum_user_cnt_12h`, `a`.`recharge_sum_amt_12h`, `a`.`recharge_sum_user_cnt_24h`, `a`.`recharge_sum_amt_24h`, `a`.`recharge_sum_user_cnt_1d`, `a`.`recharge_sum_amt_1d`, `a`.`repeat_recharge_sum_user_cnt_1d`, `a`.`recharge_sum_user_cnt_2d`, `a`.`recharge_sum_amt_2d`, `edw_dim`.`c`.`agent_type_name`, `edw_dim`.`c`.`agent_type`, `edw_dim`.`c`.`media_id`, `edw_dim`.`c`.`channel_type`, `edw_dim`.`c`.`business_admin_id`, `edw_dim`.`c`.`ad_placement_type`, `edw_dim`.`c`.`group_name`, `edw_dim`.`c`.`group_id`, `edw_dim`.`c`.`admin_id`, `edw_dim`.`c`.`rebate_mode`, `edw_dim`.`c`.`search_keyword`, `edw_dim`.`c`.`ad_id`, `edw_dim`.`c`.`agent_id`, `edw_dim`.`c`.`unit_price`, `edw_dim`.`c`.`business_admin_name`, `edw_dim`.`c`.`rebate`, `edw_dim`.`c`.`account_type`, `newmedia_ods`.`e`.`title`, `newmedia_ods`.`e`.`author_name`, `newmedia_ods`.`e`.`orig_id`, `newmedia_ods`.`e`.`source_name`, `newmedia_ods`.`e`.`is_over`, `newmedia_ods`.`e`.`new_media_title`, `newmedia_ods`.`e`.`new_media_short_fiction_paid`, `newmedia_ods`.`e`.`new_media_status`, `newmedia_ods`.`e`.`new_media_characters`, `newmedia_ods`.`e`.`short_fiction_paid`, `newmedia_ods`.`e`.`new_media_fiction_type`, `newmedia_ods`.`e`.`new_media_cover_path`, `edw_dim`.`d`.`source`, `edw_dim`.`d`.`deep_bid`, `edw_dim`.`d`.`total_budget`, `edw_dim`.`d`.`daily_budget`, `edw_dim`.`d`.`bid_type`, `edw_dim`.`d`.`targeting`, `edw_dim`.`d`.`schedule_time`, `edw_dim`.`d`.`deep_optimization_goal`, `edw_dim`.`d`.`position_ids`, `edw_dim`.`d`.`position_names`, `edw_dim`.`d`.`expected_roi`, `edw_dim`.`d`.`delivery_mode`, `edw_dim`.`d`.`optimization_goal`, `edw_dim`.`c`.`ad_create_date`, `edw_dim`.`c`.`app_name`, `edw_dim`.`c`.`password`, `edw_dim`.`c`.`remark`, `edw_dim`.`c`.`create_time`, `edw_dim`.`c`.`update_time`, `edw_dim`.`c`.`extra`, `edw_dim`.`c`.`app_id`, `edw_dim`.`c`.`last_pull_time`, `edw_dim`.`c`.`status`, `a`.`recharge_book_cnt_30minute`, `a`.`recharge_book_amt_30minute`, `a`.`recharge_book_cnt_1h`, `a`.`recharge_book_amt_1h`, `a`.`recharge_book_cnt_2h`, `a`.`recharge_book_amt_2h`, `a`.`recharge_book_cnt_6h`, `a`.`recharge_book_amt_6h`, `a`.`recharge_book_cnt_12h`, `a`.`recharge_book_amt_12h`, `a`.`recharge_book_cnt_24h`, `a`.`recharge_book_amt_24h`, `a`.`recharge_book_cnt_1d`, `a`.`recharge_book_amt_1d`, `a`.`recharge_book_cnt_2d`, `a`.`recharge_book_amt_2d`, `a`.`repeat_recharge_book_cnt_1d` FROM (SELECT `t1`.`dt`, `t1`.`project`, `t1`.`source_channel`, `t1`.`mp_app_id`, `t1`.`mp_app_name`, sum(`t1`.`new_user_dur_1d`) AS `new_user_dur_1d`, sum(`t1`.`new_user_read_dur_1d`) AS `new_user_read_dur_1d`, count(CASE WHEN (`t1`.`recharge_user_cnt_30minute` > 0) THEN 1 END) AS `recharge_user_cnt_30minute`, sum(`t1`.`recharge_amt_30minute`) AS `recharge_amt_30minute`, count(CASE WHEN (`t1`.`recharge_user_cnt_1h` > 0) THEN 1 END) AS `recharge_user_cnt_1h`, sum(`t1`.`recharge_amt_1h`) AS `recharge_amt_1h`, count(CASE WHEN (`t1`.`recharge_user_cnt_2h` > 0) THEN 1 END) AS `recharge_user_cnt_2h`, sum(`t1`.`recharge_amt_2h`) AS `recharge_amt_2h`, count(CASE WHEN (`t1`.`recharge_user_cnt_6h` > 0) THEN 1 END) AS `recharge_user_cnt_6h`, sum(`t1`.`recharge_amt_6h`) AS `recharge_amt_6h`, count(CASE WHEN (`t1`.`recharge_user_cnt_12h` > 0) THEN 1 END) AS `recharge_user_cnt_12h`, sum(`t1`.`recharge_amt_12h`) AS `recharge_amt_12h`, count(CASE WHEN (`t1`.`recharge_user_cnt_24h` > 0) THEN 1 END) AS `recharge_user_cnt_24h`, sum(`t1`.`recharge_amt_24h`) AS `recharge_amt_24h`, count(CASE WHEN (`t1`.`recharge_user_cnt_1d` > 0) THEN 1 END) AS `recharge_user_cnt_1d`, sum(`t1`.`recharge_amt_1d`) AS `recharge_amt_1d`, count(CASE WHEN (`t1`.`recharge_user_cnt_1d` > 1) THEN 1 END) AS `repeat_recharge_user_cnt_1d`, count(CASE WHEN (`t1`.`recharge_user_cnt_2d` > 0) THEN 1 END) AS `recharge_user_cnt_2d`, sum(`t1`.`recharge_amt_2d`) AS `recharge_amt_2d`, count(CASE WHEN (`t1`.`recharge_member_cnt_30minute` > 0) THEN 1 END) AS `recharge_member_cnt_30minute`, sum(`t1`.`recharge_member_amt_30minute`) AS `recharge_member_amt_30minute`, count(CASE WHEN (`t1`.`recharge_member_cnt_1h` > 0) THEN 1 END) AS `recharge_member_cnt_1h`, sum(`t1`.`recharge_member_amt_1h`) AS `recharge_member_amt_1h`, count(CASE WHEN (`t1`.`recharge_member_cnt_2h` > 0) THEN 1 END) AS `recharge_member_cnt_2h`, sum(`t1`.`recharge_member_amt_2h`) AS `recharge_member_amt_2h`, count(CASE WHEN (`t1`.`recharge_member_cnt_6h` > 0) THEN 1 END) AS `recharge_member_cnt_6h`, sum(`t1`.`recharge_member_amt_6h`) AS `recharge_member_amt_6h`, count(CASE WHEN (`t1`.`recharge_member_cnt_12h` > 0) THEN 1 END) AS `recharge_member_cnt_12h`, sum(`t1`.`recharge_member_amt_12h`) AS `recharge_member_amt_12h`, count(CASE WHEN (`t1`.`recharge_member_cnt_24h` > 0) THEN 1 END) AS `recharge_member_cnt_24h`, sum(`t1`.`recharge_member_amt_24h`) AS `recharge_member_amt_24h`, count(CASE WHEN (`t1`.`recharge_member_cnt_1d` > 0) THEN 1 END) AS `recharge_member_cnt_1d`, sum(`t1`.`recharge_member_amt_1d`) AS `recharge_member_amt_1d`, count(CASE WHEN (`t1`.`recharge_member_cnt_2d` > 0) THEN 1 END) AS `recharge_member_cnt_2d`, sum(`t1`.`recharge_member_amt_2d`) AS `recharge_member_amt_2d`, count(CASE WHEN (`t1`.`recharge_sum_user_cnt_30minute` > 0) THEN 1 END) AS `recharge_sum_user_cnt_30minute`, sum(`t1`.`recharge_sum_amt_30minute`) AS `recharge_sum_amt_30minute`, count(CASE WHEN (`t1`.`recharge_sum_user_cnt_1h` > 0) THEN 1 END) AS `recharge_sum_user_cnt_1h`, sum(`t1`.`recharge_sum_amt_1h`) AS `recharge_sum_amt_1h`, count(CASE WHEN (`t1`.`recharge_sum_user_cnt_2h` > 0) THEN 1 END) AS `recharge_sum_user_cnt_2h`, sum(`t1`.`recharge_sum_amt_2h`) AS `recharge_sum_amt_2h`, count(CASE WHEN (`t1`.`recharge_sum_user_cnt_6h` > 0) THEN 1 END) AS `recharge_sum_user_cnt_6h`, sum(`t1`.`recharge_sum_amt_6h`) AS `recharge_sum_amt_6h`, count(CASE WHEN (`t1`.`recharge_sum_user_cnt_12h` > 0) THEN 1 END) AS `recharge_sum_user_cnt_12h`, sum(`t1`.`recharge_sum_amt_12h`) AS `recharge_sum_amt_12h`, count(CASE WHEN (`t1`.`recharge_sum_user_cnt_24h` > 0) THEN 1 END) AS `recharge_sum_user_cnt_24h`, sum(`t1`.`recharge_sum_amt_24h`) AS `recharge_sum_amt_24h`, count(CASE WHEN (`t1`.`recharge_sum_user_cnt_1d` > 0) THEN 1 END) AS `recharge_sum_user_cnt_1d`, sum(`t1`.`recharge_sum_amt_1d`) AS `recharge_sum_amt_1d`, count(CASE WHEN (`t1`.`recharge_sum_user_cnt_1d` > 1) THEN 1 END) AS `repeat_recharge_sum_user_cnt_1d`, count(CASE WHEN (`t1`.`recharge_sum_user_cnt_2d` > 0) THEN 1 END) AS `recharge_sum_user_cnt_2d`, sum(`t1`.`recharge_sum_amt_2d`) AS `recharge_sum_amt_2d`, count(CASE WHEN (`t1`.`recharge_book_cnt_30minute` > 0) THEN 1 END) AS `recharge_book_cnt_30minute`, sum(`t1`.`recharge_book_amt_30minute`) AS `recharge_book_amt_30minute`, count(CASE WHEN (`t1`.`recharge_book_cnt_1h` > 0) THEN 1 END) AS `recharge_book_cnt_1h`, sum(`t1`.`recharge_book_amt_1h`) AS `recharge_book_amt_1h`, count(CASE WHEN (`t1`.`recharge_book_cnt_2h` > 0) THEN 1 END) AS `recharge_book_cnt_2h`, sum(`t1`.`recharge_book_amt_2h`) AS `recharge_book_amt_2h`, count(CASE WHEN (`t1`.`recharge_book_cnt_6h` > 0) THEN 1 END) AS `recharge_book_cnt_6h`, sum(`t1`.`recharge_book_amt_6h`) AS `recharge_book_amt_6h`, count(CASE WHEN (`t1`.`recharge_book_cnt_12h` > 0) THEN 1 END) AS `recharge_book_cnt_12h`, sum(`t1`.`recharge_book_amt_12h`) AS `recharge_book_amt_12h`, count(CASE WHEN (`t1`.`recharge_book_cnt_24h` > 0) THEN 1 END) AS `recharge_book_cnt_24h`, sum(`t1`.`recharge_book_amt_24h`) AS `recharge_book_amt_24h`, count(CASE WHEN (`t1`.`recharge_book_cnt_1d` > 0) THEN 1 END) AS `recharge_book_cnt_1d`, sum(`t1`.`recharge_book_amt_1d`) AS `recharge_book_amt_1d`, count(CASE WHEN (`t1`.`recharge_book_cnt_2d` > 0) THEN 1 END) AS `recharge_book_cnt_2d`, sum(`t1`.`recharge_book_amt_2d`) AS `recharge_book_amt_2d`, count(CASE WHEN (`t1`.`recharge_book_cnt_1d` > 1) THEN 1 END) AS `repeat_recharge_book_cnt_1d` FROM (SELECT from_unixtime(`b`.`ts`, 'yyyy-MM-dd') AS `dt`, `newmedia_dwd`.`a`.`project`, `b`.`attribution_channel` AS `source_channel`, CASE WHEN (`b`.`attribution_channel` IN ('rw-jrtt7433263664498556979', 'rw-jrtt7433335223255236619', 'rw-jrtt7433335224635326473')) THEN 'wx240f3228b4f5140f' ELSE `newmedia_dwd`.`a`.`mp_app_id` END AS `mp_app_id`, `newmedia_dwd`.`a`.`mp_app_name`, `newmedia_dwd`.`a`.`source_uid`, count(CASE WHEN ((from_unixtime(`b`.`ts`, 'yyyy-MM-dd')) = `newmedia_dwd`.`a`.`dt`) THEN 1 END) AS `new_user_cnt`, sum(CASE WHEN (`newmedia_dwd`.`a`.`event_id` = 'miniapp_#_#_hide') THEN `newmedia_dwd`.`a`.`duration` ELSE 0 END) AS `new_user_dur_1d`, sum(CASE WHEN (`newmedia_dwd`.`a`.`event_id` = 'reader_full_#_duration') THEN `newmedia_dwd`.`a`.`duration` ELSE 0 END) AS `new_user_read_dur_1d`, count(DISTINCT CASE WHEN ((`newmedia_dwd`.`a`.`server_timestamp` <= (`b`.`ts` + (30 * 60))) AND (`newmedia_dwd`.`a`.`event_id` = 'server_coin-charge_#_result')) THEN 1 END) AS `recharge_user_cnt_30minute`, sum(CASE WHEN ((`newmedia_dwd`.`a`.`server_timestamp` <= (`b`.`ts` + (30 * 60))) AND (`newmedia_dwd`.`a`.`event_id` = 'server_coin-charge_#_result')) THEN `newmedia_dwd`.`a`.`amount` ELSE 0 END) AS `recharge_amt_30minute`, count(DISTINCT CASE WHEN ((`newmedia_dwd`.`a`.`server_timestamp` <= (`b`.`ts` + (60 * 60))) AND (`newmedia_dwd`.`a`.`event_id` = 'server_coin-charge_#_result')) THEN 1 END) AS `recharge_user_cnt_1h`, sum(CASE WHEN ((`newmedia_dwd`.`a`.`server_timestamp` <= (`b`.`ts` + (60 * 60))) AND (`newmedia_dwd`.`a`.`event_id` = 'server_coin-charge_#_result')) THEN `newmedia_dwd`.`a`.`amount` ELSE 0 END) AS `recharge_amt_1h`, count(DISTINCT CASE WHEN ((`newmedia_dwd`.`a`.`server_timestamp` <= (`b`.`ts` + ((2 * 60) * 60))) AND (`newmedia_dwd`.`a`.`event_id` = 'server_coin-charge_#_result')) THEN 1 END) AS `recharge_user_cnt_2h`, sum(CASE WHEN ((`newmedia_dwd`.`a`.`server_timestamp` <= (`b`.`ts` + ((2 * 60) * 60))) AND (`newmedia_dwd`.`a`.`event_id` = 'server_coin-charge_#_result')) THEN `newmedia_dwd`.`a`.`amount` ELSE 0 END) AS `recharge_amt_2h`, count(DISTINCT CASE WHEN ((`newmedia_dwd`.`a`.`server_timestamp` <= (`b`.`ts` + ((6 * 60) * 60))) AND (`newmedia_dwd`.`a`.`event_id` = 'server_coin-charge_#_result')) THEN 1 END) AS `recharge_user_cnt_6h`, sum(CASE WHEN ((`newmedia_dwd`.`a`.`server_timestamp` <= (`b`.`ts` + ((6 * 60) * 60))) AND (`newmedia_dwd`.`a`.`event_id` = 'server_coin-charge_#_result')) THEN `newmedia_dwd`.`a`.`amount` ELSE 0 END) AS `recharge_amt_6h`, count(DISTINCT CASE WHEN ((`newmedia_dwd`.`a`.`server_timestamp` <= (`b`.`ts` + ((12 * 60) * 60))) AND (`newmedia_dwd`.`a`.`event_id` = 'server_coin-charge_#_result')) THEN 1 END) AS `recharge_user_cnt_12h`, sum(CASE WHEN ((`newmedia_dwd`.`a`.`server_timestamp` <= (`b`.`ts` + ((12 * 60) * 60))) AND (`newmedia_dwd`.`a`.`event_id` = 'server_coin-charge_#_result')) THEN `newmedia_dwd`.`a`.`amount` ELSE 0 END) AS `recharge_amt_12h`, count(DISTINCT CASE WHEN ((`newmedia_dwd`.`a`.`server_timestamp` <= (`b`.`ts` + ((24 * 60) * 60))) AND (`newmedia_dwd`.`a`.`event_id` = 'server_coin-charge_#_result')) THEN 1 END) AS `recharge_user_cnt_24h`, sum(CASE WHEN ((`newmedia_dwd`.`a`.`server_timestamp` <= (`b`.`ts` + ((24 * 60) * 60))) AND (`newmedia_dwd`.`a`.`event_id` = 'server_coin-charge_#_result')) THEN `newmedia_dwd`.`a`.`amount` ELSE 0 END) AS `recharge_amt_24h`, count(CASE WHEN (((datediff(`newmedia_dwd`.`a`.`dt`, from_unixtime(`b`.`ts`, 'yyyy-MM-dd'))) < 1) AND (`newmedia_dwd`.`a`.`event_id` = 'server_coin-charge_#_result')) THEN 1 END) AS `recharge_user_cnt_1d`, sum(CASE WHEN (((datediff(`newmedia_dwd`.`a`.`dt`, from_unixtime(`b`.`ts`, 'yyyy-MM-dd'))) < 1) AND (`newmedia_dwd`.`a`.`event_id` = 'server_coin-charge_#_result')) THEN `newmedia_dwd`.`a`.`amount` END) AS `recharge_amt_1d`, count(CASE WHEN (((datediff(`newmedia_dwd`.`a`.`dt`, from_unixtime(`b`.`ts`, 'yyyy-MM-dd'))) < 2) AND (`newmedia_dwd`.`a`.`event_id` = 'server_coin-charge_#_result')) THEN 1 END) AS `recharge_user_cnt_2d`, sum(CASE WHEN (((datediff(`newmedia_dwd`.`a`.`dt`, from_unixtime(`b`.`ts`, 'yyyy-MM-dd'))) < 2) AND (`newmedia_dwd`.`a`.`event_id` = 'server_coin-charge_#_result')) THEN `newmedia_dwd`.`a`.`amount` END) AS `recharge_amt_2d`, count(DISTINCT CASE WHEN ((`newmedia_dwd`.`a`.`server_timestamp` <= (`b`.`ts` + (30 * 60))) AND (`newmedia_dwd`.`a`.`event_id` = 'server_vip-charge_#_result')) THEN 1 END) AS `recharge_member_cnt_30minute`, sum(CASE WHEN ((`newmedia_dwd`.`a`.`server_timestamp` <= (`b`.`ts` + (30 * 60))) AND (`newmedia_dwd`.`a`.`event_id` = 'server_vip-charge_#_result')) THEN `newmedia_dwd`.`a`.`amount` ELSE 0 END) AS `recharge_member_amt_30minute`, count(DISTINCT CASE WHEN ((`newmedia_dwd`.`a`.`server_timestamp` <= (`b`.`ts` + (60 * 60))) AND (`newmedia_dwd`.`a`.`event_id` = 'server_vip-charge_#_result')) THEN 1 END) AS `recharge_member_cnt_1h`, sum(CASE WHEN ((`newmedia_dwd`.`a`.`server_timestamp` <= (`b`.`ts` + (60 * 60))) AND (`newmedia_dwd`.`a`.`event_id` = 'server_vip-charge_#_result')) THEN `newmedia_dwd`.`a`.`amount` ELSE 0 END) AS `recharge_member_amt_1h`, count(DISTINCT CASE WHEN ((`newmedia_dwd`.`a`.`server_timestamp` <= (`b`.`ts` + ((2 * 60) * 60))) AND (`newmedia_dwd`.`a`.`event_id` = 'server_vip-charge_#_result')) THEN 1 END) AS `recharge_member_cnt_2h`, sum(CASE WHEN ((`newmedia_dwd`.`a`.`server_timestamp` <= (`b`.`ts` + ((2 * 60) * 60))) AND (`newmedia_dwd`.`a`.`event_id` = 'server_vip-charge_#_result')) THEN `newmedia_dwd`.`a`.`amount` ELSE 0 END) AS `recharge_member_amt_2h`, count(DISTINCT CASE WHEN ((`newmedia_dwd`.`a`.`server_timestamp` <= (`b`.`ts` + ((6 * 60) * 60))) AND (`newmedia_dwd`.`a`.`event_id` = 'server_vip-charge_#_result')) THEN 1 END) AS `recharge_member_cnt_6h`, sum(CASE WHEN ((`newmedia_dwd`.`a`.`server_timestamp` <= (`b`.`ts` + ((6 * 60) * 60))) AND (`newmedia_dwd`.`a`.`event_id` = 'server_vip-charge_#_result')) THEN `newmedia_dwd`.`a`.`amount` ELSE 0 END) AS `recharge_member_amt_6h`, count(DISTINCT CASE WHEN ((`newmedia_dwd`.`a`.`server_timestamp` <= (`b`.`ts` + ((12 * 60) * 60))) AND (`newmedia_dwd`.`a`.`event_id` = 'server_vip-charge_#_result')) THEN 1 END) AS `recharge_member_cnt_12h`, sum(CASE WHEN ((`newmedia_dwd`.`a`.`server_timestamp` <= (`b`.`ts` + ((12 * 60) * 60))) AND (`newmedia_dwd`.`a`.`event_id` = 'server_vip-charge_#_result')) THEN `newmedia_dwd`.`a`.`amount` ELSE 0 END) AS `recharge_member_amt_12h`, count(DISTINCT CASE WHEN ((`newmedia_dwd`.`a`.`server_timestamp` <= (`b`.`ts` + ((24 * 60) * 60))) AND (`newmedia_dwd`.`a`.`event_id` = 'server_vip-charge_#_result')) THEN 1 END) AS `recharge_member_cnt_24h`, sum(CASE WHEN ((`newmedia_dwd`.`a`.`server_timestamp` <= (`b`.`ts` + ((24 * 60) * 60))) AND (`newmedia_dwd`.`a`.`event_id` = 'server_vip-charge_#_result')) THEN `newmedia_dwd`.`a`.`amount` ELSE 0 END) AS `recharge_member_amt_24h`, count(CASE WHEN (((datediff(`newmedia_dwd`.`a`.`dt`, from_unixtime(`b`.`ts`, 'yyyy-MM-dd'))) < 1) AND (`newmedia_dwd`.`a`.`event_id` = 'server_vip-charge_#_result')) THEN 1 END) AS `recharge_member_cnt_1d`, sum(CASE WHEN (((datediff(`newmedia_dwd`.`a`.`dt`, from_unixtime(`b`.`ts`, 'yyyy-MM-dd'))) < 1) AND (`newmedia_dwd`.`a`.`event_id` = 'server_vip-charge_#_result')) THEN `newmedia_dwd`.`a`.`amount` END) AS `recharge_member_amt_1d`, count(CASE WHEN (((datediff(`newmedia_dwd`.`a`.`dt`, from_unixtime(`b`.`ts`, 'yyyy-MM-dd'))) < 2) AND (`newmedia_dwd`.`a`.`event_id` = 'server_vip-charge_#_result')) THEN 1 END) AS `recharge_member_cnt_2d`, sum(CASE WHEN (((datediff(`newmedia_dwd`.`a`.`dt`, from_unixtime(`b`.`ts`, 'yyyy-MM-dd'))) < 2) AND (`newmedia_dwd`.`a`.`event_id` = 'server_vip-charge_#_result')) THEN `newmedia_dwd`.`a`.`amount` END) AS `recharge_member_amt_2d`, count(DISTINCT CASE WHEN ((`newmedia_dwd`.`a`.`server_timestamp` <= (`b`.`ts` + (30 * 60))) AND (`newmedia_dwd`.`a`.`event_id` IN ('server_coin-charge_#_result', 'server_vip-charge_#_result', 'server_book-charge_#_result'))) THEN 1 END) AS `recharge_sum_user_cnt_30minute`, sum(CASE WHEN ((`newmedia_dwd`.`a`.`server_timestamp` <= (`b`.`ts` + (30 * 60))) AND (`newmedia_dwd`.`a`.`event_id` IN ('server_coin-charge_#_result', 'server_vip-charge_#_result', 'server_book-charge_#_result'))) THEN `newmedia_dwd`.`a`.`amount` ELSE 0 END) AS `recharge_sum_amt_30minute`, count(DISTINCT CASE WHEN ((`newmedia_dwd`.`a`.`server_timestamp` <= (`b`.`ts` + (60 * 60))) AND (`newmedia_dwd`.`a`.`event_id` IN ('server_coin-charge_#_result', 'server_vip-charge_#_result', 'server_book-charge_#_result'))) THEN 1 END) AS `recharge_sum_user_cnt_1h`, sum(CASE WHEN ((`newmedia_dwd`.`a`.`server_timestamp` <= (`b`.`ts` + (60 * 60))) AND (`newmedia_dwd`.`a`.`event_id` IN ('server_coin-charge_#_result', 'server_vip-charge_#_result', 'server_book-charge_#_result'))) THEN `newmedia_dwd`.`a`.`amount` ELSE 0 END) AS `recharge_sum_amt_1h`, count(DISTINCT CASE WHEN ((`newmedia_dwd`.`a`.`server_timestamp` <= (`b`.`ts` + ((2 * 60) * 60))) AND (`newmedia_dwd`.`a`.`event_id` IN ('server_coin-charge_#_result', 'server_vip-charge_#_result', 'server_book-charge_#_result'))) THEN 1 END) AS `recharge_sum_user_cnt_2h`, sum(CASE WHEN ((`newmedia_dwd`.`a`.`server_timestamp` <= (`b`.`ts` + ((2 * 60) * 60))) AND (`newmedia_dwd`.`a`.`event_id` IN ('server_coin-charge_#_result', 'server_vip-charge_#_result', 'server_book-charge_#_result'))) THEN `newmedia_dwd`.`a`.`amount` ELSE 0 END) AS `recharge_sum_amt_2h`, count(DISTINCT CASE WHEN ((`newmedia_dwd`.`a`.`server_timestamp` <= (`b`.`ts` + ((6 * 60) * 60))) AND (`newmedia_dwd`.`a`.`event_id` IN ('server_coin-charge_#_result', 'server_vip-charge_#_result', 'server_book-charge_#_result'))) THEN 1 END) AS `recharge_sum_user_cnt_6h`, sum(CASE WHEN ((`newmedia_dwd`.`a`.`server_timestamp` <= (`b`.`ts` + ((6 * 60) * 60))) AND (`newmedia_dwd`.`a`.`event_id` IN ('server_coin-charge_#_result', 'server_vip-charge_#_result', 'server_book-charge_#_result'))) THEN `newmedia_dwd`.`a`.`amount` ELSE 0 END) AS `recharge_sum_amt_6h`, count(DISTINCT CASE WHEN ((`newmedia_dwd`.`a`.`server_timestamp` <= (`b`.`ts` + ((12 * 60) * 60))) AND (`newmedia_dwd`.`a`.`event_id` IN ('server_coin-charge_#_result', 'server_vip-charge_#_result', 'server_book-charge_#_result'))) THEN 1 END) AS `recharge_sum_user_cnt_12h`, sum(CASE WHEN ((`newmedia_dwd`.`a`.`server_timestamp` <= (`b`.`ts` + ((12 * 60) * 60))) AND (`newmedia_dwd`.`a`.`event_id` IN ('server_coin-charge_#_result', 'server_vip-charge_#_result', 'server_book-charge_#_result'))) THEN `newmedia_dwd`.`a`.`amount` ELSE 0 END) AS `recharge_sum_amt_12h`, count(DISTINCT CASE WHEN ((`newmedia_dwd`.`a`.`server_timestamp` <= (`b`.`ts` + ((24 * 60) * 60))) AND (`newmedia_dwd`.`a`.`event_id` IN ('server_coin-charge_#_result', 'server_vip-charge_#_result', 'server_book-charge_#_result'))) THEN 1 END) AS `recharge_sum_user_cnt_24h`, sum(CASE WHEN ((`newmedia_dwd`.`a`.`server_timestamp` <= (`b`.`ts` + ((24 * 60) * 60))) AND (`newmedia_dwd`.`a`.`event_id` IN ('server_coin-charge_#_result', 'server_vip-charge_#_result', 'server_book-charge_#_result'))) THEN `newmedia_dwd`.`a`.`amount` ELSE 0 END) AS `recharge_sum_amt_24h`, count(CASE WHEN (((datediff(`newmedia_dwd`.`a`.`dt`, from_unixtime(`b`.`ts`, 'yyyy-MM-dd'))) < 1) AND (`newmedia_dwd`.`a`.`event_id` IN ('server_coin-charge_#_result', 'server_vip-charge_#_result', 'server_book-charge_#_result'))) THEN 1 END) AS `recharge_sum_user_cnt_1d`, sum(CASE WHEN (((datediff(`newmedia_dwd`.`a`.`dt`, from_unixtime(`b`.`ts`, 'yyyy-MM-dd'))) < 1) AND (`newmedia_dwd`.`a`.`event_id` IN ('server_coin-charge_#_result', 'server_vip-charge_#_result', 'server_book-charge_#_result'))) THEN `newmedia_dwd`.`a`.`amount` END) AS `recharge_sum_amt_1d`, count(CASE WHEN (((datediff(`newmedia_dwd`.`a`.`dt`, from_unixtime(`b`.`ts`, 'yyyy-MM-dd'))) < 2) AND (`newmedia_dwd`.`a`.`event_id` IN ('server_coin-charge_#_result', 'server_vip-charge_#_result', 'server_book-charge_#_result'))) THEN 1 END) AS `recharge_sum_user_cnt_2d`, sum(CASE WHEN (((datediff(`newmedia_dwd`.`a`.`dt`, from_unixtime(`b`.`ts`, 'yyyy-MM-dd'))) < 2) AND (`newmedia_dwd`.`a`.`event_id` IN ('server_coin-charge_#_result', 'server_vip-charge_#_result', 'server_book-charge_#_result'))) THEN `newmedia_dwd`.`a`.`amount` END) AS `recharge_sum_amt_2d`, count(DISTINCT CASE WHEN ((`newmedia_dwd`.`a`.`server_timestamp` <= (`b`.`ts` + (30 * 60))) AND (`newmedia_dwd`.`a`.`event_id` = 'server_book-charge_#_result')) THEN 1 END) AS `recharge_book_cnt_30minute`, sum(CASE WHEN ((`newmedia_dwd`.`a`.`server_timestamp` <= (`b`.`ts` + (30 * 60))) AND (`newmedia_dwd`.`a`.`event_id` = 'server_book-charge_#_result')) THEN `newmedia_dwd`.`a`.`amount` ELSE 0 END) AS `recharge_book_amt_30minute`, count(DISTINCT CASE WHEN ((`newmedia_dwd`.`a`.`server_timestamp` <= (`b`.`ts` + (60 * 60))) AND (`newmedia_dwd`.`a`.`event_id` = 'server_book-charge_#_result')) THEN 1 END) AS `recharge_book_cnt_1h`, sum(CASE WHEN ((`newmedia_dwd`.`a`.`server_timestamp` <= (`b`.`ts` + (60 * 60))) AND (`newmedia_dwd`.`a`.`event_id` = 'server_book-charge_#_result')) THEN `newmedia_dwd`.`a`.`amount` ELSE 0 END) AS `recharge_book_amt_1h`, count(DISTINCT CASE WHEN ((`newmedia_dwd`.`a`.`server_timestamp` <= (`b`.`ts` + ((2 * 60) * 60))) AND (`newmedia_dwd`.`a`.`event_id` = 'server_book-charge_#_result')) THEN 1 END) AS `recharge_book_cnt_2h`, sum(CASE WHEN ((`newmedia_dwd`.`a`.`server_timestamp` <= (`b`.`ts` + ((2 * 60) * 60))) AND (`newmedia_dwd`.`a`.`event_id` = 'server_book-charge_#_result')) THEN `newmedia_dwd`.`a`.`amount` ELSE 0 END) AS `recharge_book_amt_2h`, count(DISTINCT CASE WHEN ((`newmedia_dwd`.`a`.`server_timestamp` <= (`b`.`ts` + ((6 * 60) * 60))) AND (`newmedia_dwd`.`a`.`event_id` = 'server_book-charge_#_result')) THEN 1 END) AS `recharge_book_cnt_6h`, sum(CASE WHEN ((`newmedia_dwd`.`a`.`server_timestamp` <= (`b`.`ts` + ((6 * 60) * 60))) AND (`newmedia_dwd`.`a`.`event_id` = 'server_book-charge_#_result')) THEN `newmedia_dwd`.`a`.`amount` ELSE 0 END) AS `recharge_book_amt_6h`, count(DISTINCT CASE WHEN ((`newmedia_dwd`.`a`.`server_timestamp` <= (`b`.`ts` + ((12 * 60) * 60))) AND (`newmedia_dwd`.`a`.`event_id` = 'server_book-charge_#_result')) THEN 1 END) AS `recharge_book_cnt_12h`, sum(CASE WHEN ((`newmedia_dwd`.`a`.`server_timestamp` <= (`b`.`ts` + ((12 * 60) * 60))) AND (`newmedia_dwd`.`a`.`event_id` = 'server_book-charge_#_result')) THEN `newmedia_dwd`.`a`.`amount` ELSE 0 END) AS `recharge_book_amt_12h`, count(DISTINCT CASE WHEN ((`newmedia_dwd`.`a`.`server_timestamp` <= (`b`.`ts` + ((24 * 60) * 60))) AND (`newmedia_dwd`.`a`.`event_id` = 'server_book-charge_#_result')) THEN 1 END) AS `recharge_book_cnt_24h`, sum(CASE WHEN ((`newmedia_dwd`.`a`.`server_timestamp` <= (`b`.`ts` + ((24 * 60) * 60))) AND (`newmedia_dwd`.`a`.`event_id` = 'server_book-charge_#_result')) THEN `newmedia_dwd`.`a`.`amount` ELSE 0 END) AS `recharge_book_amt_24h`, count(CASE WHEN (((datediff(`newmedia_dwd`.`a`.`dt`, from_unixtime(`b`.`ts`, 'yyyy-MM-dd'))) < 1) AND (`newmedia_dwd`.`a`.`event_id` = 'server_book-charge_#_result')) THEN 1 END) AS `recharge_book_cnt_1d`, sum(CASE WHEN (((datediff(`newmedia_dwd`.`a`.`dt`, from_unixtime(`b`.`ts`, 'yyyy-MM-dd'))) < 1) AND (`newmedia_dwd`.`a`.`event_id` = 'server_book-charge_#_result')) THEN `newmedia_dwd`.`a`.`amount` END) AS `recharge_book_amt_1d`, count(CASE WHEN (((datediff(`newmedia_dwd`.`a`.`dt`, from_unixtime(`b`.`ts`, 'yyyy-MM-dd'))) < 2) AND (`newmedia_dwd`.`a`.`event_id` = 'server_book-charge_#_result')) THEN 1 END) AS `recharge_book_cnt_2d`, sum(CASE WHEN (((datediff(`newmedia_dwd`.`a`.`dt`, from_unixtime(`b`.`ts`, 'yyyy-MM-dd'))) < 2) AND (`newmedia_dwd`.`a`.`event_id` = 'server_book-charge_#_result')) THEN `newmedia_dwd`.`a`.`amount` END) AS `recharge_book_amt_2d` FROM `newmedia_dwd`.`dwd_newmedia_launch_rt_inc_d` AS `a` LEFT OUTER JOIN (SELECT `edw_dim`.`dim_mkt_pull_new_user_change_inc_d`.`project`, `edw_dim`.`dim_mkt_pull_new_user_change_inc_d`.`uid`, unix_timestamp(`edw_dim`.`dim_mkt_pull_new_user_change_inc_d`.`ts`) AS `ts`, `edw_dim`.`dim_mkt_pull_new_user_change_inc_d`.`attribution_channel`, lead(unix_timestamp(`edw_dim`.`dim_mkt_pull_new_user_change_inc_d`.`ts`), 1, 9999999999) OVER (PARTITION BY `edw_dim`.`dim_mkt_pull_new_user_change_inc_d`.`project`, `edw_dim`.`dim_mkt_pull_new_user_change_inc_d`.`uid` ORDER BY `edw_dim`.`dim_mkt_pull_new_user_change_inc_d`.`ts` ASC ) AS `end_ts` FROM `edw_dim`.`dim_mkt_pull_new_user_change_inc_d` WHERE (`edw_dim`.`dim_mkt_pull_new_user_change_inc_d`.`project` IN ('reader_paid_dyminiapp', 'reader_paid_wxminiapp')) AND (`edw_dim`.`dim_mkt_pull_new_user_change_inc_d`.`data_date` >= '2024-08-19')) `b` ON ((`newmedia_dwd`.`a`.`project` = `b`.`project`) AND (`newmedia_dwd`.`a`.`source_uid` = `b`.`uid`)) AND (`newmedia_dwd`.`a`.`server_timestamp` BETWEEN `b`.`ts` AND `b`.`end_ts`) WHERE ((coalesce(`newmedia_dwd`.`a`.`promotion_type`, '')) IN ('self_operation', '', 'self-operation')) AND (`newmedia_dwd`.`a`.`dt` >= '2024-10-02') GROUP BY from_unixtime(`b`.`ts`, 'yyyy-MM-dd'), `newmedia_dwd`.`a`.`project`, `b`.`attribution_channel`, CASE WHEN (`b`.`attribution_channel` IN ('rw-jrtt7433263664498556979', 'rw-jrtt7433335223255236619', 'rw-jrtt7433335224635326473')) THEN 'wx240f3228b4f5140f' ELSE `newmedia_dwd`.`a`.`mp_app_id` END, `newmedia_dwd`.`a`.`mp_app_name`, `newmedia_dwd`.`a`.`source_uid`) `t1` WHERE (coalesce(`t1`.`source_channel`, '')) != '' GROUP BY `t1`.`dt`, `t1`.`project`, `t1`.`source_channel`, `t1`.`mp_app_id`, `t1`.`mp_app_name`) `a` FULL OUTER JOIN (SELECT `a`.`date`, `a`.`date_time`, `a`.`channel_name`, `a`.`promotion_fee`, `a`.`click`, `a`.`download`, `a`.`show`, `a`.`save_click`, `a`.`created_time`, `a`.`updated_time`, `a`.`after_discount`, `a`.`media_recharge_cost_amt`, `a`.`rn`, CASE WHEN (`a`.`channel_name` LIKE 'rd-%') THEN 'reader_paid_dyminiapp' WHEN (`a`.`channel_name` LIKE 'rw-%') THEN 'reader_paid_wxminiapp' END AS `project` FROM (SELECT `edw_dim`.`rt_media_data`.`date`, `edw_dim`.`rt_media_data`.`date_time`, `edw_dim`.`rt_media_data`.`channel_name`, `edw_dim`.`rt_media_data`.`promotion_fee`, `edw_dim`.`rt_media_data`.`click`, `edw_dim`.`rt_media_data`.`download`, `edw_dim`.`rt_media_data`.`show`, `edw_dim`.`rt_media_data`.`save_click`, `edw_dim`.`rt_media_data`.`created_time`, `edw_dim`.`rt_media_data`.`updated_time`, `edw_dim`.`rt_media_data`.`after_discount`, `edw_dim`.`rt_media_data`.`media_recharge_cost_amt`, row_number() OVER (PARTITION BY `edw_dim`.`rt_media_data`.`date`, `edw_dim`.`rt_media_data`.`channel_name` ORDER BY `edw_dim`.`rt_media_data`.`created_time` DESC ) AS `rn` FROM `edw_dim`.`rt_media_data` WHERE (`edw_dim`.`rt_media_data`.`date` >= '2024-10-02') AND ((`edw_dim`.`rt_media_data`.`channel_name` LIKE 'rd-%') OR (`edw_dim`.`rt_media_data`.`channel_name` LIKE 'rw-%'))) `a` WHERE `a`.`rn` = 1) `b` ON ((`a`.`source_channel` = `b`.`channel_name`) AND (`a`.`dt` = `b`.`date`)) AND (`a`.`project` = `b`.`project`) LEFT OUTER JOIN `edw_dim`.`dim_mkt_p_channel_acc_d` AS `c` ON ((coalesce(`a`.`source_channel`, `b`.`channel_name`)) = `edw_dim`.`c`.`channel_name`) AND (`edw_dim`.`c`.`app_id` IN (44, 45)) LEFT OUTER JOIN `edw_dim`.`dim_mkt_m_ads_acc` AS `d` ON `edw_dim`.`c`.`ad_id` = `edw_dim`.`d`.`ad_id` LEFT OUTER JOIN `newmedia_ods`.`book` AS `e` ON `edw_dim`.`c`.`book_id` = `newmedia_ods`.`e`.`id` LEFT OUTER JOIN (SELECT `a`.`channel`, `a`.`event_date`, `a`.`project`, count(CASE WHEN (`a`.`is_first_charge` = 1) THEN 1 END) AS `first_recharge_user_cnt_d`, count(if((`a`.`callback_status` = 4) AND (`a`.`is_first_charge` = 1), 1, NULL)) AS `callback_first_recharge_user_cnt_d`, count(if((`a`.`callback_status` != 4) AND (`a`.`is_first_charge` = 1), 1, NULL)) AS `surplus_first_recharge_user_cnt_d`, sum(get_json_object(`a`.`event`, '$.params.amount')) AS `all_recharge_amt`, sum(if(`a`.`callback_status` = 4, get_json_object(`a`.`event`, '$.params.amount'), NULL)) AS `callback_recharge_amt`, sum(if(`a`.`callback_status` != 4, get_json_object(`a`.`event`, '$.params.amount'), NULL)) AS `no_callback_recharge_amt` FROM (SELECT `edw_dwd`.`dwd_dyminiapp_callback_inc_d`.`table_name`, `edw_dwd`.`dwd_dyminiapp_callback_inc_d`.`id`, `edw_dwd`.`dwd_dyminiapp_callback_inc_d`.`project`, `edw_dwd`.`dwd_dyminiapp_callback_inc_d`.`attribute_date`, `edw_dwd`.`dwd_dyminiapp_callback_inc_d`.`event_date`, `edw_dwd`.`dwd_dyminiapp_callback_inc_d`.`callback_time`, `edw_dwd`.`dwd_dyminiapp_callback_inc_d`.`uid`, `edw_dwd`.`dwd_dyminiapp_callback_inc_d`.`source`, `edw_dwd`.`dwd_dyminiapp_callback_inc_d`.`account_id`, `edw_dwd`.`dwd_dyminiapp_callback_inc_d`.`channel`, `edw_dwd`.`dwd_dyminiapp_callback_inc_d`.`event_type`, `edw_dwd`.`dwd_dyminiapp_callback_inc_d`.`callback_status`, `edw_dwd`.`dwd_dyminiapp_callback_inc_d`.`is_first_charge`, `edw_dwd`.`dwd_dyminiapp_callback_inc_d`.`click_id`, `edw_dwd`.`dwd_dyminiapp_callback_inc_d`.`event`, `edw_dwd`.`dwd_dyminiapp_callback_inc_d`.`extra`, `edw_dwd`.`dwd_dyminiapp_callback_inc_d`.`updated_time`, `edw_dwd`.`dwd_dyminiapp_callback_inc_d`.`created_time`, `edw_dwd`.`dwd_dyminiapp_callback_inc_d`.`mp_app_id`, row_number() OVER (PARTITION BY `edw_dwd`.`dwd_dyminiapp_callback_inc_d`.`id` ORDER BY `edw_dwd`.`dwd_dyminiapp_callback_inc_d`.`callback_time` DESC ) AS `rn` FROM `edw_dwd`.`dwd_dyminiapp_callback_inc_d` WHERE (((`edw_dwd`.`dwd_dyminiapp_callback_inc_d`.`project` IN ('reader_paid_dyminiapp', 'reader_paid_wxminiapp')) AND (`edw_dwd`.`dwd_dyminiapp_callback_inc_d`.`event_type` IN (2, 3, 6))) AND ((datediff(`edw_dwd`.`dwd_dyminiapp_callback_inc_d`.`event_date`, date(`edw_dwd`.`dwd_dyminiapp_callback_inc_d`.`attribute_date`))) < 8)) AND (`edw_dwd`.`dwd_dyminiapp_callback_inc_d`.`event_date` >= '2024-08-18')) `a` WHERE `a`.`rn` = 1 GROUP BY `a`.`channel`, `a`.`project`, `a`.`event_date`) `f` ON (((coalesce(`a`.`source_channel`, `b`.`channel_name`)) = `f`.`channel`) AND ((coalesce(`a`.`dt`, `b`.`date`)) = `f`.`event_date`)) AND ((coalesce(`a`.`project`, `b`.`project`)) = `f`.`project`) LEFT OUTER JOIN (SELECT `edw_dim`.`dim_mkt_pull_new_user_change_inc_d`.`project`, from_unixtime(unix_timestamp(`edw_dim`.`dim_mkt_pull_new_user_change_inc_d`.`ts`), 'yyyy-MM-dd') AS `dt`, `edw_dim`.`dim_mkt_pull_new_user_change_inc_d`.`attribution_channel`, count(DISTINCT `edw_dim`.`dim_mkt_pull_new_user_change_inc_d`.`uid`) AS `new_user_cnt` FROM `edw_dim`.`dim_mkt_pull_new_user_change_inc_d` WHERE (`edw_dim`.`dim_mkt_pull_new_user_change_inc_d`.`project` IN ('reader_paid_dyminiapp', 'reader_paid_wxminiapp')) AND (`edw_dim`.`dim_mkt_pull_new_user_change_inc_d`.`data_date` >= '2024-08-18') GROUP BY `edw_dim`.`dim_mkt_pull_new_user_change_inc_d`.`project`, from_unixtime(unix_timestamp(`edw_dim`.`dim_mkt_pull_new_user_change_inc_d`.`ts`), 'yyyy-MM-dd'), `edw_dim`.`dim_mkt_pull_new_user_change_inc_d`.`attribution_channel`) `g` ON ((`a`.`project` = `g`.`project`) AND (`a`.`dt` = `g`.`dt`)) AND (`a`.`source_channel` = `g`.`attribution_channel`) LEFT OUTER JOIN `newmedia_ods`.`miniapp` AS `h` ON (`a`.`project` = `newmedia_ods`.`h`.`project`) AND (`a`.`mp_app_id` = `newmedia_ods`.`h`.`app_id`) WHERE (coalesce(str_to_date(`a`.`dt`, '%Y-%m-%d'), `b`.`date`)) >= '2024-10-02' ) AS b WHERE `dt` BETWEEN '2025-01-02' AND '2025-02-10'
digest: 6ba59ada9288b07a9b2c26e587df7e13
planCpuCosts: 446804141912.2182
planMemCosts: 118313727370.19861
warehouse: default_warehouse
stmtType: DQL
isFilter: 0
使用物化视图后的查询信息:一共扫描277299行数据,耗时0.6秒不到,查询性能提升了55倍。
queryId: 7111a73e-ed9f-11ef-a9bf-12fae80bf3e0
timestamp: 2025-02-18 10:24:12
queryType: query
clientIp: xxxx
user: xxxx
authorizedUser: 'xxxx'@'%'
resourceGroup: default_wg
catalog: default_catalog
db:
state: EOF
errorCode:
queryTime: 597 --查询耗时
scanBytes: 7836135 --扫描的文件大小
scanRows: 277299 --扫描的总行数
returnRows: 1
cpuCostNs: 184696355
memCostBytes: 23584176
stmtId: 771253
isQuery: 1
feIp: xxxx
stmt: SELECT sum(CAST(callback_recharge_amt AS DECIMAL(27, 9))) AS _col_20 , sum(`recharge_sum_amt_1d`) / sum(`promotion_amt`) AS _col_15 , sum(`promotion_amt`) / sum(`new_user_cnt`) AS _col_14 , sum(CAST(all_recharge_amt AS DECIMAL(27, 9))) AS _col_18 , sum(`promotion_amt`) / sum(`recharge_sum_user_cnt_1d`) AS _col_12 , sum(`media_recharge_amt`) AS _col_19 , sum(`recharge_sum_amt_1d`) / sum(`recharge_sum_user_cnt_1d`) AS _col_13 , sum(`new_user_cnt`) AS _col_17, sum(`recharge_sum_user_cnt_1d`) AS _col_16 from `newmedia_dm`.`dm_newmedia_materialized_view_source_channel_launch_rt_acc` WHERE `dt` BETWEEN '2025-01-02' AND '2025-02-10'
digest:
planCpuCosts: 1004802.8057553954
planMemCosts: 152
warehouse: default_warehouse
stmtType: DQL
isFilter: 0
3.3.使用二级分区解决扫描的数据量过多的场景
在实践中,StarRocks 表采用二级分区存储策略,可以在查询时通常能够减少约90%数据扫描量,单表SQL 查询效率可以提高数十倍。
例如下图,这是我们AB实验业务下的数据底表,由于数据量大,每次查询时的大部分耗时都花在扫描数据上了。这种场景下,要提高查询性能,只需要对扫描的数据量做优化即可。
那么如何优化呢?我们探索了两种方式:
- 将常用字段创建索引
- 将常用字段创建为多级分区
我们在实践过程中发现,在表数据量特别大的情况下,创建索引这种方式一是资源消耗多,二是查询性能的提升并不明显,这种方式不大适用。我们转而探索多级分区的方式,以下以我们在AB实验业务上的实践举例:
首先看下整体的数据量(88亿+):
接着我们将常用字段:exp_id 设置为二级分区,建立二级分表:
CREATE TABLE `dwt_ab_core_xxxx_test` (
`dt` date NOT NULL COMMENT "日期分区yyyy-MM-dd",
`exp_id` bigint(20) NOT NULL COMMENT "实验id",
`source_uid` varchar(65533) NULL COMMENT "归因source_uid",
`exp_name` varchar(65533) NULL COMMENT "实验名称",
......
) ENGINE=OLAP
DUPLICATE KEY(`dt`, `exp_id`, `source_uid`, `project`)
COMMENT "AB实验主题表数据"
PARTITION BY (`dt`,`exp_id`)
DISTRIBUTED BY HASH(`source_uid`, `project`) BUCKETS 1
PROPERTIES (
"replication_num" = "3",
"in_memory" = "false",
"enable_persistent_index" = "true",
"replicated_storage" = "true",
"compression" = "ZSTD"
);
注:bucket 数量需控制在1 bucket = 1G 数据量
以实验exp_id = 4330数据为例,对比验证以下原表和二级分区表的 SQL 查询效率:
-- 1
-- 原表查询SQL
select group_id
, avg(inner_avg_data) as avg_value
, stddev(inner_avg_data) as stddev_value
, count(inner_avg_data) as count_value
from (select group_id, source_uid, sum(is_read_user) as inner_avg_data
from edw_dwt.dwt_ab_xxxx_inc_d_view
where 1=1 and exp_id = 4330
AND dt >= '2024-10-25'
AND dt <= '2025-01-02'
group by group_id, source_uid ) t1
GROUP BY group_id
order by group_id
;
--二级分区表查询SQL
select group_id
, avg(inner_avg_data) as avg_value
, stddev(inner_avg_data) as stddev_value
, count(inner_avg_data) as count_value
from (select group_id, source_uid, sum(is_read_user) as inner_avg_data
from edw_dwt.dwt_ab_core_xxxx_test
where 1=1 and exp_id = 4330
AND dt >= '2024-10-25'
AND dt <= '2025-01-02'
group by group_id, source_uid ) t1
GROUP BY group_id
order by group_id
;
--2
--原表查询SQL
select group_id
, avg(inner_avg_data) as avg_value
, stddev(inner_avg_data) as stddev_value
, count(inner_avg_data) as count_value
from (select group_id, source_uid, count(dt) as inner_avg_data
from edw_dwt.dwt_ab_xxxx_inc_d_view
where 1=1 and exp_id = 4330
AND dt >= '2024-10-25'
AND dt <= '2025-01-02'
group by group_id, source_uid ) t1
GROUP BY group_id
order by group_id
;
--二级分区表查询SQL
select group_id
, avg(inner_avg_data) as avg_value
, stddev(inner_avg_data) as stddev_value
, count(inner_avg_data) as count_value
from (select group_id, source_uid, count(dt) as inner_avg_data
from edw_dwt.dwt_ab_core_xxxx_test
where 1=1 and exp_id = 4330
AND dt >= '2024-10-25'
AND dt <= '2025-01-02'
group by group_id, source_uid ) t1
GROUP BY group_id
order by group_id
由上图可以看到,相同逻辑下,二级分区的查询时长只有原表查询时长的百分之一!相当于有百倍的提升。
在这种大数据量分析查询的场景,大家可以参考该方式进行查询优化。
4.展望未来
- 通过SQL的查询计划更细致化的进行优化
目前 Serverless StarRocks 集群中提供了 SQL 诊断平台,里面记录了 SQL 的查询计划以及计划中每个节点的耗时。根据分析各个节点慢点原因,可以针对性的优化 SQL。
例如下图:这个查询中有一个OLAP_SCAN节点耗时8秒,配合执行计划就能得知是 SQL 中扫描某张表特别慢导致整个查询慢,我们只要针对这个表进行优化,就相当于优化这个慢查询