Citus分布式类型

Citus分布式类型

hash类型

分布式表会根据表的分布列,对其进行hash运算,并根据其数据范围将数据存储至对应的分片表中

hash散列值 表名 work节点
-1000 ~ 0 tab_1 work_1
0 ~ 1000 tab_2 work_2
1000 ~ 2000 tab_3 work_3

append类型

append distributed是一种需要谨慎使用的专业技术。对于大多数情况,hash distributed 是更好的选择

虽然Citus最常见的hash类型,但它也可以按时间顺序在可变数量的分片中分配时间序列数据。

顾名思义,append类型是基于追加数据的方式。例如基于事件的数据,该事件的数据按时间顺序到达。然后,您可以按时间顺序分片一个大表,并以N分钟为间隔将事件数据批量加载到Citus中。该数据模型可以推广到许多时间序列用例,例如,每行日志文件,活动日志或网站事件。基于append的分发支持更有效的范围查询。这是因为在给定分发键上的范围查询,Citus查询计划器可以轻松确定哪些分片与该范围匹配,并仅将查询发送到相关分片。
基于hash的分片更适合您希望对数据进行实时插入以及分析或希望通过非有序数据(例如,用户ID)进行分发的情况。

创建append分布式表

CREATE TABLE github_events
(
    event_id bigint,
    event_type text,
    event_public boolean,
    repo_id bigint,
    payload jsonb,
    repo jsonb,
    actor jsonb,
    org jsonb,
    created_at timestamp
);

SELECT create_distributed_table('github_events', 'created_at', 'append');

此方法不执行特定分发,它只是告诉数据库在每个分片中保留created_at列的最小值和最大值,这些值稍后由数据库用于优化查询

删除过期append表

在append分发中,用户通常只想跟踪过去几个月/年的数据。在这种情况下,不再需要的分片仍然占用磁盘空间。为了解决这个问题,Citus提供了一个用户定义的函数master_apply_delete_command来删除旧的分片。该函数将DELETE命令作为输入,并删除与删除条件匹配的所有分片及其元数据。

该函数使用分片元数据来决定是否需要删除分片,因此它要求DELETE语句中的WHERE子句位于分发列上。如果未指定条件,则选择所有分片进行删除。然后,UDF连接到worker节点,并为需要删除的所有分片发出DROP命令。如果特定分片副本的删除失败,则该副本将标记为“TO DELETE”。标记为”TO DELETE”的分片副本不会被用于将来的查询,可以稍后删除。

下面的示例从github_events表中删除那些包含created_at> =’2015-01-01 00:00:00’的所有行的分片。请注意,该表分布在created_at列上。

SELECT  *  from  master_apply_delete_command ('DELETE FROM github_events WHERE created_at> =''2015-01-01 00:00:00''' ); 
 master_apply_delete_command 
----------------------------- 
                           3 
(1  行)

此功能是删除分片中的完整分片而不删除单行。

删除表数据

DELETE  FROM  github_events 
WHERE  created_at  > =  '2015-01-01 00:03:00' ;

与master_apply_delete_command不同,删除的是表中的行数据,而不是分片级别。清理过期无效的分片减少无用的元信息更有用

删除表

DROP  TABLE  github_events ;

数据加载

使用\ copy进行批量加载

\ copy使用master_create_empty_shard创建新的分片。然后该命令连接到Worker并将数据复制到分片中,直到大小达到shard_max_size,此时将创建另一个新分片。最后,该命令获取分片的统计信息并更新元数据。

SET citus.shard_max_size TO '64MB';
\copy github_events from 'github_events-2015-01-01-0.csv' WITH (format CSV, master_host 'coordinator-host')

值得注意的是,\ copy始终创建至少一个分片,并且不会附加到现有分片。

跨分片没有快照隔离的概念,这意味着与COPY并发运行的多分片SELECT可能会在某些分片上看到它,但在其他分片上却没有。如果用户正在存储事件数据,他可能偶尔会观察到最近数据中的小间隙。如果这是一个问题那将由应用程序处理(例如,从查询中排除最新数据,或使用某些锁定)。

通过append到现有分片来增量加载

\ copy命令在使用时始终会创建至少一个新的分片,最适合批量加载数据。使用\ copy加载较小的数据增量将导致许多小分片,这可能不是理想的方案。为了允许较小的增量数据附加到分布式表中,Citus提供了2个用户定义的功能。它们是master_create_empty_shard()和master_append_table_to_shard()。

master_create_empty_shard()可用于为表创建新的空分片。
master_append_table_to_shard()可用于将PostgreSQL表的内容附加到现有分片。它还返回分片填充率,这有助于确定是否应将更多数据附加到此分片或是否应创建新分片。

要使用上述功能,您可以先将传入数据插入常规PostgreSQL表(本地表)中。然后,您可以使用master_create_empty_shard()创建空分片。然后,使用master_append_table_to_shard(),可以将本地表的内容附加到指定的分片,之后您便可以删除本地表的数据。一旦append函数返回的分片填充率接近1,您就可以创建一个新分片并开始追加到新分片。

SELECT  *  from  master_create_empty_shard ('github_events' ); 
master_create_empty_shard 
--------------------------- 
                102089 
(1  行)

SELECT  *  from  master_append_table_to_shard (102089 , 'github_events_temp' , 'master-101' , 5432 ); 
master_append_table_to_shard 
------------------------------ 
        0.100548 
(1  行)

提高数据加载性能

上述方法使您能够实现高容量负载率,足以满足大多数用例。如果需要更高的数据加载速率,可以通过多种方式使用上述功能并编写脚本以更好地控制分片和数据加载。下一节将介绍如何更快地完成任务。

缩放数据加载

如果您的用例不需要实时摄取,那么使用append分布式表将为您提供最高的摄取率。这种方法更适用于使用时间序列数据的用例,数据库可能落后几分钟或更长时间。

协调器节点批量加载(100k / s-200k / s)

append分布式表的COPY仅打开新分片的一个连接。append分布式表命令的COPY不会在多个连接上并行摄取行,但并行运行多个命令是安全的。

CREATE TABLE events (time timestamp, data jsonb);
SELECT create_distributed_table('events', 'time', 'append');
\COPY events FROM 'path-to-csv-file' WITH CSV

COPY每次使用时都会创建新的分片,但如果查询最终涉及数千个分片,则可能会导致问题。摄取数据的另一种方法是使用master_append_table_to_shard函数将其附加到现有分片。要使用master_append_table_to_shard,需要将数据加载到临时表中,并且需要一些自定义逻辑来选择适当的分片。

-- Prepare a staging table
CREATE TABLE stage_1 (LIKE events);
\COPY stage_1 FROM 'path-to-csv-file WITH CSV

-- In a separate transaction, append the staging table
SELECT master_append_table_to_shard(select_events_shard(), 'stage_1', 'coordinator-host', 5432);

下面给出了分片选择功能的示例。它append到一个现有分片,直到它的大小大于1GB,然后创建一个新碎片,其缺点是一次只允许一个追加,但充分填充分片是它的优点。

CREATE OR REPLACE FUNCTION select_events_shard() RETURNS bigint AS $$
DECLARE
  shard_id bigint;
BEGIN
  SELECT shardid INTO shard_id
  FROM pg_dist_shard JOIN pg_dist_placement USING (shardid)
  WHERE logicalrelid = 'events'::regclass AND shardlength < 1024*1024*1024;

  IF shard_id IS NULL THEN
    /* no shard smaller than 1GB, create a new one */
    SELECT master_create_empty_shard('events') INTO shard_id;
  END IF;

  RETURN shard_id;
END;
$$ LANGUAGE plpgsql;

工作节点批量加载(100k / s-1M / s)

对于非常高的数据摄取率,数据可以通过Worker节点进行。此方法可以横向扩展并提供最高的摄取率,但使用起来可能更复杂。因此,我们建议仅在前面描述的方法无法解决您的数据提取率时才尝试此方法。

append分布式表通过worker支持COPY,通过在master_host选项中指定协调器的地址,以及可选的master_port选项(默认为5432)。通过worker的COPY与通过协调器的COPY具有相同的属性,除了在协调器上没有瓶颈。

psql -h worker-host-n -c "\COPY events FROM 'data.csv' WITH (FORMAT CSV, MASTER_HOST 'coordinator-host')"

使用COPY的另一种方法是创建一个临时表,并使用标准SQL客户端将其append到分布式表,这类似于通过worker暂存数据。使用psql通过worker暂存文件的示例如下:

stage_table=$(psql -tA -h worker-host-n -c "SELECT 'stage_'||nextval('stage_id_sequence')")
psql -h worker-host-n -c "CREATE TABLE $stage_table (time timestamp, data jsonb)"
psql -h worker-host-n -c "\COPY $stage_table FROM 'data.csv' WITH CSV"
psql -h coordinator-host -c "SELECT master_append_table_to_shard(choose_underutilized_shard(), '$stage_table', 'worker-host-n', 5432)"
psql -h worker-host-n -c "DROP TABLE $stage_table"

上面的示例使用choose_underutilized_shard函数来选择要追加的分片。为确保并行数据摄取,此功能应在许多不同的分片之间进行并行。

下面的示例choose_underutilized_shard函数随机选择20个最小的分片中的一个,或者如果1GB以下少于20,则创建一个新分片。这允许20个并发,允许数据摄取高达100万行/秒(取决于索引,大小,容量)。

/* Choose a shard to which to append */
CREATE OR REPLACE FUNCTION choose_underutilized_shard()
RETURNS bigint LANGUAGE plpgsql
AS $function$
DECLARE
  shard_id bigint;
  num_small_shards int;
BEGIN
  SELECT shardid, count(*) OVER () INTO shard_id, num_small_shards
  FROM pg_dist_shard JOIN pg_dist_placement USING (shardid)
  WHERE logicalrelid = 'events'::regclass AND shardlength < 1024*1024*1024
  GROUP BY shardid ORDER BY RANDOM() ASC;

  IF num_small_shards IS NULL OR num_small_shards < 20 THEN
    SELECT master_create_empty_shard('events') INTO shard_id;
  END IF;

  RETURN shard_id;
END;
$function$;

同时摄入多个分片的缺点是分片可能跨越更长的时间范围,这意味着特定时间段的查询可能涉及包含该时段之外的大量数据的分片。

文章浏览总量 791 次

要发表评论,您必须先登录