DISABLE OUT_OF_ORDER OPTIMIZATION and sprocs

I have a kafka pipeline into a stored procedure which is doing an upsert into a table.

I’ve tested my upsert logic repeatedly and it seems sound. No matter what I do, the pipeline ends up inserting duplicate rows instead of correctly following the sproc which would de-duplicate records within a batch.

I found the documentation has a blurb about out-of-order processing “optimization”, which follows:

Unless otherwise specified, Kafka records may be processed out of order if the MemSQL cluster is 1.5 * pipelines_max_partitions_per_batch behind in a single Kafka partition; however, records will be committed in order in all cases. This is an optimization specific to Kafka pipelines and is enabled by default. If you require the records to be processed in order (e.g. in upsert scenarios), alter your pipeline with DISABLE OUT_OF_ORDER OPTIMIZATION specified.

This blurb doesn’t actually make sense to me, partly due to the confusing names of the global settings, but moreover because the error messages I get when I set pipelines_max_partitions_per_batch refers to a number of leaf nodes. (So are the partitions in this case MemSQL nodes and not Kafka partitions? This has all kinds of deeply disturbing implications about what kinds of unwelcome “optimizations” the pipeline engine is actually trying to do here…) 1.5 * the number of leaf nodes is many orders of magnitude lower than the number of offsets in any kafka topic, so it seems like it will always pass this threshold.

Furthermore, the blurb indicates to set the DISABLE OUT_OF_ORDER OPTIMIZATION, but the error message says this doesn’t work on pipelines that go into sprocs. So it seems I’m out of luck here.

I’ve messed with all of the global settings and the pipeline declaration, and the sproc, but memsql is still doing some unwelcome action to unravel the consistency and ordering of the kafka topics. It is clear to me that MemSQL is processing the same kafka partition in parallel at some point. What exactly is MemSQL even doing and how do I make it stop ‘helping’ me?

1 Like

As an errata I should correct that I said something about the max_partitions_per_batch being related to leaf nodes, but I should have said MemSQL partitions. So I have a MemSQL cluster running 16 partitions, reading from a Kafka topic with 2 partitions.

Is MemSQL dumb enough to assign 2 kafka partitions to 16 MemSQL partitions? That really seems like what happens.

1 Like

Great questions! Lets dig into this.

This blurb doesn’t actually make sense to me, partly due to the confusing names of the global settings,

Bug in the docs! We’ve opened a task to fix this, it should say pipelines_max_offsets_per_batch_partition

Furthermore, the blurb indicates to set the DISABLE OUT_OF_ORDER OPTIMIZATION, but the error message says this doesn’t work on pipelines that go into sprocs. So it seems I’m out of luck here.

This is correct. Its because the SP language doesn’t yet have the notion of an ordered QTV, so there is no way to have the argument to your SP be “ordered”. However, there are ways to fix this, and we intend to make more automated ways to fix this in future versions. More on this later.

Is MemSQL dumb enough to assign 2 kafka partitions to 16 MemSQL partitions? That really seems like what happens.

Yes, we are that “dumb”! But only in a backfill scenario, when processing the small number of kafka partitions in parallel could be more efficient. In general, this is controlled by max_partitions_per_batch and can be set in create pipeline or alter pipeline.

What exactly is MemSQL even doing and how do I make it stop ‘helping’ me?

Its hard to tell from this description. Likely when you select from the SP argument, it processes those rows in some order that’s different than what you expect. If you have something to order by, select with an order by clause. Else we can add something to order by.

Do you mind sending me your

  • relevant create tables
  • create pipeline
  • stored procedure
  • output of select * from information_schema.pipelines_batches where batch_id=<batch_id_of_a_batch_that_you_feel_did_something_strange>.

I think there might be a bug but it’s not related to what I posted. The combination of setting MAX_PARTITIONS_PER_BATCH with the rest of the global settings on default on the pipeline removes 10,000s of duplicates from my pipeline. I had already discovered that when I posted. But I still had 116-120 dupes, non-deterministically with those settings, even when running the same exact batch sizes repeatedly. Also, changing the size of the batches made duplicates increase again.

I was really focused on concurrency problems because of the fact that the number of duplicates varied despite the fact that the all the problematic records turned out to be exact binary duplicates in kafka. My SQL looked correct, and functions correctly when I ran it against a full-table dump of the entire kafka topic.

What actually seems to be happening is distinct does not behave deterministically against JSON columns inside the pipeline/sproc. The query is like

select distinct (…fields) from T inner join (select ID, MAX(date) from T group by ID) on id=id and date=date

This works against a table. But after learning about the default way MemSQL handles concurrency and kafka topics, I developed a hunch that MemSQL may not be so great about handling a topic as advanced and sophisicated as javascript. Sure enough, when I remove all the JSON columns from , the distinct works as expected.

To fix it, I will find some other workaround other than using distinct. Or, I expect dumping the sproc input to a temp table will probably make it function like querying a table, (and will let me join on actual incrementing IDs, thank goodness.)

We are also hitting the same issue. MemSQL does not maintain the message order which is a basic thing. Not sure when it is going to be fixed.

Can you explain the fix other than creating a temp table and process in the sequence from there?