Creating a data lineage in Apache Atlas from Spark logical plans (with some hacks)

Note:

The current implementation is a crude prototype aimed solely at demonstrating the ability to display a logical plan from Apache Spark V Apache Atlas. The Lfyysq prototype is essentially a “prototype of a prototype” and serves only as an initial starting point for deeper analysis and development.

In this paper the author does not aim to present a final or optimal solution. The main focus is to demonstrate the principle and outline the necessary methods for integrating logical plans with metadata in Apache Atlas.

The author does not encourage using this approach in a production environment in its current form. A full solution to the problem requires further development, including the creation of specialized libraries, improvement of the architecture. And all other other …

Purpose of Work:

The goal of this work is to create a prototype that demonstrates the feasibility of integrating Apache Spark logical plans with metadata in Apache Atlas just as it happens in this article with Apache NIFI .

Test task for illustration and parsing the plan into AST:

Let's define a small file cars.csv with the following content:

model,manufacturer
Model S,Tesla
Model 3,Tesla
Mustang,Ford
Civic,Honda

And we will write down its logical plan:

  val spark = SparkSession.builder()
    .appName("Logical Plan Example")
    .master("local")
    .getOrCreate()

  import spark.implicits._

  val carsCSV = spark
    .read
    .option("header", "true")
    .csv("src/main/resources/cars.csv")

  val carsSeq = List(
    ("i8", "BMW"),
    ("A4", "Audi"),
    ("911", "Porsche"),
    ("Corolla", "Toyota")
  ).toDF("model", "manufacturer")

  val unioncars = carsCSV.union(carsSeq)

  val resDF = unioncars
    .where(col("manufacturer") =!= "Audi")
    .select("model", "manufacturer")
    .withColumn("processedDDTM", lit(LocalDateTime.now()))

  val logicalPlan = resDF.queryExecution.logical

  println(logicalPlan)
/* вывод
    Project [model#17, manufacturer#18, 2024-09-12 13:00:46.880141 AS processedDDTM#36]
      +- Project [model#17, manufacturer#18]
         +- Filter NOT (manufacturer#18 = Audi)
            +- Union false, false
               :- Relation [model#17,manufacturer#18] csv
               +- Project [_1#23 AS model#28, _2#24 AS manufacturer#29]
                  +- LocalRelation [_1#23, _2#24]
   */
}

The logical plan is a tree, and for further work it must be transformed into a convenient form (AST).

For this we will define a class ASTwhich will reflect the structure of the plan in a format convenient for subsequent processing.

// Определение корневого класса или типа для всех узлов дерева
sealed trait Node {
  // Метод для получения имени узла на основе его типа
  def getName: String = this.getClass.toString
}

// Узел типа "Проект", содержащий последовательность столбцов
case class ProjectNode(columns: Seq[String]) extends Node {
  // Переопределение метода getName для возврата конкретного имени узла
  override def getName: String = "Project"
}

// Узел типа "Фильтр", содержащий условие фильтрации
case class FilterNode(condition: String) extends Node {
  // Переопределение метода getName для возврата конкретного имени узла
  override def getName: String = "Filter"
}

// Узел типа "Объединение", указывающий, следует ли объединять все записи и по какому признаку
case class UnionNode(isAll: Boolean, byName: Boolean) extends Node {
  // Переопределение метода getName для возврата конкретного имени узла
  override def getName: String = "Union"
}

// Узел типа "Логическое отношение", содержащий последовательность столбцов
case class LogicalRelationNode(columns: Seq[String]) extends Node {
  // Переопределение метода getName для возврата конкретного имени узла
  override def getName: String = "LogicalRelation"
}

case class LocalRelationNode(columns: Seq[String]) extends Node {
  override def getName: String = "LocalRelation"
}

// Узел типа "Локальное отношение", содержащий последовательность столбцов
case class LocalRelationNode(columns: Seq[String]) extends Node {
  // Переопределение метода getName для возврата конкретного имени узла
  override def getName: String = "LocalRelation"
}

