Citus实时分析用例指南

实时分析用例指南

我们将使用简单的模式来提取HTTP事件数据,并提供实时仪表板的展示。

建立表

http_request使用site_id列进行散列分配。这意味着特定站点的所有数据都将存在于同一个分片中。

CREATE  TABLE  http_request  (
  site_id  INT ,
  ingest_time  TIMESTAMPTZ  DEFAULT  now (),

  url  TEXT ,
  request_country  TEXT ,
  ip_address  TEXT ,

  status_code  INT ,
  response_time_msec  INT 
)上运行; 

SELECT  create_distributed_table ('http_request' , 'site_id' );

建议在群集中使用的分片数为cpu核心数的2-4倍。使用这么多分片可以在添加新的工作节点后重新平衡群集中的数据

插入数据

使用psql执行如下匿名块

DO $$
  BEGIN LOOP
    INSERT INTO http_request (
      site_id, ingest_time, url, request_country,
      ip_address, status_code, response_time_msec
    ) VALUES (
      trunc(random()*32), clock_timestamp(),
      concat('http://example.com/', md5(random()::text)),
      ('{China,India,USA,Indonesia}'::text[])[ceil(random()*4)],
      concat(
        trunc(random()*250 + 2), '.',
        trunc(random()*250 + 2), '.',
        trunc(random()*250 + 2), '.',
        trunc(random()*250 + 2)
      )::inet,
      ('{200,404}'::int[])[ceil(random()*2)],
      5+trunc(random()*150)
    );
    PERFORM pg_sleep(random() * 0.25);
  END LOOP;
END $$;

执行查询

SELECT
  site_id,
  date_trunc('minute', ingest_time) as minute,
  COUNT(1) AS request_count,
  SUM(CASE WHEN (status_code between 200 and 299) THEN 1 ELSE 0 END) as success_count,
  SUM(CASE WHEN (status_code between 200 and 299) THEN 0 ELSE 1 END) as error_count,
  SUM(response_time_msec) / COUNT(1) AS average_response_time_msec
FROM http_request
WHERE date_trunc('minute', ingest_time) > now() - '5 minutes'::interval
GROUP BY site_id, minute
ORDER BY minute ASC;

上述查询有效,但有两个缺点:
1. 每次需要生成图表时,您的HTTP分析仪表板都必须遍历每一行。如果您的客户对过去一年的趋势感兴趣,那么您的查询将从头开始汇总过去一年的每一行。
2. 您的存储成本将与摄取率和可查询历史记录的长度成比例增长。

汇总数据

您可以通过将原始数据汇总到预先聚合的表单来克服这两个缺点。我们将原始数据聚合到一个表中,该表存储1分钟间隔的摘要。在生产系统中,您可能还需要1小时和1天的间隔,这些间隔对应于仪表板中的缩放级别。当用户想要上个月的请求时间时,仪表板可以简单地读取并绘制过去30天中每个值的值。

CREATE TABLE http_request_1min (
  site_id INT,
  ingest_time TIMESTAMPTZ, -- which minute this row represents

  error_count INT,
  success_count INT,
  request_count INT,
  average_response_time_msec INT,
  CHECK (request_count = error_count + success_count),
  CHECK (ingest_time = date_trunc('minute', ingest_time))
);

SELECT create_distributed_table('http_request_1min', 'site_id');

CREATE INDEX http_request_1min_idx ON http_request_1min (site_id, ingest_time);

http_request_1min与http_request是共址的,因此查询不会存在任何限制。

CREATE TABLE latest_rollup (
  minute timestamptz PRIMARY KEY,

  -- "minute" should be no more precise than a minute
  CHECK (minute = date_trunc('minute', minute))
);

-- initialize to a time long ago
INSERT INTO latest_rollup VALUES ('10-10-1901');

-- function to do the rollup
CREATE OR REPLACE FUNCTION rollup_http_request() RETURNS void AS $$
DECLARE
  curr_rollup_time timestamptz := date_trunc('minute', now());
  last_rollup_time timestamptz := minute from latest_rollup;
BEGIN
  INSERT INTO http_request_1min (
    site_id, ingest_time, request_count,
    success_count, error_count, average_response_time_msec
  ) SELECT
    site_id,
    date_trunc('minute', ingest_time),
    COUNT(1) as request_count,
    SUM(CASE WHEN (status_code between 200 and 299) THEN 1 ELSE 0 END) as success_count,
    SUM(CASE WHEN (status_code between 200 and 299) THEN 0 ELSE 1 END) as error_count,
    SUM(response_time_msec) / COUNT(1) AS average_response_time_msec
  FROM http_request
  -- roll up only data new since last_rollup_time
  WHERE date_trunc('minute', ingest_time) <@
          tstzrange(last_rollup_time, curr_rollup_time, '(]')
  GROUP BY 1, 2;

  -- update the value in latest_rollup so that next time we run the
  -- rollup it will operate on data newer than curr_rollup_time
  UPDATE latest_rollup SET minute = curr_rollup_time;
