千亿级别历史增量去重方案

千亿级别历史增量去重方案

前言

消除重复数据是我们在实际业务中经常遇到的一类问题。在大数据领域,重复数据的删除有助于减少存储所需要的存储容量。而且在一些特定的业务场景中,重复数据是不可接受的,例如,精确统计APP每天的去重用户量,在明细表中统计每天每本书的阅读用户数。

一般情况下,我们可以直接使用SQL通过DISTINCT来进行直接去重计算,这种方式简单粗暴直接,对于一天、一周的数据去重计算,我们也许可以直接采用这种方式。但是很多情况下,去重计数是一个**增量和长期**的过程,并且不同的场景下因为效率和精度问题方案也需要变化。

业务场景

统计每本书每个章节的阅读历史累计UV、当日新增UV、新增时的点击次数(PV)。

需求分析

首先可以确定的是,这是一个历史累计增量去重的问题,第一浮现在脑海里的肯定是 count(distinct uid),也许可以,接下来再估算一下数据量,平台总章节数:几千万+,总用户数:几亿+,假如每个用户都看过所有章节,想象一下,需要对多少数据去重。

经过思考,我们可以构造一章历史全量表,按天分区,然后用当天数据和历史全量进行对比去重,最后得到当天新增的数据并存入当天分区。

举个例子,假如我们要计算2021-01-03这一天的指标,那我们需要如下操作:一张表是历史全量分区表,一张表是每日的阅读数据

1、历史全量分区表:

新增日期(分区) book_id chapter_id uid click_num
2021-01-01 1 12 a 100
2021-01-01 2 13 b 234
2021-01-01 2 14 b 123
2021-01-02 2 20 d 10

2、每日的阅读数据:

日期 book_id chapter_id uid click_num
2021-01-03 1 12 a 213
2021-01-03 1 34 a 124
2021-01-03 1 12 b 691
2021-01-03 2 14 b 451
2021-01-03 2 20 d 343

3、2021-01-03的数据对历史的数据进行去重操作(用户首次阅读本章时记录),插入历史表分区(新增数据如下表粗体所示):

新增日期(分区) book_id chapter_id uid click_num
2021-01-01 1 12 a 100
2021-01-01 2 13 b 234
2021-01-01 2 14 b 123
2021-01-02 2 20 d 10
2021-01-03 1 34 a 124
2021-01-03 1 12 b 691

4、最终,我们输出的结果就是:

日期 书籍ID 章节ID 历史累计UV 新增时的点击次数(PV) 当日新增UV
2021-01-03 1 12 2 791 1
2021-01-03 2 13 1 234 0
2021-01-03 2 14 1 123 0
2021-01-03 2 20 1 10 0
2021-01-03 1 34 1 124 1

至此,我们的大致框架思路已经确定,接下来我们就要调研采用哪种方法来实现步骤3。

去重方案调研

我们目前常用的去重方法有:

基于 HyperLogLog

HyperLogLog 是一种估计统计算法,被用来统计一个集合中不同数据的个数,也就是我们所说的去重统计。HyperLogLog 算法是用于基数统计的算法,每个 HyperLogLog 键只需要花费 12 KB 内存,就可以计算接近 2 的 64 方个不同元素的基数。HyperLogLog 适用于大数据量的统计,因为成本相对来说是更低的,最多也就占用 12KB 内存。

我们可以使用HLL计算截止每天每本书每章的累计uv和点击次数pv,就是每天全量数据,建表语句如下:

create table temp.temp_test_hll(
	book_id int,
	chapter_id bigint,
	hll binary,
  accum_uv bigint,
  accum_pv bigint
) partitioned by (dt string) ;

在代码中新增依赖:

<dependency>
   <groupId>net.agkn</groupId>
  <artifactId>hll</artifactId>
   <version>1.6.0</version>
</dependency>

主流程(也可以自定义UDAF实现):

//desc 历史+今日计算
    spark.sql(
      """
        |select
        |t1.book_id,t1.chapter_id,t1.hll,t1.pv,t2.list
        |from temp.temp_test_hll t1 --昨日全量
        |left join (
        |     --当天阅读数据
        |     select
        |     book_id,
        |     chapter_id,
        |     collect_list(concat(cast(source_uid as string),',',cast(click_count as string))) as list
        |     from source
        |     group by book_id,chapter_id
        |) t2
        |on t1.book_id = t2.book_id and t1.chapter_id = t2.chapter_id
        |""".stripMargin)
      .rdd
      .mapPartitions(partition =>{
        // 因为我们的用户id为字符串,需要取hash
        val hashFunction = Hashing.murmur3_128()
        partition.map(row => {
          val bookId = row.getInt(0)
          val chapterId = row.getInt(1)
          // 将历史的二进制hll反序列化
          val hll = HLL.fromBytes(row.getAs[Array[Byte]](2))
          var pv = row.getLong(3)
          import scala.collection.JavaConverters._
          val list = row.getList[String](4).asScala.toList
          for (str <- list) {
            val Array(uid,click_count) = str.split(",")
            val result_1 = hll.cardinality()
            hll.addRaw(hashFunction.newHasher().putString(uid, StandardCharsets.UTF_8).hash().asLong())
            val result_2 = hll.cardinality()
            if (result_2 > result_1) pv += click_count.toLong
          }
          (bookId,chapterId,hll.toBytes(),hll.cardinality(),pv)
        })
      }).toDF().write.mode(SaveMode.Overwrite).insertInto("temp.temp_test_hll")

最后我们每天计算累计uv可以反序列化出来直接使用hll.cardinality(),效率十分高效,并且全量的数据占用磁盘空间也只有1/3。

addRaw 方法用于向 HyperLogLog 中插入元素。如果插入的元素非数值型的,则需要 hash 过后才能插入。accumulator.cardinality() 方法用于计算 HyperLogLog 中元素的基数。

