The art of ETL. Writing our own SQL engine in Spark [часть 4 из 5]

public abstract class Operation implements Configurable<OperationMeta>” data-src=””/></p><p></p><div class='code-block code-block-1' style='margin: 8px 0; clear: both;'> <script type=

In this series of articles, I talk in detail about how to write your own object-oriented SQL interpreter in Java using the Spark RDD API, tailored to the task of preparing and transforming data sets.

Brief summary of the previous series devoted to the implementation of language specs in code:
A note on using prior art
Data sets in execution context
Variables, execution context settings, and metadata of parameters of connected functions
Interpreter, execution context, expression operators

Having understood all the contexts and the structure of the interpreter kernel, you can move on to the description API of extension points, launch modes, and technical binding of the assembly executable artifacts.

M for Mature rating warning

The difficulty level of this series of articles is high. Basic concepts are not explained at all throughout the text, and not all of the advanced ones are. Therefore, if you are not a developer already familiar with bigdata terminology and data engineering jargon, these articles will be difficult to read, and even worse to understand. I warned you.

We have three and a half types of connected functions:

We have already talked about function parameter metadata. In fact, they are a common part of more specific metadata used by the engine to register functions in the execution context at the start, as well as an automatic documentation generation mechanism for generating this same automatic documentation at the project build stage.

I don’t know about you, but I personally am quite annoyed by the manner of the maintainers of most open source projects to limit themselves to assembling “documentation” from Javadoc, in which a certain property getter will be indicated "Getter for <property>", and so on 600 times for 600 getters in 100 classes, and no specifics for you, what kind of property it is, why is it of this type, and what can be installed there in general. Not to mention the use cases, which are slightly less than any from real life. For some reason I like it when documentation is written for people and looks like a coherent text. So you should think about such things from the very beginning, and make sure that the metadata is useful not only to the engine, but to the end user. But we will talk about this later.

We can have functions anywhere in the classspace, so first the engine must somehow know where to look for them. An annotation is used for this purpose. @RegisteredPackage:

package io.github.pastorgl.datacooker;

public @interface RegisteredPackage {
    String value() default "";

In addition to setting a label on a package, we give it a human-readable description:

@RegisteredPackage("Basic Transforms of DataStreams from one record type to another")
package io.github.pastorgl.datacooker.commons.transform;

import io.github.pastorgl.datacooker.RegisteredPackage;

With the help of a wonderful library classgraph At the start, we will go through all the classpass packages and put all the packages annotated with our marker into the cache, thereby registering them in our execution context.

After the package is registered, you can look for implementations of the corresponding classes of plug-in functions in it. They all implement the class Configurable:

public interface Configurable<M extends ConfigurableMeta> extends Serializable {
    M meta();

And the metadata, accordingly, expand ConfigurableMeta:

public abstract class ConfigurableMeta {
    public final String verb;
    public final String descr;

    public final Map<String, DefinitionMeta> definitions;

    protected ConfigurableMeta(String verb, String descr, Map<String, DefinitionMeta> definitions);

There is verb — the name of the function that will be used in language operators, its text description, and the map of function parameters, which we have already discussed earlier.

Now we can go through the function subclasses. For adapters we have:

public abstract class AdapterMeta extends ConfigurableMeta {
    public final StreamType[] type;
    public final String[] paths;

    public AdapterMeta(String verb, String descr, String[] paths, StreamType[] type, Map<String, DefinitionMeta> meta);

This is where the metadata adds the type(s) of datasets the adapter can work with and examples of the paths it can work with. The adapters themselves are inherited from StorageAdapter and its specialized half (it’s a pity that the toad has no traits):

public abstract class StorageAdapter<C extends AdapterMeta> implements Configurable<C> {
    public final C meta;

    protected JavaSparkContext context;
    protected String path;

    protected Configuration resolver;