// Класс для представления абстрактного синтаксического дерева (AST), где каждый узел имеет тип Node,
// список дочерних узлов, номер уровня и выражение уровня (необходим для индонтефикации нод на одном уровне)
case class AST(node: Node,
               children: Seq[AST],
               level_num: Int,
               levelExpr: String)

And we will write a parser from the logical plan in AST

// Объект для парсинга логических планов в AST
object ParserAST {

  // Функция для преобразования логического плана в AST
  // Возвращает Option[AST], где None означает, что план не может быть преобразован
  private def parseAST(plan: LogicalPlan): Option[AST] = {

    // Рекурсивная функция для обхода логического плана и создания узлов AST
    // Параметры:
    // - logicalPlan: текущий логический план для обработки
    // - levelnum: уровень в дереве AST
    // - levelExpr: строковое представление уровня и индекса
    // Возвращает Option[AST], где None означает, что логический план не может быть преобразован
    def loop(logicalPlan: LogicalPlan, levelnum: Int, levelExpr: String): Option[AST] = {

      // Определение узла на основе типа логического плана
      val node: Option[Node] = logicalPlan match {
        case p: Project =>
          // Обработка узла типа Project и создание узла AST с именем "Project"
          val columns = p.projectList.map(_.sql)
          Some(ProjectNode(columns))
          
        case f: Filter =>
          // Обработка узла типа Filter и создание узла AST с именем "Filter"
          val condition = f.condition.sql
          Some(FilterNode(condition))
          
        case u: Union =>
          // Обработка узла типа Union и создание узла AST с именем "Union"
          val isAll = u.allowMissingCol
          val byName = u.byName
          Some(UnionNode(isAll, byName))
          
        case lr: LocalRelation =>
          // Обработка узла типа LocalRelation и создание узла AST с именем "LocalRelation"
          val columns = lr.output.map(_.sql)
          Some(LocalRelationNode(columns))
          
        case lr: LogicalRelation =>
          // Обработка узла типа LogicalRelation и создание узла AST с именем "LogicalRelation"
          val columns = lr.output.map(_.sql)
          Some(LogicalRelationNode(columns))
          
        case _ =>
          // Если логический план не совпадает ни с одним из известных типов, возвращаем None
          None
      }

      // Если узел успешно создан, создаем AST и рекурсивно обрабатываем детей
      node.map { n =>
        // Создание списка дочерних узлов AST, рекурсивно обрабатывая каждый дочерний план
        val children = logicalPlan.children.zipWithIndex.flatMap {
          case (ch, i) => loop(ch, levelnum + 1, f"${levelnum + 1}_${i}")
        }.toList
        // Создание узла AST с текущим узлом и его дочерними узлами
        AST(n, children, levelnum, levelExpr)
      }
    }

    // Запуск рекурсивного обхода с начальным уровнем и строковым представлением
    loop(plan, 1, "1_0")
  }

  // Неявное преобразование для класса LogicalPlan, добавляющее метод для получения AST
  implicit class parser(lp: LogicalPlan) {
    def AST(): Option[AST] = {
      parseAST(lp)
    }
  }
}

now you can receive AST as follows logicalPlan.AST().get

Let's define entities in the Atlas to build Lianage:

Apche Atlas sequence table

Apche Atlas sequence table

Just like in Java-based programming languages, all classes inherit from Objectin Apache Atlas all entities inherit from ReferenceableHowever, the construction of lineage (linearity of data) occurs only for types Process And DataSet. If the type does not inherit from one of these classes (for example, if the inheritance is from Asset), then the “Lineage” button simply will not appear.

In addition, the lineage itself is built on the basis of fields inputs And outputs For Processsimilarly for DataSet. There's nothing you can do about it – you'll have to inherit from these types, although most of the fields will remain empty.

My original goal was to reflect the transformations taking place in Apache Sparkbut the structure Apache Atlas forces me to surround mine Process entities DataSet in the fields inputs And outputs. Although I was initially only interested in Processthese DataSet-s can be used to display the schemas of the data the process starts with and returns. However, at this stage I do not plan to parse the schemas and will leave each DataSet empty.

