Python code generation for the benefit of ETL – part 2

Hey! In the first part, I shared my thoughts that prompted the creation of a python library convtools… Briefly about it: provides primitives (conversions), combining which, you can describe complex conversions for data processing. Conversion generates highly specialized code, compiles it and returns a function that solves a specific problem.

This time I would like to dwell on two points in more detail:

  • how pipe allows you to increase the reusability of the code

  • new part of the library: Table – streaming table data processor

1) Pipes & code reuse

I hope that reading the code with comments is more convenient than the tongue-tied narration torn from it, so let’s start:

from datetime import datetime
from decimal import Decimal

from convtools import conversion as c


# Допустим нам нужно прочесть некий отчет и поработать с ним.

# описываем конверсии, необходимые при чтении отчета
# сохраним то, как будут обрабатываться все входные строки
c_str = c.this().call_method("strip")
# то, как из обработанной строки получить Decimal (убиваем запятые -
# разделители групп разрядов)
c_decimal = c_str.call_method("replace", ",", "").as_type(Decimal)
c_parse_date = c.call_func(
    datetime.strptime, c.this(), "%Y-%m-%d"
).call_method("date")
c_date = c_str.pipe(c_parse_date)

c_optional_date = c_str.pipe(c.if_(c.this(), с_parse_date, None))
c_optional_decimal = c_str.pipe(c.if_(c.this(), c_decimal, None))

# разменяем входные строки на понятные типы через (тут должен быть
# Enum, но с диктом тоже красиво)
c_product_type = c.naive(
    {
        "phone": 1,
        "laptop": 2,
    }
).item(c_str.call_method("lower"))


# теперь определим, что нас может интересовать в данных, направляя
# каждое из полей в необходимую конверсию
schema = {
    "full_name": c.item("Full Name").pipe(c_str),
    "sku": c.item("SKU").pipe(c_str),
    "product_type": c.item("Product Type").pipe(c_product_type),
    "earnings": c.item("Earnings").pipe(c_decimal),
    "refunds": c.item("Refunds").pipe(c_optional_decimal),
    "date": c.item("Date").pipe(c_date),
    "date_of_birth": c.item("Date of Birth").pipe(c_optional_date),
}

# теперь можем забыть о том, как что и откуда берется.
# можно описать необходимую логику
converter = (
    # группируем по именам и датам
    c.group_by(schema["full_name"], schema["date"])
    .aggregate(
        {
            "full_name": schema["full_name"],
            "date": schema["date"],
            "date_of_birth": c.ReduceFuncs.First(schema["date_of_birth"]),
            "total_earnings": c.ReduceFuncs.Sum(schema["earnings"]),
        }
    )
    # отбросим "начинающих"
    .filter(c.item("total_earnings") > 1000)
    # сгруппируем по датам, соберем якобы полезную аналитику:
    #  - кол-во не "начинающих"
    #  - их средний заработок
    #  - их медианная дата рождения
    #  - dict: SKU -> сумма заработков (агрегация в агрегации)
    .pipe(
        c.group_by(c.item("date")).aggregate(
            {
                "date": c.item("date"),
                "number_of_workers": c.ReduceFuncs.Count(),
                "average_earnings": c.ReduceFuncs.Average(
                    c.item("total_earnings")
                ),
                "median_date_of_birth": c.ReduceFuncs.Median(
                    c.item("date_of_birth")
                ),
                "earnings_by_sku": c.ReduceFuncs.DictSum(
                    c.item("sku"),
                    c.item("total_earnings"),
                ),
            }
        )
    )
    # debug=True - выведет в stdout сгенерированный код.
    # если установлен black, он будет еще и отформатирован
    .gen_converter()
)

converter(input_data)
generated code
def pipe__cp(input__cp, _naive, _labels, _none):
    return (
        _naive["strptime_ox"](input__cp.strip(), _naive["v_7o"]).date()
        if input__cp
        else None
    )


class AggData__cg:
    __slots__ = ["v0", "v1"]

    def __init__(self, _none=__none__):
        self.v0 = _none
        self.v1 = _none


def group_by__cg(data_, _naive, _labels, _none):
    signature_to_agg_data__cg = defaultdict(AggData__cg)

    for row__cg in data_:
        agg_data__cg = signature_to_agg_data__cg[
            (
                row__cg["Full Name"].strip(),
                _naive["strptime_ox"](
                    row__cg["Date"].strip(), _naive["v_7o"]
                ).date(),
            )
        ]
        if agg_data__cg.v0 is _none:
            agg_data__cg.v0 = pipe__cp(
                row__cg["Date of Birth"].strip(), _naive, _labels, _none
            )
            agg_data__cg.v1 = (
                _naive["Decimal_4v"](
                    row__cg["Earnings"].strip().replace(",", "")
                )
                or 0
            )
        else:
            agg_data__cg.v1 = agg_data__cg.v1 + (
                _naive["Decimal_4v"](
                    row__cg["Earnings"].strip().replace(",", "")
                )
                or 0
            )

    result_ = (
        {
            "full_name": signature__cg[0],
            "date": signature__cg[1],
            "date_of_birth": (
                None if agg_data__cg.v0 is _none else agg_data__cg.v0
            ),
            "total_earnings": (
                0 if agg_data__cg.v1 is _none else agg_data__cg.v1
            ),
        }
        for signature__cg, agg_data__cg in signature_to_agg_data__cg.items()
    )
    filtered_result_ = [
        i_4z for i_4z in result_ if (i_4z["total_earnings"] > 1000)
    ]
    return filtered_result_


class AggData__gp:
    __slots__ = ["v0", "v1", "v2", "v3"]

    def __init__(self, _none=__none__):
        self.v0 = _none
        self.v1 = _none
        self.v2 = _none
        self.v3 = _none


def group_by__gp(data_, _naive, _labels, _none):
    signature_to_agg_data__gp = defaultdict(AggData__gp)

    for row__gp in data_:
        agg_data__gp = signature_to_agg_data__gp[row__gp["date"]]
        if agg_data__gp.v0 is _none:
            agg_data__gp.v0 = 1
            agg_data__gp.v2 = [row__gp["date_of_birth"]]
            agg_data__gp.v3 = _d = defaultdict(int)
            _d[row__gp["sku"]] = row__gp["total_earnings"] or 0
        else:
            agg_data__gp.v0 = agg_data__gp.v0 + 1
            agg_data__gp.v2.append(row__gp["date_of_birth"])
            agg_data__gp.v3[row__gp["sku"]] = agg_data__gp.v3[
                row__gp["sku"]
            ] + (row__gp["total_earnings"] or 0)
        if agg_data__gp.v1 is _none:
            if row__gp["total_earnings"] is not None:
                agg_data__gp.v1 = (1, row__gp["total_earnings"] * 1)
        else:
            if row__gp["total_earnings"] is not None:
                agg_data__gp.v1 = (
                    agg_data__gp.v1[0] + 1,
                    agg_data__gp.v1[1] + row__gp["total_earnings"] * 1,
                )

    return [
        {
            "date": signature__gp,
            "number_of_workers": (
                0 if agg_data__gp.v0 is _none else agg_data__gp.v0
            ),
            "average_earnings": (
                None
                if agg_data__gp.v1 is _none
                else (agg_data__gp.v1[1] / agg_data__gp.v1[0])
            ),
            "median_date_of_birth": (
                None
                if agg_data__gp.v2 is _none
                else _naive["median_59"](agg_data__gp.v2)
            ),
            "earnings_by_sku": (
                None if agg_data__gp.v3 is _none else (dict(agg_data__gp.v3))
            ),
        }
        for signature__gp, agg_data__gp in signature_to_agg_data__gp.items()
    ]


def converter_uk(data_):
    global __naive_values__, __none__
    _naive = __naive_values__
    _none = __none__
    _labels = {}
    return _naive["group_by__gp"](
        _naive["group_by__cg"](data_, _naive, _labels, _none),
        _naive,
        _labels,
        _none,
    )

Since all conversions are expressions, the method pipe allows you to combine almost any of them. pipe-we can reasonably be thought of as a performance-free abstraction, since in cases where you do not need to add label-s and input data are not used more than once, pipe does not result in an additional function call.

2) Table – streaming processing of tabular data

From time to time I had a need to work with tabular data (or similar to them). And I was always tempted by the idea of ​​”baking” the column indexes into the code, so that references to them would be cheap. And now, finally, having the convtools primitives at hand, it turned out to implement our plan, making a small number of gestures.

Work logic: Table works with iterators lazily, reading only the first line that may contain a header. Next, based on the title and the requested changes, code is written that will perform the necessary transformations on the existing iterator. Methods into_iter_rows and into_csv are finite.

from convtools.contrib.tables import Table
from convtools import conversion as c

(
    Table.from_csv(
        "tests/csvs/ac.csv",
        header=True,
        dialect=Table.csv_dialect(delimiter="t"),
    )
    # возьмем только колонки "a" и "c"
    .take("a", "c")
    # добавим две новых вычисляемых колонки
    .update(B=c.col("a") + c.col("c"), D=c.call_func(abs, c.col("c")))
    # переименуем
    .rename({"a": "A"})
    # отфильтруем строки
    .filter(c.col("c") < 10)
    # отбросим колонку "c"
    .drop("c").into_csv("tests/csvs/out.csv")
)


# объединим две таблицы по колонке "a"
list(
    Table.from_rows([(1, 2), (2, 3)], ["a", "b"])
    .join(
        Table.from_rows([(1, 3), (2, 4)], ["a", "c"]),
        how="inner",
        on=["a"],
    )
    .into_iter_rows(dict)  # поддерживаются list/tuple
)

Also added two rarely useful methods:

"""
Table 1      Table 2
| a | b |    | b | c |
| 1 | 2 |    | 3 | 4 |

>>> table1.chain(table2, fill_value=" ")

Result:
| a | b | c |
| 1 | 2 |   |
|   | 3 | 4 |
"""
"""
Table 1      Table 2
| a | b |    | b | c |
| 1 | 2 |    | 3 | 4 |
             | 5 | 6 |
>>> table1.zip(table2, fill_value=" ")

Result:
| a | b | b | c |
| 1 | 2 | 3 | 4 |
|   |   | 5 | 6 |
"""

Conclusion

Of course, I have no illusion that everyone will suddenly rush to use convtools, but I still wanted to share. And not even because codegen adds flexibility, it’s just that writing code in this style puts me on a different track. With this approach, I make significantly fewer errors, the stages of data processing become more obvious, and code testing becomes easier, because work is done with pure “near-functions” + convtools encourages work with iterators, which has a positive effect on memory consumption.

All success and, if I think of something or tell me, see you soon!

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *