模板聚合的翻译 【翻译】Flink Table Api & SQL — 性能调优 — 流式聚合

日期:2023-03-11 12:39:36 / 人气: 570 / 发布者:成都翻译公司

在此页面中,我们将介绍一些有用的优化选项以及流聚合的内部原理,这将在某些情况下带来很大的改进。注意:当前,仅对无边界聚合支持流聚合优化。小型批处理聚合的核心思想是将一组输入缓存在聚合运算符内部的缓冲区中。数据流中的记录可能会倾斜,因此聚合运算符的某些实例会比其他实例处理更多的记录,这会导致热点。注意:但是,当前,拆分优化不支持包含用户定义的AggregateFunction的聚合。

本文翻译自官网:Streaming Aggregation

Flink Table Api & SQL 翻译目录

SQL 是*广泛使用的数据分析语言。Flink 的 Table API 和 SQL 使用户能够以更少的时间和精力定义高效的流分析应用程序。而且,Flink Table API 和 SQL 都得到了有效的优化,集成了很多查询优化和优化算子的实现。但是,并非所有优化都默认启用,因此对于某些工作负载,您可以通过打开某些选项来提高性能。

在这个页面中,我们将介绍一些有用的优化选项和流聚合的内部原理模板聚合的翻译,它们在某些情况下会带来很大的改进。

注意:目前只有 Blink planner 支持本页提到的优化选项。

注意:目前,流聚合优化仅支持无边界聚合。以后会支持窗口聚合的优化。

默认情况下,无界聚合算子对输入的记录进行一条一条的处理,即(1)从状态中读取累加器,(2)将记录累加/收回到累加器中,(3)将累加器写回状态,(4)下一条记录会从(1))再次处理。这种处理方式可能会增加StateBackend的开销(尤其是RocksDB StateBackend)。另外,生产中很常见的数据倾斜会加剧问题并使工作更容易受到背压情况的影响。

小批量聚合

小批量聚合的核心思想是在聚合算子内部的缓冲区中缓存一组输入。当输入被触发进行处理时,每个键只需一个操作即可访问状态。这可以大大减少状态开销并获得更好的吞吐量。但是,这可能会增加一些延迟,因为它会缓冲一些记录而不是立即处理它们。这是吞吐量和延迟之间的权衡。

下图说明了小批量聚合如何减少状态操作。

默认情况下禁用 MiniBatch 优化。要启用此优化,您应该设置 table.exec.mini-batch.enabled、table.exec.mini-batch.allow-latency 和 table.exec.mini-batch.size。请参阅配置页面了解更多详细信息。

以下示例显示了如何启用这些选项。

// instantiate table environment
val tEnv: TableEnvironment = ...
// access flink configuration
val configuration = tEnv.getConfig().getConfiguration()
// set low-level key-value options
configuration.setString("table.exec.mini-batch.enabled", "true") // enable mini-batch optimization
configuration.setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds to buffer input records
configuration.setString("table.exec.mini-batch.size", "5000") // the maximum number of records can be buffered by each aggregate operator task

本地全局聚合

提出将局部聚合分为两个阶段来解决数据倾斜的问题,即先在上游进行局部聚合,然后在下游进行全局聚合,类似于MapReduce中的Combine+Reduce模式. 例如,请考虑以下 SQL:

SELECT color, sum(id)
FROM T
GROUP BY color

数据流中的记录可能会发生倾斜,因此聚合运算符的某些实例将处理比其他实例更多的记录,这可能会导致热点。本地聚合可以帮助将一定数量的具有相同密钥的输入累积到单个累加器中。全局摘要只会接收减少的累加器,而不是大量的原始输入。这样可以大大降低网络重组和状态访问的成本。每个本地聚合累积的输入数量基于*小批处理间隔。这意味着本地-全局聚合取决于启用小批量优化。

下图显示了本地全局聚合如何提高性能。

以下示例显示如何启用本地全局聚合。

// instantiate table environment
val tEnv: TableEnvironment = ...
// access flink configuration
val configuration = tEnv.getConfig().getConfiguration()
// set low-level key-value options
configuration.setString("table.exec.mini-batch.enabled", "true") // local-global aggregation depends on mini-batch is enabled
configuration.setString("table.exec.mini-batch.allow-latency", "5 s")
configuration.setString("table.exec.mini-batch.size", "5000")
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE") // enable two-phase, i.e. local-global aggregation

拆分不同的聚合

局部和全局优化可以有效消除常规聚合的数据倾斜,如SUM、COUNT、MAX、MIN、AVG。然而,在处理不同的聚合反应时,其性能并不令人满意。

例如,如果我们要分析今天有多少独立用户登录。我们可能有以下查询:

SELECT day, COUNT(DISTINCT user_id)
FROM T
GROUP BY day

如果distinct key(即user_id)的值是稀疏的,COUNT DISTINCT不能减少记录。即使启用了局部和全局优化,也无济于事。因为累加器仍然包含几乎所有的原始记录,全局聚合将成为瓶颈(大部分重累加器由一个任务处理,即同一天)。

这个优化的思路是把不同的聚合(比如COUNT(DISTINCT col))分成两个层次。第一个聚合由组键和其他桶键打乱。用于计算桶键 HASH_CODE(distinct_key)% BUCKET_NUM。BUCKET_NUM 默认为 1024,可以通过 table.optimizer.distinct-agg.split.bucket-num 选项进行配置。第二次聚合由原始组键打乱,用于 SUM 聚合来自不同桶的 COUNT DISTINCT 值。由于相同的唯一键只会在同一个桶中计算,因此转换是等效的。桶键作为额外的组键,分担组键中热点的负担。Bucket 关键字使工作具有可扩展性,以解决不同聚合中的数据倾斜/热点问题。

拆分非重复聚合后,上述查询将自动改写为以下查询:

SELECT day, SUM(cnt)
FROM (
    SELECT day, COUNT(DISTINCT user_id) as cnt
    FROM T
    GROUP BY day, MOD(HASH_CODE(user_id), 1024)
)
GROUP BY day

下图展示了拆分非重复聚合如何提高性能(假设颜色代表日期,字母代表user_id)。

注意:以上是*简单的例子,可以从这个优化中受益。此外,Flink 支持拆分更复杂的聚合查询,例如多个不同key的(如不同的集合COUNT(DISTINCT a)、SUM(DISTINCT b)),以及其他非重复的聚合工作(例如模板聚合的翻译,总和、*大值、*小值、计数)。

注意:但是,目前,拆分优化不支持包含用户定义的 AggregateFunction 的聚合。

以下示例显示了如何启用拆分不同聚合优化。

// instantiate table environment
val tEnv: TableEnvironment = ...
tEnv.getConfig         // access high-level configuration
  .getConfiguration    // set low-level key-value options
  .setString("table.optimizer.distinct-agg.split.enabled", "true")  // enable distinct agg split

在不同的聚合上使用 FILTER 修饰符

在某些情况下,用户可能需要从不同维度计算 UV(唯一身份访问者)的数量,例如 Android UV、iPhone UV、Web UV 和总 UV。很多用户会选择CASE WHEN来支持这个功能,例如:

SELECT
 day,
 COUNT(DISTINCT user_id) AS total_uv,
 COUNT(DISTINCT CASE WHEN flag IN ('android', 'iphone') THEN user_id ELSE NULL END) AS app_uv,
 COUNT(DISTINCT CASE WHEN flag IN ('wap', 'other') THEN user_id ELSE NULL END) AS web_uv
FROM T
GROUP BY day

但是,在这种情况下,建议使用 FILTER 语法而不是 CASE WHEN。因为FILTER更符合SQL标准,会得到更多的性能提升。FILTER 是聚合函数中使用的修饰符,用于限制聚合中使用的值。用 FILTER 修饰符替换上面的例子,如下所示:

SELECT
 day,
 COUNT(DISTINCT user_id) AS total_uv,
 COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('android', 'iphone')) AS app_uv,
 COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('wap', 'other')) AS web_uv
FROM T
GROUP BY day

Flink SQL 优化器可以在同一个唯一键上识别不同的过滤参数。例如,在上面的示例中,所有三个 COUNT DISTINCT 都在 user_id 列上。那么 Flink 可以只使用一个共享状态实例而不是三个状态实例来减少状态访问和状态大小。在某些工作负载中,这可以显着提高性能。

相关阅读Relate

  • 法国签证营业执照翻译件模板 你与申根签证只有一条推送的距离
  • 江苏省增值税发票翻译模板 江苏税务局出口货物退(免)税申报管理系统软件
  • 肄业证书翻译模板 复旦大学学生学业证明文书管理细则(试行)
  • 四级英语作文模板带翻译 大学英语四级翻译模拟训练及答案
  • 社会某信用代码证翻译模板 js验证某社会信用代码,某社会信用代码 验证js,js+验证+社会信用代码证
  • 美国移民证件翻译模板 日语签证翻译聊聊身份证翻译模板
  • 翻译软件模板 人类史上*实用的的文档快速翻译指南
  • 江苏省增值税发票翻译模板 江苏出口货物退(免)税申报管理服务平台
  • 瑞士签证房产证翻译件模板 瑞士探亲签证—就读子女
  • 日语户口本翻译模板 户口本翻译价格_户口本翻译一般多少钱?
  • 模板聚合的翻译 【翻译】Flink Table Api & SQL — 性能调优 — 流式聚合 www.chinazxzy.com/fymb/5407.html
    
    本站部分内容和图片来源于网络用户和读者投稿,不确定投稿用户享有完全著作权,根据《信息网络传播权保护条例》,如果侵犯了您的权利,请联系:chinazxzy@163.com,及时删除。
    Go To Top 回顶部
    • 扫一扫,微信在线