In Apache Atlas, custom entities can be described using the JSON format. It is important to follow the correct sequence of type definitions, otherwise a 404 error will occur when attempting to reference a type that does not yet exist in the system.

First, let's define the type for DataSet.

  {
    "enumDefs": [],
    "structDefs": [],
    "classificationDefs": [],
    "entityDefs": [
      {
        "name": "pico_spark_data_type", 
        "description": "A type inheriting from assets for Pico DataSet", 
        "superTypes": ["DataSet"],
        "attributeDefs": [],
        "relationshipDefs": [] 
      }
    ],
    "relationshipDefs": [],
    "businessMetadataDefs": []
  }

Comments:

  1. enumDefs, structDefs, classificationDefs:

    • Empty arrays because enumerations, structures and classifications are not used.

  2. entityDefs:

    • Defines entities in the system.

    • name: The name of the entity that represents the data type.

    • description: Description of the entity.

    • superTypes: Superclasses from which this entity inherits.

    • attributeDefs: Empty array because no attributes are specified.

    • relationshipDefs: Empty array because no relationships are defined.

  3. relationshipDefs, businessMetadataDefs:

{
    "enumDefs": [],
    "structDefs": [],
    "classificationDefs": [],
    "entityDefs": [
      {
        "name": "pico_spark_process_type",
        "description": "A type inheriting from assets for Pico Spark abstraction",
        "superTypes": ["Process"],
        "attributeDefs": [
          {
            "name": "inputs",
            "description": "List of inputs for the process",
            "typeName": "array<pico_spark_data_type>",
            "isOptional": true
          },
          {
            "name": "outputs",
            "description": "List of outputs for the process",
            "typeName": "array<pico_spark_data_type>",
            "isOptional": true
          }
        ],
        "relationshipDefs": []
      }
    ],
    "relationshipDefs": [],
    "businessMetadataDefs": []
  }

Comments:

  1. enumDefs, structDefs, classificationDefs:

    • Empty arrays because enumerations, structures, and classifications are not used in this definition.

  2. entityDefs:

    • Contains definitions of entities.

    • name: The name of the entity that defines the data type in the Pico Spark context.

    • description: Description of the entity.

    • superTypes: Superclasses from which the entity inherits.

    • attributeDefs: Empty array because no attributes were added.

    • relationshipDefs: Empty array because no relationships are specified.

  3. relationshipDefs, businessMetadataDefs:

For type pico_spark_process_type I also create descendants for all node types (Filter, Project, Union, etc.) in AST. However, I will omit it here, as it would take up too much space and be too monotonous.

These JSONs contain a lot of empty entities, but they are necessary because Apache Atlas does not create types without them.

Interaction with Apache Atlas via REST:

Simply describing entities is not enough – they need to be transferred to Apache AtlasAtlas has an extensive REST API to interact with the system. Specifically, the process of creating a new type looks like this:

curl -X POST "http://<atlas-server-url>/api/atlas/v2/types/typedefs" \
     -H "Content-Type: application/json" \
     -H "Accept: application/json" \
     -d '{
           "enumDefs": [],
           "structDefs": [],
           "classificationDefs": [],
           "entityDefs": [
             {
               "name": "pico_spark_data_type",
               "description": "A type inheriting from assets for Pico DataSet",
               "superTypes": ["DataSet"],
               "attributeDefs": [],
               "relationshipDefs": []
             }
           ],
           "relationshipDefs": [],
           "businessMetadataDefs": []
         }'

I create JSON file where the request bodies for all required custom types will be listed under the name EntityTypes.json
and I will create a method that reads this file and makes a request for each EntityType

  val atlasServerUrl = "http://localhost:21000/api/atlas/v2"
  val authHeader: String = "Basic " + java.util.Base64.getEncoder.encodeToString("admin:admin".getBytes)

