Implementation for time_bucket function

I enjoyed reading the blog post about MemSQL for time series data. It introduces time_bucket, first, and last functions. The implementations for first and last are listed in the supplemental material. Unfortunately, the implementation for time_bucket seems to be missing. Could you please add it to the blog post and/or here?

Hmmm. We will fix the blog post! In the meanwhile, here is the full code for time_bucket, first, and last:

/*
Time series function package for MemSQL

Functions included:

  - time_bucket(bucket_description, timestamp_val) ==> bucket values by time for easy grouping
  - first(dat_val, timestamp_val) ==> UD Agg to get first value ordered by time
  - last(dat_val, timestamp_val) ==> UD Agg to get last value ordered by time

These functions make it easy to group time series data into desired intervals.
Then you can aggregate points in that interval using standard aggregate functions.
In addition, you can use the new UD Aggs first() and last() to get the firt and 
last vales in a sequence. Of course, you can do something similar with rank(), a  
window function, but first() and last() can be a useful shorthand.
*/

/* Example uses:

drop table if exists t;
create table t(c varchar(30), m varchar(80), ts datetime);
insert t values ("c1", "red", "2019-01-29 13:39:09"),
 ("c1", "green", "2019-01-29 13:39:01"),
 ("c2", "blue", "2019-01-29 13:39:07"),
 ("c1", "purple", "2019-01-29 13:39:05"),
 ("c1", "orange", "2019-01-29 13:39:04"),
 ("c1", "yellow", "2019-01-29 13:39:08");

 select time_bucket('5 seconds', ts), min(ts), first(m, ts), max(ts), last(m, ts)
 from t
 group by 1;
+------------------------------+---------------------+--------------+---------------------+-------------+
| time_bucket('5 seconds', ts) | min(ts)             | first(m, ts) | max(ts)             | last(m, ts) |
+------------------------------+---------------------+--------------+---------------------+-------------+
| 2019-01-29 13:39:00.000000   | 2019-01-29 13:39:01 | green        | 2019-01-29 13:39:04 | orange      |
| 2019-01-29 13:39:05.000000   | 2019-01-29 13:39:05 | purple       | 2019-01-29 13:39:09 | red         |
+------------------------------+---------------------+--------------+---------------------+-------------+

drop table if exists well_reading(varchar(30) well_id, ts datetime(6), flow_rate decimal(18,4));
drop table if exists trade;
create table trade(symbol varchar(5), ts datetime(6), shares decimal(18,4), price decimal(18,4));

*/

DELIMITER //
CREATE OR REPLACE FUNCTION time_bucket(
  bucket_desc varchar(64) NOT NULL, 
  ts datetime(6)) RETURNS datetime(6) NULL AS
DECLARE
  num_periods bigint = -1; 
  second_part_offset int = -1;
  unit varchar(255) = NULL;
  num_str varchar(255) = NULL;
  unix_ts bigint;
  r datetime(6);
  days_since_epoch bigint;
BEGIN
  num_str = substring_index(bucket_desc, ' ', 1);
  num_periods = num_str :> bigint;
  unit = substr(bucket_desc, length(num_str) + 2, length(bucket_desc));
  IF unit = 'second' or unit = 'seconds' THEN
    unit = 'second';
  ELSIF unit = 'minute' or unit = 'minutes' THEN
    unit = 'minute';
  ELSIF unit = 'hour' or unit = 'hours' THEN
    unit = 'hour';
  ELSIF unit = 'day' or unit = 'days' THEN
    unit = 'day';
  ELSE
    raise user_exception(concat("Unknown time unit: ", unit));
  END IF;

  unix_ts = unix_timestamp(ts);
  
  IF unit = 'second' THEN
    r = from_unixtime(unix_ts - (unix_ts % num_periods));
  ELSIF unit = 'minute' THEN 
    r = from_unixtime(unix_ts - (unix_ts % (num_periods * 60)));
  ELSIF unit = 'hour' THEN 
    r = from_unixtime(unix_ts - (unix_ts % (num_periods * 60 * 60)));
  ELSIF unit = 'day' THEN 
    unix_ts += 4 * 60 * 60; -- adjust to align day boundary
    days_since_epoch = unix_ts / (24 * 60 * 60);
    days_since_epoch = days_since_epoch - (days_since_epoch % num_periods);
    r = (from_unixtime(days_since_epoch * (24 * 60 * 60))) :> date;
  ELSE
    raise user_exception("Internal error -- bad time unit");
  END IF;

  RETURN r;
END;
//
DELIMITER ;

/* 
Generate data to test time_bucket()
*/
DELIMITER //
CREATE OR REPLACE PROCEDURE tb_test() AS
DECLARE 
  d datetime = '2019-01-01';
BEGIN
 DROP TABLE IF EXISTS date_tbl;
 CREATE TABLE date_tbl(d datetime);
 WHILE d <= '2019-01-04' LOOP
   d = d + interval 1 hour;
   INSERT date_tbl VALUES(d);
 END LOOP;
END //
DELIMITER ;

