Handling failure message in batch using Memsql Pipeline

Hi,

I’m trying to store messages which were posted from a Kafka topic through Pipeline and Stored Procedure.
a) Pipeline was subscribed to Kafka topic.
b) Memsql Pipeline will trigger the stored procedure.

Everything is going good for positive scenario’s. In case of any corrupted message received from Kafka topic then rest of the valid messages in the same batch were getting Ignore.

I tried to execute batch in an loop only when I receive a corrupt message but that also didn’t work out. It’s still ignoring remaining valid messages.

For example:
if I have received 3 messages in the batch, out of which 1 is invalid then I’m expecting two valid records needs to be inserted into TABLE_1 and remaining one should be in Errors table.

But we are getting one invalid message to Error table but remaining two messages were not getting inserted in any table.

Queries:

delimiter //
CREATE OR REPLACE PROCEDURE log_errors(sample_table_1 query(a JSON, b JSON))
AS
DECLARE
arr ARRAY(RECORD(a JSON, b JSON)) = COLLECT(sample_table_1);
_col_1 VARCHAR(50);
_col_2 VARCHAR(50);
_col_3 VARCHAR(50);
BEGIN
FOR r IN arr LOOP
_col_1 = r.a::$col_1;
_col_2 = r.a::$col_2;
_col_3 = r.a::$col_3;

INSERT INTO TABLE_1 (COL_1,COL_2,COL_3) VALUES(_col_1,_col_2,_col_3);
	
END LOOP;
EXCEPTION
  WHEN OTHERS THEN
    DECLARE
	tmp text = exception_message();
	BEGIN
    INSERT INTO ERRORS VALUES(_col_1,tmp,current_timestamp());
	END;

END//

CREATE OR REPLACE PROCEDURE parse_procedure_1 (
sample_table_1 query(
a JSON,
b JSON))
AS
DECLARE
BEGIN

INSERT INTO TABLE_1 (COL_1,COL_2,COL_3) 
SELECT DISTINCT b::$col_1,b::$col_2,b::$col_3
FROM sample_table_1 tble 
	
EXCEPTION
   WHEN OTHERS THEN
       CALL log_errors(sample_table_1);

END //
delimiter ;

CREATE PIPELINE my_pipeline AS
LOAD DATA KAFKA ‘*:9092/
INTO PROCEDURE parse_procedure_1
FORMAT JSON (
a ← a default NULL,
b ← b default NULL
);

Note: Please Ignore the syntax.

Since Multi Transactions were not allowed in Pipeline - Stored Procedure combination. Please let me know the alternative for this.

Thanks
Arun

Hi Arun

Thanks for posting a question on the MemSQL forums

I got a response from our engineer, and this has to do with the fact that stored procedures runs on many rows instead of running on one row. You’ll need to rewrite your stored procedure to handle that.

Jacky.

Thanks Jacky for the response.

If we run it on the batch then all the messages will be inserted into Error table even one was corrupted. May I know how to handle it while doing many rows?

Regards
Arun

@Jacky if the default behavior of the pipeline is to handle incoming Kafka events in batches, what is Memsql’s recommendation for error handling ?
It appears from @arun.pradeep’s comment that the current Memsql behavior causes the whole batch to be discarded on individual errors within the batch.

You have full SQL control, so you can do whatever you want. Its unclear from your question what sort of errors you’re expecting or trying to filter, but by the time an SP has the row you know at least it matches the schema of the QTV argument. Are the errors you’re looking for single row SQL constraint violations? One way you could consider doing this is

insert into table_1(...) 
select ... 
from sample_table_1 ... 
where <expression_that_tests_row_corretness>;

insert into errors(...) 
select ... 
from sample_table_1 ... 
where <expression_that_tests_row_incorretness>;

If you’re worried about dup key violations, you can use insert ignore to ignore the dups or a join to track the dups, but its very manual.

Why do you want to use a stored procedure btw? If you don’t use a stored procedure, you can put skip constraint violations into your create pipeline statement and error rows will end up in information_schema.pipelines_errors.

Or, if you don’t mind every row being processed by the aggregator, you could do something similar to your collect approach, but wrap the singleton insert in a try .. exception rather than putting the exception on the whole loop. This might be slow if you have a huge volume of data though.

1 Like
CREATE OR REPLACE PROCEDURE prc_tweet(tweet_BATCH query(tweet_JSON json))
  AS
  BEGIN
    INSERT INTO cdr(org_id, time_zone, node_type, cluster_id, cluster_name, host_name, time_stamp, ip_address, ) 
    SELECT json_extract_string(tweet_JSON, 'org_id'), json_extract_string(tweet_JSON, 'time_zone'), json_extract_string(tweet_JSON, 'node_type'), json_extract_string(tweet_JSON, 'version'), json_extract_string(tweet_JSON, 'cluster_id'), json_extract_string(tweet_JSON, 'cluster_name'), json_extract_string(tweet_JSON, 'host_name'), json_extract_double(tweet_JSON, 'time_stamp'), json_extract_string(tweet_JSON, 'ip_address')
    FROM CDR_BATCH;
    EXCEPTION
      WHEN OTHERS THEN
      INSERT INTO pipeline_error_data(pipeline_name, error_msg)  -->How to hadnle error code and error type into this table.
      SELECT 'pipeline_tweet', tweet_JSON
      FROM tweet_BATCH;
END;

We are using ‘prc_tweet’ procedure to push data from Kafka pipeline to a table through stored procedure. We are using 7.0 memsql version. incase any json record raise any exception, we are successfully inserting the pipeline name and error json into a custom table called “pipeline_error_data”. But please let me know how to capture the error code and error type and details into this table. I have gone through several links but no luck. It would be more helpful if you share a sample code to capture the error code and error type and details into custom table called “pipeline_error_data” .

Hi rjsv,

Have you gone through this document link?

Let us know if the above was helpful or not.

Thanks,