END;
$$ LANGUAGE plpgsql;

可以在协调器节点,定期使用crontab/pg_cron之类的工具定期执行SELECT rollup_http_request();

重写查询

新的查询

SELECT site_id, ingest_time as minute, request_count,
       success_count, error_count, average_response_time_msec
  FROM http_request_1min
 WHERE ingest_time > date_trunc('minute', now()) - '5 minutes'::interval;

删除过期数据

汇总使查询更快,但我们仍需要删除过期数据以避免无限制的存储成本。

DELETE FROM http_request WHERE ingest_time < now() - interval '1 day';
DELETE FROM http_request_1min WHERE ingest_time < now() - interval '1 month';

在生产中,您可以将这些查询包装在一个函数中,并在cron作业中每分钟调用一次。

distinct count

HTTP分析中的一个常见问题是处理不同的计数:上个月访问过您网站的唯一身份访问者数量是多少?需要在汇总表中存储所有以前看过的访问者的列表,这是一个非常大量的数据。然而一个近似的答案更易于管理。

hyperloglog或HLL的数据类型可以实现近似查询; 它需要一个惊人的小空间来告诉你一组中有多少独特的元素。它的准确性可以调整。我们将仅使用1280字节的数据,最多可以计算数百亿的唯一身份访问者,最多只有2.2%的误差。

  • 添加hll列
ALTER TABLE http_request_1min ADD COLUMN distinct_ip_addresses hll;
  • 插入hll数据
INSERT INTO http_request_1min (
    site_id, ingest_time, request_count,
    success_count, error_count, average_response_time_msec,
+   distinct_ip_addresses
  ) SELECT
    site_id,
    minute,
    COUNT(1) as request_count,
    SUM(CASE WHEN (status_code between 200 and 299) THEN 1 ELSE 0 END) as success_count,
    SUM(CASE WHEN (status_code between 200 and 299) THEN 0 ELSE 1 END) as error_count,
    SUM(response_time_msec) / COUNT(1) AS average_response_time_msec,
+   hll_add_agg(hll_hash_text(ip_address)) AS distinct_ip_addresses
  FROM http_request
  • 查询
SELECT site_id, ingest_time as minute, request_count,
       success_count, error_count, average_response_time_msec,
       hll_cardinality(distinct_ip_addresses) AS distinct_ip_address_count
  FROM http_request_1min
 WHERE ingest_time > date_trunc('minute', now()) - interval '5 minutes'

HLL不仅速度更快,而且可以让您做以前无法做到的事情。假设我们完成了汇总,但我们没有使用HLL,而是保存了确切的唯一计数。虽然可以正常工作,但您无法完成诸如“有多少不同的会话在过去这一周期间不在访问?”。

使用HLL,这很容易。您可以使用以下查询计算一段时间内的不同IP计数:

SELECT  hll_cardinality (hll_union_agg (distinct_ip_addresses ))
FROM  http_request_1min 
WHERE  ingest_time  >  date_trunc ('minute' , now ()) -  '5 minutes' :: interval

使用JSONB存储非结构化数据

让我们跟踪来自每个国家的访客数量。使用半结构数据类型可以使您无需为每个国家/地区添加列,并最终获得具有数百个稀疏填充列的行。

  • 首先,将新列添加到汇总表:
ALTER  TABLE  http_request_1min  ADD  COLUMN  country_counters  JSONB 
  • 接下来,通过修改汇总函数将其包含在汇总中
INSERT INTO http_request_1min (
    site_id, ingest_time, request_count,
    success_count, error_count, average_response_time_msec,
+   country_counters
  ) SELECT
    site_id,
    minute,
    COUNT(1) as request_count,
    SUM(CASE WHEN (status_code between 200 and 299) THEN 1 ELSE 0 END) as success_c
    SUM(CASE WHEN (status_code between 200 and 299) THEN 0 ELSE 1 END) as error_cou
    SUM(response_time_msec) / COUNT(1) AS average_response_time_msec,
- FROM http_request
+   jsonb_object_agg(request_country, country_count) AS country_counters
+ FROM (
+   SELECT *,
+     count(1) OVER (
+       PARTITION BY site_id, date_trunc('minute', ingest_time), request_country
+     ) AS country_count
+   FROM http_request
+ ) h
  • 获取来自美国的请求数量
SELECT
  request_count, success_count, error_count, average_response_time_msec,
  COALESCE(country_counters->>'USA', '0')::int AS american_visitors
FROM http_request_1min
WHERE ingest_time > date_trunc('minute', now()) - '5 minutes'::interval;
文章浏览总量 975 次

要发表评论,您必须先登录