The art of ETL. Writing our own SQL engine in Spark [часть 6]

NEW!

In the previous series (FAQ • 1 • 2 • 3 • 4 • 5) we looked in great detail at how to write in Java your own interpreter of an object-oriented SQL dialect on top of the Spark RDD API, tailored to the tasks of preparing and transforming data sets.

In this part, we'll talk about how to add function support to SQL expressions. For example,

SELECT
    MAX(score1, score2, score3, score4, score5) AS max_score,
    MIN(score1, score2, score3, score4, score5) AS min_score,
    MEDIAN(score1, score2, score3, score4, score5) AS median_score,
    score1 + score2 + score3 + score4 + score5 AS score_sum
FROM raw_scores INTO final_scores
WHERE ABS(score1 + score2 + score3 + score4 + score5) > $score_margin;

– here we have functions MAX, MIN And MEDIAN accept any number of type arguments Double and return DoubleA ABS only one such argument.

In general, in addition to general mathematics, any self-respecting SQL dialect should at least have functions for manipulating date/time, working with strings and arrays. We will definitely add them too. In the classpath so that the engine can load them from there. To the heap, there are also operators like >= or LIKEwhich we have already implemented, but with hardcode, we will make them just as pluggable.

Difficulty warning

The difficulty level of this series of articles is generally high. Basic concepts are not explained at all in the text, and not all of the advanced ones are. However, this part is somewhat easier to understand than the previous ones. But still, it will be easier to understand if you have already gone through the rest, at least diagonally.

First, let's define what a “function” is in the context of writing expressions like mathematical ones, and how it differs from an “operator”.

In fact, it’s almost no different.

One might even say that, up to syntax conventions, the notation sum(a, b) this is exactly the same as a + b. And if you write it down +(a, b)then the symbol + we will just have a shorter alias for the name sum. In the case of unary operators it is even more clear: ~HEX against bitwise_negation(HEX).

That is, from the point of view of at least syntax, a function and an operator in expressions are the same thing, and they can be interpreted uniformly. The differences begin only at the level of semantics, but more on that later.

For now, let's see how we understand expressions.

Firstly, there is a lexer/parser written in ANTLR, and it defines the following rule:

expression
 : ( is_op | between_op | in_op | comparison_op | var_name | property_name | L_NUMERIC | L_STRING | S_NULL | S_TRUE | S_FALSE | S_OPEN_PAR | S_CLOSE_PAR | expression_op | digest_op | random_op | bool_op | default_op )+
;

This rule tells us that an “expression” looks to us like a kind of “soup” of tokens, including:
• string and numeric literals,
• predefined symbols for booleans and zeros,
• names of variables and record properties,
• a whole bunch of subexpressions for all sorts of different groups of operators, including SQL IS/BETWEEN/INcomparisons, logic, and a couple more things that only we need,
• and finally, parentheses – opening and closing.

Okay, great soup has been made. To disentangle it into something interpretable, you can successfully use the classic algorithm Shunting Yardwhose output will be writing the expression in “reverse polish notation” And any expression in RPN can be directly executed using an even simpler stack-based state machinethan Shunting Yard itself.

In Java code, these two components would look something like this (beware, Java is very verbose):

Shunting Yard

