How to alter FS pipeline to change the columns, if the columns in the file changed

Hi,
I am trying to alter the FS pipeline for memSQL 6.8.12.
The scenario is, the columns in the input file is changing as per the user input.
I want to alter the columns selection in the pipeline through a python script dynamically.

I am trying to get the syntax for the change, but not able to do successfully.
Can you please suggest on this.

Below is the scenario.

Initial Pipiline:

CREATE PIPELINE test_memsql_PIPELINE
AS LOAD DATA FS ‘/base/location/test_file_memsql.csv’
BATCH_INTERVAL 2500
ENABLE OUT_OF_ORDER OPTIMIZATION
SKIP ALL ERRORS
INTO TABLE test_memsql
FIELDS TERMINATED BY ‘,’ ENCLOSED BY ‘’ ESCAPED BY ‘\’
LINES TERMINATED BY ‘\n’ STARTING BY ‘’
IGNORE 1 LINES
(
COB_DATE,
BU_CATEGORY,
BU_AREA,
PNL
);

Pipeline structure needed after ALTER PIPELINE

CREATE PIPELINE test_memsql_PIPELINE
AS LOAD DATA FS ‘/base/location/test_file_memsql.csv’
BATCH_INTERVAL 2500
ENABLE OUT_OF_ORDER OPTIMIZATION
SKIP ALL ERRORS
INTO TABLE test_memsql
FIELDS TERMINATED BY ‘,’ ENCLOSED BY ‘’ ESCAPED BY ‘\’
LINES TERMINATED BY ‘\n’ STARTING BY ‘’
IGNORE 1 LINES
( BU_CATEGORY,
COB_DATE,
DIVISION,
PNL,
BU_AREA,
TOTAL_AMOUNT
);

I tried something like below, but looks like this is not syntactically correct.

alter pipeline test_memsql_PIPELINE columns (BU_CATEGORY,COB_DATE,DIVISION,PNL,BU_AREA,TOTAL_AMOUNT);

alter pipeline test_memsql_PIPELINE set fields (BU_CATEGORY,COB_DATE,DIVISION,PNL,BU_AREA,TOTAL_AMOUNT);

alter pipeline test_memsql_PIPELINE set columns (BU_CATEGORY,COB_DATE,DIVISION,PNL,BU_AREA,TOTAL_AMOUNT);

ERROR 1064 (42000): You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near ‘columns (BU_CATEGORY,COB_DATE,DIVISION,PNL,BU_AREA,TOTAL_AMOUNT)’ at line 1

ERROR 1064 (42000): You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near ‘fields (BU_CATEGORY,COB_DATE,DIVISION,PNL,BU_AREA,TOTAL_AMOUNT)’ at line 1

PS: Just to mention, I tried CREATE OR REPLACE PIPELINE for the pipeline modification. It looks working as expected.

  • The pipeline columns changed to new columns.
  • The pipeline didn’t stop and remain in running state.
  • Only the new files are read.

Can RELACE PIPELINE option be safely used in this scenario?

Thanks

REPLACE PIPELINE is in fact the right tool here. It’ll “atomically” replace the pipeline, preserving the pipeline’s position in the data stream.

However, you’d need to make sure that the change happens after the pipeline has finished processing all instances of the old “file schema” and before it processes any instances of the new schema. We don’t make it particularly easy to do that. You could check information_schema.pipelines_files to see whether all known files have been loaded, though even that might have issues, as I think there’s a problematic window between the time that a file gets added to the data source and the time it shows up in pipelines_files as a “known file”.

If it’s possible to change the input format, this seems like a good use case for JSON pipelines, where order doesn’t matter and fields can be missing. You can use a per-field DEFAULT clause to specify a value to be loaded when the field is missing.

The options for CREATE PIPELINE AS LOAD DATA ... FORMAT JSON are identical to LOAD DATA ... FORMAT JSON:

Thanks Sasha…this is working good!