OrderedColumnStoreScan segment scanning with CTEs

I’m seeing some unexpected behavior in the number of segments scanned when a query is in a CTE vs. when it’s not in a CTE. I have a query structured like so:

with
filtered_facts as (
  select
    sum(value) as value,
    epoch as epoch,f.metric_id as metric_id,f.app_id as app_id
  from facts_app_metrics f
  where f.metric_id in (...) and epoch = ... and f.app_id = ...
  group by epoch,f.metric_id,f.app_id
)
select
  sum(value)
from filtered_facts;

This is a much more simplified version of what I’m trying to do, but it demonstrates the issue. This query has a profile like so:

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| PROFILE                                                                                                                                                                                                                                                                                                                                                                                                                                              |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Project [`sum(value)`] actual_rows: 1 exec_time: 0ms start_time: 00:00:00.089 network_traffic: 0.014000 KB network_time: 0ms                                                                                                                                                                                                                                                                                                                         |
| Aggregate [SUM(remote_0.`sum(value)`) AS `sum(value)`] actual_rows: 48 exec_time: 0ms start_time: 00:00:00.024                                                                                                                                                                                                                                                                                                                                       |
| Gather partitions:all est_rows:1 alias:remote_0 actual_rows: 48 exec_time: 0ms start_time: 00:00:00.089 end_time: 00:00:00.089                                                                                                                                                                                                                                                                                                                       |
| Project [`sum(value)`] est_rows:1 actual_rows: 48 exec_time: 0ms start_time: [00:00:00.023, 00:00:00.087] network_traffic: 0.057000 KB network_time: 0ms                                                                                                                                                                                                                                                                                             |
| Aggregate [SUM(filtered_facts.value) AS `sum(value)`] actual_rows: 12 exec_time: 0ms start_time: 00:00:00.053                                                                                                                                                                                                                                                                                                                                        |
| TableScan 0tmp AS filtered_facts storage:list stream:yes est_table_rows:62,371,410  exec_time: 0ms                                                                                                                                                                                                                                                                                                                                                   |
| Project [value] est_rows:62,371,410 actual_rows: 12 exec_time: 0ms start_time: 00:00:00.053                                                                                                                                                                                                                                                                                                                                                          |
| StreamingGroupBy [SUM(f.value) AS value] groups:[f.metric_id] actual_rows: 12 exec_time: 0ms start_time: 00:00:00.053 memory_usage: 0.000000 KB                                                                                                                                                                                                                                                                                                      |
| Filter [f.metric_id IN (...) AND f.epoch = ? AND f.app_id = ?] actual_rows: 1,956 exec_time: 2ms start_time: 00:00:00.051                                                                                                                                                                                                                                                                                                                            |
|  [actual_rows: 1,956 | max:1,956 at partition_40, average: 40.750000, std dev: 279.367925]                                                                                                                                                                                                                                                                                                                                                           |
| OrderedColumnStoreScan report_service.facts_app_metrics AS f, KEY app_id (app_id, epoch, metric_id, device_country_id, store_id) USING CLUSTERED COLUMNSTORE WITH(COLUMNSTORE_SEGMENT_ROWS=102400) est_table_rows:24,948,564,112 est_filtered:62,371,411 actual_rows: 14,745,600 exec_time: 83ms start_time: [00:00:00.001, 00:00:00.026] memory_usage: 37,748.734375 KB segments_scanned: 144 segments_skipped: 256,195 segments_fully_contained: 0 |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

If I were to extract the CTE into its own query:

select
  sum(value) as value,
  epoch as epoch,f.metric_id as metric_id,f.app_id as app_id
from facts_app_metrics f
where f.metric_id in (...) and epoch = ... and f.app_id = ...
group by epoch,f.metric_id,f.app_id;

…you’ll notice it actually scans fewer rows:

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| PROFILE                                                                                                                                                                                                                                                                                                                                                                                                                    |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Gather partitions:single alias:remote_0 actual_rows: 12 exec_time: 0ms                                                                                                                                                                                                                                                                                                                                                     |
| Project [value, f.epoch, f.metric_id, f.app_id] actual_rows: 12 exec_time: 0ms start_time: 00:00:00.018 network_traffic: 0.403000 KB network_time: 0ms                                                                                                                                                                                                                                                                     |
| StreamingGroupBy [SUM(f.value) AS value] groups:[f.metric_id] actual_rows: 12 exec_time: 1ms start_time: 00:00:00.018 memory_usage: 0.000000 KB                                                                                                                                                                                                                                                                            |
| Filter [f.metric_id IN (...) AND f.epoch = ? AND f.app_id = ?] actual_rows: 1,956 exec_time: 0ms start_time: 00:00:00.016                                                                                                                                                                                                                                                                                                  |
| OrderedColumnStoreScan report_service.facts_app_metrics AS f, KEY app_id (app_id, epoch, metric_id, device_country_id, store_id) USING CLUSTERED COLUMNSTORE WITH(COLUMNSTORE_SEGMENT_ROWS=102400) est_table_rows:24,948,564,112 est_filtered:62,371,411 actual_rows: 307,200 exec_time: 17ms start_time: 00:00:00.001 memory_usage: 786.432007 KB segments_scanned: 3 segments_skipped: 5,335 segments_fully_contained: 0 |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

That seems to indicate a difference of 144 segments being scanned when used in a CTE vs. only 3 segments when not used in a CTE. Am I reading that correctly?

Is there anything we can do in our queries to reduce the segments being scanned when this type of query is used in a CTE?

Thanks!

Actually I wonder if the profile is just being written differently. I noticed that the segments scanned is 3 of 5335 vs. 144 of 256,195 which is a 48x difference – and in one case it says Gather partitions:single vs. Gather partitions:all. I think I may have answered my question :slight_smile:

Thinking about this more – I suppose it still leaves the question why the query needs to gather from all partitions instead of a single partition when the query is in the CTE.

Yes, you are correct, and that appears to be the root question here. What is the shard key on this table? Can you share the DDL?

As a crude workaround, consider putting a LIMIT 1000000000 on the CTE to limit rewrites etc. affecting how it is processed.

Also, take a look at the visual profile in Studio to help understand more easily what is going on.

Sure! The DDL for this table is:

CREATE TABLE `facts_app_metrics` (
  `app_id` bigint(20) NOT NULL,
  `device_country_id` bigint(20) NOT NULL,
  `metric_id` bigint(20) NOT NULL,
  `store_id` bigint(20) NOT NULL,
  `epoch` bigint(20) NOT NULL,
  `value` decimal(20,5) NOT NULL,
  KEY `app_id` (`app_id`,`epoch`,`metric_id`,`device_country_id`,`store_id`) /*!90619 USING CLUSTERED COLUMNSTORE */ /*!90621 WITH(COLUMNSTORE_SEGMENT_ROWS=102400) */,
  /*!90618 SHARD */ KEY `foreign_key_shard_key` (`app_id`,`epoch`)
) /*!90621 AUTOSTATS_ENABLED=TRUE */

Interesting! I added the LIMIT and that did indeed make a significant difference in the query profile. For the example I wrote in the first post, it brings the performance of the CTE query right in line with the non-CTE query.

I’ll definitely take a look at the visual profile to get a better idea of what’s going on. Thank you!