Squeeze from TDL4Interpreter.java:

    private List<Expression<?>> expression(List<ParseTree> exprChildren, ExpressionRules rules) {
        List<Expression<?>> items = new ArrayList<>();

        Deque<ParseTree> whereOpStack = new LinkedList<>();
        List<ParseTree> predExpStack = new ArrayList<>();
        int i = 0;
        // doing Shunting Yard
        for (; i < exprChildren.size(); i++) {
            ParseTree child = exprChildren.get(i);

            if ((child instanceof TDL4.Expression_opContext)
                    || (child instanceof TDL4.Comparison_opContext)
                    || (child instanceof TDL4.Bool_opContext)
                    || (child instanceof TDL4.In_opContext)
                    || (child instanceof TDL4.Is_opContext)
                    || (child instanceof TDL4.Between_opContext)
                    || (child instanceof TDL4.Digest_opContext)
                    || (child instanceof TDL4.Random_opContext)
                    || (child instanceof TDL4.Default_opContext)) {
                while (!whereOpStack.isEmpty()) {
                    ParseTree peek = whereOpStack.peek();

                    if (peek instanceof TerminalNode) {
                        TerminalNode tn = (TerminalNode) peek;
                        int tt = tn.getSymbol().getType();
                        if (tt == TDL4Lexicon.S_OPEN_PAR) {
                            break;
                        }
                    }
                    if (isHigher(child, peek)) {
                        predExpStack.add(whereOpStack.pop());
                    } else {
                        break;
                    }
                }

                whereOpStack.push(child);
                continue;
            }

            if (child instanceof TerminalNode) {
                TerminalNode tn = (TerminalNode) child;
                int tt = tn.getSymbol().getType();
                if (tt == TDL4Lexicon.S_OPEN_PAR) {
                    whereOpStack.add(child);
                    continue;
                }

                if (tt == TDL4Lexicon.S_CLOSE_PAR) {
                    while (true) {
                        if (whereOpStack.isEmpty()) {
                            throw new RuntimeException("Mismatched parentheses at query token #" + i);
                        }
                        ParseTree pop = whereOpStack.pop();
                        if (!(pop instanceof TerminalNode)) {
                            predExpStack.add(pop);
                        } else {
                            break;
                        }
                    }
                    continue;
                }
            }

            // expression
            predExpStack.add(child);
        }

        while (!whereOpStack.isEmpty()) {
            predExpStack.add(whereOpStack.pop());
        }

        for (ParseTree exprItem : predExpStack) {
            if (exprItem instanceof TDL4.Property_nameContext) {
                switch (rules) {
                    case QUERY: {
                        String propName = resolveName(((TDL4.Property_nameContext) exprItem).L_IDENTIFIER());

                        items.add(Expressions.propItem(propName));
                        continue;
                    }
                    case AT: {
                        String propName = resolveName(((TDL4.Property_nameContext) exprItem).L_IDENTIFIER());

                        items.add(Expressions.stringItem(propName));
                        continue;
                    }
                    default: {
                        throw new InvalidConfigurationException("Attribute name is not allowed in this context: " + exprItem.getText());
                    }
                }
            }

            if (exprItem instanceof TDL4.Var_nameContext) {
                TDL4.Var_nameContext varNameCtx = (TDL4.Var_nameContext) exprItem;

                String varName = resolveName(varNameCtx.L_IDENTIFIER());

                items.add(Expressions.varItem(varName));
                continue;
            }

            if (exprItem instanceof TDL4.Between_opContext) {
                TDL4.Between_opContext between = (TDL4.Between_opContext) exprItem;

                items.add(Expressions.stackGetter(1));

                double l = resolveNumericLiteral(between.L_NUMERIC(0)).doubleValue();
                double r = resolveNumericLiteral(between.L_NUMERIC(1)).doubleValue();
                items.add((between.S_NOT() == null)
                        ? Expressions.between(l, r)
                        : Expressions.notBetween(l, r)
                );

                continue;
            }

            if (exprItem instanceof TDL4.In_opContext) {
                TDL4.In_opContext inCtx = (TDL4.In_opContext) exprItem;
                if (inCtx.array() != null) {
                    items.add(Expressions.setItem(resolveArray(inCtx.array(), ExpressionRules.QUERY)));
                }
                if (inCtx.var_name() != null) {
                    items.add(Expressions.arrItem(resolveName(inCtx.var_name().L_IDENTIFIER())));
                }
                if (inCtx.property_name() != null) {
                    items.add(Expressions.propItem(resolveName(inCtx.property_name().L_IDENTIFIER())));
                }

                items.add(Expressions.stackGetter(2));

                boolean not = inCtx.S_NOT() != null;
                items.add(not ? Expressions.notIn() : Expressions.in());

                continue;
            }

            // column_name IS NOT? NULL
            if (exprItem instanceof TDL4.Is_opContext) {
                items.add(Expressions.stackGetter(1));

                items.add((((TDL4.Is_opContext) exprItem).S_NOT() == null) ? Expressions.isNull() : Expressions.nonNull());

                continue;
            }

            if ((exprItem instanceof TDL4.Expression_opContext)
                    || (exprItem instanceof TDL4.Comparison_opContext)
                    || (exprItem instanceof TDL4.Bool_opContext)
                    || (exprItem instanceof TDL4.Digest_opContext)
                    || (exprItem instanceof TDL4.Random_opContext)
                    || (exprItem instanceof TDL4.Default_opContext)) {
                Operator eo = Operator.get(exprItem.getText());
                if (eo == null) {
                    throw new RuntimeException("Unknown operator token " + exprItem.getText());
                } else {
                    items.add(Expressions.stackGetter(eo.ariness));
                    items.add(Expressions.opItem(eo));
                }

                continue;
            }

            TerminalNode tn = (TerminalNode) exprItem;
            int type = tn.getSymbol().getType();
            if (type == TDL4Lexicon.L_NUMERIC) {
                items.add(Expressions.numericItem(resolveNumericLiteral(tn)));
                continue;
            }
            if (type == TDL4Lexicon.L_STRING) {
                items.add(Expressions.stringItem(resolveStringLiteral(tn)));
                continue;
            }
            if (type == TDL4Lexicon.S_NULL) {
                items.add(Expressions.nullItem());
                continue;
            }
            if ((type == TDL4Lexicon.S_TRUE) || (type == TDL4Lexicon.S_FALSE)) {
                items.add(Expressions.boolItem(Boolean.parseBoolean(tn.getText())));
                continue;
            }
        }

        return items;
    }

Stack-based execution machine RPN

