Enabling Parallelization in Spark with Partition Pushdown


#1

We have a columnstore table that was created using a query like the one below:
CREATE TABLE key_metrics (
source_id TEXT,
date TEXT,
metric1 FLOAT,
metric2 FLOAT,

SHARD KEY (source_id, date) USING CLUSTERED COLUMNSTORE
);

We have an application that uses Spark (with Spark Job Server) that queries the MemSQL table. Below is a simplified form of the kind of Dataframe operations we are doing in Scala:

sparkSession
.read
.format(“com.memsql.spark.connector”)
.options( Map (“path” -> “dbName.key_metrics”))
.load()
.filter(col(“source_id”).equalTo(“12345678”)
.filter(col(“date”)).isin(Seq(“2019-02-01”, “2019-02-02”, “2019-02-03”))

Through observing the physical plan constructed for the DataFrame, I have confirmed that these filter predicates are indeed being pushed down to MemSQL

It is my understanding that with partition pushdown, during bulk loading, we can leverage parallelism across all the cores of the machines by creating as many spark tasks as there are MemSQL database partitions. However upon running the Spark pipeline and observing the Spark UI, it seems that there is only one spark task that is created which makes a single query to the DB thereby using only a single core.

I have checked that there is a pretty even distribution of the partitions in the table:
±--------------±----------------±-------------±-------±-----------+
| DATABASE_NAME | TABLE_NAME | PARTITION_ID | ROWS | MEMORY_USE |
±--------------±----------------±-------------±-------±-----------+
| dbName | key_metrics | 0 | 784012 | 0 |
| dbName | key_metrics | 1 | 778441 | 0 |
| dbName | key_metrics | 2 | 671606 | 0 |
| dbName | key_metrics | 3 | 748569 | 0 |
| dbName | key_metrics | 4 | 622241 | 0 |
| dbName | key_metrics | 5 | 739029 | 0 |
| dbName | key_metrics | 6 | 955205 | 0 |
| dbName | key_metrics | 7 | 751677 | 0 |
±--------------±----------------±-------------±-------±-----------+

I have made sure that the following properties are set as well:
spark.memsql.disablePartitionPushdown = false
spark.memsql.defaultDatabase = “dbName”

Is my understanding of partition pushdown incorrect? Is there some other configuration that I am missing?

Would appreciate your input on this.

Thanks!
Varun


#2

Hi @varun,

Can you verify that all MemSQL credentials are the same on all nodes per https://docs.memsql.com/memsql-and-spark/v6.7/spark-connector/?

Note that all MemSQL credentials have to be the same on all nodes to take advantage of partition pushdown, which queries leaves directly.

Just want to rule out any perms issues before debugging further.

Thanks,
Dave


#3

Hi Dave,

Thank you for your response.

Currently my dev setup is using only one node and I am relying on the default configuration settings for most except the below two, which I’ve explicitly set:
spark.memsql.disablePartitionPushdown = false
spark.memsql.defaultDatabase = “dbName”

Thanks,
Varun


#4

Hi! We realized there was an incompatibility in MemSQL Spark connector 2.0.6 and MemSQL 6.7+ with regards to partition pushdown. Please try using MemSQL Spark connector 2.0.7 (released today) and let us know if that addresses the problem.


#5

Hi Lucy,

Thanks for your message.

I’ve tried using version 2.0.7 and I am still not seeing any parallelization in Spark when loading the Dataframe.

I have simplified the test code even further to eliminate other variables:

var conf = new SparkConf()
.setAppName(“Read from MemSQL query”)
.set(“spark.memsql.host”, “localhost”)
.set(“spark.memsql.port”, “3306”)
.set(“spark.memsql.defaultDatabase”, “dbName”)
.set(“spark.memsql.disablePartitionPushdown”, “false”)
.setMaster(“local[*]”)

implicit val sparkSession = SparkSession.builder
.config(conf)
.getOrCreate()

val table = “key_metrics”
val df = sparkSession
.read
.format(“com.memsql.spark.connector”)
.options(Map(“query” -> s"SELECT * FROM $table"))
.load()

Thanks,
Varun