/*
User-defined aggregate first(value, ts)

Return the first (earliest) value based on timestamp order.

Note that if more than one value has the same timestamp, then 
the result is non-deterministic -- an arbitrary one of the
values with the lowest timestamp is returned.

State: v, d (where v is a value, d is a timestamp)
Get result: just return value of v
Update: if new d < min d so far, update d and v
*/
DELIMITER //
CREATE OR REPLACE FUNCTION first_init() RETURNS RECORD(v TEXT, d datetime(6)) AS
  BEGIN
    RETURN ROW("_empty_set_", '9999-12-31 23:59:59.999999');
  END //
DELIMITER ;

DELIMITER //
CREATE OR REPLACE FUNCTION first_iter(state RECORD(v TEXT, d DATETIME(6)), 
   v TEXT, d DATETIME(6)) 
  RETURNS RECORD(v TEXT, d DATETIME(6)) AS
  DECLARE 
    nv TEXT;
    nd DATETIME(6);
    nr RECORD(v TEXT, d DATETIME(6)); 
  BEGIN
    -- if new timestamp is less than lowest before, update state
    if state.d > d then
      nr.v = v;
      nr.d = d;
      return nr;
    end if;
    return state; 
  END //
DELIMITER ;

DELIMITER //
CREATE OR REPLACE FUNCTION first_merge(state1 RECORD(v TEXT, d DATETIME(6)), 
   state2 RECORD(v TEXT, d DATETIME(6))) RETURNS RECORD(v TEXT, d DATETIME(6)) AS
  BEGIN
    if state1.d < state2.d then
      RETURN state1;
    end if;
    RETURN state2;
  END //
DELIMITER ;

DELIMITER //
CREATE OR REPLACE FUNCTION first_terminate(state RECORD(v TEXT, d DATETIME(6))) RETURNS TEXT AS
  BEGIN
    RETURN state.v;
  END //
DELIMITER ;

CREATE AGGREGATE first(TEXT, DATETIME(6)) RETURNS TEXT
  WITH STATE RECORD(v TEXT, d DATETIME(6))
  INITIALIZE WITH first_init
  ITERATE WITH first_iter
  MERGE WITH first_merge
  TERMINATE WITH first_terminate;

/*
User-defined aggregate last(value, ts)

Return the last (oldest) value based on timestamp order.

Note that if more than one value has the same timestamp, then 
the result is non-deterministic -- an arbitrary one of the
values with the highest timestamp is returned.

State: v, d (where v is a value, d is a timestamp)
Get result: just return value of v
Update: if new d > max d so far, update d and v
*/
DELIMITER //
CREATE OR REPLACE FUNCTION last_init() RETURNS RECORD(v TEXT, d datetime(6)) AS
  BEGIN
    RETURN ROW("_empty_set_", '1000-01-01 00:00:00.000000');
  END //
DELIMITER ;

DELIMITER //
CREATE OR REPLACE FUNCTION last_iter(state RECORD(v TEXT, d DATETIME(6)), 
   v TEXT, d DATETIME(6)) 
  RETURNS RECORD(v TEXT, d DATETIME(6)) AS
  DECLARE 
    nv TEXT;
    nd DATETIME(6);
    nr RECORD(v TEXT, d DATETIME(6)); 
  BEGIN
    -- if new timestamp is greater than largest before, update state
    IF state.d < d THEN
      nr.v = v;
      nr.d = d;
      RETURN nr;
    END IF;
    RETURN state; 
  END //
DELIMITER ;

DELIMITER //
CREATE OR REPLACE FUNCTION last_merge(state1 RECORD(v TEXT, d DATETIME(6)), 
   state2 RECORD(v TEXT, d DATETIME(6))) RETURNS RECORD(v TEXT, d DATETIME(6)) AS
  BEGIN
    IF state1.d > state2.d THEN
      RETURN state1;
    END IF;
    RETURN state2;
  END //
DELIMITER ;

DELIMITER //
CREATE OR REPLACE FUNCTION last_terminate(state RECORD(v TEXT, d DATETIME(6))) RETURNS TEXT AS
  BEGIN
    RETURN state.v;
  END //
DELIMITER ;

CREATE AGGREGATE last(TEXT, DATETIME(6)) RETURNS TEXT
  WITH STATE RECORD(v TEXT, d DATETIME(6))
  INITIALIZE WITH last_init
  ITERATE WITH last_iter
  MERGE WITH last_merge
  TERMINATE WITH last_terminate;

In addition, another good method to bucket by K seconds is to take the integer number of seconds, divide it by K with integer division (DIV), then multiply by K again. Here are functions that use this to bucket by second and minute, respectively.

DELIMITER //
CREATE OR REPLACE FUNCTION bucket_s(n int, ts datetime(6)) 
RETURNS datetime AS
BEGIN
  RETURN from_unixtime(unix_timestamp(ts) DIV n * n); 
END //

CREATE OR REPLACE FUNCTION bucket_m(n int, ts datetime(6)) 
RETURNS datetime AS
BEGIN
  RETURN from_unixtime(unix_timestamp(ts) DIV (60 * n) * (60 * n)); 
END //
DELIMITER ;

For example, you could do this to bucket by 5 second intervals:

SELECT bucket_s(5, ts), avg(value)
FROM mySeries
ORDER BY 1;
1 Like

Thank you! Works like charm.