Set batch size for each Kafka pipeline

I have multiple pipelines reading data from different kafka topic. Is there away to set different batch size for each pipeline.
Eg: pipeline 1 : 100 msg per batch
Pipeline 2: 1000 msg per batch
I know there is a global setting but how can custom batch limit per pipeline can be defined.

unfortunately, current version of MemSQL pipelines only has a global batch size. how critical is this requirement for you?

if the messages in both topics are produced at different rates, and memsql can keep up with them both (ie there’s no backlog of messages to process), the max batch size rarely comes into play. the most recent ingested batch should be less than the global max size.

So I have use case where i am reading from multiple kafka topics.
Some kafka topic have high volumes but the message size is small (kbs) while some have lower message volume but message size is big.
The one with bigger message size nested Json (5mb pm and at one time 100msg) takes considerable time to finish the batch processing as it pulls all 100 msgs in single batch .

That is another interesting use-case. What is the considerable time to finish batch processing? Are you putting it through a stored procedure?

In general, each pipeline batch has some overhead, so running 1 batches for 100 messages should be better in terms of total resources than 100 or even 10 batches for the same number of messages. If the stored procedure is relatively light, the time delay you are experiencing is likely dominated by the network download of 500mb.

It is worth checking if 100 large messages are arriving on the same partition or several. In MemSQL pipelines, each database partition downloads messages from a different kafka partition, and that helps parallelize the workload. However, if the messages arrive on just one partition, then you are not getting the benefits of parallel execution.

100 batches to finish insert took 3-4 hours.
It is a nested json, and those 100 msg lead to 2M records.

Could you post a few example records from INFORMATION_SCHEMA.PIPELINES_BATCHES for your pipeline, preferably all rows for some batch id? it should provide some clues where the time is being spent.

The source is putting data in one partition only. That could be the reason for slowness. But again this is linked to parsing of nested JSON mentioned in other post. Currently in code just looping through each array header and details for insert.