def generatePicoSparkTypes(): Unit = {

  // Функция для чтения содержимого файла из ресурсов
  def readFileFromResources(fileName: String): String = {
    val source = Source.fromResource(fileName)
    try source.mkString
    finally source.close()
  }

  // Чтение JSON из файла ресурсов
  val jsonString = readFileFromResources("EntityTypes.json")

  // Попытка разобрать строку JSON в структуру данных
  val parsedJson: Either[ParsingFailure, Json] = parse(jsonString)

  // Преобразование разобранного JSON в список объектов JSON
  val jsonObjects: Option[List[Json]] = parsedJson match {
    case Right(json) =>
      json.as[List[Json]] match {
        case Right(jsonArray) =>
          Some(jsonArray)
        case Left(error) =>
          // Обработка ошибки разбора массива JSON
          println(s"Error parsing JSON array: $error")
          None
      }
    case Left(error) =>
      // Обработка ошибки разбора JSON
      println(s"Error parsing JSON: $error")
      None
  }

  // Отправка каждого объекта JSON на сервер Atlas
  jsonObjects match {
    case Some(jsonArray) =>
      jsonArray.foreach { jsonBody =>
        // Создание POST-запроса для создания типа в Apache Atlas
        val createTypeRequest = basicRequest
          .method(Method.POST, uri"$atlasServerUrl/types/typedefs") // Метод POST и URL для запроса
          .header("Authorization", authHeader) // Заголовок авторизации
          .header("Content-Type", "application/json") // Заголовок типа содержимого
          .header("Accept", "application/json") // Заголовок для принятия ответа в формате JSON
          .body(jsonBody.noSpaces) // Тело запроса с JSON-данными
          .response(asString) // Ожидание ответа в формате строки

        // Отправка запроса и вывод результата
        val response = createTypeRequest.send(backend)
        println(response.body) // Печать тела ответа
        println(response.code) // Печать кода ответа
      }
    case None =>
      // Сообщение, если JSON-объекты не были найдены
      println("No JSON objects found.")
  }

}

comments:

  1. readFileFromResources: Function to read the contents of a JSON file from resources.

  2. jsonString: Getting a JSON string from a file.

  3. parsedJson: Trying to parse a JSON string into a data structure Json.

  4. jsonObjects: Convert parsed JSON to a list of JSON objects.

  5. jsonArray.foreach: For each JSON object, a POST request is created and sent to the Atlas server.

  6. createTypeRequest: Creating a POST request with JSON data to create types in Apache Atlas.

  7. response: Send a request and output the result, including the response body and response code.

now to create all entities in Apache Atlas it is enough to call the method
generatePicoSparkTypes()

Because DataSet entities have already been created, you can start creating them right away Process entities with filled fields inputs And outputs. This is important because when we tried to update entities via the API, nothing worked. Let's start by defining a set of methods:

EntityTypes in Apache Atlas

EntityTypes in Apache Atlas

as we all see EntityType created

Create a DataSet Entity:

Before creating the process entities, you need to create the DataSet entities, since the former refer to the latter.

at this point it has already been determined pico_spark_data_type which is responsible for the input/output data patterns
First, let's define two auxiliary methods

/**
 * Создает функцию для отправки JSON данных на указанный эндпоинт в Apache Atlas.
 *
 * @param postfix Строка, добавляемая к базовому URL для формирования полного URL эндпоинта.
 * @return Функция, принимающая JSON строку и отправляющая ее на сервер через HTTP POST запрос.
 */
def senderJsonToAtlasEndpoint(postfix: String): String => Unit = {

  jsonBody => {
    // Создание HTTP POST запроса для отправки JSON данных на сервер
    val createTypeRequest = basicRequest
      .method(Method.POST, uri"$atlasServerUrl/${postfix}")
      .header("Authorization", authHeader)
      .header("Content-Type", "application/json")
      .header("Accept", "application/json")
      .body(jsonBody)
      .response(asString)

    // Отправка запроса и получение ответа
    val response = createTypeRequest.send(backend)
    
    // Вывод тела ответа и кода статуса
    println(response.body)
    println(response.code)
  }
}

/**
 * Генерирует и отправляет сущности данных Spark в Apache Atlas для указанного домена.
 *
 * @param domain Домен, который будет использоваться в атрибутах сущностей.
 * @param execJsonAtlas Функция для отправки JSON данных в Apache Atlas.
 * @return Функция, принимающая AST и создающая JSON для каждой дочерней сущности.
 */
def generateSparkDataEntities(domain: String, execJsonAtlas: String => Unit): AST => Unit = {

  // Локальная функция для генерации и отправки сущностей данных Spark
  def generateEntities(ast: AST): Unit = {
    ast.children.foreach { inast =>
      // Формирование JSON тела для сущности данных Spark
      val jsonBody =
        f"""
           |{
           |  "entity": {
           |    "typeName": "pico_spark_data_type",
           |    "attributes": {
           |      "domain": "${domain}",
           |      "qualifiedName": "pico_spark_data_${ast.levelExpr}-${inast.levelExpr}@${domain}",
           |      "name": "pico_spark_data_${ast.levelExpr}-${inast.levelExpr}@${domain}",
           |      "description": "A description for the spark_data"
           |    }
           |  }
           |}
           |""".stripMargin

      // Отправка сформированного JSON тела на сервер
      execJsonAtlas(jsonBody)
      
      // Рекурсивный вызов для обработки дочерних узлов
      generateEntities(inast)
    }
  }

  // Возвращаем функцию для генерации сущностей
  generateEntities
}

Explanations:

  • senderJsonToAtlasEndpoint: This function creates and returns another function that sends JSON data to the specified endpoint in Apache Atlas. The comments explain the parameters, creating the request, sending, and handling the response.

  • generateSparkDataEntities: This function generates Spark data entities, forms the corresponding JSON, and sends it to Apache Atlas using the passed send function. Comments describe the parameters and internal logic of the function, including the recursive call to process all child nodes.

Write 2 more methods to start the formation of Linage in Atlas

/**
 * Преобразует AST (абстрактное синтаксическое дерево) в сущности Apache Atlas и отправляет их на сервер.
 *
 * @param ast Абстрактное синтаксическое дерево, представляющее структуру данных.
 * @param domain Домен, используемый для формирования квалифицированных имен и атрибутов сущностей.
 * @param topLevelExpr Выражение уровня, используемое для определения уровня в AST. В данном случае не используется.
 */
def ASTToAtlasEntity(ast: AST, domain: String, topLevelExpr: String): Unit = {

  // Создание функции для отправки JSON данных на эндпоинт "entity" в Apache Atlas
  val entitySender = senderJsonToAtlasEndpoint("entity")
  
  // Создание функции для генерации сущностей данных Spark и отправки их в Apache Atlas
  val sparkDataEntityGenerator = generateSparkDataEntities(domain, entitySender)

  // Создание базовых сущностей вывода и отправка их на сервер
  //ее реализацию опущу
  createBaseOutput(domain, entitySender)
  
  // Создание базовых сущностей ввода и отправка их на сервер
  //ее реализацию опущу
  createBaseInput(domain, entitySender)
  
  // Генерация и отправка сущностей данных Spark на основе AST
  sparkDataEntityGenerator(ast)
}

/**
 * Имплементация расширения для преобразования AST в сущности Apache Atlas.
 *
 * @param domain Домен, используемый для формирования квалифицированных имен и атрибутов сущностей.
 */
implicit class converter(ast: AST) {

  /**
   * Преобразует текущее AST в сущности Apache Atlas и отправляет их на сервер.
   *
   * @param domain Домен, используемый для формирования квалифицированных имен и атрибутов сущностей.
   */
  def EntityToAtlas(domain: String): Unit = {
    ASTToAtlasEntity(ast, domain, "")
  }
}

Explanations:

  • ASTToAtlasEntity: This method converts the passed AST into Apache Atlas entities and sends them to the server. It uses helper functions to create base entities and generate Spark data entities and sends them to the server via the created function entitySender.

  • EntityToAtlas: This is an extension method (implicit class) for a type ASTwhich simplifies the method call ASTToAtlasEntity with default value for topLevelExpr. This method provides a convenient way to transform an AST into Apache Atlas entities using the specified domain.