    public StorageAdapter() {
        this.meta = meta();

    public void initialize(JavaSparkContext ctx, Configuration config, String path) throws InvalidConfigurationException {
        context = ctx;
        resolver = config;
        this.path = path;


    abstract protected void configure() throws InvalidConfigurationException;

public abstract class InputAdapter extends StorageAdapter<InputAdapterMeta> {
    public abstract Map<String, DataStream> load(int partCount, Partitioning partitioning);

public abstract class OutputAdapter extends StorageAdapter<OutputAdapterMeta> {
    public abstract void save(String sub, DataStream rdd);

This is worth commenting on. Well, we always initialize with a Spark context, an instance of parameters, and a path. And then, depending on the type of adapter, we call either the method load() with the required number of desks and type of partitioning (PARTITION <N> BY <Algorithm>), or save() with substring (for COPY with star syntax) and the corresponding data set.

I won’t give an example of the implementation of the adapter class, since the standard ones are too complicated for a tutorial, and the minimal one will be non-functional and won’t really illustrate anything. If you’re still interested, I recommend watching it to JDBC adapter classes from Data Cooker Dist, a separate utility accompanying Data Cooker ETL, they are relatively simple, especially the input one (due to the primitiveness of Spark’s JdbcRDD).

For transformations A little more specific metadata is required:

public class TransformMeta extends ConfigurableMeta {
    public final StreamType from;
    public final StreamType to;

    public final TransformedStreamMeta transformed;
    private final Boolean keyAfter;

    public boolean keyAfter() {
        return (keyAfter != null) ? keyAfter : (from == StreamType.PlainText);

Here we need to know from what type to which we are converting the records of the data set, what the converted set will be, and when we need to recalculate the keys for the records. But for operations (which, as we remember, are an extended analogue SELECT FROM … INTO …) you need to know nothing at all:

public class OperationMeta extends ConfigurableMeta {
    public final DataStreamsMeta input;
    public final DataStreamsMeta output;

These types of functions are united into one group of metadata by the presence of metadata of data sets – transformed, or at the input / output. They are described as follows family of classes:

public class DataStreamsMeta {

public class TransformedStreamMeta extends DataStreamsMeta {
    public final DataStreamMeta streams;

public class NamedStreamsMeta extends DataStreamsMeta {
    public final Map<String, DataStreamMeta> streams;

public class PositionalStreamsMeta extends DataStreamsMeta {
    public final int count;

    public final DataStreamMeta streams;

public class DataStreamMeta {
    public final String descr;

    public final StreamType[] type;

    public final Origin origin;

    public final List<String> ancestors;

    public final Map<String, String> generated;

    public final boolean optional;

That is, about any one set of data related to a function, we always know it:

  • description,
  • list of allowed types,
  • origin (filtered, generated, augmented),
  • list of ancestors (in case not everyone participated),
  • a set of names of newly appearing attributes for generated and augmented sets,
  • sign of optionality.

Well, depending on what kind of set it is – named, positional, or simply transformed – internal names, a limit on the number, or nothing more can be added to the wrapper classes.

We’ve decided on the metadata. Now you can do it yourself base function classes look:

public abstract class Transform implements Configurable<TransformMeta> {
    private final TransformMeta meta;

    public Transform() {
        meta = meta();

    public abstract StreamConverter converter();

public interface StreamConverter {
    DataStream apply(DataStream ds, Map<String, List<String>> newColumns, Configuration params);

Well, there’s nothing complicated at all. One lambda, to which a set of data is supplied as input, extracted from SET COLUMNS, yes parameters. Minimal implementation looks like this:

public class PassthruTransform extends Transform {
    public TransformMeta meta() {
        return new TransformMeta("passthru", StreamType.Passthru, StreamType.Passthru,
                "Doesn't change a DataStream in any way",


    public StreamConverter converter() {
        return (ds, newColumns, params) -> new DataStream(ds.streamType, ds.rdd, ds.accessor.attributes());

A very necessary and important transformation that does nothing to the set itself. And it is needed to switch and/or repartition a set of data somewhere in the middle of the ETL process:

TRANSFORM signals passthru() PARTITION $parts * 10;

She accepts sets like Passthruthis is such a special type of data set that tells the engine that the transformation is not transformative, and eats anything, unlike other transforms that convert sets strictly from one real type to another (in different ways, so their total number even in the standard set is calculated tens).

Now let’s look at operations inherited from

public abstract class Operation implements Configurable<OperationMeta> {
    public final OperationMeta meta;

    protected ListOrderedMap<String, DataStream> inputStreams;
    protected Configuration params;
    protected ListOrderedMap<String, String> outputStreams;

    public Operation() {
        this.meta = meta();

    public void initialize(ListOrderedMap<String, DataStream> input, Configuration params, ListOrderedMap<String, String> output) throws InvalidConfigurationException {
        this.inputStreams = input;
        this.params = params;
        this.outputStreams = output;


    abstract protected void configure() throws InvalidConfigurationException;

    abstract public Map<String, DataStream> execute() throws InvalidConfigurationException;

At the input is a map of input data sets, an instance with parameters, and a map with names for output sets. And at the output, the implementation produces a map with sets associated with names. The minimum is next operation:

public class CountByKeyOperation extends Operation {
    static final String GEN_COUNT = "_count";

    public OperationMeta meta() {
        return new OperationMeta("countByKey", "Count values under the same key in all given DataStreams",

                new PositionalStreamsMetaBuilder()
                        .input("Source KeyValue DataStream",


                new PositionalStreamsMetaBuilder()
                        .output("KeyValue DataStream with unique source keys",
                                new StreamType[]{StreamType.Columnar}, Origin.GENERATED, null
                        .generated(GEN_COUNT, "Count of values under each key in the source DataStream")

    protected void configure() throws InvalidConfigurationException {

    public Map<String, DataStream> execute() {
        if (inputStreams.size() != outputStreams.size()) {
            throw new InvalidConfigurationException("Operation '" + meta.verb + "' requires same amount of INPUT and OUTPUT streams");

        final List<String> indices = Collections.singletonList(GEN_COUNT);

        Map<String, DataStream> output = new HashMap<>();
        for (int i = 0, len = inputStreams.size(); i < len; i++) {
            DataStream input = inputStreams.getValue(i);
            JavaPairRDD<Object, Record<?>> count = input.rdd
                    .mapToPair(t -> new Tuple2<>(t._1, 1L))
                    .mapToPair(t -> new Tuple2<>(t._1, new Columnar(indices, new Object[]{t._2})));

            output.put(outputStreams.get(i), new DataStream(StreamType.Columnar, count, Collections.singletonMap(OBJLVL_VALUE, indices)));

        return output;

This is also a very important and necessary thing, because calling it like

CALL countByKey() INPUT data OUTPUT keys_counted;

we get an analogue of SQL SELECT _key, COUNT(*) GROUP BY _key Where _key — recording key. (In our language itself, after all, GROUP BY there is no COUNT(*), because usually in ETL processes there is not much need to do such things. And then they took it, and they got it. They also showed how this is implemented at the Spark RDD API level.)

Other operations can implement whatever analysts ask for. Including deeply analytical algorithms that go far beyond ETL…

However, now it was only important for us to find all the heirs of the corresponding base classes of plug-in functions and add their metadata to the cache. From this cache, the interpreter will take classes by the name of their verb, instantiate them, initialize them with parameters, and transfer control.

So with the extensibility points, everything is in perfect order.

You can take off.

The need to integrate an ETL tool into several different environments requires it to have different launch modes, including remote and debug-interactive. For example, we got this table (with command line switches for supported launch modes – I quote from documentation):

There are many modes, but “batch on a Spark cluster” will be the main one in any case. In production, processes usually work from the depths of some CI/CD service – it doesn’t matter whether the task is launched according to a schedule, by pressing a button on a dashboard, or through a call to some remote API via REST – but automation in any case requires batching. When a task arrives, we start, work, and then wait until the next task, and so on in a circle.

In our particular case, all roads lead to TeamCity, which runs CloudFormation, which deploys the EMR cluster, which raises Livy on the cluster, through which the JAR with the Data Cooker Dist application is submitted to Spark, which is passed launch parameters via the command line, which accesses the storage S3 to copy the data to HDFS, after which the Data Cooker ETL itself will be submitted with the path to the script and the file with top-level context variables, and then the heat begins. And when they did the calculations, they took the result and extinguished the cluster so that they wouldn’t waste their money in vain.

And so on as many times as processes need to be performed. Sometimes there are thousands of processes a day.

However, when introducing new processes, it quickly becomes clear that debugging them on samples directly on a cluster somewhere in the cloud on the other side of the globe is oh, how inconvenient. So it would be nice to be able to run on physical hardware nearby, and ideally on a local machine, without a Spark cluster.

This is by no means impossible.

The Spark context runs fine in local mode as a regular JVM application. True, if you pack it into FatJAR with all the dependencies, such an assembly will weigh about 200 megabytes.

This in itself is not a problem. The problem is if FatJAR needs to be deployed many times a day to the cloud (on the other side of the ball). Well, in this case, you just need to learn how to assemble an application with external dependencies for the target cloud environment, that is, depend on the JARs of those versions that are deployed there, in the cloud.

The downside here will be just the versioning: Spark cloud clusters, as a rule, lag significantly behind the current ones, and even more so from bleeding edge. Officially, Java 8 is still running in the Amazon cloud, not even 11 (although it is there, and you can still use it unofficially). But somehow we will survive, we will overcome.

We use Apache Maven as our build system; it supports build profiles.


Therefore, first, let’s look at what is included in the EMR (let’s say the current version is now 6.9), and write it in pom.xml depending on the lib versions that are available to us.

<!-- EMR 6.9 -->

<!-- end EMR deps -->

Next, we will find those libs that do not suit us in the cloud version, and use maven-shade-plugin to rewrite the package names to the private version. Given the minimality of our dependencies, there should be few such libraries. Ideally there shouldn’t be any. I’ll tell you a secret that in the latest versions we have reached this goal, so there’s nothing to show.

In the last step, we use different profiles to include or exclude libraries in the resulting FatJAR, and thereby achieve our goal. On CI/CD we will collect a stripped-down profile artifact with external dependencies:


After excluding Spark from FatJAR, the artifact size is 7 megabytes, which is already normal. And here it is in the profile local we section <artifactSet> we simply don’t indicate, which is why we get a complete artifact in which the entire classpath is included.

Actually, all that remains is to start. According to the sign from the beginning of this section, to run in batch mode we will need to use the following set of keys:

java -jar ./datacooker-etl-cli.jar -l -s ./scripts/script.tdl

Class Mainmentioned in the maven config above, is a dispatcher that does the following things:

  • parses command line switches, creating a mode handler configuration instance,
  • finds out which of the many launch modes the user has in mind,
  • if necessary, raises the local Spark context and configures it,
  • creates any of the 4 possible mode handler classes, initializing them with the Spark configuration and context (if requested),
  • and transfers control to them in the correct order.

The mode handlers are:

  • Runner — analysis of syntax, and if necessary, interpretation of the script file in a local or cluster environment,
  • Local — REPL server and client in one bottle, for local interactive debugging,
  • Server — a Jersey-based REPL server that listens to HTTP requests and executes commands sent to it; can be launched on a cluster too,
  • Client – accordingly, jax-rs is a client for the server that does not require a Spark context at all.

And we will achieve all the launch modes we need – for any conceivable case of operating our tool – by combining parameters and dispatching control between these handlers.

Moreover, to run junit tests during assembly, you can easily inherit from the handler Runnerstarting TestRunnerin which Spark is set to run locally, and the paths to the scripts are intercepted in TestDataContext, and are redirected to assembly resources. This way, the test run will truly interpret the actual scripts in the source tree, each in its own isolated context. For example,

public class CountByKeyOperationTest {
    public void mapCountTest() {
        try (TestRunner underTest = new TestRunner("/test.countByKey.tdl")) {
            Map<String, JavaPairRDD<Object, Record<?>>> ret = underTest.go();

            JavaPairRDD<Object, Record<?>> left = ret.get("left");

            Map<Object, Record<?>> result = ret.get("counted").collectAsMap();


            for (Record<?> l : result.values()) {
                assertEquals(2L, l.asLong("_count").longValue());

…which runs the following script from the resources:

CREATE "left" hadoopText() FROM 'data/bar.csv' PARTITION 1;

TRANSFORM "left" textToColumnar(@delimiter=",")
    VALUE (foo,"2",_,_,_,_,_,_,_,_,_,_,_,_,_,_,_,_,_,_,"21") KEY foo;

CALL countByKey() INPUT "left" OUTPUT counted;

COPY counted hadoopText() INTO 'counted';

These, of course, are not unit tests in their true sense; on the contrary, they are quite integration tests. But the coverage is more than remarkable – after all, each test actually launches the entire engine in batch mode.

The script, by the way, may well serve as an example of using the corresponding function, poet… Stop. So so so.

In fact, the idea of ​​using real scripts from tests as an addition to documentation did not come to my mind right away, but as soon as it came, I immediately transferred the call to a small additional utility that collects documents from metadata to the test scope, and thus got my “Dream Auto Documenter”

However, today’s article is too long even for a Friday one, so we’ll continue after the weekend. Don’t switch!

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *