Having some trouble with "batch query" using in PIPELINES

(Ported from memsql-public-chat Slack channel)

coradinibr [5:45 PM]
hi. I’m evaluating PIPELINES and I’m trying to create a simple "insert/update’ via proc test. Here is my proc as example:

DELIMITER //
CREATE PROCEDURE proc_resp(batch query(id bigint, new_date bigint))
AS
BEGIN
    INSERT INTO resp_updates(id, date_updated)
      SELECT id, from_unixtime(new_date) FROM batch
    ON DUPLICATE KEY
      UPDATE date_updated = from_unixtime(batch.new_date);
END //
DELIMITER ;

it fails when it tries to execute the batch.new_date saying the field doesn’t exist. If I replace the UPDATE line by UPDATE date_updated = now(), everything works.

Question: What’s the trick to use the “batch query” field in the UPDATE clause. Sub-selects don’t work by default in MemSQL, so querying the batch like in the INSERT statement is not an option here. (edited)

robbie [5:47 PM]
the right thing here would be

INSERT INTO resp_updates(id, date_updated)
      SELECT id, from_unixtime(new_date) FROM batch
    ON DUPLICATE KEY
      UPDATE date_updated = values(date_updated)

additionally, if you are just trying to do upserts, you can do that directly in your create pipeline without a stored procedure

coradinibr [5:48 PM]
I need the new_date coming from the proc call, so even when the id exists, the new timestamp is coming from outside.

robbie [5:49 PM]
yea, that’s what VALUES does. It will use the value that would have been inserted into that column

coradinibr [5:49 PM]
interesting

it worked!! great!!

I’ve to do upserts, but I’m receiving json objects from kafka and picking specific fields.

robbie [5:53 PM]
oh I see

coradinibr [5:54 PM]
I used the TRANSFORM memsql://json which uses jq

then as I could not insert straight to the table because of the duplicated key, I created a proc

robbie [5:55 PM]
I see, I think you should be able to do the ON DUPLICATE KEY as part of the CREATE PIPELINE directly

coradinibr [5:55 PM]
it didn’t work, there might be some trick there. :relaxed:

robbie [5:56 PM]
it should just be

CREATE PIPELINE .... ON DUPLICATE KEY UPDATE date_updated = values(date_updated)

coradinibr [20 days ago]
hi @robbie, how do I do this?
Here is my PIPELINE

CREATE PIPELINE GET_RESPONDENT
    AS 
    LOAD DATA 
         KAFKA 'my_kafka_node/my_kafka_topic'
    BATCH_INTERVAL 2500
WITH TRANSFORM ('memsql://json', '', '-r "[.respondentId, (.date/1000 | floor)] | @tsv"')
INTO TABLE resp_updates;

I have no headers from the transform, my table has 2 fields (id bigint, date_updated datetime).

memsql doesn’t convert unix timestamp to datetime automatically, it means I have to call from_unixtime().

here’s my stored procedure

DROP PROCEDURE proc_resp;
DELIMITER //
CREATE PROCEDURE proc_resp(batch query(respondent_id bigint, new_date bigint))
AS
BEGIN
    INSERT INTO resp_updates(respondent_id, date_updated)
      SELECT respondent_id, from_unixtime(new_date) FROM batch
    ON DUPLICATE KEY
      UPDATE date_updated = values(date_updated);
END //
DELIMITER ;

It would work if I could run this transform

WITH TRANSFORM ('memsql://json', '', '-r " [ .respondentId, (.date/1000 | floor | strftime(\"%Y-%m-%d %k:%M:%S\"))] | @tsv"')

but memsql parser is removing the escaped double quotes of strftime above and failing when trying to call jq externally

robbie [5:56 PM]
but this also works

coradinibr [5:56 PM]
I’ll try
btw, I found an issue with jq that you might be interested in investigating.
When calling additional external formatters, I could not find a way to scape double quotes

example:

WITH TRANSFORM ('memsql://json', '', '-r " . | [ .respondentId, (.date/1000 | floor | strftime(\"%Y-%m-%d %k:%M:%S\"))] | @tsv"')

something happens, seems like your parser removes the double quotes from the strftime, even if I tried to scape them.

my test is working, not important now. Thanks very much for your help!!