Squeeze from Operator.java:

    public static Object eval(AttrGetter props, List<Expression<?>> item, VariablesContext vc) {
        if (item.isEmpty()) {
            return null;
        }

        Deque<Object> stack = new LinkedList<>();
        Deque<Object> top = null;
        for (Expression<?> ei : item) {
            // these all push to expression stack
            if (ei instanceof Expressions.PropItem) {
                stack.push(((Expressions.PropItem) ei).get(props));
                continue;
            }
            if (ei instanceof Expressions.VarItem) {
                stack.push(((Expressions.VarItem) ei).get(vc));
                continue;
            }
            if (ei instanceof Expressions.StringItem) {
                stack.push(((Expressions.StringItem) ei).get());
                continue;
            }
            if (ei instanceof Expressions.NumericItem) {
                stack.push(((Expressions.NumericItem) ei).get());
                continue;
            }
            if (ei instanceof Expressions.NullItem) {
                stack.push(((Expressions.NullItem) ei).get());
                continue;
            }
            if (ei instanceof Expressions.BoolItem) {
                stack.push(((Expressions.BoolItem) ei).get());
                continue;
            }
            if (ei instanceof Expressions.SetItem) {
                stack.push(((Expressions.SetItem) ei).get());
                continue;
            }
            if (ei instanceof Expressions.OpItem) {
                stack.push(((Expressions.OpItem) ei).eval(top));
                continue;
            }
            if (ei instanceof Expressions.IsExpr) {
                stack.push(((Expressions.IsExpr) ei).eval(top.pop()));
                continue;
            }
            if (ei instanceof Expressions.InExpr) {
                stack.push(((Expressions.InExpr) ei).eval(top.pop(), top.pop()));
                continue;
            }
            if (ei instanceof Expressions.BetweenExpr) {
                stack.push(((Expressions.BetweenExpr) ei).eval(top.pop()));
                continue;
            }
            // and this one pops from it
            if (ei instanceof Expressions.StackGetter) {
                top = ((Expressions.StackGetter) ei).get(stack);
                continue;
            }
        }

        if (stack.size() != 1) {
            throw new RuntimeException("Invalid SELECT item expression");
        }

        return stack.pop();
    }

There is a lot of code, but it simply sorts tokens into the right places on the stack, turning them into primitives like
• “put a constant of such and such type on the stack”,
• “put the record property on the stack”,
• “take N elements from the stack”,
• “put on the stack the result of calling the operator with previously taken elements”,
and so on – and then it runs through the resulting list, and executes the primitives in viewing order, at the end getting a calculated result, which should only be one. If suddenly there is more than one left, then either there are unbalanced parentheses somewhere in the expression, or the wrong number of operands.

Great. How do you fit function call syntax into this soup? In them, opening and closing parentheses not only change the order in which operators are evaluated, but also set the number of operands, which become not even operands, but arguments.

Very simple. We need to add recursion to the soup. (We’ll leave the question with an asterisk “how to solve it non-recursively?*” on the conscience of the readers, because the author is completely unscrupulous and doesn’t want to bother finding an answer. Forgive me.)

Let's rewrite the rule:

expression
 : ( is_op | between_op | in_op | comparison_op | var_name | L_NUMERIC | L_STRING | S_NULL | S_TRUE | S_FALSE | expression_op | digest_op | bool_op | default_op | func_call )+
 ;

attr_expr
 : ( is_op | between_op | in_op | comparison_op | var_name | L_NUMERIC | L_STRING | S_NULL | S_TRUE | S_FALSE | expression_op | digest_op | bool_op | default_op | func_attr | attr )+
 ;

func_call
 : func S_OPEN_PAR expression ( S_COMMA expression )* S_CLOSE_PAR
 | func S_OPEN_PAR S_CLOSE_PAR
 | S_OPEN_PAR expression S_CLOSE_PAR
 ;

func_attr
 : func S_OPEN_PAR attr_expr ( S_COMMA attr_expr )* S_CLOSE_PAR
 | func S_OPEN_PAR S_CLOSE_PAR
 | S_OPEN_PAR attr_expr S_CLOSE_PAR
 ;

What you should pay attention to is that there are now two rules for the expressions themselves. One is needed for the context in which you can access the attributes of records, that is, executed in SELECT выражение, выражение, ... FROM ... WHERE логическое_выражение. The other is accordingly required for the context within statements of type LET $переменная = выражениеwhich are executed at the script level and cannot access any records.

By making a difference in the parser, we will get rid of execution context checks in the interpreter, which were completely forgotten in the previous version. Working on mistakes is important!

The rules for calling functions describe three different syntaxes:
func(expression[, expression]...)
func()
(expression)

That is, a piece of expression in parentheses without a name in front of it is now also a “function” for us, though it’s nameless, and it returns its only argument to us. The remaining two options describe a function with no arguments or with one or more arguments.

This approach allows us to completely remove bracket processing from Shunting Yard:

Shunting Yard without bracket processing

Squeeze from TDL4Interpreter.java:

    private List<ParseTree> doShuntingYard(List<ParseTree> exprChildren) {
        Deque<ParseTree> whereOpStack = new LinkedList<>();
        List<ParseTree> predExpStack = new ArrayList<>();
        int i = 0;
        // doing Shunting Yard
        for (; i < exprChildren.size(); i++) {
            ParseTree child = exprChildren.get(i);

            if ((child instanceof TDL4.Expression_opContext)
                    || (child instanceof TDL4.Comparison_opContext)
                    || (child instanceof TDL4.Bool_opContext)
                    || (child instanceof TDL4.In_opContext)
                    || (child instanceof TDL4.Is_opContext)
                    || (child instanceof TDL4.Between_opContext)
                    || (child instanceof TDL4.Digest_opContext)
                    || (child instanceof TDL4.Default_opContext)) {
                while (!whereOpStack.isEmpty()) {
                    ParseTree peek = whereOpStack.peek();

                    if (isHigher(child, peek)) {
                        predExpStack.add(whereOpStack.pop());
                    } else {
                        break;
                    }
                }

                whereOpStack.push(child);
                continue;
            }

            if (child instanceof TDL4.Func_callContext) {
                TDL4.Func_callContext funcCall = (TDL4.Func_callContext) child;
                if (funcCall.expression() != null) {
                    for (TDL4.ExpressionContext e : funcCall.expression()) {
                        predExpStack.addAll(doShuntingYard(e.children));
                    }
                }

                if (funcCall.func() != null) {
                    predExpStack.add(funcCall);
                }
                continue;
            }

            if (child instanceof TDL4.Func_attrContext) {
                TDL4.Func_attrContext funcAttr = (TDL4.Func_attrContext) child;
                if (funcAttr.attr_expr() != null) {
                    for (TDL4.Attr_exprContext e : funcAttr.attr_expr()) {
                        predExpStack.addAll(doShuntingYard(e.children));
                    }
                }

                if (funcAttr.func() != null) {
                    predExpStack.add(funcAttr);
                }
                continue;
            }

            // expression
            predExpStack.add(child);
        }

        while (!whereOpStack.isEmpty()) {
            predExpStack.add(whereOpStack.pop());
        }

        return predExpStack;
    }

    private List<Expressions.ExprItem<?>> expression(List<ParseTree> exprChildren, ExpressionRules rules) {
        List<Expressions.ExprItem<?>> items = new ArrayList<>();

        List<ParseTree> predExpStack = doShuntingYard(exprChildren);

        for (ParseTree exprItem : predExpStack) {
            if (exprItem instanceof TDL4.AttrContext) {
                switch (rules) {
                    case QUERY: {
                        String propName = resolveName(((TDL4.AttrContext) exprItem).L_IDENTIFIER());

                        items.add(Expressions.attrItem(propName));
                        continue;
                    }
                    case AT: {
                        String propName = resolveName(((TDL4.AttrContext) exprItem).L_IDENTIFIER());

                        items.add(Expressions.stringItem(propName));
                        continue;
                    }
                    default: {
                        throw new InvalidConfigurationException("Attribute name is not allowed in this context: " + exprItem.getText());
                    }
                }
            }

            if (exprItem instanceof TDL4.Var_nameContext) {
                TDL4.Var_nameContext varNameCtx = (TDL4.Var_nameContext) exprItem;

                String varName = resolveName(varNameCtx.L_IDENTIFIER());

                items.add(Expressions.varItem(varName));
                continue;
            }

            if (exprItem instanceof TDL4.Between_opContext) {
                TDL4.Between_opContext between = (TDL4.Between_opContext) exprItem;

                items.add(Expressions.stackGetter(1));

                double l = resolveNumericLiteral(between.L_NUMERIC(0)).doubleValue();
                double r = resolveNumericLiteral(between.L_NUMERIC(1)).doubleValue();
                items.add((between.S_NOT() == null)
                        ? Expressions.between(l, r)
                        : Expressions.notBetween(l, r)
                );

                continue;
            }

            if (exprItem instanceof TDL4.In_opContext) {
                TDL4.In_opContext inCtx = (TDL4.In_opContext) exprItem;
                if (inCtx.array() != null) {
                    items.add(Expressions.setItem(resolveArray(inCtx.array(), ExpressionRules.QUERY)));
                }
                if (inCtx.var_name() != null) {
                    items.add(Expressions.arrItem(resolveName(inCtx.var_name().L_IDENTIFIER())));
                }
                if (inCtx.attr() != null) {
                    items.add(Expressions.attrItem(resolveName(inCtx.attr().L_IDENTIFIER())));
                }

                items.add(Expressions.stackGetter(2));

                boolean not = inCtx.S_NOT() != null;
                items.add(not ? Expressions.notIn() : Expressions.in());

                continue;
            }

            // column_name IS NOT? NULL
            if (exprItem instanceof TDL4.Is_opContext) {
                items.add(Expressions.stackGetter(1));

                items.add((((TDL4.Is_opContext) exprItem).S_NOT() == null) ? Expressions.isNull() : Expressions.nonNull());

                continue;
            }

            if ((exprItem instanceof TDL4.Expression_opContext)
                    || (exprItem instanceof TDL4.Comparison_opContext)
                    || (exprItem instanceof TDL4.Bool_opContext)
                    || (exprItem instanceof TDL4.Digest_opContext)
                    || (exprItem instanceof TDL4.Default_opContext)) {
                Operator<?> eo = Operators.get(exprItem.getText());
                if (eo == null) {
                    throw new RuntimeException("Unknown operator token " + exprItem.getText());
                } else {
                    int arity = eo.arity();
                    items.add(Expressions.stackGetter(arity));
                    items.add(Expressions.opItem(eo));
                }

                continue;
            }

            if (exprItem instanceof TDL4.Func_callContext) {
                TDL4.Func_callContext funcCall = (TDL4.Func_callContext) exprItem;
                TDL4.FuncContext funcCtx = funcCall.func();
                Function<?> ef = Functions.get(resolveName(funcCtx.L_IDENTIFIER()));
                if (ef == null) {
                    throw new RuntimeException("Unknown function token " + exprItem.getText());
                } else {
                    int arity = ef.arity();
                    if (arity == Function.ARBITR_ARY) {
                        items.add(Expressions.stackGetter(funcCall.expression().size()));
                    } else if (arity > 0) {
                        items.add(Expressions.stackGetter(arity));
                    }
                    items.add(Expressions.funcItem(ef));
                }

                continue;
            }

            if (exprItem instanceof TDL4.Func_attrContext) {
                TDL4.Func_attrContext funcAttr = (TDL4.Func_attrContext) exprItem;
                TDL4.FuncContext funcCtx = funcAttr.func();
                Function<?> ef = Functions.get(resolveName(funcCtx.L_IDENTIFIER()));
                if (ef == null) {
                    throw new RuntimeException("Unknown function token " + exprItem.getText());
                } else {
                    int arity = ef.arity();
                    switch (arity) {
                        case Function.KEY_LEVEL: {
                            items.add(Expressions.keyItem());
                            break;
                        }
                        case Function.RECORD_LEVEL: {
                            items.add(Expressions.recItem());
                            break;
                        }
                        case Function.ARBITR_ARY: {
                            items.add(Expressions.stackGetter(funcAttr.attr_expr().size()));
                            break;
                        }
                        case Function.NO_ARGS: {
                            break;
                        }
                        default: {
                            items.add(Expressions.stackGetter(arity));
                        }
                    }
                    items.add(Expressions.funcItem(ef));
                }

                continue;
            }

            TerminalNode tn = (TerminalNode) exprItem;
            int type = tn.getSymbol().getType();
            if (type == TDL4Lexicon.L_NUMERIC) {
                items.add(Expressions.numericItem(resolveNumericLiteral(tn)));
                continue;
            }
            if (type == TDL4Lexicon.L_STRING) {
                items.add(Expressions.stringItem(resolveStringLiteral(tn)));
                continue;
            }
            if (type == TDL4Lexicon.S_NULL) {
                items.add(Expressions.nullItem());
                continue;
            }
            if ((type == TDL4Lexicon.S_TRUE) || (type == TDL4Lexicon.S_FALSE)) {
                items.add(Expressions.boolItem(Boolean.parseBoolean(tn.getText())));
                continue;
            }
        }

        return items;
    }

