Skipping rows while loading thru PIPELINE into Stored Procedure

I am loading a row-store table using file system pipelines and stored procedures. Syntax is similar to below.

INSERT INTO table
SELECT * FROM batch_query
WHERE condition
ON DUPLICATE KEY UPDATE
       ROWS = values(ROWS).

Can we use the COLUMN name from the table in the WHERE condition to skip some rows from being updated. Basically i have a timestamp coming from the file and i do not want to update the row in the table if the table row timestamp is greater than the timestamp that is coming from the file.

When i tried, i am getting the below error.

Type: ER_BAD_FIELD_ERROR (1054)
Message: Unknown column 'MEMSQL_CRT_TS' in 'where clause'

If this is not possible, is there any other way to skip certain rows in the table from being updated by the pipeline.

Thanks.

Can you share the full procedure code?

You can certainly skip rows in the SP the way you are describing. The error makes me think you just need to refer to it with table name or alias, table_name.MEMSQL_CRT_TS.

i tried prefixing with the table name but that didn’t work either. Below is the SP.

account is the table and MEMSQL_CRT_TS is the column that i would like to include in the WHERE condition.

DELIMITER $$
CREATE OR REPLACE PROCEDURE `ACCOUNT_LOAD`(account_rows query(`INP_AC_NO` varchar(8) CHARACTER SET utf8 COLLATE utf8_general_ci NULL,
  `INP_SC_NO` varchar(5) CHARACTER SET utf8 COLLATE utf8_general_ci NULL,
  `INP_LK_SHR_QY` decimal(18,5) NULL,
  `INP_XCL_IN` char(1) CHARACTER SET utf8 COLLATE utf8_general_ci NULL,
  `INP_UNLD_TS` datetime(6) NULL,
  `INP_TABLE_TY` char(01) CHARACTER SET utf8 COLLATE utf8_general_ci NULL
  )) RETURNS void AS
	  
  DECLARE
        db2_unld_ts QUERY(db2_unld_ts DATETIME(6));
        tmp_unld_ts DATETIME(6);
  BEGIN
			db2_unld_ts = SELECT INP_UNLD_TS
						    FROM account_rows
						   WHERE INP_TABLE_TY = 'E'
						   LIMIT 1;

			tmp_unld_ts = SCALAR(db2_unld_ts);

			INSERT INTO account
					(AC_NO        
					,SC_NO        
					,LK_SHR_QY
					,XCL_IN                                                                               
					,MEMSQL_CRT_TS
					)

			SELECT  INP_ML_AC_NO        
					,INP_ML_SC_NO        
					,INP_LK_SHR_QY
					,INP_XCL_IN                                                                     
					,CURRENT_TIMESTAMP(6)
			   FROM account_rows              
			WHERE INP_TABLE_TY = 'M'      
			  **AND MEMSQL_CRT_TS < tmp_unld_ts**
			ON DUPLICATE KEY            
			UPDATE AC_NO                 =  values(AC_NO      )
				 , SC_NO                 =  values(SC_NO      )
				 , LK_SHR_QY             =  values(LK_SHR_QY  )
				 , XCL_IN                =  values(HL_XCL_IN  )
				 , MEMSQL_CRT_TS         =CURRENT_TIMESTAMP(6);                                                                                       

			DELETE FROM account
			 WHERE MEMSQL_CRT_TS < tmp_unld_ts;
END;$$

DELIMITER ;

do you mind sharing the table ddl as well?

How about trying this out (I haven’t test this though)

DELIMITER $$
CREATE OR REPLACE PROCEDURE ACCOUNT_LOAD(account_rows query(INP_AC_NO varchar(8) CHARACTER SET utf8 COLLATE utf8_general_ci NULL,
INP_SC_NO varchar(5) CHARACTER SET utf8 COLLATE utf8_general_ci NULL,
INP_LK_SHR_QY decimal(18,5) NULL,
INP_XCL_IN char(1) CHARACTER SET utf8 COLLATE utf8_general_ci NULL,
INP_UNLD_TS datetime(6) NULL,
INP_TABLE_TY char(01) CHARACTER SET utf8 COLLATE utf8_general_ci NULL
)) RETURNS void AS

DECLARE
db2_unld_ts QUERY(db2_unld_ts DATETIME(6));
tmp_unld_ts DATETIME(6);
BEGIN
db2_unld_ts = SELECT INP_UNLD_TS
FROM account_rows
WHERE INP_TABLE_TY = ‘E’
LIMIT 1;

  	tmp_unld_ts = SCALAR(db2_unld_ts);

  	INSERT INTO account
  			(AC_NO        
  			,SC_NO        
  			,LK_SHR_QY
  			,XCL_IN                                                                               
  			,MEMSQL_CRT_TS
  			)

  	SELECT  INP_ML_AC_NO        
  			,INP_ML_SC_NO        
  			,INP_LK_SHR_QY
  			,INP_XCL_IN                                                                     
  			,CURRENT_TIMESTAMP(6)
  	   FROM account_rows              
  	WHERE INP_TABLE_TY = 'M'      
  	ON DUPLICATE KEY            
  	UPDATE AC_NO                 =  if(MEMSQL_CRT_TS < tmp_unld_ts, values(AC_NO), AC_NO)
  		 , SC_NO                 =  if(MEMSQL_CRT_TS < tmp_unld_ts, values(SC_NO), SC_NO)
  		 , LK_SHR_QY             =  if(MEMSQL_CRT_TS < tmp_unld_ts, values(LK_SHR_QY), LK_SHR_QY)
  		 , XCL_IN                =  if(MEMSQL_CRT_TS < tmp_unld_ts, values(HL_XCL_IN), XCL_IN)
  		 , MEMSQL_CRT_TS         =  if(MEMSQL_CRT_TS < tmp_unld_ts, CURRENT_TIMESTAMP(6), MEMSQL_CRT_TS)                                                                 

  	DELETE FROM account
  	 WHERE MEMSQL_CRT_TS < tmp_unld_ts;

END;$$

DELIMITER ;

Hi anandh.nagarajan. Thanks for trying MemSQL Stored Procedures.

The error appears to be due to using the INSERT ... SELECT ... ON DUPLICATE KEY ... form. In short, the WHERE clause refers to the sub-select of the query, and account_rows is defined to have only the INP_ fields, whereas your intent seems to be to compare to MEMSQL_CRT_TS field of the existing account row. You should be able to get the desired behavior using the suggested syntax in Saksham’s answer.

Sorry for the late response and thanks everybody for the feedback. I was able to achieve what i wanted by joining the input rows from pipeline with the table on the cluster.

DELIMITER $$
CREATE OR REPLACE PROCEDURE ACCOUNT_LOAD(account_rows query(INP_AC_NO varchar(8) CHARACTER SET utf8 COLLATE utf8_general_ci NULL,
INP_SC_NO varchar(5) CHARACTER SET utf8 COLLATE utf8_general_ci NULL,
INP_LK_SHR_QY decimal(18,5) NULL,
INP_XCL_IN char(1) CHARACTER SET utf8 COLLATE utf8_general_ci NULL,
INP_UNLD_TS datetime(6) NULL,
INP_TABLE_TY char(01) CHARACTER SET utf8 COLLATE utf8_general_ci NULL
)) RETURNS void AS

DECLARE
db2_unld_ts QUERY(db2_unld_ts DATETIME(6));
tmp_unld_ts DATETIME(6);
BEGIN
	
	db2_unld_ts = SELECT INP_UNLD_TS
	FROM account_rows
	WHERE INP_TABLE_TY = ‘E’
	LIMIT 1;

  	tmp_unld_ts = SCALAR(db2_unld_ts);

  	INSERT INTO account
  			(AC_NO        
  			,SC_NO        
  			,LK_SHR_QY
  			,XCL_IN                                                                               
  			,MEMSQL_CRT_TS
  			)

  	SELECT  INP_AC_NO        
  			,INP_SC_NO        
  			,INP_LK_SHR_QY
  			,INP_XCL_IN                                                                     
  			,CURRENT_TIMESTAMP(6)
  	   FROM account_rows inp_tab
	   LEFT OUTER JOIN
	        account acc_tab
	     ON inp_tab.INP_AC_NO = acc_tab.AC_NO
		AND inp_tab.INP_SC_NO = acc_tab.SC_NO		 
  	WHERE INP_TABLE_TY = 'M'      
	  AND ( acc_tab.MEMSQL_CRTS_TS < tmp_unld_ts OR  
	        acc_tab.MEMSQL_CRTS_TS IS NULL 
		   )
  	ON DUPLICATE KEY            
  	UPDATE AC_NO                 =  values(AC_NO)
  		 , SC_NO                 =  values(SC_NO)
  		 , LK_SHR_QY             =  values(LK_SHR_QY)
  		 , XCL_IN                =  values(HL_XCL_IN)
  		 , MEMSQL_CRT_TS         =  CURRENT_TIMESTAMP(6);

  	DELETE FROM account
  	 WHERE MEMSQL_CRT_TS < tmp_unld_ts;
END;$$

DELIMITER ;
1 Like