多种数据同步方案在七猫的实践
背景
伴随着七猫的发展,大数据团队针对不同业务场景和不同数据功能,调研和运用多种技术栈和数据库来来解决了各类数据存放和使用的问题。本文将七猫大数据团队在实践过程中的一些开发技巧和代码示例进行整理,一是用于技术的沉淀记录,二是希望通过该文章能帮助大家对各种数据库同步场景所有启发。
数据同步概览
篇章1.离线数据同步
该篇章介绍七猫在各数据库中进行数据相互同步的实践例子。
MySQL2Hive
1.小数据量导入数仓
对于千万级别及以下的数据,为了简化开发,七猫选择了 Sqoop 来进行数据同步。
使用例子:
#mysql实例信息
mysql_host="xxxx"
mysql_port="xxxx"
mysql_db="xxxx"
mysql_user="xxxx"
mysql_pwd="xxxx"
jdbc_url="jdbc:mysql://${mysql_host}/${mysql_db}?tinyInt1isBit=false&useUnicode=true&characterEncoding=utf-8"
sqoop_import=`which sqoop-import`
${sqoop_import} -D mapred.job.queue.name=important --connect "${jdbc_url}" --username ${mysql_user} --password ${mysql_pwd} \
--table xxxx \
--columns col1,col2,col3 \
--where "col2 = 1 AND col3 > 100"
--hive-import \
-m 4 \
--split-by col1 \
--hive-overwrite \
--hive-database xxxx \
--hive-table xxxx \
--hive-drop-import-delims \
--input-null-string '\\N' \
--input-null-non-string '\\N' \
--fields-terminated-by "\001" \
--delete-target-dir
优化参数使用解释:
1.并行度设置
-m:通过这个参数设置map task数量
--split-by:通过这个参数指定map tas按照表中哪个字段进行切片,两者同时使用
2.导入数据时 NULL 存储一致性问题
Hive 中的 NULL 在底层是以“\N”来存储,而 MySQL 中的 NULL 在底层就是 NULL,为了保证数据两端的一致性,设置如下两个参数
--input-null-string '\N'
--input-null-non-string '\N' \
3.换行符 \n 和\r 等特殊符号处理
--hive-drop-import-delims:通过这个参数设置将特殊字符进行丢弃
--hive-delims-replacement:通过这个参数设置将特殊字符进行替换
2.大数据量导入数仓
为了保证数据稳定性,七猫开发通用 Spark 代码进行数据导入。
例子:
import org.apache.spark.sql.SparkSession
import scalikejdbc.{ConnectionPool, ConnectionPoolSettings, DB, SQL}
//初始化数据库连接池
val settings = ConnectionPoolSettings(
initialSize = 10,
maxSize = 20,
connectionTimeoutMillis = 3000L,
validationQuery = s"select 1 from xxxx"
)
ConnectionPool.singleton(url, user, password, settings)
//从表中获取上下界
def getDataBound(tableName: String, colName: String): (Long, Long) = {
var lowerBound = 0L
var upperBound = 0L
DB.readOnly(implicit session => {
SQL(s"select min(${colName}) as lower_bound,max(${colName}) as upper_bound from ${tableName} ").map(result => {
(result.int("lower_bound"), result.int("upper_bound"))
}).list().apply()
}).foreach { bound =>
lowerBound = bound._1
upperBound = bound._2
println(s"表${tableName}->${colName}的上下界为${lowerBound}、${upperBound}")
}
(lowerBound, upperBound)
}
//并行读取表中数据
val jdbcDF = spark.read
.format("jdbc")
.option("driver", driverName)
.option("url", url)
.option("dbtable", readTable)
.option("numPartitions", 10)
.option("fetchsize", 1000)
.option("partitionColumn", "id")
.option("lowerBound", tableBound._1)
.option("upperBound", tableBound._2)
.option("user", user)
.option("password", password)
.load()
注:spark读取MySQL默认的并行度只有1,即使手动指定了并行度numPartitions,也不会生效,这在读取大表的时候会成为性能瓶颈。想要提高并行度读取MySQL,则需要获取到该表主键的上界(upperBound)下界(lowerBound),然后指定并行度(numPartitions),程序就会按照设置的并行度均匀拆分,每个task会去并行读取拆分后的数据。
ClickHouse2Hive
例子:
object ClickHouseToHiveExporter {
private val logger = LoggerFactory.getLogger(this.getClass.getName)
def main(args: Array[String]): Unit = {
val options: Options = new Options
options.addOption("hostName", true, "数据库地址")
options.addOption("db", true, "数据库库名")
options.addOption("sourceTableName", true, "数据表名")
options.addOption("username", true, "用户名")
options.addOption("password", true, "密码")
options.addOption("destTableName", true, "目标Hive表表名")
options.addOption("port", true, "ClickHouse端口号")
options.addOption("data_minute", true, "作业执行时间所处的分钟,格式:yyyyMMddHHMM")
options.addOption("job_date", true, "作业执行日期,格式:yyyy-MM-dd")
val parser = new PosixParser
val line = parser.parse(options, args)
if (!line.hasOption("hostName") ||
!line.hasOption("db") ||
!line.hasOption("sourceTableName") ||
!line.hasOption("username") ||
!line.hasOption("password") ||
!line.hasOption("destTableName") ||
!line.hasOption("port")
) {
logger.error(
"""
|缺少必要的参数:
| 1. hostName
| 2. db
| 3. sourceTableName
| 4. username
| 5. password
| 6. destTableName
| 7. port
|""".stripMargin)
System.exit(1)
}
val hostName = line.getOptionValue("hostName")
val db = line.getOptionValue("db")
val sourceTableName = line.getOptionValue("sourceTableName")
val username = line.getOptionValue("username")
val password = line.getOptionValue("password")
val destTableName = line.getOptionValue("destTableName")
val port = line.getOptionValue("port")
logger.info(
s"""
|接收参数:
|hostName:$hostName
|db:$db
|username:$username
|destTableName:$destTableName
|port:$port
|""".stripMargin)
val sourceTableSql =
s"""(
|
|SELECT col1
| , col2
|FROM $db.$sourceTableName
|WHERE col3 = "xxxx"
|) tempTable
""".stripMargin
val sparkSession = SparkSession
.builder()
.enableHiveSupport()
.getOrCreate()
val sourceDF: DataFrame = sparkSession.read.format("jdbc")
.option("url", s"jdbc:clickhouse://$hostName:$port")
.option("driver", "ru.yandex.clickhouse.ClickHouseDriver")
.option("user", username)
.option("password", password)
.option("dbtable", sourceTableSql)
.load()
sourceDF.write
.format("ORC")
.mode(SaveMode.Overwrite)
.insertInto(destTableName)
}
}
对于 AggregatingMergeTree 引擎表同步 Hive,只需要根据ck表中字段类型相应调整下查询 SQL 即可。
例如ck表结构为:
CREATE TABLE ck_table (
col1 String,
col2 String,
col3 SimpleAggregateFunction(anyLast, Nullable(Int32)),
col4 SimpleAggregateFunction(anyLast, Nullable(Int32)),
col5 SimpleAggregateFunction(max, DateTime64(3))
)
ENGINE = AggregatingMergeTree()
PARTITION BY col1
PRIMARY KEY (col2)
ORDER BY (col2)
SETTINGS index_granularity = 8192
以上面例子做如下调整
......
val sourceTableSql: String =
s"""
|(
|SELECT col1
| , col2
| , anyLast(col3) AS col3
| , anyLast(col4) AS col4
| , toString(max(col5)) AS col5
|FROM $db.$sourceTableName
|WHERE data_date = '${data_date}'
|GROUP BY col1
| , col1
|) temp
|""".stripMargin
......
题外话,也可通过 Sqoop 导入数据
CONDITIONS='''$CONDITIONS'''
${sqoop_import} -D mapred.job.queue.name=important \
--connect "jdbc:clickhouse://xxxx:xxxx/xxxx" \
--driver ru.yandex.clickhouse.ClickHouseDriver \
--username xxxx \
--password xxxx \
--hive-import \
-m 1 \
--hive-overwrite \
--hive-database xxxx \
--hive-table xxxxx \
--hive-drop-import-delims \
--input-null-string '\\N' \
--input-null-non-string '\\N' \
--fields-terminated-by "\001" \
--target-dir /xxxx/xxxx \
--delete-target-dir \
--query "
SELECT col1
, col2
FROM xxxx
WHERE col3 = xxxx
AND $CONDITIONS
"
StarRocks2Hive
通过使用 StarRocks 自带的 Spark 连接器和语法。
官方介绍: https://docs.starrocks.io/zh-cn/latest/unloading/Spark_connector
例子:
${spark_sql} --master yarn-client --deploy-mode client \
......
--jars /xxxx/starrocks-spark2_2.11-1.0.0.jar \
-e "
CREATE TEMPORARY VIEW spark_starrocks
USING starrocks
OPTIONS
(
\"starrocks.table.identifier\" = \"sr_database.sr_table\",
\"starrocks.fenodes\" = \"fenodes:fenodes_port\",
\"user\" = \"xxxx\",
\"password\" = \"xxxx\"
);
INSERT OVERWRITE TABLE hive_database.hive_table partition (dt = 'xxxx')
SELECT
FROM spark_starrocks
;
"
Hive2StarRocks
1.小数据量场景导入
使用 StarRocks 本身读外表能力进行数据导入。
注:该方式导入数据量控制在百万以下,否则会对StarRocks 集群造成太大压力导致宕机。读 Hive 表需要在表名前加上hive.
例子:
#导入新starrocks集群
# Hive 数据更新后,为了 StarRocks 能够获取到相应元数据,建议手动刷新
mysql -hfenode_port -uxxxx -pxxxx -Pfenode_port -e "
REFRESH EXTERNAL TABLE hive.hive_database.hive_table;
"
sleep 120
mysql -hfenode_port -uxxxx -pxxxx -Pfenode_port -e "
INSERT INTO sr_database.sr_table
SELECT col1
, col2
FROM hive.hive_database.hive_table
WHERE col3 = 'xxxx'
2.中等规模数据量(千万以下)场景导入
通过 StarRocks 的 Stream Load 功能实现。
工具代码关键部分展示:
object SparkWrite2StarRocks {
LoggerUtil.setSparkLogLevels()
def main(args: Array[String]): Unit = {
var srFePort = xxxx
var srFilterRatio = 0.0
var sinkParallelism = 1
var debug = false
val options: Options = new Options
options.addOption("srFe", true, "sr fe节点hostname")
options.addOption("srFePort", true, "sr fe节点http port,默认8030")
options.addOption("srUser", true, "sr用户名")
options.addOption("srPass", true, "sr密码")
options.addOption("srDB", true, "sr数据库名")
options.addOption("srTable", true, "sr数据库名")
options.addOption("srColumns", true, "sr列名")
options.addOption("srFilterRatio", true, "sr导入数据错误率,默认0,表示不允许有错")
options.addOption("sinkParallelism", true, "sink并行度,默认1,需要根据数据量判断,例如50亿条数据,可能需要设置5000以上")
options.addOption("sqlText", true, "数据查询语句")
options.addOption("debug", true, "是否开启debug模式,默认不开启")
val parser = new PosixParser
val line = parser.parse(options, args)
if (
!line.hasOption("srFe")
&& !line.hasOption("srUser")
&& !line.hasOption("srPass")
&& !line.hasOption("srDB")
&& !line.hasOption("srTable")
&& !line.hasOption("srColumns")
&& !line.hasOption("sqlText")
) {
LoggerUtil.error(
"""
|ERROR: 缺少必要的参数
|srFe -> sr fe节点hostname
|srUser -> sr用户名
|srPass -> sr密码
|srDB -> sr数据库名
|srTable -> sr数据库名
|srColumns -> sr列名
|sqlText -> 数据查询语句
|""".stripMargin)
System.exit(1)
}
val srFe = line.getOptionValue("srFe")
val srUser = line.getOptionValue("srUser")
val srPass = line.getOptionValue("srPass")
val srDB = line.getOptionValue("srDB")
val srTable = line.getOptionValue("srTable")
val srColumns = line.getOptionValue("srColumns")
val sqlText = line.getOptionValue("sqlText")
if (line.hasOption("srFePort")) srFePort = line.getOptionValue("srFePort").toInt
if (line.hasOption("srFilterRatio")) srFilterRatio = line.getOptionValue("srFilterRatio").toDouble
if (line.hasOption("sinkParallelism")) sinkParallelism = line.getOptionValue("sinkParallelism").toInt
if (line.hasOption("debug")) debug = line.getOptionValue("debug").toBoolean
LoggerUtil.warn(
s"""
|任务参数:
|srFe -> $srFe
|srFePort -> $srFePort
|srUser -> $srUser
|srPass -> $srPass
|srDB -> $srDB
|srTable -> $srTable
|srColumns -> $srColumns
|srFilterRatio -> $srFilterRatio
|sinkParallelism -> $sinkParallelism
|sqlText -> $sqlText
|debug -> $debug
|""".stripMargin)
val spark = SparkSession
.builder()
.enableHiveSupport()
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.kryoserializer.buffer", "10")
.config("spark.debug.maxToStringFields", "300")
.getOrCreate()
val sc = spark.sparkContext
sc.setLogLevel("WARN")
import spark.implicits._
val resDf = spark.sql(sqlText)
resDf.show()
resDf.map(_.mkString(Consts.columnSeparator))
.repartition(sinkParallelism)
.foreachPartition(
itr => {
val sink = new MySrSink(Map(
"max_filter_ratio" -> s"$srFilterRatio",
"columns" -> srColumns,
"column_separator" -> Consts.columnSeparator,
"timeout" -> Consts.timeout),
srDB,
srUser,
srPass,
srTable,
srFe,
srFePort,
debug,
debug)
if (itr.hasNext) sink.invoke(itr.mkString("\n"))
if (!sink.status) {
throw new Exception("写入StarRocks异常!!!")
System.exit(1)
}
}
)
spark.close()
}
}
object SparkConnector2StarRocks {
// parameters
private val starRocksName = "xxxx"
private val tblNameSrc = "xxxx"
private val tblNameDst = "xxxx"
private val userName = "root"
private val password = ""
private val srFe = "xxxx" // fe hostname
private val port = xxxx // fe http port
private val filterRatio = 0.2
private val columns = "xxxx"
private val master = "xxxx"
private val appName = "xxxx"
private val partitions = 2 // computing parallelism
private val buckets = 1 // sink parallelism
private val debug = false
LoggerUtil.setSparkLogLevels()
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName(appName)
.setMaster(master)
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val spark = SparkSession.builder().config(conf).master(master).enableHiveSupport().getOrCreate()
val sc = spark.sparkContext
sc.setLogLevel("WARN")
import spark.implicits._
val starrocksSparkDF = spark.read.format("starrocks")
.option("starrocks.table.identifier", s"${starRocksName}.${tblNameSrc}")
.option("starrocks.fenodes", s"${srFe}:${port}")
.option("user", s"${userName}")
.option("password", s"${password}")
.load().repartition(partitions)
starrocksSparkDF.show(5, false)
starrocksSparkDF.createOrReplaceTempView("view_tb1")
val resDf = spark.sql(
"""
|select uid, date, hour, minute, site
|from view_tb1
|lateral view explode(split(uid_list_str,',')) temp_tbl as uid
|""".stripMargin)
resDf.show(5, false) // IDEA/REPL local outputs
resDf.map(x => x.toString().replaceAll("\\[|\\]", "").replace(",", Consts.columnSeparator))
.repartition(buckets).foreachPartition(
itr => {
val sink = new MySrSink(Map(
"max_filter_ratio" -> s"${filterRatio}",
"columns" -> columns,
"column_separator" -> Consts.columnSeparator),
starRocksName,
userName,
password,
tblNameDst,
srFe,
port,
debug,
debug)
if (itr.hasNext) sink.invoke(itr.mkString("\n"))
}
)
spark.close()
}
}
class MySrSink(headers:Map[String,String],
dbName:String,
userName:String,
password:String,
tblName:String,
hostName:String,
port:Int = 18030,
debug:Boolean = true, showPayLoad: Boolean = false) extends Serializable {
val CHARSET = "UTF-8"
val BINARY_CT = "application/octet-stream"
val CONTENT_TYPE = "text/plain"
var TIMEOUT = 30000
var api = s"http://${hostName}:${port}/api/${dbName}/${tblName}/_stream_load"
var httpClient: CloseableHttpClient = _
var response:CloseableHttpResponse = _
var status :Boolean = true
def invoke(value: String): Unit = {
httpClient = PutUtil.clientGen(userName, password)
try {
val res = PutUtil.put(httpClient, value, api, CONTENT_TYPE, headers, debug, showPayLoad)
status = res._1
httpClient = res._2
response = res._3
} catch {
case ex:Exception => {
println("### invoke ERROR:")
ex.printStackTrace()
}
} finally {
try {
httpClient.close()
response.close()
} catch {
case ex:Exception => {
println("### http close ERROR:")
ex.printStackTrace()
}
}
}
}
}
3.数据量(亿级别以上)场景导入
通过 StarRocks 的 BROKER LOAD 功能实现。
注:在阿里云环境上使用时需注意 StarRocks 当前版本是否兼容直接读取oss-hdfs文件。如果不兼容则需要先将数据从oss-hdfs同步至oss,再从oss上读取文件。
例子:
# step 1 先将数据从oss-hdfs同步至oss oss-hdfs不支持直接导入sr 该命令如果分区相同 则是覆盖写
hadoop jar /opt/apps/JINDOSDK//jindosdk-4.6.1/tools/jindo-distcp-tool-4.6.1.jar \
--src oss://xxxx/user/hive/warehouse/${sr_db}.db/${sr_table}/dt=${input_date} \
--dest xxxx/user/hive/warehouse/${sr_db}.db/${sr_table}/dt=${input_date} \
--hadoopConf mapreduce.job.queuename=xxxx \
--hadoopConf mapreduce.map.memory.mb=3280 \
--hadoopConf mapreduce.task.timeout=60000000 \
--hadoopConf fs.oss.accessKeyId=xxxx \
--hadoopConf fs.oss.accessKeySecret=xxxx \
--parallelism 500 \
--jobBatch 50000 \
--taskBatch 2 \
--bandWidth 100
# ===========================数据导入sr配置====================================
# brokeload写入sr
load_path="oss://xxxx/user/hive/warehouse/${sr_db}.db/${sr_table}/dt=${input_date}/*"
access_key_id="xxxx"
access_key_secret="xxxx"
end_point="xxxx"
load_info="
LOAD LABEL ${label_name}
(
DATA INFILE ('$load_path')
INTO TABLE ${sr_table}
FORMAT AS ${sr_format}
(
--字段
col1,
col2,
col3
)
-- 从路径中获取分区信息 作为第一个字段
COLUMNS FROM PATH AS (dt)
)
WITH BROKER
(
'fs.oss.accessKeyId' = '${access_key_id}',
'fs.oss.accessKeySecret' = '${access_key_secret}',
'fs.oss.endpoint' = '${end_point}'
)
;
"
#执行导入命令
mysql -h ${sr_host} -P${sr_port} -u${sr_user} -p${sr_password} -D${sr_db} -e "${load_info}"
# ===========================导入状态检查=====================================
# 检查导入状态
finish_state="FINISHED"
end_count_condition=0
get_state=""
# 循环获取导入状态 直到finished结束 或者达到一定次数结束
while [ x$finish_state != x${get_state} ] ;
do
# 获取导入
get_broke_load_info=`mysql -h ${sr_host} -P${sr_port} -u${sr_user} -p${sr_password} -D${sr_db} -e "show load WHERE Label = '${label_name}'"`
get_state=$(echo "$get_broke_load_info" | grep -oP '(FINISHED|CANCELLED|LOADING|PENDING)' )
echo $get_state
echo "正在执行中......"
end_count_condition=$((end_count_condition+1))
# 如果超过10次则直接跳出获取状态的程序 直接结束 120次 1个小时
if [[ ${end_count_condition} -gt 120 ]]; then
echo "导入失败,已超出导入时间"
break
fi
case x$get_state in
xCANCELLED )
echo "导入失败,已被取消"
break
;;
x${finish_state} )
echo "导入成功"
break
;;
esac
sleep 30
done
echo "导入结束"
Hive2ClickHouse
通过 Spark 的 jdbc 功能,七猫开发通用工具进行数据导入。
工具代码:
import java.util.Properties
import org.apache.commons.cli._
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
import ru.yandex.clickhouse.ClickHouseDataSource
object DataExportToClickhouse {
private val logger: Logger = LoggerFactory.getLogger(this.getClass.getName)
private val ckDriver = "ru.yandex.clickhouse.ClickHouseDriver"
def main(args: Array[String]): Unit = {
val options: Options = new Options
options.addOption("ckHost", true, "clickhouse-host")
options.addOption("ckPort", true, "clickhouse端口,默认8123")
options.addOption("ckUser", true, "clickhouse用户名")
options.addOption("ckPass", true, "clickhouse密码")
options.addOption("ckDb", true, "clickhouse数据库名")
options.addOption("ckTable", true, "clickhouse表名")
options.addOption("ckPartition",true,"clickhouse分区字段名")
options.addOption("ckPartitionValue", true, "clickhouse表分区")
options.addOption("hiveTable", true, "hive表名")
options.addOption("hiveColumns", true, "hive查询列")
options.addOption("hivePartition", true, "hive分区字段名")
options.addOption("hivePartitionValue", true, "hive分区值")
options.addOption("ckTableType", true, "clickhouse表类型,默认是非集群版,可填写cluster")
options.addOption("parallelize", true, "write并行度,默认是8")
options.addOption("batchSize", true, "每批写入条数,默认是100000")
val parser = new PosixParser
val line = parser.parse(options, args)
if (!line.hasOption("ckHost")) {
logger.error("缺少必要ck_host参数");
System.exit(1)
}
else if (!line.hasOption("ckUser")) {
logger.error("缺少必要ck_user参数");
System.exit(1)
}
else if (!line.hasOption("ckPass")) {
logger.error("缺少必要ck_pass参数");
System.exit(1)
}
else if (!line.hasOption("ckDb")) {
logger.error("缺少必要ck_db参数");
System.exit(1)
}
else if (!line.hasOption("ckTable")) {
logger.error("缺少必要ck_table参数");
System.exit(1)
}
else if (!line.hasOption("hiveTable")) {
logger.error("缺少必要hive_table参数");
System.exit(1)
}
else if (!line.hasOption("hiveColumns")) {
logger.error("缺少必要hive_columns参数");
System.exit(1)
}
val ckTableType = if (line.hasOption("ckTableType") && "cluster".equals(line.getOptionValue("ckTableType"))) "cluster" else "single"
val writeParallelize = if (line.hasOption("parallelize")) line.getOptionValue("parallelize").toInt else 8
val batchSize = if (line.hasOption("batchSize")) line.getOptionValue("batchSize").toInt else 100000
val ckHost = line.getOptionValue("ckHost")
val ckPort = if (line.hasOption("ckPort")) line.getOptionValue("ckPort") else "8123"
val ckUser = line.getOptionValue("ckUser")
val ckPass = line.getOptionValue("ckPass")
val ckDb = line.getOptionValue("ckDb")
val ckTb = line.getOptionValue("ckTable")
val ckUrl = s"jdbc:clickhouse://$ckHost:$ckPort/$ckDb"
val ckProp = new Properties()
ckProp.put("driver", ckDriver)
ckProp.put("user", ckUser)
ckProp.put("password", ckPass)
val hiveTable: String = line.getOptionValue("hiveTable")
val hivePartition: String = line.getOptionValue("hivePartition")
val hivePartitionValue: String = line.getOptionValue("hivePartitionValue")
val columns: String = line.getOptionValue("hiveColumns")
val ckPartition: String = line.getOptionValue("ckPartition")
val ckPartitionValue: String = line.getOptionValue("ckPartitionValue")
val spark = SparkSession
.builder()
.enableHiveSupport()
.appName("zhuyitian-CommercialAdvMinutes")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.kryoserializer.buffer", "10")
.getOrCreate()
var df: DataFrame = null
var hiveCondition: String = null
var ckCondition: String = null
if (StringUtils.isNotBlank(hivePartition) && StringUtils.isNotBlank(ckPartitionValue)) {
//hive和ck都指定了分区,删除ck分区数据,倒入hive指定分区数据
dropClickHousePartition(new ClickHouseDataSource(ckUrl, ckProp), ckDb, ckTb, ckPartitionValue, ckTableType)
df = spark.sql(s"select $columns from $hiveTable where $hivePartition = '$hivePartitionValue'")
println(s"select $columns from $hiveTable where $hivePartition = '$hivePartitionValue'")
hiveCondition = s" $hivePartition = '$hivePartitionValue'"
ckCondition = s" $ckPartition='$ckPartitionValue' "
} else if (StringUtils.isNotBlank(hivePartition) && StringUtils.isBlank(ckPartitionValue)) {
//hive指定了分区,清空ck表,倒入指定分区数据
truncateClickHouseTable(new ClickHouseDataSource(ckUrl, ckProp), ckDb, ckTb, ckTableType)
df = spark.sql(s"select $columns from $hiveTable where $hivePartition = '$hivePartitionValue'")
println(s"select $columns from $hiveTable where $hivePartition = '$hivePartitionValue'")
hiveCondition = s" $hivePartition = '$hivePartitionValue'"
} else {
// hive和ck均未指定分区,全表清空,全表覆盖
truncateClickHouseTable(new ClickHouseDataSource(ckUrl, ckProp), ckDb, ckTb, ckTableType)
df = spark.sql(s"select $columns from $hiveTable")
println(s"select $columns from $hiveTable")
}
df.show()
df.coalesce(writeParallelize)
.write
.mode(saveMode = "append")
.option("batchsize", batchSize)
.option("isolationLevel", "NONE") // 设置事务
.jdbc(ckUrl, s"$ckDb.$ckTb", ckProp)
checkClickhouseDataNum(hiveTable, s"$ckDb.$ckTb", hiveCondition ,ckCondition , spark, new ClickHouseDataSource(ckUrl, ckProp) )
}
private def dropClickHousePartition(chDatasource: ClickHouseDataSource, ckDb: String, ckTb: String, partition: String, ckTableType: String): Unit = {
try {
val chConn = chDatasource.getConnection()
val dropSQL = ckTableType match {
case "single" => s"ALTER TABlE $ckDb.$ckTb DROP PARTITION '$partition'"
case "cluster" => s"ALTER TABlE $ckDb.${ckTb}_local DROP PARTITION '$partition'"
case _ => s"ALTER TABlE $ckDb.$ckTb DROP PARTITION '$partition'"
}
println(dropSQL)
val psmt = chConn.prepareStatement(dropSQL)
psmt.execute()
logger.warn(dropSQL)
} catch {
case e: Exception =>
e.printStackTrace()
logger.error("删除clickhouse分区失败,程序退出")
System.exit(1)
}
}
private def truncateClickHouseTable(chDatasource: ClickHouseDataSource, ckDb: String, ckTb: String, ckTableType: String): Unit = {
try {
val chConn = chDatasource.getConnection()
val truncateSQL = ckTableType match {
case "single" => s"TRUNCATE TABlE $ckDb.$ckTb"
case "cluster" => s"TRUNCATE TABlE $ckDb.${ckTb}_local"
case _ => s"TRUNCATE TABlE $ckDb.$ckTb"
}
println(truncateSQL)
val psmt = chConn.prepareStatement(truncateSQL)
psmt.execute()
logger.warn(truncateSQL)
} catch {
case e: Exception =>
e.printStackTrace()
logger.error("清空clickhouse表数据失败,程序退出")
System.exit(1)
}
}
/**
* 检测 Clickhouse的数据量是否和hive 数据量一致
*/
private def checkClickhouseDataNum(hiveTableName: String, ckTableName: String, hivePartition: String, ckPartition:String ,spark: SparkSession, chDatasource: ClickHouseDataSource): Unit = {
logger.info("enter checkClickhouseDataNum...")
var hiveSql: String = s"SELECT count(1) FROM $hiveTableName"
var ckSql: String = s"SELECT count(1) FROM $ckTableName"
if(StringUtils.isNotBlank(hivePartition)){
hiveSql = hiveSql.concat(s" WHERE $hivePartition")
}
if(StringUtils.isNotBlank(ckPartition)){
ckSql = ckSql.concat(s" WHERE $ckPartition")
}
logger.info("hiveSql:{}", hiveSql)
logger.info("ckSql:{}", ckSql)
val df = spark.sql(hiveSql)
val chConn = chDatasource.getConnection
val psmt = chConn.createStatement();
val chResult = psmt.executeQuery(ckSql)
chResult.next();
if (df.first().getLong(0).longValue() != chResult.getLong(1).longValue()) {
logger.error("checkClickhouseDataNum failed, hive-count:{},ck-count:{}", df.first().getLong(0), chResult.getLong(1))
System.exit(1)
}
logger.info("checkClickhouseDataNum success, hive-count:{},ck-count:{}", df.first().getLong(0), chResult.getLong(1))
}
}
篇章2.实时数据任务
在七猫大数据开发中,优先使用 Flink Sql 作为实时任务的主要技术栈。该篇章会介绍一下Flink Sql的各种 Source 和 Sink 的写法与运用。通过不同的 Source 和 Sink 组合就可以实现各数据库之间的数据同步。
Kafka As Source
例子:
kafka中数据(json格式):
{
"json_array_string":
[
{
"id": "1",
"params":
{
"col1": "0",
"col2": "0"
}
}
],
"json_map_string":
{
"col3": "0",
"col4": "0"
},
"ts": 0
}
CREATE TABLE kafka_source_table_bigdata_reader_smallproject (
json_array_string ARRAY<ROW<id STRING,params MAP<STRING,STRING>>>,
json_map_string MAP<STRING,STRING>,
ts bigint,
--水位线的设置
ts_ltz AS TO_TIMESTAMP_LTZ(ts, 0),
WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'topic',
'properties.bootstrap.servers' = 'ip1:port1,ip2:port2,ip3:port3',
'properties.group.id' = 'groupid',
'scan.startup.mode' = 'group-offsets',
-- 'scan.startup.timestamp-millis' = '1684771200000',
'scan.topic-partition-discovery.interval' = '180',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
);
基础参数介绍:
1.connector:指定使用的连接器,Kafka 连接器使用 'kafka'。
2.topic:当表用作 source 时读取数据的 topic 名。亦支持用分号间隔的 topic 列表,如 'topic-1;topic-2'。注意,对 source 表而言,'topic' 和 'topic-pattern' 两个选项只能使用其中一个。当表被用作 sink 时,该配置表示写入的 topic 名。注意 sink 表不支持 topic 列表。
3.topic-pattern:匹配读取 topic 名称的正则表达式。在作业开始运行时,所有匹配该正则表达式的 topic 都将被 Kafka consumer 订阅。
4.properties.bootstrap.servers:逗号分隔的 Kafka broker 列表。
5.properties.group.id:Kafka source 的消费组 id。如果未指定消费组 ID,则会使用自动生成的 "KafkaSource-{tableIdentifier}" 作为消费组 ID。
6.format:用来序列化或反序列化 Kafka 消息的格式。 请参阅 格式 页面以获取更多关于格式的细节和相关配置项。 注意:该配置项和 'value.format' 二者必需其一。
7.value.format:序列化和反序列化 Kafka 消息体时使用的格式。
8.scan.startup.mode:Kafka consumer 的启动模式。有效值为:'earliest-offset','latest-offset','group-offsets','timestamp' 和 'specific-offsets'。
MySQL CDC as Source
原理:通过 Flink SQL 来监听 MySQL 的 bin log,获取 MySQL 的数据。
例子:
CREATE TABLE source_table (
--元数据写法
database_name STRING METADATA VIRTUAL, --
table_name STRING METADATA VIRTUAL,
col1 STRING,
col2 STRING
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'xxxx',
'port' = 'xxxx',
'username' = 'xxxx',
'password' = 'xxxx',
'connect.max-retries' = '10',
'scan.incremental.snapshot.enabled' = 'true',
'server-id' = '7100-7120',
'database-name' = 'xxxx',
'table-name' = 'xxxx_[0-9]+'
)
基础参数介绍:
1.connector:指定要使用的连接器, 这里应该是 'mysql-cdc'.
2.hostname:MySQL 数据库服务器的 IP 地址或主机名。
3.username:连接到 MySQL 数据库服务器时要使用的 MySQL 用户的名称。
4.password:连接 MySQL 数据库服务器时使用的密码。
5.database-name:要监视的 MySQL 服务器的数据库名称。数据库名称还支持正则表达式。
6.table-name:需要监视的 MySQL 数据库的表名。表名支持正则表达式。
7.scan.incremental.snapshot.enabled: 增量快照是一种读取表快照的新机制,与旧的快照机制相比, 增量快照有许多优点,包括: (1)在快照读取期间,Source 支持并发读取, (2)在快照读取期间,Source 支持进行 chunk 粒度的 checkpoint, (3)在快照读取之前,Source 不需要数据库锁权限。 如果希望 Source 并行运行,则每个并行 Readers 都应该具有唯一的 Server id,所以 Server id 必须是类似 5400-6400
的范围,并且该范围必须大于并行度。
8.server-id:读取数据使用的 server id,server id 可以是个整数或者一个整数范围,比如 '5400' 或 '5400-5408', 建议在 'scan.incremental.snapshot.enabled' 参数为启用时,配置成整数范围。因为在当前 MySQL 集群中运行的所有 slave 节点,标记每个 salve 节点的 id 都必须是唯一的。 所以当连接器加入 MySQL 集群作为另一个 slave 节点(并且具有唯一 id 的情况下),它就可以读取 binlog。 默认情况下,连接器会在 5400 和 6400 之间生成一个随机数,但是我们建议用户明确指定 Server id。
支持的元数据:
table_name:当前记录所属的 Mysql 表名称。
database_name:当前记录所属的库名称。
op_ts:当前记录表在数据库中更新的时间。如果从表的快照而不是 binlog 读取记录,该值将始终为0。
MySQL as Source
使用场景:Mysql 数据作为维度表使用。
例子:
CREATE TABLE dim_topic_info (
id BIGINT,
is_del INT,
`status` INT,
tag_ids STRING,
analysis_tag STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql',
'hostname' = 'xxxx',
'port' = 'xxxx',
'username' = 'xxxx',
'password' = 'xxxx',
'database-name' = 'xxxx',
'table-name' = 'xxxx',
'lookup.cache.strategy' = 'ALL',
'lookup.cache.ttl' = '5min'
)
基础参数介绍:
1.lookup.cache.strategy:
支持以下三种缓存策略:
None(默认值):无缓存。
LRU:缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据。如果没有找到,则去物理维表中查找。使用该缓存策略时,必须配置lookup.cache.max-rows参数。
ALL:缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查找数据都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在,并在Cache过期后重新加载一遍全量Cache。适用于远程表数据量小且MISS KEY(源表数据和维表JOIN时,ON条件无法关联)特别多的场景。
2.lookup.cache.max-rows:当选择LRU缓存策略后,必须设置缓存大小。当选择ALL缓存策略后,可以不设置缓存大小。
3.lookup.cache.ttl:lookup.cache.ttl的配置和lookup.cache.strategy有关,详情如下:
如果lookup.cache.strategy配置为None,则lookup.cache.ttl可以不配置,表示缓存不超时。
如果lookup.cache.strategy配置为LRU,则lookup.cache.ttl为缓存超时时间。默认不过期。
如果lookup.cache.strategy配置为ALL,则lookup.cache.ttl为缓存加载时间。默认不重新加载。
填写时请使用时间格式,例如1min或10s。
StarRocks As Source
例子:
CREATE TABLE flink_test
(
`id` INT,
`name` STRING,
`score` INT
)
WITH
(
'connector'='starrocks',
'scan-url'='192.168.xxx.xxx:8030',
'jdbc-url'='jdbc:mysql://192.168.xxx.xxx:9030',
'username'='xxxxxx',
'password'='xxxxxx',
'database-name'='test',
'table-name'='score_board',
'scan.connect.timeout-ms' = '1000',
'scan.params.keep-alive-min' = '10',
'scan.params.query-timeout-s' = '600',
'scan.params.mem-limit-byte' = '1073741824',
'scan.max-retries' = '1'
);
连接参数同 Mysql Source
特殊参数介绍:
1.scan.connect.timeout-ms:Flink Connector 连接 StarRocks 集群的时间上限。单位:毫秒。默认值:1000。超过该时间上限,则数据读取任务会报错
2.scan.params.keep-alive-min:数据读取任务的保活时间,通过轮询机制定期检查。单位:分钟。默认值:10。建议取值大于等于 5
3.scan.params.query-timeout-s:数据读取任务的超时时间,在任务执行过程中进行检查。单位:秒。默认值:600。如果超过该时间,仍未返回读取结果,则停止数据读取任务
4.scan.params.mem-limit-byte:BE 节点中单个查询的内存上限。单位:字节。默认值:1073741824(即 1 GB)
5.scan.max-retries:数据读取失败时的最大重试次数。默认值:1。超过该数量上限,则数据读取任务报错
注:
1.仅支持使用部分 SQL 语句读取 StarRocks 中的数据,如 SELECT ... FROM <table_name> WHERE ...。暂不支持除 count 以外的聚合函数。
2.使用 SQL 语句时,支持自动进行谓词下推。如过滤条件 char_1 <> 'A' and int_1 = -126,会下推到 Flink Connector 中并转换成适用于 StarRocks 的语句后,再执行查询,不需要额外配置。
3.不支持 LIMIT 语句。
4.StarRocks 暂时不支持 Checkpoint 机制。因此,如果读取任务失败,则无法保证数据一致性
Redis As Source
使用场景:Redis 数据作为维度表使用。
例子:
CREATE TEMPORARY TABLE redis_dim (
id STRING,
name STRING,
PRIMARY KEY (id) NOT ENFORCED --Redis中的Row Key字段。
) WITH (
'connector' = 'redis',
'host' = 'xxxx',
'port' = 'xxxx',
'password' = 'xxxx',
'cache' = 'LRU'
);
基础参数介绍:
1.connector:固定值为redis。
2.host:Redis Server连接地址。
3.post:Redis Server连接端口。
4.password:Redis数据库密码。
5.dbNum:选择操作的数据库编号。
6.mode:对应Redis的数据结构。一般为string或者hash
维度独有参数:
1.cache:支持以下两种缓存策略:
None(默认值):无缓存。
LRU:缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据,如果没有找到,则去物理维表中查找。
2.cacheSize:选择LRU缓存策略后,可以设置缓存大小。默认10000
3.cacheTTLMs:cacheTTLMs配置和cache有关:如果cache配置为None,则cacheTTLMs可以不配置,表示缓存不超时。如果cache配置为LRU,则cacheTTLMs为缓存超时时间。默认不过期。
注:
Redis维表必须声明且只能声明一个主键。
Redis维表仅支持声明两个字段,且字段类型必须为STRING。
Redis维表仅支持读取Redis数据存储中STRING和HASHMAP类型的数据。
Redis维表JOIN时,ON条件必须包含所有主键的等值条件。
Redis维表仅支持None和LRU两种缓存策略。
Kafka As Sink
例子:
CREATE TABLE kafka_sink_table (
col1 string,
col2 string
) WITH (
'connector' = 'kafka',
'topic' = 'topic',
'properties.bootstrap.servers' = 'ip1:port1,ip2:port2,ip3:port3',
'scan.topic-partition-discovery.interval' = '60',
'format' = 'json',
'sink.partitioner' = 'round-robin',
'sink.parallelism' = '10'
);
基础参数介绍:
1.sink.partitioner:
default:使用 Kafka 默认的分区器对消息进行分区;
fixed:每个 Flink partition 最终对应最多一个 Kafka partition;
round-robin:Flink partition 按轮循(round-robin)的模式对应到 Kafka partition;只有当未指定消息的消息键时生效。
自定义 FlinkKafkaPartitioner 的子类:例如 'org.mycompany.MyPartitioner'。
2.sink.parallelism:定义 Kafka sink 算子的并行度。默认情况下,并行度由框架定义为与上游串联的算子相同。
注:当 Source 源是 mysql-cdc 时获取的数据类型为Changelog格式,所以需要指定format=debezium-json
StarRocks As Sink
例子:
1.sink为默认csv格式(数据中无特殊字符建议使用)
CREATE TABLE sink_table (
database_name STRING ,
table_name STRING,
id bigint,
PRIMARY KEY (col1) NOT ENFORCED
)
WITH (
'connector' = 'starrocks',
'jdbc-url'='jdbc:mysql://node1:port1,node2:port2,node3:port3?characterEncoding=utf-8&useSSL=false',
'load-url'='node1:port1,node2:port2,node3:port3?',
'database-name' = 'xxxx',
'table-name' = 'xxxx',
'username' = 'xxxx',
'password' = 'xxxx' ,
'sink.buffer-flush.max-rows' = '500000',
'sink.buffer-flush.max-bytes' = '94371840',
'sink.buffer-flush.interval-ms' = '120000',
'sink.properties.column_separator' = '<col_sep>',
'sink.max-retries' = '3'
);
2.sink为json格式(数据中可能出现特殊字符时建议使用)
CREATE TABLE sink_table (
col1 date,
col2 string,
col3 string
)
WITH (
'connector' = 'starrocks',
'jdbc-url'='jdbc:mysql://node1:port1,node2:port2,node3:port3?characterEncoding=utf-8&useSSL=false',
'load-url'='node1:port1,node2:port2,node3:port3?',
'database-name' = 'xxxx',
'table-name' = 'xxxx',
'username' = 'xxxx',
'password' = 'xxxx' ,
'sink.buffer-flush.max-rows' = '1000000',
'sink.buffer-flush.max-bytes' = '94371840',
'sink.buffer-flush.interval-ms' = '120000',
'sink.max-retries' = '3',
'sink.parallelism' = '5',
'sink.properties.format' = 'json',
'sink.properties.strip_outer_array' = 'true',
'sink.properties.ignore_json_size' = 'true'
);
注:sink.properties.format参数代表 StarRocks 写入时的格式,默认为csv。当数据中有类似分隔符或者换行符这种特殊符号时,csv格式会将行和列解析错误,该情况建设设置成json格式使用(会加大资源消耗,按需调整资源大小)
sink独有参数介绍:
1.sink.buffer-flush.max-rows:积攒在内存的数据条数,达到该阈值后数据通过 Stream Load 一次性导入 StarRocks。取值范围:[64000, 5000000]
2.sink.buffer-flush.max-bytes:积攒在内存的数据大小,达到该阈值后数据通过 Stream Load 一次性导入 StarRocks。取值范围:[64MB, 10GB]。将此参数设置为较大的值可以提高导入性能,但可能会增加导入延迟。 该参数只在 sink.semantic 为at-least-once才会生效。 sink.semantic 为 exactly-once,则只有 Flink checkpoint 触发时 flush 内存的数据,因此该参数不生效。
3.sink.buffer-flush.interval-ms:数据发送的间隔,用于控制数据写入 StarRocks 的延迟,取值范围:[1000, 3600000]。该参数只在 sink.semantic 为 at-least-once才会生效。
4.sink.properties.column_separator:CSV 数据的列分隔符。
5.sink.max-retries:Stream Load 失败后的重试次数。超过该数量上限,则数据导入任务报错。取值范围:[0, 10]。该参数只在 sink.version 为 V1 才会生效。
json格式独有参数介绍:
1.sink.properties.strip_outer_array:用于指定是否裁剪最外层的数组结构。取值范围:true 和 false。默认值:false。真实业务场景中,待导入的 JSON 数据可能在最外层有一对表示数组结构的中括号 []。这种情况下,一般建议您指定该参数取值为 true,这样 StarRocks 会剪裁掉外层的中括号 [],并把中括号 [] 里的每个内层数组都作为一行单独的数据导入
2.sink.properties.ignore_json_size:用于指定是否检查 HTTP 请求中 JSON Body 的大小。HTTP 请求中 JSON Body 的大小默认不能超过 100 MB。如果 JSON Body 的大小超过 100 MB,会提示 "The size of this batch exceed the max size [104857600] of json type data data [8617627793]. Set ignore_json_size to skip check, although it may lead huge memory consuming." 错误。为避免该报错,可以在 HTTP 请求头中添加 "ignore_json_size:true" 设置,忽略对 JSON Body 大小的检查。
Redis As Sink
例子:
hash类型:
CREATE TABLE redis_sink (
key STRING,
hash_key STRING,
hash_value STRING,
PRIMARY KEY (key) NOT ENFORCED -- 必填。
) WITH (
'connector' = 'redis',
'host' = 'xxxx',
'dbNum' = 'xxxx',
'mode' = 'hashmap',
'expiration' = '86400000'
);
string类型:
CREATE TABLE redis_sink (
source_uid STRING,
json_info STRING,
PRIMARY KEY (source_uid) NOT ENFORCED -- 必填。
) WITH (
'connector' = 'redis',
'host' = 'xxxx',
'dbNum' = 'xxxx',
'mode' = 'string',
'expiration' = '86400000'
);
基础参数参考 Reids As Source
sink独有参数:
1.ignoreDelete:是否忽略Retraction消息。true:收到Retraction消息时,忽略Retraction消息。false:收到Retraction消息时,同时删除数据对应的key及已插入的数据。
2.expiration:为写入数据对应的Key设置TTL。如果该参数的值大于0,则写入数据对应的Key会被设置相应的TTL,单位为毫秒。
Hive As Sink
例子:
CREATE CATALOG dlfcatalog WITH (
'type' = 'hive',
'default-database' = 'temp',
'hive-version' = '3.1.2',
'hive-conf-dir' = 'xxxx'
);
begin statement set;
insert into dlfcatalog.hive_datbase.hive_table
select col1
, col2
, col3
FROM source
;
end;
Flink Sql 需要通过 YAML 配置,使用 Catalog 接口 和 HiveCatalog 连接到现有的 Hive 集群。
定义 HiveCatalog 时所支持的参数介绍:
1.type:Catalog 的类型。 创建 HiveCatalog 时,该参数必须设置为'hive'。
2.hive-conf-dir:指向包含 hive-site.xml 目录的 URI。 该 URI 必须是 Hadoop 文件系统所支持的类型。 如果指定一个相对 URI,即不包含 scheme,则默认为本地文件系统。如果该参数没有指定,我们会在 class path 下查找hive-site.xml。
3.default-database:当一个catalog被设为当前catalog时,所使用的默认当前database。
4.hive-version:HiveCatalog 能够自动检测使用的 Hive 版本。
注:该模式只支持增量写入,不支持更新
总结
本篇文章只介绍了七猫在常用数据库下的数据同步。无论是Hbase、MongoDb、ES等其他数据库的使用,还是数据同步平台化的建设,七猫有有着一套完整的体系。通过这个文档希望大家能够对数据同步有所收获,也对七猫未来的文章有所期待。