Query optimization case for Greenplum

Hi all! My name is AndreyI work as a data analyst in the Product Data Team Dialog.X5/Insights at X5 Tech. We provide analytics on sales and purchasing behavior based on X5 Group data. To process large volumes of data, the product uses a Greenplum DBMS (database management system).

Greenplum is a distributed DBMS with a massively parallel architecture (Massive Parallel Processing). It is built on PostgreSQL and is well suited for storing and processing large amounts of data.

In this article we will consider a resource-intensive operation for distributed systemsCOUNT(DISTINCT)and two optimization methods. For a preliminary dive into query plans, you can read here this good article.

List of definitions used in the article:

Segment. Segments in Greenplum are PostgreSQL instances. Each segment is an independent PostgreSQL database where a portion of the data is stored. The segment processes local data, returning the results to the master. The master, in turn, is a server where the main PostgreSQL instance is deployed, to which clients connect by sending SQL queries. The data itself is stored on segment servers.

Data redistribution. An operation in the query plan (Redistribute Motion) where each Greenplum shard re-hashes the data and sends rows to different shards according to the hash key.

Distribution of a table by field/list of fields. Storing the table on different segments of the cluster. A specific segment for storing records is selected based on hashes that are calculated from the specified fields.

Data structure

Check fact table:

fct_receipts (
  receipt_id    - идентификатор чека 
, receipt_dttm  - дата+время чека
, calendar_dk   - числовое представление даты чека например 20240101
, store_id      - идентификатор магазина
, plu_id        - идентификатор товара
)

The table is distributed fairly evenly across the receipt_id field and partitioned across the receipt_dttm field. The data volume is calculated in terabytes.

A little about the nature of the data:

  • receipt_dttm for a receipt is unique;

  • receipt_id refers to only one store;

  • Based on the above statements, the number of receipts metric is additive over time and across store groups.

Request to calculate the number of checks

Let's consider calculating the number of receipts for groups of stores (this can be segmentation by region, chain, etc.) and for groups of goods (this can be segmentation by brand, manufacturer, etc.).

The following list of parameters is supplied to the request input:

  • Period (August 2023 is indicated throughout the article)

  • Parameter table with selected_stores store groups:

  • Parameter table with product groups selected_plu:

Since the parameter tables are small in size relative to the check fact table, the distribution type is selected for the parameter tablesREPLICATED. Tables with distribution REPLICATED are duplicated in full on all segments of the cluster and when connecting to themJOIN happens locally.

A request to calculate the number of receipts by store/product group looks like this:

INSERT INTO receipts_cnt_baskets_draft
SELECT
  sest.store_group_id
, COALESCE(sepl.plu_group_id, 0::INT4) AS plu_group_id
, COUNT(DISTINCT fcre.receipt_id)      AS cnt_baskets
FROM fct_receipts            AS fcre
  INNER JOIN selected_stores AS sest
    USING (store_id)
  INNER JOIN selected_plu    AS sepl
    USING (plu_id)
WHERE 1 = 1
  AND fcre.receipt_dttm >= '2023-08-01 00:00:00'::TIMESTAMP
  AND fcre.receipt_dttm <  '2023-09-01 00:00:00'::TIMESTAMP
GROUP BY
  GROUPING SETS (
    (store_group_id, plu_group_id)
  , (store_group_id              )
  )
;

Some context on request:

  • DISTINCT needed, since different plu_ids of the same group of goods can appear in one receipt.

  • GROUPING SETS used to avoid making multiple calls to the fact check table to calculate different groupings.

Query analysis

We will submit the following parameters to the request input:

5 groups of stores:

Group of stores

Number of stores

1

22287

2

1209

3

1001

4

162

5

14

35 product groups (TOP 5 is given for abbreviation)

Product group

Number of products

1

25702

2

65

3

31

4

27

5

26

Let's look at the query plan built by the optimizer GPORCA:

EXPLAIN ANALYZE
INSERT INTO receipts_cnt_baskets
SELECT
  sest.store_group_id
, COALESCE(sepl.plu_group_id, 0::INT4) AS plu_group_id
, COUNT(DISTINCT fcre.receipt_id)      AS cnt_baskets
-- 1 Часть запроса
FROM fct_receipts            AS fcre
  INNER JOIN selected_stores AS sest
    USING (store_id)
  INNER JOIN selected_plu    AS sepl
    USING (plu_id)
WHERE 1 = 1
  AND fcre.receipt_dttm >= '2023-08-01 00:00:00'::TIMESTAMP
  AND fcre.receipt_dttm <  '2023-09-01 00:00:00'::TIMESTAMP
-- 2 часть запроса
GROUP BY
  GROUPING SETS (
    (store_group_id, plu_group_id)
  , (store_group_id              )
  )
;

Simplified query plan. Comments on plan nodes are numbered.
Reading from bottom to top:

1 Часть плана (Получение данных)

Итого:
Данные подготовлены и лежат на каждом сегменте 
по ключу распределения fct_receipts

Shared Scan (share slice:id 4:0)
 3) Соединения с таблицами-параметрами (JOIN локальный)
->  Hash Join
    Hash Cond: (fct_receipts.plu_id = selected_plu.plu_id)

->  Hash Join
    Hash Cond: (fct_receipts.store_id = selected_stores.store_id)

    2) Выборка 1 партиции согласно условию по датам
->  Partition Selector for fct_receipts
       Partitions selected: 1

    1) Хэширование таблиц параметров
->  Hash
    ->  Seq Scan on selected_stores 
->  Hash 
    ->  Seq Scan on selected_plu
2 Часть плана - расчет COUNT(DISTINCT receipt_id)

    Объединение результатов
->  Append

Ключ группировки (store_group_id)
    3) COUNT(receipt_id)
    ->  HashAggregate
      Group Key: share0_ref2.store_group_id
      
    2)  DISTINCT ключ группировки + receipt_id
    ->  HashAggregate
        Group Key: share0_ref2.store_group_id, share0_ref2.receipt_id
          
      1) Перераспределение данных по ключу группировки
      ->  Redistribute Motion
          Hash Key: share0_ref2.store_group_id
		  Считывание данных из 1 части плана
            ->  Shared Scan (share slice:id 1:0)

  
Ключ группировки (store_group_id, plu_group_id)  
  3) COUNT(receipt_id)
  ->  HashAggregate
      Group Key: share0_ref3.store_group_id, share0_ref3.plu_group_id
      
     2) DISTINCT ключ группировки + receipt_id
     ->  HashAggregate
          Group Key: share0_ref3.store_group_id, share0_ref3.plu_group_id, share0_ref3.receipt_id
        
        1) Перераспределение данных по ключу группировки
        ->  Redistribute Motion
            Hash Key: share0_ref3.store_group_id, share0_ref3.plu_group_id
            Считывание данных из 1 части плана
            ->  Shared Scan (share slice:id 2:0) 

In total, judging by the query plan, the calculation of the number of checks is performed in three steps:

  1. Redistribution of data by grouping key.

  2. DISTINCT grouping key + receipt_id.

  3. COUNT(receipt_id).

The product groups and store groups sent to the request are clearly not uniform. After redistributing the data (step 1), there may be too much data on one or more segments, a so-called skew will occur. Accordingly, some segments will be more loaded, and the execution of the request will depend on data processing on these segments.

To see how many rows a segment received or how many rows a segment processed, you can enable the optionSET gp_enable_explain_allstat = ON; beforeEXPLAIN ANALYZE.

Then additional information will appear in the plan under each node:

By simple parsing you can get a list of segments. In list items, the last value will be equal to the number of rows processed by the segment.

Part of the list of segments is given:

The grouping key is distributed over 58 segments; a clear imbalance is visible in one of the segments. Segment 179 received about 269 million lines, and segment 129 received about 31 million. Segment 179 received 9 times more lines, and when compared with other segments, the difference will be even more noticeable.

The above query runs for about one minute over a period of 1 month, depending on the load on the cluster.

Query optimization

Let's consider a couple of options for optimizing such a query.

Option 1. Using the parameter.

For the current version of our cluster, the parameter optimizer_force_multistage_agg installed in off. The value may change from version to version. To view the parameter value, you can use the command:

SHOW optimizer_force_multistage_agg;

The documentation says that this parameter tells the optimizer GPORCA to choose a multi-stage aggregate plan for operations like COUNT(DISTINCT).
When this value is off (default), the optimizer GPORCA chooses between one-stage and two-stage aggregate plans depending on cost SQL-request.

Enable the parameter SET optimizer_force_multistage_agg = on;
We tell the optimizer to choose a two-stage aggregate plan.

Plan based on the example of a grouping key (store_group_id, plu_group_id):

Ключ группировки (year_granularity, store_group_id, plu_group_id)

4) COUNT(receipt_id)
->  HashAggregate
    Group Key: share0_ref3.store_group_id, share0_ref3.plu_group_id
	
    3) Перераспределение данных по ключу группировки
    ->  Redistribute Motion 
        Hash Key: share0_ref3.store_group_id, share0_ref3.plu_group_id
		
        2) DISTINCT ключ группировки + receipt_id
        ->  HashAggregate
            Group Key: share0_ref3.store_group_id, share0_ref3.plu_group_id, share0_ref3.receipt_id
			
	      1) Перераспределение данных по ключу группировки + receipt_id, receipt_id
            ->  Redistribute Motion
		        Hash Key: share0_ref3.store_group_id, share0_ref3.plu_group_id, share0_ref3.receipt_id, share0_ref3.receipt_id
                ->  Shared Scan (share slice:id 3:0)

In this plan, calculating the number of checks is performed in four steps:

  1. Redistribution by grouping key + receipt_id.
    This reduces skew since the number of unique receipt_id values ​​is large.

  2. DISTINCT by grouping key + receipt_id.
    This reduces the amount of data for the next redistribution statement.

  3. Redistribution by grouping key.

  4. COUNT(receipt_id).

In this regard, the two-stage approach is expressed in steps 1, 2. Additional redistribution andDISTINCTby grouping key + receipt_id.

The above query runs 3.5 to 4.5 times faster, depending on the load on the cluster.

You should not enable the parameter for the entire database – this may change the behavior of other queries. Locally at the session level, you can speed up a problematic request, and then return the parameter value to its original state with the commandRESET.

Conclusion

When using the parameter (hint), an additional redistribution appears with a more optimal distribution key, and all segments of the cluster begin to participate in the calculation. This increases the network load in the cluster, but significantly reduces the query execution time.

Option 2. Algorithmic,grouping key expansion.

The number of checks metric is additive in time. You can count the number of receipts by day and then do additional aggregation. Adding a day to a grouping key will increase the number of grouping keys up to 30 times. If the uniformity of the data in the key is still not enough, you can use other table fields that correspond to the additivity of the number of receipts metric.

Rewritten request:

INSERT INTO receipts_cnt_baskets
  WITH draft AS (
    SELECT
      sest.store_group_id
    , fcre.calendar_dk
    , COALESCE(sepl.plu_group_id, 0::INT4) AS plu_group_id
    , COUNT(DISTINCT fcre.receipt_id)      AS cnt_baskets
    FROM fct_receipts            AS fcre
      INNER JOIN selected_stores AS sest
        USING (store_id)
      INNER JOIN selected_plu    AS sepl
        USING (plu_id)
    WHERE 1 = 1
      AND fcre.receipt_dttm >= '2023-08-01 00:00:00'::TIMESTAMP
      AND fcre.receipt_dttm <= '2023-09-01 00:00:00'::TIMESTAMP
   GROUP BY
     GROUPING SETS (
       (store_group_id, calendar_dk, plu_group_id)
     , (store_group_id, calendar_dk              )
  )
)
SELECT
  store_group_id
, plu_group_id
, SUM(cnt_baskets)
FROM draft
GROUP BY
  store_group_id
, plu_group_id
;

For this query, the optimizer chose a plan, as at the beginning of the article (using the example of a grouping key (store_group_id, calendar_dk, plu_group_id)):

3) COUNT(receipt_id)
->  HashAggregate
    Group Key: share1_ref3.store_group_id, share1_ref3.calendar_dk, share1_ref3.plu_group_id

    2) DISTINCT ключ группировки + receipt_id
    ->  HashAggregate
        Group Key: share1_ref3.store_group_id,
share1_ref3.calendar_dk, share1_ref3.plu_group_id, share1_ref3.receipt_id

        1) Перераспределение данных по ключу группировки
        ->  Redistribute Motion
            Hash Key: share1_ref3.store_group_id, share1_ref3.calendar_dk, share1_ref3.plu_group_id
            ->  Shared Scan (share slice:id 2:1)

There is no significant skew, since the extended grouping key uses all segments of the cluster, and the load becomes more uniform.

This query runs 7 to 9 times faster compared to the original query without a hint, depending on the load on the cluster.

If we assume that each redistribution operator moves 100% of the rows, then in this request there is less data redistribution in comparison with a request with a hint, and the network load is reduced by up to two times.

Conclusion

By expanding the grouping key due to the additivity of the “number of receipts” metric over time, we reduce the skew in the data in the grouping key and use all segments of the cluster for calculations.

Results

  • When developing, it is important to understand the nature of the data. A good algorithm is in most cases better than using optimizer parameters.

  • If the grouping key differs from the table distribution key, then the grouping operation leads to data redistribution on the cluster. Grouping key data may be skewed, and it is important to be able to diagnose such cases.

  • A small number of grouping keys leads to incomplete use of cluster segments. By expanding the grouping key, you can increase the use of segments to reduce calculation time

We dived a little into the world of Greenplum and looked at how the DBMS executes queries. We learned about the imbalance and methods to combat this phenomenon. I hope it was useful and interesting.

I want to thank you for your advice Daniil Nedumova and for help in preparing the article – Anton Denisova.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *