Bootstrap in PySpark

  • Our product currently runs on PySpark 2.4.4. Some teams have already moved to the Pyspark 3.x + Hadoop 3 combination, but this is only in our plans for now.

  • We write our productive code using our own framework PySparkPipeline. Within the team we take an approach where preferably (if possible) so that within one process there are only lazy calculations (that is, there is no launch of the calculations themselves). As a result, the Spark DataFrame is returned, that is, exactly the calculation scheme, and the only action called is writing to the database.

    This approach has its pros and cons, but we will not delve into them within the scope of this article. Let's just say that this approach has taken root well in our product.

Data Description

To better understand the approaches described below, it is worth briefly dwelling on the description of the structure of the data on which the calculations are performed.

Before calculating the effect for campaigns, it is necessary to first collect statistics on participants in a dataframe, which is the input of each of the approaches described below.

Below is a stripped down and slightly transformed example of the data we use.

Field descriptions:

  • camp_id: campaign identifier;

  • control_group_flg: flag that the client belongs to the campaign control group;

  • revenue: revenue that the client brought to us during the campaign;

  • margin: the margin that the client brought to us during the campaign;

  • conversion: flag that the client responded to the offer.

What are the options?

So, after searching for solutions on the Internet, I realized that when a similar problem arose, people resorted to optimization by using different statistical criteria for different metrics and samples. I really wanted to have one universal tool that I could use automatically on a daily basis and not think about anything else.

1. Bootstrap in UDF

The very first option that came to mind was to write a UDF that would be used for grouping. Thus, if, for example, we have 100 workers allocated, then we can calculate 100 campaigns in parallel.

Initially, it was clear that this option was unlikely to be suitable due to the fact that our campaigns could include either a thousand clients or tens of millions. At the same time, neither we nor Spark, naturally, know which campaign will go to which worker. It turns out that if at least one share requires a large amount of memory, then all our workers should be able to accommodate such a volume. But such an allocation of resources cannot be called optimal.

This problem, in theory, could be solved by combining small campaigns into batches so that at least a relatively optimal amount of data gets to the worker, but this would increase the execution time and would not solve the problem of campaigns with a large number of participants.

Despite these assumptions, I had never encountered UDFs used in grouping, so I was interested in giving it a try.

Code It turned out to be relatively simple, essentially I just added a schema definition and reformatted it a little to work with Pandas instead of NumPy:

# Список ключевых столбцов
KEY_COLS = ['camp_id']
# Список метрик, по которым считаем статистику
STATISTICS_COLS = ['revenue', 'margin', 'conversion']
ALPHA = 0.05
BS_ITERS = 5000

# Определяем схему дф, который будет возвращаться из pandas_udf,
schema = T.StructType(
    # Ключевые поля
    [T.StructField(i, T.StringType()) for i in KEY_COLS]
    +
    # Схема для левой и правой границ доверительного интервала всех метрик
    [T.StructField(metric, T.ArrayType(T.DoubleType()))
     for metric in STATISTICS_COLS]
)

# Создаем udf для расчета статистик по нашей схеме
@F.pandas_udf(schema, functionType=F.PandasUDFType.GROUPED_MAP)
def bs_on_executor(pdf: pd.DataFrame) -> pd.DataFrame:
    """Функция расчета доверительного интервала для нескольких метрик"""

    # На каждом воркере выставляем переменную ARROW_PRE_0_15_IPC_FORMAT
    # чтобы на нашей версии PySpark нормально работал более новый PyArrow
    os.environ['ARROW_PRE_0_15_IPC_FORMAT'] = '1'
    # получаем значение KEY_COLS для того, чтобы их вернуть в ответе
    keys = pdf[KEY_COLS].iloc[0].tolist()

    # контрольная группа
    a = (pdf.loc[pdf['control_group_flg'] == 1]
         .reset_index(drop=True)[STATISTICS_COLS])
    # пилотная группа
    b = (pdf.loc[pdf['control_group_flg'] == 0]
         .reset_index(drop=True)[STATISTICS_COLS])
    len_a = len(a)
    len_b = len(b)
    
    diff_list = []
    # непосредственно само бутстрапирование
    for _ in range(BS_ITERS):
        a_boot = a.sample(len_a, replace=True).mean()
        b_boot = b.sample(len_b, replace=True).mean()
        diff_list.append(b_boot - a_boot)
    bs_result = pd.concat(diff_list)
    
    # расчет доверительного интервала
    ci_res = bs_result.groupby(bs_result.index).quantile([ALPHA/2, 1-ALPHA/2])

    return pd.DataFrame(
        [keys + [ci_res[metric].tolist() for metric in STATISTICS_COLS]])


# Сама группировка и вызов бутстрапа
result = statistics_df.groupBy(*KEY_COLS).apply(bs_on_executor)

And for small campaigns it worked quite quickly and well.

But as soon as I tried to test it on a campaign with two million clients, I immediately got the error “org.apache.arrow.vector.util.OversizedAllocationException: Unable to expand the buffer

The reason is not that there is not enough memory on the worker, but that in the older version of PyArrow, which works with our version of Spark, no more than 2GB of uncompressed data can be transferred to the UDF Link to bug in Jira Apache.

Accordingly, after checking, this option will definitely be rejected.

2. Naive approach (explode in Spark)

The next approach, which, in the end, in a slightly modified form (see the “Final version” section below) is used now, is the “naive approach”. The basic concept is as follows:

  1. Let's select from the original dataframe the fields with the campaign identifier and the control group flag (keeping the original number of rows).

  2. We copy it N times, storing the copy number in the “Iteration number” column. N is the desired number of bootstrap iterations (for us it is 5000).

  3. For each row of the resulting dataframe, we need to randomly select a record from the original one, preserving the group affiliation (meaning that the campaign ID and the control group flag must match).

  4. We group by campaign ID, group and iteration number and calculate the metric values.

We immediately face the problem of finding an approach to data sampling. Dataframes have a method samplebut with its help you cannot get a dataframe larger than the original one, so you will have to perform this operation N times in a loop, which Spark will not like very much…

I decided to go the following way:

  1. Go through the window function through all the rows within the group and assign a sequence number, using sorting, which will always return the same result (essentially sorted by the internal user ID).

  2. For each line from our dataframe with statistics, we generate an array with a sequence of numbers from 1 to N and do explode. From these arrays we get the iteration number, and for each iteration we store the number of rows corresponding to the number of rows in the original dataframe.

  3. Next, for each line we enter a random number from 1 to , which will correspond to the serial number from step 1.

  4. Thus, we received a dataframe with N iterations and a list of identifiers that fell into a specific iteration.

  5. Then all that remains is to connect it back to the dataframe from step 1.

Thus, we access the source with customer data in the campaign 2 times. Therefore, it is better to either cache it in advance or write it to the database.

It could be optimized by creating a bootstrap dataframe without using the original data, but simply creating a dataframe with the required number of rows, depending on the number of clients in each campaign. However, in this case it would be necessary to store the group keys and the number of rows in each group. And this no longer corresponds to the PySparkPipeline concept of the absence of intermediate calculations. And there won’t be much optimization there.

Respectively, code can be represented as follows:

# Список ключевых столбцов групп
KEY_COLS = ['camp_id', 'control_group_flg']
# Список метрик, по которым считаем статистику
STATISTICS_COLS = ['revenue', 'margin', 'conversion']
# Количество бутстрап операций
BS_ITERS = 5000

# Рассчитываем количество строк в группе и порядковый номер
window_for_bs_stats = Window.partitionBy(*KEY_COLS)
statistics_df = (
    statistics_df
    .withColumn('rn',
                F.row_number().over(window_for_bs_stats.orderBy('user_id')))
    .withColumn('group_cnt', F.count(F.lit(1)).over(window_for_bs_stats))
    # repartition нужен для того, чтобы дальнейшие действия по разворачиванию
    # происходили на всех воркерах, а не ограниченном количестве,
    # соответствующем количеству партиций из window_for_bs_stats
    .repartition('rn')
).cache()

iterations_df = (
    statistics_df
    # Для каждой записи в нашем датафрейме генерируем количество строк = bs_iter_cnt
    # Порядковый номер строки будет являться номером итерации
    .select(KEY_COLS + ['group_cnt'])
    .withColumn('iter_num', F.explode(F.sequence(F.lit(1), F.lit(BS_ITERS))))
    # Для каждой строки в итерации случайно выбираем номер записи 
    # из изначального датафрейма
    .withColumn('rn', 
                (F.floor(F.rand() * (F.col('group_cnt'))) + 1).cast('int')
                .alias('rn'))
    .select(KEY_COLS + ['iter_num', 'rn'])
)

# Соединяем бутстрап датафрейм с изначальным по случайному идентификатору строки
result_df = (
    bs_df
    .join(
        statistics_df.select(KEY_COLS + STATISTICS_COLS + ['rn']),
        on=KEY_COLS + ['rn'],
        how='inner'
    )
    # Группируем по ключевым полям и номеру итерации
    .groupby(KEY_COLS + ['iter_num'])
    .agg(*[F.avg(stat_col).alias(stat_col) for stat_col in STATISTICS_COLS])
)

Well, then we group by campaign ID and calculate the difference between the pilot and target groups and calculate the necessary percentiles to obtain a confidence interval.

This approach works quite well and is quite fast. 50 million rows (that is, 20-30 campaigns) are calculated in 30-40 minutes on a free cluster.

But there is one significant drawback: at a certain point in the calculation, the result is 50 million * 5000 rows. This is quite a large amount, so you need to allocate a sufficient amount of memory to workers, which will depend on the number of metrics and additional fields that you use.

3. Poisson or Binomial bootstrap

At the same time that I was thinking about how to make a naive bootstrap, at the weekly analyst meetings, colleagues from another product talked about their approach to the bootstrap that they had recently developed.

They used Poisson bootstrap. What it is is well explained in this video: How I Stopped Worrying and Loved Poisson-Bootstrap | Webinar by Valery Babushkin | karpov.courses

The basic concept is that instead of sampling test clients, for each client we “sample” its appearance in each iteration of the bootstrap. The probability of a client appearing in each bootstrap iteration is described by a binomial distribution, where n = the number of clients in the campaign, and p = 1/n.

Accordingly, for each client we can generate a distribution of the number of occurrences of this string in each iteration of the bootstrap. Thus, we get a matrix where the columns are the iteration numbers, and the rows are the number of occurrences of each record of each iteration.

Moreover, with a sufficiently large number of objects (more than 100), the Binomial distribution begins to converge to the Poisson distribution with lambda = 1.

Once the distributions begin to converge, we can use the Poisson distribution, which does not require us to know the number of customers in each campaign. Regarding the naive approach, from a performance point of view, the difference is that the extra join is removed, and all calculations occur within one dataframe.

In the final form, the implementation can be represented as follows way:

# Список ключевых столбцов групп
KEY_COLS = ['camp_id', 'control_group_flg']
# Список метрик, по которым считаем статистику
STATISTICS_COLS = ['revenue', 'margin', 'conversion']
# Количество бутстрап операций
BS_ITERS = 5000

# udf для создания массива количества появлений строки в каждой итерации
pois_udf = F.udf(lambda size: np.random.poisson(1, size).tolist(), 
                 T.ArrayType(T.IntegerType()))

result_df = (
    statistics_df
    .withColumn('iter_cnt', F.lit(BS_ITERS))
    # создаем массив с количеством вхождений строки в каждую итерацию
    .withColumn('poisson_array', pois_udf(F.col('iter_cnt')))
    # делаем posexplode, сохраняя порядковый номер итерации и количество вхождений
    .select(
        KEY_COLS 
        + [F.posexplode('poisson_array').alias('iter_num', 'poisson')] 
        + STATISTICS_COLS
    )
    # Убираем лишние строки, которые не участвуют в итерации бутстрапа
    .filter(F.col('poisson') != 0)
    # Группируем по ключевым полям и номеру итерации
    .groupBy(KEY_COLS  + ['iter_num'])
    # Считаем сумму метрик и количество клиентов в каждой итерации
    .agg(*(
        [F.sum(F.col('poisson') * F.col(stat_col)).alias(stat_col) 
         for stat_col in STATISTICS_COLS] 
        + 
        [F.sum(F.col('poisson')).alias('total_cnt')]
    ))
)

# Рассчитываем значение средних для каждой итерации
for stat_col in STATISTICS_COLS:
    result_df = (result_df
                 .withColumn(stat_col, F.col(stat_col) / F.col('total_cnt')))

Disadvantages of this approach:

  • The problem of large amounts of data remains, since at some point the rows are still multiplied by the number of bootstrap iterations.

  • A controversial disadvantage is that on small numbers of records, due to the fact that the number of rows in each iteration of the bootstrap is different, the approach can give a large variance if the number of rows does not exceed several hundred.

Final version

Since the Poisson or Binomial bootstrap still have at least a small minus in the accuracy of estimates on small samples, I decided to use a naive bootstrap, but add to it one of my colleagues’ ideas – an iterative approach.

The essence of the iterative approach is that N iterations are not generated at once, but they are divided into several buckets (for example, 50), respectively, in each bucket the number of lines is multiplied not by 5000, but only by 100. A separate calculation stage is created for each bucket , that is, their number and the number of tasks increases, but at the same time, Spark optimization works in such a way that some are executed in parallel, and some are executed sequentially, depending on the available resources that are allocated for the current session.

Thus, it is possible to allocate fewer resources to operate the process. An example of the difference in execution plans is given below. Since after the dataframe is expanded, a join occurs at the iteration, the memory load can be quite estimated by the volume of the Shuffle:

Bootstrap with one iteration:

Bootstrap with one iteration:

Bootstrap with two iterations - the amount of data at each stage has decreased, accordingly, at one point in time there is less data on the workers:

Bootstrap with two iterations – the amount of data at each stage has decreased, accordingly, at one point in time there is less data on the workers:

The code for the final approach looks like this way:

# Список ключевых столбцов групп
KEY_COLS = ['camp_id', 'control_group_flg']
# Список метрик, по которым считаем статистику
STATISTICS_COLS = ['revenue', 'margin', 'conversion']
# Количество бутстрап операций
BS_ITERS = 5000
# количество итераций внутри одного батча
BS_BATCH_ITERS = 100

def naive_bs_batch(sdf: DataFrame, batch_num: int, iter_cnt: int) -> DataFrame:
    """Функция расчета одного батча бутстрапа"""
    iterations_df = (
        sdf
        # Для каждой записи в нашем датафрейме генерим количество строк = iter_cnt
        # И проставляем реальный номер итерации
        .select(KEY_COLS + ['group_cnt'])
        .withColumn('iter_num',
                    F.explode(F.sequence(F.lit(batch_num * iter_cnt + 1), 
                    F.lit((batch_num + 1) * iter_cnt))))
        # Для каждой записи итерации случайно выбираем номер записи
        # из изначального датафрейма
        .withColumn('rn', 
                    (F.floor(F.rand() * (F.col('group_cnt'))) + 1)
                    .cast('int').alias('rn'))
        .select(KEY_COLS + ['iter_num', 'rn'])
    )

    # Соединяем бутстрап датафрейм с изначальным по случайному идентификатору строки
    batch_result = (
        iterations_df
        .join(
            sdf.select(KEY_COLS + STATISTICS_COLS + ['rn']),
            on=KEY_COLS + ['rn'],
            how='inner'
        )
        # Группируем по ключевым полям и номеру итерации бутстрапа 
        # и считаем статистики для всех итераций
        .groupby(KEY_COLS + ['iter_num'])
        .agg(*[F.avg(stat_col).alias(stat_col) 
               for stat_col in STATISTICS_COLS])
        # Без coalesce у нас получается 
        # количество партиций = кол-во shuffle.partitions * batch_iter_amt
        # Из-за этого очень долго может работать union
        .coalesce(1)
    )
    return batch_result

# количество батчей
batches_cnt = BS_ITERS // BS_BATCH_ITERS

# Рассчитываем количество строк в группе и порядковый номер
window_for_bs_stats = Window.partitionBy(*KEY_COLS)
statistics_df = (
    statistics_df
    .withColumn('rn',
                F.row_number().over(window_for_bs_stats.orderBy('user_id')))
    .withColumn('group_cnt', F.count(F.lit(1)).over(window_for_bs_stats))
    .repartition('rn')
).cache()

# Считаем каждый батч и делаем union результатов
result_df = reduce(
    lambda x, y: x.union(y),
    [naive_bs_batch(sdf=statistics_df, batch_num=i, iter_cnt=BS_BATCH_ITERS) 
     for i in range(batches_cnt)]
)

Conclusion

Bootstrap in PySpark is possible and easy to implement. All three described approaches will work under certain conditions, and quite stably.

  • If you have small campaigns and PySpark version above 3, then it is quite possible to use the UDF calculation approach, especially since it is simply more common to write such processing in Python.

  • If you have consistently large campaigns, then you should use the Poisson/Binomial bootstrap.

  • If the size varies greatly from campaign to campaign, then for myself I took a naive approach.

Moreover, for the last two approaches, with a sufficiently large amount of data, iterative optimization can be applied (as in the final version) to increase stability and reduce the amount of data consumed at the moment.

We run this process on a daily basis at night when the cluster is free. The entire calculation, including collection of units, takes from 30 minutes to 1.5 hours. The calculation results are translated into a dashboard with a list of campaigns and calculated metrics.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *