Correct Configuration of TRANSFORM PIPELINE for S3 NDJSON data



I’m currently evaluating MemSql and am in the process of loading data into the developer instance of the server.

I am attempting to import NDJSON data from S3 into memsql. The data is stored as .json.gz files.

The documentation is not clear at all so I am assuming that .gz files are automatically decompressed on import.

I have the following:

Newline Delimited JSON files containing rows like this.

{"user":"fc2ebc75782e6d395d0807081bd3bfb2","country":"AT","region":"AT-9","location":"12","loc_key":"AT~AT-9~12","birth_year":"1992","stream_date":"2019-01-01","upc":"22122122122","isrc":"BAZZL1100254","track_artists":"Foo Bar Baz","track_title":"Bar Baz","track_version":"2","album_artist":"Foo Bar Baz","album_name":"BarBaz Bar","parent_identifier":"22122122122","content_type":"audio","territory":"AT","vendor_identifier":"1c6c7cd166bab4dfba949af06d8d4a0e","playlist_id":"","units":"1","max_seconds":240,"source":"other","device":"speaker","os":"speaker","gender":"male","age":"25to34","account":"premium","shuffles":"0","repeats":"0","completed":"0","skipped":"1"}

And my definiton for the pipeline is like this. Unfortunately the documentation is rather sparse and the few examples of parsing JSON that I can find so far all use nested JSON as the sample.

    CREATE PIPELINE IF NOT EXISTS pipeline_memsql_test_streaming_dataset AS
  LOAD DATA S3 "bucket-test/data/prefix/xxx_user_streams" 
  CREDENTIALS '{"aws_secret_access_key": "******", "aws_access_key_id": "*****"}' 
  WITH TRANSFORM ('memsql://json', '', '-r " [.user, .country, .region, .location, .loc_key, .birth_year, .stream_date|tostring, .upc|tostring, .isrc|tostring, .track_artists, .track_title, .track_version|tonumber, .album_artist, .album_name, .parent_identifier, .content_type, .territory, .vendor_identifier, .playlist_id, .units|tonumber, .max_seconds|tonumber, .source, .device, .os, .gender, .age, .account, .shuffles|tonumber, .repeats|tonumber, .completed|tonumber, .skipped|tonumber] | @tsv"')
  INTO TABLE xxx_user_streams

Running TEST PIPELINE produces no output as I’d expect.

How can I go about debugging this and get it working.


Hi Ian

Apologies for the delay. I am fetching some people to help you get this figured out. Stayed tuned.


Hi @Jacky,

Many thanks for the response. I did succeed in resolving the issue with the following approach.

I would however still like to understand better how the Transform works when using the approach above.

DROP PIPELINE IF EXISTS pipeline_memsql_test_streaming_dataset_xxx;
CREATE PIPELINE IF NOT EXISTS pipeline_memsql_test_streaming_dataset_xxx AS
LOAD DATA S3 "bucket-test/data/xxx" 
CREDENTIALS '{"aws_secret_access_key": "*********", "aws_access_key_id": "*******"}' 
INTO TABLE xxx_user_streams(
user <- user,
country <- country,
region <- region,
location <- location,
loc_key <- loc_key,
birth_year <- birth_year,
stream_date <- stream_date,
upc <- upc,
isrc <- isrc,
track_artists <- track_artists,
track_title <- track_title,
track_version <- track_version,
album_artist <- album_artist,
album_name <- album_name,
parent_identifier <- parent_identifier,
content_type <- content_type,
territory <- territory,
vendor_identifier <- vendor_identifier,
playlist_id <- playlist_id,
units <- units,
max_seconds <- max_seconds,
source <- source,
device <- device,
os <- os,
gender <- gender,
age <- age,
account <- account,
shuffles <- shuffles,
repeats <- repeats,
completed <- completed,
skipped <- skipped

Thanks for getting back to me.