需要注意的是,HyperLogLog 并不是精准的去重,如果业务场景追求 100% 正确,那么一定不要使用这种方法。

基于 BitMap

HyperLogLog 和 BloomFilter 虽然减少了存储但是丢失了精度, 这在某些业务场景下是无法被接受的。下面的这种方法不仅可以减少存储,而且还可以做到完全准确,那就是使用 BitMap。

Bit-Map 的基本思想是用一个 bit 位来标记某个元素对应的 Value,而 Key 即是该元素。由于采用了 Bit 为单位来存储数据,因此可以大大节省存储空间。

假设有这样一个需求:在 20 亿个随机整数中找出某个数 m 是否存在其中,并假设 32 位操作系统,4G 内存。在 Java 中,int 占 4 字节,1 字节 = 8 位(1 byte = 8 bit)

如果每个数字用 int 存储,那就是 20 亿个 int,因而占用的空间约为 (2000000000*4/1024/1024/1024)≈7.45G

如果按位存储就不一样了,20 亿个数就是 20 亿位,占用空间约为 (2000000000/8/1024/1024/1024)≈0.233G

在使用 BitMap 算法前,如果你需要去重的对象不是数字,那么需要先转换成数字。例如,用户可以自己创造一个映射器,将需要去重的对象和数字进行映射,最简单的办法是,可以直接使用数据库维度表中自增 ID。

总体处理思路和HyperLogLog的思路一样,都是每天计算全量存入当日分区,然后每天计算用当日数据和历史关联计算,BitMap的大致用法如下:

添加依赖:

<dependency>
   <groupId>org.roaringbitmap</groupId>
   <artifactId>RoaringBitmap</artifactId>
   <version>0.8.0</version>
</dependency>

常用操作:

val roaring64NavigableMap = new Roaring64NavigableMap()
roaring64NavigableMap.addLong(1233453453345L) //添加元素
roaring64NavigableMap.runOptimize() //压缩
roaring64NavigableMap.getLongCardinality() //计算结果

需要注意的是,BitMap有多种实现,需要根据我们的数据情况来合理选择

  1. Java 中的 Bitmap : BitSet

  2. 压缩 Bitmap: RoaringBitmap

  3. 长整型压缩 Bitmap: Roaring64NavigableMap

在我们没有数值型ID的时候,我们需要对字符串进行hash,建议采用碰撞率更低效率更高的Hashing.murmur3_128(),通常hash出来的为长整形,这时候推荐使用Roaring64NavigableMap,反之,如果有数值ID,则可以使用任意一种Bitmap。

基于布隆过滤器(BloomFilter)

BloomFilter(布隆过滤器)类似于一个 HashSet,用于快速判断某个元素是否存在于集合中,其典型的应用场景就是能够快速判断一个 key 是否存在于某容器,不存在就直接返回。

需要注意的是,和 HyperLogLog 一样,布隆过滤器不能保证 100% 精确。但是它的插入和查询效率都很高。

方案评估与选择

  • HyperLogLog
    • 性能最高效,占用的存储空间最小。
    • 非精准去重,业务方不能接受误差的存在。
    • 我们没有唯一数值型id,通过hash后取得的id会使计算准确性更加不可控。
  • BitMap
    • 性能高效,能精准去重。
    • 我们没有唯一数值型id,导致计算结果可能不准。
    • 当元素数量达到上千万的时候,序列化和反序列化需要消耗大量的内存资源,分析性能下降严重。
  • BloomFilter
    • 非精准去重,业务方不能接受误差的存在。
    • 没有唯一数值型id。

我们发现,最大的问题已经不是这几种方法是否能做到精准去重了,而是我们没有全局唯一性的id。

此时,在各种方法都不能采用的时候,我们可能只能来取反集了,即用当日的数据和历史全量的数据进行对比取历史不存在的数据,得到的数据就是新增的数据,然后存入当日分区。最后再用昨日的结果加上当日的新增结果来得到最终结果。

但是在这里,我们就要考虑如何让这个性能更加高效了,有如下两种方式对历史去重获取当日新增的数据:

--获取20210103当天的新增
--这边需要说明一下数据量,历史数据目前已经6TB(2000亿条数据)
--方法一
insert overwrite table history_data partition(min_dt)
select book_id,chapter_id,source_uid,min(dt) as min_dt from (
	select dt,book_id,chapter_id,source_uid from history_data
	union all
	select dt,book_id,chapter_id,source_uid from today_data --20210103数据
) t
group by book_id,chapter_id,source_uid
having min_dt = '20210103'

--方法二
insert overwrite table history_data partition(dt)
select a.dt,a.book_id,a.chapter_id,a.source_uid 
from today_data a --20210103数据
left anti join history_data b
on a.book_id = b.book_id and a.chapter_id = b.chapter_id and a.source_uid = b.source_uid

我们可以猜想一下上面两种方法哪个性能高,估计很大一部分人都认为方法一性能高,原因是方法一没有进行多表join的操作,认为单表group by 的性能会比 join的性能高,其实这个是不一定的,经过实际测试,在目前6T的超大数据量情况下,方法二的性能比方法一的性能会高出很多。

未来优化方向

海量数据的精准去重是大数据计算中最大的难点,而历史全量累计去重又是一个长期的过程,随着数据量的不断增加,可能就需要改变方案。

未来我们会尝试使用Kylin来进行精准去重计算,Kylin支持任意粒度的上卷聚合、支持String等非数值型数据。通过构建全局字典来构建一个全局唯一的id,然后使用RoaringBitmap来进行精准去重。具体可以参考:https://blog.bcmeng.com/post/kylin-distinct-count-global-dict.html

展示评论