Kafka Pipelines - configure kafka consumer

Hi everyone!

I am very eager to try out the Pipelines functionality in MemSQL to load some JSON-formatted data from Kafka.

However, it is quite important to me to be able to configure Kafka consumer roperties. E.g. define group.id, auto.offset.reset and certain other properties for each pipeline.

Could you advise if there is any way to do that?

Regards,
Denis

1 Like

Hello Denis! thanks for trying out MemSQL Pipelines.

MemSQL pipelines does not use group.id or auto.offset.reset, but rather it has its own logic tracking the offsets for each topic. This has a few implications, e.g. only one topic per pipeline, but in general allows MemSQL to retry batches when needed and enforce exactly once semantics.

Could you explain why you need pipelines to use consumer groups? It’s possible that we can find a workable solution.

Hello @m_k!

Thank you for a quick reply.

I guess, if MemSQL manages offsets all by itself, then there is no reason to keep track of the data ingestion process from my end.

Cheers,
Denis

1 Like

Out of curiosity …
Any applications which connects to Kafka has to specify the producer/consumer group name.
I am aware that MemSQL handles this internally however i am curious how this works on Kerberos enabled Kafka brokers. Kerberos requires configuring producer and consumer group names on topic so that applications can connect using consumer name and service account.In MemSQL how would this work with just the service account and no consumer name to configure on kafka topic.

Pipelines is using the simple library client (sometimes called legacy). It sets group.id as null, which makes the broker treat it as a part of the default group. However, MemSQL pipelines does not use the Kafka subscribe() API, but rather reads the available offsets metadata directly, and then calls consume() with precise offsets.

Thanks for the clarification mkobyakov, things are much clear now.

One question, if I create two pipelines with same kafka broker and topic to perform different actions and my kafka has 10 messages. How would this work ?. Would both read 10 messages each or both together will read 10 msg. ( I think, I know the answer but would like to confirm)

Both pipelines would read 10 messages. In this case, pipelines as Kafka consumers are independent of each other.

Hi @m_k

I am running into an issue that I think would be solved if we are able to set the consumer group (group.id) in the Kafka consumer config. Basically, we are trying to use the same topic with multiple pipelines to populate new tables. We are running into an issue of data corruption and lost data because when a new pipeline is started using the same topic, the topic contains the data from the previous table (this is expected) but the new pipeline is starting from the earliest offset (this is causing the corruption and failed batches), not from the most recently committed message (which should just be new table data). This is likely because Single Store is setting a new consumer group for each pipeline creation (my assumption). If we can set a default consumer group for each pipeline, I think we should be able to use one topic and one consumer group to maintain our data integrity. Please let me know if we are able to set the consumer group for a pipeline.

Hello there,

See in my opinion about setting the consumer group (group.id) for each pipeline, I understand the importance of maintaining data integrity across multiple pipelines. However, in Kafka, the consumer group is typically set at the application level rather than per pipeline. This means that each distinct consumer application subscribing to a topic would belong to the same consumer group.

To address your specific use case, where multiple pipelines need to consume data from the same topic but maintain separate offsets, there are a few approaches we can consider:

Partitioning: Ensure that the topic is partitioned appropriately, so each pipeline can consume from its own partition. This way, each pipeline can maintain its own offset within its partition without interfering with other pipelines.

Custom Offset Management: Implement custom offset management within your pipelines. Instead of relying on Kafka’s automatic offset management, you can store and manage offsets externally, allowing each pipeline to start consuming from the last committed offset for its specific purpose.

Topic Naming Convention: Consider using a naming convention for topics that reflects the purpose or target table of each pipeline. This way, you can create separate topics for each pipeline, which can simplify offset management and ensure data isolation.

Dynamic Consumer Group Assignment: Investigate if your Kafka client library supports dynamic consumer group assignment or if there are configuration options that allow more control over consumer group assignment. This might involve exploring different Kafka client libraries or configurations that offer more flexibility in managing consumer groups.

Before proceeding with any changes, it’s essential to evaluate the impact on your existing infrastructure and data processing flow. Additionally, testing these solutions in a staging environment can help ensure they meet your requirements for data integrity and reliability.

Hope this will be helpful.