I’m trying to store messages which were posted from a Kafka topic through Pipeline and Stored Procedure.
a) Pipeline was subscribed to Kafka topic.
b) Memsql Pipeline will trigger the stored procedure.
Everything is going good for positive scenario’s. In case of any corrupted message received from Kafka topic then rest of the valid messages in the same batch were getting Ignore.
I tried to execute batch in an loop only when I receive a corrupt message but that also didn’t work out. It’s still ignoring remaining valid messages.
if I have received 3 messages in the batch, out of which 1 is invalid then I’m expecting two valid records needs to be inserted into TABLE_1 and remaining one should be in Errors table.
But we are getting one invalid message to Error table but remaining two messages were not getting inserted in any table.
CREATE OR REPLACE PROCEDURE log_errors(sample_table_1 query(a JSON, b JSON))
arr ARRAY(RECORD(a JSON, b JSON)) = COLLECT(sample_table_1);
FOR r IN arr LOOP
_col_1 = r.a::$col_1;
_col_2 = r.a::$col_2;
_col_3 = r.a::$col_3;
INSERT INTO TABLE_1 (COL_1,COL_2,COL_3) VALUES(_col_1,_col_2,_col_3); END LOOP; EXCEPTION WHEN OTHERS THEN DECLARE tmp text = exception_message(); BEGIN INSERT INTO ERRORS VALUES(_col_1,tmp,current_timestamp()); END;
CREATE OR REPLACE PROCEDURE parse_procedure_1 (
INSERT INTO TABLE_1 (COL_1,COL_2,COL_3) SELECT DISTINCT b::$col_1,b::$col_2,b::$col_3 FROM sample_table_1 tble EXCEPTION WHEN OTHERS THEN CALL log_errors(sample_table_1);
CREATE PIPELINE my_pipeline AS
LOAD DATA KAFKA ‘*:9092/’
INTO PROCEDURE parse_procedure_1
FORMAT JSON (
a <- a default NULL,
b <- b default NULL
Note: Please Ignore the syntax.
Since Multi Transactions were not allowed in Pipeline - Stored Procedure combination. Please let me know the alternative for this.