Error creating pipeline to Confluent cloud using sasl

Hi,

I’m using the confluent cloud kafka and I want to test a pipeline.

I have deployed memsql in aws using cloud-formation

I’m using the following query to create the pipeline:

CREATE OR REPLACE PIPELINE kafka_confluent_cloud

AS LOAD DATA KAFKA ‘my-conflunt-cloud:9092/test’

CONFIG '{“ssl.endpoint.identification.algorithm”: “https”,

“sasl.mechanism”: “PLAIN”,

“request.timeout.ms”: “20000”,

“retry.backoff.ms”: “500”,

“sasl.jaas.config”: “org.apache.kafka.common.security.plain.PlainLoginModule required username=\“some-user\” password=\“some-password\”;”,

“security.protocol”: “SASL_SSL” }’

INTO table items;

I get the following error:
11:04:58: ERROR 1933 ER_EXTRACTOR_EXTRACTOR_GET_LATEST_OFFSETS: Cannot get source metadata for pipeline kafka_confluent_cloud. Stderr:
2019-07-02 08:04:58.401 Waiting for next batch, 255 batches left until expiration.
2019-07-02 08:04:58.402 Batch starting with new consumer.

Are there more logs that I can start understanding from where the problem is happening?

I have created a simple kafka client on my laptop with the same parameters, and everything is working fine.

Thanks,

Hello yohay! thanks for using MemSQL pipelines.

At this time pipelines will not accept Java-style/JAAS configs for Kafka clients, but you can still connect to Confluent Cloud using configuration meant for C clients.

Here’s a link to the relevant Confluent Cloud documentation. https://docs.confluent.io/current/cloud/using/config-client.html#librdkafka-based-c-clients

Step 6 has the config variables that you want, and you will want to provide them according to our documentation and examples at SingleStoreDB Cloud · SingleStore Documentation

Thank you!

This is the configuration I’m trying to use:

use test1;
CREATE OR REPLACE PIPELINE `kafka_confluent_cloud`
AS LOAD DATA KAFKA 'xxxxx.aws.confluent.cloud:9092/test'
CONFIG '{
"api.version.request": "true",
"broker.version.fallback": "0.10.0.0",
"api.version.fallback.ms": "0",
"sasl.mechanisms": "PLAIN",
"security.protocol": "SASL_SSL",
"ssl.ca.location": "/usr/local/etc/openssl/cert.pem",
"sasl.username": "key",
"sasl.password": "secret"
}'
INTO table items;

According to confluent web site:

Confluent Cloud certificates are issued by large CAs (Digicert etc.) that are installed as standard in most modern operating systems. As such ssl.ca.location is typically not required

So I have tried to run the same configuration without the ssl.ca.location. But still I get the same error.

Any assistance will be appreciated.

Typically kafka clients scan the PEM files in /usr/local/etc/openssl/ to find the correct CA for every connection. However, for MemSQL pipelines this is not automatic. You will still need to provide the correct ssl.ca.location config.

If you are having trouble finding the correct CA in that folder, the pre-installed PEM files can be concatenated together into a single file, but keep in mind that the file needs to be present at a consistent location on every host in your MemSQL cluster.