Now when you start up ast.EntityToAtlas("picoDomain")A data entity appears in the atlas

screenshot from web UI

screenshot from web UI

because DataSet Entity have already been created, you can create Process Entity immediately with the loaded ones inputs And outputsthis is important because no matter how much I poked around in the API to update Entuty nothing worked.

Let's start by defining a bunch of methods:

  // Создает функцию для отправки сущностей в Apache Atlas
  // Использует функцию преобразования AST в JSON и функцию отправки JSON
  def senderEntity(nodeToAtlasCreateEntityJson: (AST, String) => String, execJsonAtlas: String => Unit): (AST, String) => Unit = {
    // Возвращает функцию, которая преобразует AST в JSON и отправляет его в Atlas
    (ast: AST, topLevelExpr: String) => {
      val jsonBody = nodeToAtlasCreateEntityJson(ast, topLevelExpr)
      execJsonAtlas(jsonBody)
    }
  }

  // Генерирует JSON для сущностей в Atlas на основе AST и уровня
  // Определяет JSON для различных типов узлов, таких как ProjectNode, FilterNode и т.д.
  def generatotrProcessEntity(domain: String, qualifiedName: (Node, String) => String): (AST, String) => String = {
    (ast: AST, topLevelExpr: String) => {
      val node = ast.node

      // Создает список входных сущностей, если есть дочерние элементы
      val inputs = if (ast.children.nonEmpty) {
        ast.children.map(_.levelExpr).map { expr =>
          f"""
             |
             |{
             |  "typeName": "pico_spark_data_type",
             |  "uniqueAttributes": {
             |    "qualifiedName": "pico_spark_data_${ast.levelExpr}-${expr}@${domain}"
             |  }
             |}
             |
             |""".stripMargin
        }.mkString(", ")
      } else {
        f"""
           | {
           |  "typeName": "pico_spark_data_type",
           |   "uniqueAttributes": {
           |    "qualifiedName": "pico_spark_data_input@${domain}"
           |   }
           | }
           |""".stripMargin
      }

      // Создает JSON для выходных сущностей, если задан topLevelExpr
      val output = if (topLevelExpr.nonEmpty) {
        f"""
           | {
           |  "typeName": "pico_spark_data_type",
           |   "uniqueAttributes": {
           |      "qualifiedName": "pico_spark_data_${topLevelExpr}-${ast.levelExpr}@${domain}"
           |   }
           | }
           |""".stripMargin
      } else {
        f"""
           | {
           |  "typeName": "pico_spark_data_type",
           |   "uniqueAttributes": {
           |    "qualifiedName": "pico_spark_data_output@${domain}"
           |   }
           | }
           |""".stripMargin
      }

      // Определяет JSON для различных типов узлов, таких как ProjectNode, FilterNode и т.д.
      node match {
        case p: ProjectNode =>
          f"""
             |{
             |"entity": {
             |      "typeName": "pico_spark_project_type",
             |      "attributes": {
             |        "qualifiedName": "${qualifiedName(node, ast.levelExpr)}",
             |        "name": "pico_project_${ast.levelExpr}",
             |        "description": "This is an project for the pico_spark_project_type",
             |        "columns": [${p.columns.map(col => "\"" + col + "\"").mkString(", ")}],
             |        "inputs":[ ${inputs} ],
             |        "outputs":[ ${output} ]
             |      }
             |    }
             |}
             |""".stripMargin
        case ...

      }
    }
  }

  // Создает функцию для генерации и отправки сущностей в Apache Atlas
  // Использует предоставленные функции для создания JSON и отправки его в Atlas
  def generatorDataEntities(domain: String, execJsonAtlas: String => Unit): AST => Unit = {

    def sparkDataEntitys(ast: AST): Unit = {
      ast.children.foreach { inast =>
        val jsonBody =
          f"""
             |{
             |  "entity": {
             |    "typeName": "pico_spark_data_type",
             |    "attributes": {
             |      "domain": "${domain}",
             |      "qualifiedName": "pico_spark_data_${ast.levelExpr}-${inast.levelExpr}@${domain}",
             |      "name": "pico_spark_data_${ast.levelExpr}-${inast.levelExpr}@${domain}",
             |      "description": "A description for the spark_data"
             |    }
             |  }
             |}
             |""".stripMargin

        execJsonAtlas(jsonBody)
        sparkDataEntitys(inast)
      }
    }

    // Возвращает функцию, которая генерирует и отправляет сущности данных для Spark
    sparkDataEntitys
  }

Explanations:

  • senderEntity: A function that creates and sends JSON for entities in Apache Atlas using the provided transformation and sending functions.

  • generatotrProcessEntity: A function that generates JSON for different node types in the AST and converts them into a format suitable for Apache Atlas.

  • generatorDataEntities: A function that creates and sends entity data to Spark by recursively processing the children of nodes in the AST.

And we update the methods for working with AST

   // Преобразует AST в сущности Apache Atlas и отправляет их на указанный эндпоинт
  def ASTToAtlasEntity(ast: AST, domain: String): Unit = {

    // Создает функцию отправки JSON-данных для сущностей в Apache Atlas
    val entitySender = senderJsonToAtlasEndpoint("entity")

    // Создает функцию для генерации квалифицированного имени
    val qualifiedName = generatorQualifiedName(domain)

    // Создает функцию для генерации JSON-сущностей для процессов
    val generatorProcessEntity = generatotrProcessEntity(domain, qualifiedName)

    // Создает функцию для отправки JSON-данных сущностей в Atlas
    val sendEntity = senderEntity(generatorProcessEntity, entitySender)

    // Создает функцию для генерации данных сущностей и отправки их в Atlas
    val generateDataEntity = generatorDataEntities(domain, entitySender)

    // Обрабатывает один узел AST, отправляя его как сущность в Atlas
    def processNode(ast: AST, intopLevelExpr: String): Unit = {
      sendEntity(ast, intopLevelExpr)
    }

    // Рекурсивно проходит по всему дереву AST, обрабатывая каждый узел
    def traverseAST(ast: AST, intopLevelExpr: String): Unit = {
      processNode(ast, intopLevelExpr)
      ast.children.foreach(ch => traverseAST(ch, ast.levelExpr))
    }

    // Создает базовые выходные и входные сущности для указанного домена и отправляет их в Atlas
    createBaseOutput(domain, entitySender)
    createBaseInput(domain, entitySender)

    // Генерирует данные сущностей для AST и отправляет их в Atlas
    generateDataEntity(ast)

    // Запускает рекурсивное прохождение AST
    traverseAST(ast, "")
  }

  // Обогащает класс AST функцией для преобразования его в сущности Apache Atlas
  implicit class converter(ast: AST) {

    // Преобразует текущий узел AST в сущности Apache Atlas и отправляет их на указанный эндпоинт
    def EntityToAtlas(domain: String): Unit = {
      ASTToAtlasEntity(ast, domain)
    }

  }

Explanations:

  • ASTToAtlasEntity: The main method that:

    • Creates functions to convert AST to JSON and send it to Apache Atlas.

    • Defines helper functions for processing AST nodes and recursively traversing the tree.

    • Creates and sends base entities (input and output) to Atlas.

    • Recursively traverses the AST tree and sends each entity to Atlas.

  • implicit class converter(ast: AST): Enriches the classroom ASTadding a method to transform AST into Apache Atlas entities.

Now after launching Apache Atlas Linage will appear

Well, it seems to be similar to the original logical plan

Project [model#17, manufacturer#18, 2024-09-12 16:57:34.046609 AS processedDDTM#36]
+- Project [model#17, manufacturer#18]
   +- Filter NOT (manufacturer#18 = Audi)
      +- Union false, false
         :- Relation [model#17,manufacturer#18] csv
         +- Project [_1#23 AS model#28, _2#24 AS manufacturer#29]
            +- LocalRelation [_1#23, _2#24]

P.S. You can look at the code here
PPS docker files for launch Apache Atlas you can take it here

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *