I enjoyed reading the blog post about MemSQL for time series data. It introduces time_bucket
, first
, and last
functions. The implementations for first
and last
are listed in the supplemental material. Unfortunately, the implementation for time_bucket
seems to be missing. Could you please add it to the blog post and/or here?
Hmmm. We will fix the blog post! In the meanwhile, here is the full code for time_bucket
, first
, and last
:
/*
Time series function package for MemSQL
Functions included:
- time_bucket(bucket_description, timestamp_val) ==> bucket values by time for easy grouping
- first(dat_val, timestamp_val) ==> UD Agg to get first value ordered by time
- last(dat_val, timestamp_val) ==> UD Agg to get last value ordered by time
These functions make it easy to group time series data into desired intervals.
Then you can aggregate points in that interval using standard aggregate functions.
In addition, you can use the new UD Aggs first() and last() to get the firt and
last vales in a sequence. Of course, you can do something similar with rank(), a
window function, but first() and last() can be a useful shorthand.
*/
/* Example uses:
drop table if exists t;
create table t(c varchar(30), m varchar(80), ts datetime);
insert t values ("c1", "red", "2019-01-29 13:39:09"),
("c1", "green", "2019-01-29 13:39:01"),
("c2", "blue", "2019-01-29 13:39:07"),
("c1", "purple", "2019-01-29 13:39:05"),
("c1", "orange", "2019-01-29 13:39:04"),
("c1", "yellow", "2019-01-29 13:39:08");
select time_bucket('5 seconds', ts), min(ts), first(m, ts), max(ts), last(m, ts)
from t
group by 1;
+------------------------------+---------------------+--------------+---------------------+-------------+
| time_bucket('5 seconds', ts) | min(ts) | first(m, ts) | max(ts) | last(m, ts) |
+------------------------------+---------------------+--------------+---------------------+-------------+
| 2019-01-29 13:39:00.000000 | 2019-01-29 13:39:01 | green | 2019-01-29 13:39:04 | orange |
| 2019-01-29 13:39:05.000000 | 2019-01-29 13:39:05 | purple | 2019-01-29 13:39:09 | red |
+------------------------------+---------------------+--------------+---------------------+-------------+
drop table if exists well_reading(varchar(30) well_id, ts datetime(6), flow_rate decimal(18,4));
drop table if exists trade;
create table trade(symbol varchar(5), ts datetime(6), shares decimal(18,4), price decimal(18,4));
*/
DELIMITER //
CREATE OR REPLACE FUNCTION time_bucket(
bucket_desc varchar(64) NOT NULL,
ts datetime(6)) RETURNS datetime(6) NULL AS
DECLARE
num_periods bigint = -1;
second_part_offset int = -1;
unit varchar(255) = NULL;
num_str varchar(255) = NULL;
unix_ts bigint;
r datetime(6);
days_since_epoch bigint;
BEGIN
num_str = substring_index(bucket_desc, ' ', 1);
num_periods = num_str :> bigint;
unit = substr(bucket_desc, length(num_str) + 2, length(bucket_desc));
IF unit = 'second' or unit = 'seconds' THEN
unit = 'second';
ELSIF unit = 'minute' or unit = 'minutes' THEN
unit = 'minute';
ELSIF unit = 'hour' or unit = 'hours' THEN
unit = 'hour';
ELSIF unit = 'day' or unit = 'days' THEN
unit = 'day';
ELSE
raise user_exception(concat("Unknown time unit: ", unit));
END IF;
unix_ts = unix_timestamp(ts);
IF unit = 'second' THEN
r = from_unixtime(unix_ts - (unix_ts % num_periods));
ELSIF unit = 'minute' THEN
r = from_unixtime(unix_ts - (unix_ts % (num_periods * 60)));
ELSIF unit = 'hour' THEN
r = from_unixtime(unix_ts - (unix_ts % (num_periods * 60 * 60)));
ELSIF unit = 'day' THEN
unix_ts += 4 * 60 * 60; -- adjust to align day boundary
days_since_epoch = unix_ts / (24 * 60 * 60);
days_since_epoch = days_since_epoch - (days_since_epoch % num_periods);
r = (from_unixtime(days_since_epoch * (24 * 60 * 60))) :> date;
ELSE
raise user_exception("Internal error -- bad time unit");
END IF;
RETURN r;
END;
//
DELIMITER ;
/*
Generate data to test time_bucket()
*/
DELIMITER //
CREATE OR REPLACE PROCEDURE tb_test() AS
DECLARE
d datetime = '2019-01-01';
BEGIN
DROP TABLE IF EXISTS date_tbl;
CREATE TABLE date_tbl(d datetime);
WHILE d <= '2019-01-04' LOOP
d = d + interval 1 hour;
INSERT date_tbl VALUES(d);
END LOOP;
END //
DELIMITER ;
/*
User-defined aggregate first(value, ts)
Return the first (earliest) value based on timestamp order.
Note that if more than one value has the same timestamp, then
the result is non-deterministic -- an arbitrary one of the
values with the lowest timestamp is returned.
State: v, d (where v is a value, d is a timestamp)
Get result: just return value of v
Update: if new d < min d so far, update d and v
*/
DELIMITER //
CREATE OR REPLACE FUNCTION first_init() RETURNS RECORD(v TEXT, d datetime(6)) AS
BEGIN
RETURN ROW("_empty_set_", '9999-12-31 23:59:59.999999');
END //
DELIMITER ;
DELIMITER //
CREATE OR REPLACE FUNCTION first_iter(state RECORD(v TEXT, d DATETIME(6)),
v TEXT, d DATETIME(6))
RETURNS RECORD(v TEXT, d DATETIME(6)) AS
DECLARE
nv TEXT;
nd DATETIME(6);
nr RECORD(v TEXT, d DATETIME(6));
BEGIN
-- if new timestamp is less than lowest before, update state
if state.d > d then
nr.v = v;
nr.d = d;
return nr;
end if;
return state;
END //
DELIMITER ;
DELIMITER //
CREATE OR REPLACE FUNCTION first_merge(state1 RECORD(v TEXT, d DATETIME(6)),
state2 RECORD(v TEXT, d DATETIME(6))) RETURNS RECORD(v TEXT, d DATETIME(6)) AS
BEGIN
if state1.d < state2.d then
RETURN state1;
end if;
RETURN state2;
END //
DELIMITER ;
DELIMITER //
CREATE OR REPLACE FUNCTION first_terminate(state RECORD(v TEXT, d DATETIME(6))) RETURNS TEXT AS
BEGIN
RETURN state.v;
END //
DELIMITER ;
CREATE AGGREGATE first(TEXT, DATETIME(6)) RETURNS TEXT
WITH STATE RECORD(v TEXT, d DATETIME(6))
INITIALIZE WITH first_init
ITERATE WITH first_iter
MERGE WITH first_merge
TERMINATE WITH first_terminate;
/*
User-defined aggregate last(value, ts)
Return the last (oldest) value based on timestamp order.
Note that if more than one value has the same timestamp, then
the result is non-deterministic -- an arbitrary one of the
values with the highest timestamp is returned.
State: v, d (where v is a value, d is a timestamp)
Get result: just return value of v
Update: if new d > max d so far, update d and v
*/
DELIMITER //
CREATE OR REPLACE FUNCTION last_init() RETURNS RECORD(v TEXT, d datetime(6)) AS
BEGIN
RETURN ROW("_empty_set_", '1000-01-01 00:00:00.000000');
END //
DELIMITER ;
DELIMITER //
CREATE OR REPLACE FUNCTION last_iter(state RECORD(v TEXT, d DATETIME(6)),
v TEXT, d DATETIME(6))
RETURNS RECORD(v TEXT, d DATETIME(6)) AS
DECLARE
nv TEXT;
nd DATETIME(6);
nr RECORD(v TEXT, d DATETIME(6));
BEGIN
-- if new timestamp is greater than largest before, update state
IF state.d < d THEN
nr.v = v;
nr.d = d;
RETURN nr;
END IF;
RETURN state;
END //
DELIMITER ;
DELIMITER //
CREATE OR REPLACE FUNCTION last_merge(state1 RECORD(v TEXT, d DATETIME(6)),
state2 RECORD(v TEXT, d DATETIME(6))) RETURNS RECORD(v TEXT, d DATETIME(6)) AS
BEGIN
IF state1.d > state2.d THEN
RETURN state1;
END IF;
RETURN state2;
END //
DELIMITER ;
DELIMITER //
CREATE OR REPLACE FUNCTION last_terminate(state RECORD(v TEXT, d DATETIME(6))) RETURNS TEXT AS
BEGIN
RETURN state.v;
END //
DELIMITER ;
CREATE AGGREGATE last(TEXT, DATETIME(6)) RETURNS TEXT
WITH STATE RECORD(v TEXT, d DATETIME(6))
INITIALIZE WITH last_init
ITERATE WITH last_iter
MERGE WITH last_merge
TERMINATE WITH last_terminate;
In addition, another good method to bucket by K seconds is to take the integer number of seconds, divide it by K with integer division (DIV), then multiply by K again. Here are functions that use this to bucket by second and minute, respectively.
DELIMITER //
CREATE OR REPLACE FUNCTION bucket_s(n int, ts datetime(6))
RETURNS datetime AS
BEGIN
RETURN from_unixtime(unix_timestamp(ts) DIV n * n);
END //
CREATE OR REPLACE FUNCTION bucket_m(n int, ts datetime(6))
RETURNS datetime AS
BEGIN
RETURN from_unixtime(unix_timestamp(ts) DIV (60 * n) * (60 * n));
END //
DELIMITER ;
For example, you could do this to bucket by 5 second intervals:
SELECT bucket_s(5, ts), avg(value)
FROM mySeries
ORDER BY 1;
1 Like
Thank you! Works like charm.