Citus实时分析用例指南
实时分析用例指南
我们将使用简单的模式来提取HTTP事件数据,并提供实时仪表板的展示。
建立表
http_request使用site_id列进行散列分配。这意味着特定站点的所有数据都将存在于同一个分片中。
CREATE TABLE http_request (
site_id INT ,
ingest_time TIMESTAMPTZ DEFAULT now (),
url TEXT ,
request_country TEXT ,
ip_address TEXT ,
status_code INT ,
response_time_msec INT
)上运行;
SELECT create_distributed_table ('http_request' , 'site_id' );
建议在群集中使用的分片数为cpu核心数的2-4倍。使用这么多分片可以在添加新的工作节点后重新平衡群集中的数据
插入数据
使用psql执行如下匿名块
DO $$
BEGIN LOOP
INSERT INTO http_request (
site_id, ingest_time, url, request_country,
ip_address, status_code, response_time_msec
) VALUES (
trunc(random()*32), clock_timestamp(),
concat('http://example.com/', md5(random()::text)),
('{China,India,USA,Indonesia}'::text[])[ceil(random()*4)],
concat(
trunc(random()*250 + 2), '.',
trunc(random()*250 + 2), '.',
trunc(random()*250 + 2), '.',
trunc(random()*250 + 2)
)::inet,
('{200,404}'::int[])[ceil(random()*2)],
5+trunc(random()*150)
);
PERFORM pg_sleep(random() * 0.25);
END LOOP;
END $$;
执行查询
SELECT
site_id,
date_trunc('minute', ingest_time) as minute,
COUNT(1) AS request_count,
SUM(CASE WHEN (status_code between 200 and 299) THEN 1 ELSE 0 END) as success_count,
SUM(CASE WHEN (status_code between 200 and 299) THEN 0 ELSE 1 END) as error_count,
SUM(response_time_msec) / COUNT(1) AS average_response_time_msec
FROM http_request
WHERE date_trunc('minute', ingest_time) > now() - '5 minutes'::interval
GROUP BY site_id, minute
ORDER BY minute ASC;
上述查询有效,但有两个缺点:
1. 每次需要生成图表时,您的HTTP分析仪表板都必须遍历每一行。如果您的客户对过去一年的趋势感兴趣,那么您的查询将从头开始汇总过去一年的每一行。
2. 您的存储成本将与摄取率和可查询历史记录的长度成比例增长。
汇总数据
您可以通过将原始数据汇总到预先聚合的表单来克服这两个缺点。我们将原始数据聚合到一个表中,该表存储1分钟间隔的摘要。在生产系统中,您可能还需要1小时和1天的间隔,这些间隔对应于仪表板中的缩放级别。当用户想要上个月的请求时间时,仪表板可以简单地读取并绘制过去30天中每个值的值。
CREATE TABLE http_request_1min (
site_id INT,
ingest_time TIMESTAMPTZ, -- which minute this row represents
error_count INT,
success_count INT,
request_count INT,
average_response_time_msec INT,
CHECK (request_count = error_count + success_count),
CHECK (ingest_time = date_trunc('minute', ingest_time))
);
SELECT create_distributed_table('http_request_1min', 'site_id');
CREATE INDEX http_request_1min_idx ON http_request_1min (site_id, ingest_time);
http_request_1min与http_request是共址的,因此查询不会存在任何限制。
CREATE TABLE latest_rollup (
minute timestamptz PRIMARY KEY,
-- "minute" should be no more precise than a minute
CHECK (minute = date_trunc('minute', minute))
);
-- initialize to a time long ago
INSERT INTO latest_rollup VALUES ('10-10-1901');
-- function to do the rollup
CREATE OR REPLACE FUNCTION rollup_http_request() RETURNS void AS $$
DECLARE
curr_rollup_time timestamptz := date_trunc('minute', now());
last_rollup_time timestamptz := minute from latest_rollup;
BEGIN
INSERT INTO http_request_1min (
site_id, ingest_time, request_count,
success_count, error_count, average_response_time_msec
) SELECT
site_id,
date_trunc('minute', ingest_time),
COUNT(1) as request_count,
SUM(CASE WHEN (status_code between 200 and 299) THEN 1 ELSE 0 END) as success_count,
SUM(CASE WHEN (status_code between 200 and 299) THEN 0 ELSE 1 END) as error_count,
SUM(response_time_msec) / COUNT(1) AS average_response_time_msec
FROM http_request
-- roll up only data new since last_rollup_time
WHERE date_trunc('minute', ingest_time) <@
tstzrange(last_rollup_time, curr_rollup_time, '(]')
GROUP BY 1, 2;
-- update the value in latest_rollup so that next time we run the
-- rollup it will operate on data newer than curr_rollup_time
UPDATE latest_rollup SET minute = curr_rollup_time;
END;
$$ LANGUAGE plpgsql;
可以在协调器节点,定期使用crontab/pg_cron之类的工具定期执行SELECT rollup_http_request();
重写查询
新的查询
SELECT site_id, ingest_time as minute, request_count,
success_count, error_count, average_response_time_msec
FROM http_request_1min
WHERE ingest_time > date_trunc('minute', now()) - '5 minutes'::interval;
删除过期数据
汇总使查询更快,但我们仍需要删除过期数据以避免无限制的存储成本。
DELETE FROM http_request WHERE ingest_time < now() - interval '1 day';
DELETE FROM http_request_1min WHERE ingest_time < now() - interval '1 month';
在生产中,您可以将这些查询包装在一个函数中,并在cron作业中每分钟调用一次。
distinct count
HTTP分析中的一个常见问题是处理不同的计数:上个月访问过您网站的唯一身份访问者数量是多少?需要在汇总表中存储所有以前看过的访问者的列表,这是一个非常大量的数据。然而一个近似的答案更易于管理。
hyperloglog或HLL的数据类型可以实现近似查询; 它需要一个惊人的小空间来告诉你一组中有多少独特的元素。它的准确性可以调整。我们将仅使用1280字节的数据,最多可以计算数百亿的唯一身份访问者,最多只有2.2%的误差。
- 添加hll列
ALTER TABLE http_request_1min ADD COLUMN distinct_ip_addresses hll;
- 插入hll数据
INSERT INTO http_request_1min (
site_id, ingest_time, request_count,
success_count, error_count, average_response_time_msec,
+ distinct_ip_addresses
) SELECT
site_id,
minute,
COUNT(1) as request_count,
SUM(CASE WHEN (status_code between 200 and 299) THEN 1 ELSE 0 END) as success_count,
SUM(CASE WHEN (status_code between 200 and 299) THEN 0 ELSE 1 END) as error_count,
SUM(response_time_msec) / COUNT(1) AS average_response_time_msec,
+ hll_add_agg(hll_hash_text(ip_address)) AS distinct_ip_addresses
FROM http_request
- 查询
SELECT site_id, ingest_time as minute, request_count,
success_count, error_count, average_response_time_msec,
hll_cardinality(distinct_ip_addresses) AS distinct_ip_address_count
FROM http_request_1min
WHERE ingest_time > date_trunc('minute', now()) - interval '5 minutes'
HLL不仅速度更快,而且可以让您做以前无法做到的事情。假设我们完成了汇总,但我们没有使用HLL,而是保存了确切的唯一计数。虽然可以正常工作,但您无法完成诸如“有多少不同的会话在过去这一周期间不在访问?”。
使用HLL,这很容易。您可以使用以下查询计算一段时间内的不同IP计数:
SELECT hll_cardinality (hll_union_agg (distinct_ip_addresses ))
FROM http_request_1min
WHERE ingest_time > date_trunc ('minute' , now ()) - '5 minutes' :: interval
使用JSONB存储非结构化数据
让我们跟踪来自每个国家的访客数量。使用半结构数据类型可以使您无需为每个国家/地区添加列,并最终获得具有数百个稀疏填充列的行。
- 首先,将新列添加到汇总表:
ALTER TABLE http_request_1min ADD COLUMN country_counters JSONB
- 接下来,通过修改汇总函数将其包含在汇总中
INSERT INTO http_request_1min (
site_id, ingest_time, request_count,
success_count, error_count, average_response_time_msec,
+ country_counters
) SELECT
site_id,
minute,
COUNT(1) as request_count,
SUM(CASE WHEN (status_code between 200 and 299) THEN 1 ELSE 0 END) as success_c
SUM(CASE WHEN (status_code between 200 and 299) THEN 0 ELSE 1 END) as error_cou
SUM(response_time_msec) / COUNT(1) AS average_response_time_msec,
- FROM http_request
+ jsonb_object_agg(request_country, country_count) AS country_counters
+ FROM (
+ SELECT *,
+ count(1) OVER (
+ PARTITION BY site_id, date_trunc('minute', ingest_time), request_country
+ ) AS country_count
+ FROM http_request
+ ) h
- 获取来自美国的请求数量
SELECT
request_count, success_count, error_count, average_response_time_msec,
COALESCE(country_counters->>'USA', '0')::int AS american_visitors
FROM http_request_1min
WHERE ingest_time > date_trunc('minute', now()) - '5 minutes'::interval;
[CitusDB中国]站主,PostgreSQL粉丝,现从事Citus研发工作
愿Citus在中国发展的越来越好