Handling failure message in batch using Memsql Pipeline


#1

Hi,

I’m trying to store messages which were posted from a Kafka topic through Pipeline and Stored Procedure.
a) Pipeline was subscribed to Kafka topic.
b) Memsql Pipeline will trigger the stored procedure.

Everything is going good for positive scenario’s. In case of any corrupted message received from Kafka topic then rest of the valid messages in the same batch were getting Ignore.

I tried to execute batch in an loop only when I receive a corrupt message but that also didn’t work out. It’s still ignoring remaining valid messages.

For example:
if I have received 3 messages in the batch, out of which 1 is invalid then I’m expecting two valid records needs to be inserted into TABLE_1 and remaining one should be in Errors table.

But we are getting one invalid message to Error table but remaining two messages were not getting inserted in any table.

Queries:

delimiter //
CREATE OR REPLACE PROCEDURE log_errors(sample_table_1 query(a JSON, b JSON))
AS
DECLARE
arr ARRAY(RECORD(a JSON, b JSON)) = COLLECT(sample_table_1);
_col_1 VARCHAR(50);
_col_2 VARCHAR(50);
_col_3 VARCHAR(50);
BEGIN
FOR r IN arr LOOP
_col_1 = r.a::$col_1;
_col_2 = r.a::$col_2;
_col_3 = r.a::$col_3;

INSERT INTO TABLE_1 (COL_1,COL_2,COL_3) VALUES(_col_1,_col_2,_col_3);
	
END LOOP;
EXCEPTION
  WHEN OTHERS THEN
    DECLARE
	tmp text = exception_message();
	BEGIN
    INSERT INTO ERRORS VALUES(_col_1,tmp,current_timestamp());
	END;

END//

CREATE OR REPLACE PROCEDURE parse_procedure_1 (
sample_table_1 query(
a JSON,
b JSON))
AS
DECLARE
BEGIN

INSERT INTO TABLE_1 (COL_1,COL_2,COL_3) 
SELECT DISTINCT b::$col_1,b::$col_2,b::$col_3
FROM sample_table_1 tble 
	
EXCEPTION
   WHEN OTHERS THEN
       CALL log_errors(sample_table_1);

END //
delimiter ;

CREATE PIPELINE my_pipeline AS
LOAD DATA KAFKA ‘*:9092/
INTO PROCEDURE parse_procedure_1
FORMAT JSON (
a <- a default NULL,
b <- b default NULL
);

Note: Please Ignore the syntax.

Since Multi Transactions were not allowed in Pipeline - Stored Procedure combination. Please let me know the alternative for this.

Thanks
Arun


#2

Hi Arun

Thanks for posting a question on the MemSQL forums

I got a response from our engineer, and this has to do with the fact that stored procedures runs on many rows instead of running on one row. You’ll need to rewrite your stored procedure to handle that.

Jacky.


#3

Thanks Jacky for the response.

If we run it on the batch then all the messages will be inserted into Error table even one was corrupted. May I know how to handle it while doing many rows?

Regards
Arun


#4

@Jacky if the default behavior of the pipeline is to handle incoming Kafka events in batches, what is Memsql’s recommendation for error handling ?
It appears from @arun.pradeep’s comment that the current Memsql behavior causes the whole batch to be discarded on individual errors within the batch.


#5

You have full SQL control, so you can do whatever you want. Its unclear from your question what sort of errors you’re expecting or trying to filter, but by the time an SP has the row you know at least it matches the schema of the QTV argument. Are the errors you’re looking for single row SQL constraint violations? One way you could consider doing this is

insert into table_1(...) 
select ... 
from sample_table_1 ... 
where <expression_that_tests_row_corretness>;

insert into errors(...) 
select ... 
from sample_table_1 ... 
where <expression_that_tests_row_incorretness>;

If you’re worried about dup key violations, you can use insert ignore to ignore the dups or a join to track the dups, but its very manual.

Why do you want to use a stored procedure btw? If you don’t use a stored procedure, you can put skip constraint violations into your create pipeline statement and error rows will end up in information_schema.pipelines_errors.

Or, if you don’t mind every row being processed by the aggregator, you could do something similar to your collect approach, but wrap the singleton insert in a try .. exception rather than putting the exception on the whole loop. This might be slow if you have a huge volume of data though.