As for the finite state machine for executing RPN, a tiny special case will be added at the end:

            // special case
            if (ei instanceof ObjItem) {
                top = new LinkedList<>();
                top.add(((ObjItem) ei).recOrKey() ? rec : key);
                continue;
            }

– it is needed so that the functions in the context SELECT could work with the entire record, and not just its attributes. Oops, it seems I got a little ahead of myself. Well, let's correct this oversight.

Let us recall from the previous parts that in the SQL engine we are developing, all entities, except for the top-level SQL statements implemented directly in the interpreter, are pluggable.

That is, functions, as well as expression operators, must be implementations of certain interfaces accessible via the classpath and loaded into the interpreter when the execution context is launched.

Let's think about what the general interface should look like. Let's call it, for example, Evaluator:

public interface Evaluator<R> extends Serializable {
    R call(Deque<Object> args);

    String name();
    String descr();

    int arity();

    // Тут ещё некоторое количество статических методов для работы со стеком,
    // типа popDouble(Deque<Object> args), но не будем на них останавливаться
}

What do we have here? Here we have a generic result type that is returned when the method is called call above the list of arguments/operands, a name (or symbol) with a human-readable description, as well as the number of arguments/operands itself – that is, arity.

There's something else needed for operators. We describe them in the general case as an abstract class:

public abstract class Operator<R> implements Evaluator<R> {
    public abstract int prio();

    public boolean rightAssoc() {
        return false;
    }

    public boolean handleNull() {
        return false;
    }

    protected abstract R op0(Deque<Object> args);

    public Operator() {
    }

    @Override
    public R call(Deque<Object> args) {
        if (!handleNull()) {
            for (Object a : args) {
                if (a == null) {
                    return null;
                }
            }
        }

        return op0(args);
    }

    public static abstract class Unary<R, T> extends Operator<R> {
        @Override
        public int arity() {
            return 1;
        }
    }

    public static abstract class Binary<R, T1, T2> extends Operator<R> {
        @Override
        public int arity() {
            return 2;
        }
    }

    public static abstract class Ternary<R, T1, T2, T3> extends Operator<R> {
        @Override
        public int arity() {
            return 3;
        }
    }
}

Firstly, we now have priority semantics. And also a flag that marks the operator as right-associative, changing the order in which the entire expression is evaluated.

And one more flag that allows the operator to process the operand with the value NULL. Otherwise, according to established rules in SQL, anyone who gets into the expression NULL must give in the end NULL. This logic is implemented by default. But some special operators may wish to handle NULL somehow differently, and we are obliged to provide them with such an opportunity.

Finally, we have unary, binary, and ternary operators.

The functions come with a similar calico, but a slightly different color (that is, semantically they are much simpler):

public abstract class Function<R> implements Evaluator<R> {
    public static final int KEY_LEVEL = -3;
    public static final int RECORD_LEVEL = -2;
    public static final int ARBITR_ARY = -1;
    public static final int NO_ARGS = 0;

