(Ported from memsql-public-chat Slack channel)
coradinibr [5:45 PM]
hi. I’m evaluating PIPELINES and I’m trying to create a simple "insert/update’ via proc test. Here is my proc as example:
DELIMITER // CREATE PROCEDURE proc_resp(batch query(id bigint, new_date bigint)) AS BEGIN INSERT INTO resp_updates(id, date_updated) SELECT id, from_unixtime(new_date) FROM batch ON DUPLICATE KEY UPDATE date_updated = from_unixtime(batch.new_date); END // DELIMITER ;
it fails when it tries to execute the batch.new_date saying the field doesn’t exist. If I replace the UPDATE line by UPDATE date_updated = now(), everything works.
Question: What’s the trick to use the “batch query” field in the UPDATE clause. Sub-selects don’t work by default in MemSQL, so querying the batch like in the INSERT statement is not an option here. (edited)
robbie [5:47 PM]
the right thing here would be
INSERT INTO resp_updates(id, date_updated) SELECT id, from_unixtime(new_date) FROM batch ON DUPLICATE KEY UPDATE date_updated = values(date_updated)
additionally, if you are just trying to do upserts, you can do that directly in your create pipeline without a stored procedure
coradinibr [5:48 PM]
I need the
new_date coming from the proc call, so even when the id exists, the new timestamp is coming from outside.
robbie [5:49 PM]
yea, that’s what
VALUES does. It will use the value that would have been inserted into that column
coradinibr [5:49 PM]
it worked!! great!!
I’ve to do upserts, but I’m receiving json objects from kafka and picking specific fields.
robbie [5:53 PM]
oh I see
coradinibr [5:54 PM]
I used the
TRANSFORM memsql://json which uses jq
then as I could not insert straight to the table because of the duplicated key, I created a proc
robbie [5:55 PM]
I see, I think you should be able to do the
ON DUPLICATE KEY as part of the
CREATE PIPELINE directly
coradinibr [5:55 PM]
it didn’t work, there might be some trick there.
robbie [5:56 PM]
it should just be
CREATE PIPELINE .... ON DUPLICATE KEY UPDATE date_updated = values(date_updated)
coradinibr [20 days ago]
hi @robbie, how do I do this?
Here is my PIPELINE
CREATE PIPELINE GET_RESPONDENT AS LOAD DATA KAFKA 'my_kafka_node/my_kafka_topic' BATCH_INTERVAL 2500 WITH TRANSFORM ('memsql://json', '', '-r "[.respondentId, (.date/1000 | floor)] | @tsv"') INTO TABLE resp_updates;
I have no headers from the transform, my table has 2 fields (id bigint, date_updated datetime).
memsql doesn’t convert unix timestamp to datetime automatically, it means I have to call from_unixtime().
here’s my stored procedure
DROP PROCEDURE proc_resp; DELIMITER // CREATE PROCEDURE proc_resp(batch query(respondent_id bigint, new_date bigint)) AS BEGIN INSERT INTO resp_updates(respondent_id, date_updated) SELECT respondent_id, from_unixtime(new_date) FROM batch ON DUPLICATE KEY UPDATE date_updated = values(date_updated); END // DELIMITER ;
It would work if I could run this transform
WITH TRANSFORM ('memsql://json', '', '-r " [ .respondentId, (.date/1000 | floor | strftime(\"%Y-%m-%d %k:%M:%S\"))] | @tsv"')
but memsql parser is removing the escaped double quotes of strftime above and failing when trying to call jq externally
robbie [5:56 PM]
but this also works
coradinibr [5:56 PM]
btw, I found an issue with jq that you might be interested in investigating.
When calling additional external formatters, I could not find a way to scape double quotes
WITH TRANSFORM ('memsql://json', '', '-r " . | [ .respondentId, (.date/1000 | floor | strftime(\"%Y-%m-%d %k:%M:%S\"))] | @tsv"')
something happens, seems like your parser removes the double quotes from the strftime, even if I tried to scape them.
my test is working, not important now. Thanks very much for your help!!