    public Function() {
    }

    public static abstract class KeyLevel<R> extends Function<R> {
        @Override
        public int arity() {
            return KEY_LEVEL;
        }
    }

    public static abstract class RecordLevel<R, REC extends Record<?>> extends Function<R> {
        @Override
        public int arity() {
            return RECORD_LEVEL;
        }
    }

    public static abstract class ArbitrAry<R, TA> extends Function<R> {
        @Override
        public int arity() {
            return ARBITR_ARY;
        }
    }

    public static abstract class NoArgs<R> extends Function<R> {
        @Override
        public int arity() {
            return NO_ARGS;
        }
    }

    public static abstract class Unary<R, T> extends Function<R> {
        @Override
        public int arity() {
            return 1;
        }
    }

    public static abstract class Binary<R, T1, T2> extends Function<R> {
        @Override
        public int arity() {
            return 2;
        }
    }

    public static abstract class Ternary<R, T1, T2, T3> extends Function<R> {
        @Override
        public int arity() {
            return 3;
        }
    }
}

But here, in addition to the usual ones, special arities appear as if out of nowhere:
• “null” – for functions without arguments,
• “minus first” – for functions with an arbitrary number of arguments,
• “record level” – for SELECT context functions that accept an entire object,
• “record key level” – also for the SELECT context, but they access the record key, which, generally speaking, cannot be reached in any other way.

The last two only make sense for object-oriented SQL like ours. Let me remind you that the data sets in it are Spark RDD, and each record of the set consists of a pair of separate key objects (the easiest way is to consider it as a set of bytes, but there can be anything there, even a string, even a direct partition number) and the record object itself (that is, the heir Record<?>, defined in terms of the engine API). Not at all like RDBMS tables.

The key of a record in a data set, therefore, is a somewhat alien thing for SQL, since in the general case it does not have to coincide with either a record attribute or even a set of attributes, and can be specified externally as the result of calculating some expression. At the same time, his role is very important, because it is the key that determines the partition into which the record will fall. Getting to it in the usual way will also not be so easy, since it is found outside the object of recording. But if you really want (yes), then you can create a function

REC_KEY

public static class KEY extends KeyLevel<Object> {
    @Override
    public Object call(Deque<Object> args) {
        return args.pop();
    }

    @Override
    public String name() {
        return "REC_KEY";
    }

    @Override
    public String descr() {
        return "Returns the Key of DS Record";
    }
}

which by itself does nothing, and support it in the interpreter by replacing eval() to something like this:

    public static boolean boolLoose(List<ExprItem<?>> item, VariablesContext vc) {
        return boolAttr(null, null, item, vc);
    }

    public static Object evalAttr(Object key, Record<?> rec, List<ExprItem<?>> item, VariablesContext vc) {
//...
    }

and accordingly transmitting to evalAttr() from the data context not only the record object itself, as before, but also the key. And for expressions calculated in the context of a script, where there are no entries, pull evalLoose(). However, this is a targeted change, and nothing particularly impactful.

Also, it should be remembered that the Data Cooker ETL tool is designed exclusively for ETL tasks, and is not an attempt to make a parody of an analytical DBMS. Therefore, if you need aggregation functions, reducers, and other windows, then you should use a more suitable tool for this. Spark can do all this, but our engine is needed only to more efficiently prepare data for subsequent analysis, and in principle it is not going to do it on its own.

Therefore, functions like COUNT(*), which collapse the entire data set into one record, we definitely won’t have. All our functions work either in the context of one record or none.

So, once the types for operators and functions are described, adding them is not difficult. Let me give you a few examples:

String concatenation operator ||

    public static class CAT extends Binary<String, Object, Object> {
        @Override
        public int prio() {
            return 125;
        }

        @Override
        public String name() {
            return "||";
        }

        @Override
        public String descr() {
            return "String concatenation";
        }

        @Override
        protected String op0(Deque<Object> args) {
            return args.stream().map(String::valueOf).collect(Collectors.joining());
        }
    }

The inequality operator that handles NULL

    public static class NEQ extends Binary<Boolean, Object, Object> {
        @Override
        public int prio() {
            return 40;
        }

        @Override
        public String name() {
            return "!=";
        }

        @Override
        public String descr() {
            return "Checks for inequality. String checks are case-sensitive";
        }

        @Override
        public boolean handleNull() {
            return true;
        }

        @Override
        protected Boolean op0(Deque<Object> args) {
            Object a = args.pop();
            Object b = args.pop();
            if ((a == null) || (b == null)) {
                return false;
            }
            if (a instanceof Number) {
                return ((Number) a).doubleValue() != Utils.parseNumber(String.valueOf(b)).doubleValue();
            }
            if (a instanceof Boolean) {
                return (boolean) a != Boolean.parseBoolean(String.valueOf(b));
            }
            return !Objects.equals(a, b);
        }
    }

Digest calculation operator

    public static class DIGEST extends Binary<String, Object, String> {
        @Override
        public int prio() {
            return 40;
        }

        @Override
        public boolean rightAssoc() {
            return true;
        }

        @Override
        public String name() {
            return "DIGEST";
        }

        @Override
        public String descr() {
            return "Right argument is treated as Java digest provider in the format of provider-whitespace-algorithm" +
                    " or only algorithm, and left argument's hash is calculated and converted to hexadecimal string";
        }

        @Override
        protected String op0(Deque<Object> args) {
            String r = Evaluator.popString(args);

            String digest = Evaluator.popString(args);

            final String[] d = digest.split(" ", 2);

            MessageDigest md;
            try {
                md = (d.length > 1) ? MessageDigest.getInstance(d[1], d[0]) : MessageDigest.getInstance(d[0]);
            } catch (Exception e) {
                throw new InvalidConfigurationException("Unknown DIGEST algorithm '" + digest + "'");
            }
            return Hex.encodeHexString(md.digest(r.getBytes()));
        }
    }

MAX function from any number of arguments

    public static class MAX extends ArbitrAry<Double, Double> {
        @Override
        public Double call(Deque<Object> args) {
            if (args.isEmpty()) {
                return null;
            }

            return args.stream().mapToDouble(o -> Utils.parseNumber(String.valueOf(o)).doubleValue()).max().getAsDouble();
        }

        @Override
        public String name() {
            return "MAX";
        }

        @Override
        public String descr() {
            return "Returns a mathematical max value from given arguments";
        }
    }

Function for cutting from an array

    public static class Slice extends Ternary<Object[], Object[], Integer, Integer> {
        @Override
        public Object[] call(Deque<Object> args) {
            Object[] a = Evaluator.popArray(args);
            return Arrays.copyOfRange(a, Evaluator.popInt(args), Evaluator.popInt(args));
        }

        @Override
        public String name() {
            return "ARR_SLICE";
        }

        @Override
        public String descr() {
            return "Return a slice of ARRAY given as 1st argument starting with index from 2nd and to index in 3rd (exclusive)";
        }
    }

Function to calculate the hashcode of an entry

    public static class HASHCODE extends RecordLevel<Integer, Record<?>> {
        @Override
        public Integer call(Deque<Object> args) {
            return Objects.hashCode(args.pop());
        }

        @Override
        public String name() {
            return "REC_HASHCODE";
        }

        @Override
        public String descr() {
            return "Returns Java .hashCode() of DS Record";
        }
    }

Function for calculating the perimeter of a polygon on the ground

    public static class PolyPerimeter extends RecordLevel<Double, PolygonEx> {
        @Override
        public Double call(Deque<Object> args) {
            PolygonEx poly = (PolygonEx) args.pop();

            PolygonArea pArea = new PolygonArea(Geodesic.WGS84, false);

            for (Coordinate c : poly.getExteriorRing().getCoordinates()) {
                pArea.AddPoint(c.y, c.x);
            }

            PolygonResult pRes = pArea.Compute();
            return pRes.perimeter;
        }

        @Override
        public String name() {
            return "POLY_PERIMETER";
        }

        @Override
        public String descr() {
            return "The perimeter of the Polygon's outline in meters";
        }
    }

That is, to add a new function or expression operator, you will need to write literally a few lines, implementing the class, and then the automatic magic will work.

About functions for working with date/time we definitely won’t forget, since you need to have them – and quite a lot – in order to implement work with time stamps in different time zones. Previously, we had separate operations for this (that is, extensions of the tongue), now they will be combed under a common comb. (Read them at the link; there are already enough footcloths of code in the article.)

Moreover, along with them, a type specific to this family of functions is added to our data type system – DateTime-like Objectwhich is represented either as Numeric with seconds/milliseconds of the Epoch, or as a godless toad ISO8601-like string, which actually does not correspond to any standard, but is often found in data downloads from suppliers. If such a thing exists in nature, we will also have to support it.

To a first approximation, as follows from the title screenshot of the article, we got as many as 55 functions. And 33 operators (although there are repetitions among them).

Well. We got to the bottom layer of the API right away, and now it’s ready for use.

But in addition to the bottom layer of the engine, which as a whole is an almost bare interpreter with minimal tricks, we have a few more front layers, namely the REPL, which can work both locally in the process and connect via REST, and finally , an auto-generator of documentation that collects documentation, including from the metadata of the code itself. That is, from the same generic types in abstract classes Function.Binary<R, T1, T2> and others like them.

But all this machinery has already been repeatedly implemented for other components of the engine, such as “operations”, “transforms”, “storage adapters”, data and variable contexts – so most of the code will be copy-pasted of the corresponding components, and a brief repetition in this article I probably won’t share the contents of the previous episodes, but will refer them to the previous part. Additions to the front layers are too trivial.

However, the code for the operator registry can be given, because a couple of small notes on it are still necessary:

Operators.java

public class Operators {
    public final static Map<String, Operator<?>> OPERATORS;

    static {
        Map<String, Operator<?>> operators = new HashMap<>();

        for (Map.Entry<String, String> pkg : RegisteredPackages.REGISTERED_PACKAGES.entrySet()) {
            try (ScanResult scanResult = new ClassGraph().acceptPackages(pkg.getKey()).scan()) {
                ClassInfoList operatorClasses = scanResult.getSubclasses(Operator.class.getTypeName());
                List<Class<?>> operatorClassRefs = operatorClasses.loadClasses();

                for (Class<?> operatorClass : operatorClassRefs) {
                    try {
                        if (!Modifier.isAbstract(operatorClass.getModifiers())) {
                            Operator<?> operator = (Operator<?>) operatorClass.getDeclaredConstructor().newInstance();
                            operators.put(operator.name(), operator);
                        }
                    } catch (Exception e) {
                        System.err.println("Cannot instantiate Operator class '" + operatorClass.getTypeName() + "'");
                        e.printStackTrace(System.err);
                    }
                }
            }
        }

        OPERATORS = Collections.unmodifiableMap(operators.entrySet()
                .stream()
                .sorted(Comparator.comparingInt(o -> -o.getValue().prio()))
                .collect(Collectors.toMap(
                        Map.Entry::getKey,
                        Map.Entry::getValue,
                        (oldValue, newValue) -> oldValue, LinkedHashMap::new)));
    }

    public static Map<String, EvaluatorInfo> packageOperators(String pkgName) {
        Map<String, EvaluatorInfo> ret = new LinkedHashMap<>();

        for (Map.Entry<String, Operator<?>> e : OPERATORS.entrySet()) {
            if (e.getValue().getClass().getPackage().getName().startsWith(pkgName)) {
                ret.put(e.getKey(), EvaluatorInfo.bySymbol(e.getValue().name()));
            }
        }

        return ret;
    }

    static Operator<?> get(String symbol) {
        return OPERATORS.get(symbol);
    }

    // Those three are in fact peculiar language constructs, so we make sure they
    // won't go into the list of real Expression Operators. Therefore, this class
    // is not included in @RegisteredPackage, and we directly reference them in
    // the interpreter instead.
    public static Operator<Boolean> IN = new IN();
    public static Operator<Boolean> IS = new IS();
    public static Operator<Boolean> BETWEEN = new BETWEEN();

    public static class IN extends Binary<Boolean, Object, Object[]> {
        @Override
        public int prio() {
            return 35;
        }

        @Override
        public boolean rightAssoc() {
            return true;
        }

        @Override
        public String name() {
            return "IN";
        }

        @Override
        public String descr() {
            return "TRUE if left argument is present in the right, casted to array, FALSE otherwise." +
                    " Vice versa for NOT variant";
        }

        @Override
        protected Boolean op0(Deque<Object> args) {
            throw new RuntimeException("Direct operator IN call");
        }
    }

    public static class IS extends Binary<Boolean, Object, Void> {
        @Override
        public int prio() {
            return 35;
        }

        @Override
        public boolean rightAssoc() {
            return true;
        }

        @Override
        public String name() {
            return "IS";
        }

        @Override
        public String descr() {
            return "TRUE if left argument is NULL, FALSE otherwise. Vice versa for NOT variant";
        }

        @Override
        public boolean handleNull() {
            return true;
        }

        @Override
        protected Boolean op0(Deque<Object> args) {
            throw new RuntimeException("Direct operator IS call");
        }
    }

    public static class BETWEEN extends Ternary<Boolean, Double, Double, Double> {
        @Override
        public int prio() {
            return 35;
        }

        @Override
        public boolean rightAssoc() {
            return true;
        }

        @Override
        public String name() {
            return "BETWEEN";
        }

        @Override
        public String descr() {
            return "TRUE if left argument is inclusively between min and max, casted to numerics, FALSE otherwise." +
                    " For NOT variant, TRUE if it is outside the range, excluding boundaries";
        }

        @Override
        protected Boolean op0(Deque<Object> args) {
            throw new RuntimeException("Direct operator BETWEEN call");
        }
    }
}

Here we have two things that differ from the template.

Firstly, for greater human readability, the list of operators found in the classpath is sorted not by symbol, but by priority – from highest to lowest. In this form it is easier to perceive both in the REPL and in the dock.

Secondly, in SQL there are three very, hmmm… peculiar operator IN, IS And BETWEEN, which turned out to be still easier to hardcode in the interpreter itself than to try to do it normally. They have options with NOTand in general, they are strange:

is_op
 : S_IS S_NOT? S_NULL
 ;

between_op
 : S_NOT? S_BETWEEN L_NUMERIC S_AND L_NUMERIC
 ;

in_op
 : S_NOT? S_IN array
 | S_NOT? S_IN var_name
 | S_NOT? S_IN attr
 ;

IN IS the second operand is always NULLFor IN you can pass an array constructor, and in BETWEEN only literals can be specified Numeric-s. In short, as the classic sang, “spiritual people are special people, and if you are not an adulterer, it’s better not to touch spiritual people… dey-dey-dey…” And I am inclined to agree with him that these operators are more constructs of language than real comrades . So we’ll provide some kind of stubs for them, making them invisible to the autodocumenter at the same time, and we’ll list them as a separate item to the language guidewhere we describe all this deviant behavior.

As for the rest, there’s not much to tell.

But at the same time, the resulting change still draws on full major release, because expanding the syntax of the SQL dialect is not some kind of bullshit. This is pretty serious.

I hope that for some readers the topic “how to write and extend interpreters” seems as interesting as it does for me. Or maybe someone needs an ETL engine with SQL syntax. Write your comments.

Promka: https://dcetl.ru
Sources: https://github.com/PastorGL/datacooker-etl
Official group in the cart: https://t.me/data_cooker_etl

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *