Nifi – When Record-Oriented Doesn't Pay Off

{
  "type": "record",
  "name": "Mdm_Record",
  "namespace": "any.org",
  "fields": [
    {
      "name": "metadata",
      "type": {
        "type": "record",
        "name": "MessageInfo",
        "namespace": "any.org",
        "fields": [
          {
            "name": "ts",
            "type": {
              "type": "long",
              "logicalType": "timestamp-millis"
            }            
          }
        ]
      }
    },
    {
      "name": "data",
      "type": {
        "type": "record",
        "name": "MDM_Item",
        "fields": [
          {
            "name": "object",
            "type": {
              "type": "record",
              "name": "Node",
              "fields": [
                {
                  "name": "guid",
                  "type": [
                    "null",
                    "string"
                  ],               
                  "default": null
                },
                {
                  "name": "template",
                  "type": [
                    "null",
                    "string"
                  ],                 
                  "default": null
                },
                {
                  "name": "language",
                  "type": [
                    "null",
                    "string"
                  ],                 
                  "default": null
                },
                {
                  "name": "fields",
                  "type": {
                    "type": "map",
                    "values": [
                      "Node",
                      {
                        "type": "record",
                        "name": "Barcode",
                        "fields": [
                          {
                            "name": "provider",
                            "type": {
                              "type": "record",
                              "name": "Provider",
                              "fields": [
                                {
                                  "name": "name",
                                  "type": "string"                                 
                                },
                                {
                                  "name": "city",
                                  "type": [
                                    "null",
                                    "string"
                                  ],                                 
                                  "default": null
                                },
                                {
                                  "name": "taxIdNumber",
                                  "type": [
                                    "null",
                                    "string"
                                  ],
                                  "default": null
                                },
                                {
                                  "name": "taxRegistrationReasonCode",
                                  "type": [
                                    "null",
                                    "string"
                                  ],                                 
                                  "default": null
                                }
                              ]
                            }
                           
                          },
                          {
                            "name": "barcode",
                            "type": [
                              "null",
                              "string"
                            ],                           
                            "default": null
                          },
                          {
                            "name": "supplierArticle",
                            "type": [
                              "null",
                              "string"
                            ],
                           "default": null
                          }
                        ]
                      },
                      {
                        "type": "record",
                        "name": "MediaResource",
                        "fields": [
                          {
                            "name": "link",
                            "type": "string",                           
                          },
                          {
                            "name": "filename",
                            "type": [
                              "null",
                              "string"
                            ],                           
                            "default": null
                          },
                          {
                            "name": "mimetype",
                            "type": [
                              "null",
                              "string"
                            ],                           
                            "default": null
                          },
                          {
                            "name": "fileSize",
                            "type": [
                              "null",
                              "int"
                            ],
                           
                            "default": null
                          },
                          {
                            "name": "format",
                            "type": [
                              "null",
                              "string"
                            ],                            
                            "default": null
                          }
                        ]
                      },
                      {
                        "type": "record",
                        "name": "SofaSpecification",
                        "fields": [
                          {
                            "name": "seats",
                            "type": [
                              "null",
                              "int"
                            ],                           
                            "default": null
                          },
                          {
                            "name": "bedLength",
                            "type": {
                              "type": "record",
                              "name": "QuantityValue",
                              "fields": [
                                {
                                  "name": "value",
                                  "type": "float",
                                 
                                },
                                {
                                  "name": "measureUnit",
                                  "type": [
                                    "null",
                                    {
                                      "type": "record",
                                      "name": "MeasureUnit",
                                      "fields": [
                                        {
                                          "name": "unitName",
                                          "type": "string",
                                        },
                                        {
                                          "name": "unitCode",
                                          "type": "string",
                                        }
                                      ]
                                    }
                                  ],
                                  "default": null
                                }
                              ]
                            }
                          },
                          {
                            "name": "bedWidth",
                            "type": "QuantityValue",
                          },
                          {
                            "name": "backHeightFolded",
                            "type": "QuantityValue",
                          },
                          {
                            "name": "backHeightFoldedOut",
                            "type": "QuantityValue",
                          },
                          {
                            "name": "fullLength",
                            "type": "QuantityValue",
                          }
                        ]
                      },
                      {
                        "type": "record",
                        "name": "MattressSpecification",
                        "fields": [
                          {
                            "name": "bedLength",
                            "type": "QuantityValue",
                          },
                          {
                            "name": "bedWidth",
                            "type": "QuantityValue",
                          },
                          {
                            "name": "height",
                            "type": "QuantityValue",
                          },
                          {
                            "name": "maximumBedLoad",
                            "type": "QuantityValue",
                          }
                        ]
                      },
                      "QuantityValue",
                      "MeasureUnit",
                      {
                        "type": "array",
                        "items": [
                          "Node",
                          "Barcode",
                          {
                            "type": "record",
                            "name": "ItemPack",
                            "fields": [
                              {
                                "name": "packageName",
                                "type": "string",
                              },
                              {
                                "name": "package",
                                "type": [
                                  "null",
                                  {
                                    "type": "record",
                                    "name": "Pack",
                                    "fields": [
                                      {
                                        "name": "packageGuid",
                                        "type": "string",
                                      },
                                      {
                                        "name": "packageType",
                                        "type": [
                                          "null",
                                          "string"
                                        ],
                                        "default": null
                                      },
                                      {
                                        "name": "packaging",
                                        "type": {
                                          "type": "map",
                                          "values": [
                                            {
                                              "type": "record",
                                              "name": "Box",
                                              "fields": [
                                                {
                                                  "name": "length",
                                                  "type": "QuantityValue",
                                                },
                                                {
                                                  "name": "width",
                                                  "type": "QuantityValue",
                                                },
                                                {
                                                  "name": "height",
                                                  "type": "QuantityValue",
                                                }
                                              ]
                                            },
                                            {
                                              "type": "record",
                                              "name": "SoftPackaging",
                                              "fields": [
                                                {
                                                  "name": "twist",
                                                  "type": "boolean",
                                                }
                                              ]
                                            }
                                          ]
                                        },
                                      }
                                    ]
                                  }
                                ],
                                "default": null
                              },
                              {
                                "name": "length",
                                "type": "QuantityValue",
                              },
                              {
                                "name": "width",
                                "type": "QuantityValue",
                              },
                              {
                                "name": "height",
                                "type": "QuantityValue",
                              },
                              {
                                "name": "volume",
                                "type": "QuantityValue",
                              },
                              {
                                "name": "weightNetto",
                                "type": "QuantityValue",
                              },
                              {
                                "name": "weightGross",
                                "type": "QuantityValue",
                              },
                              {
                                "name": "seatsCount",
                                "type": [
                                  "null",
                                  "int"
                                ],
                                "default": null
                              },
                              {
                                "name": "packagedUnitsCount",
                                "type": [
                                  "null",
                                  "int"
                                ],
                                "default": null
                              },
                              {
                                "name": "setSeatsCount",
                                "type": [
                                  "null",
                                  "int"
                                ],
                                "default": null
                              },
                              {
                                "name": "priority",
                                "type": [
                                  "null",
                                  "int"
                                ],
                                "default": null
                              },
                              {
                                "name": "measureUnit",
                                "type": [
                                  "null",
                                  "MeasureUnit"
                                ],
                                "default": null
                              },
                              {
                                "name": "mainLogisticsPackaging",
                                "type": [
                                  "null",
                                  "boolean"
                                ],
                                "default": null
                              },
                              {
                                "name": "barcode",
                                "type": [
                                  "null",
                                  "string"
                                ],
                                "default": null
                              }
                            ]
                          },
                          "MediaResource",
                          "string",
                          "int",
                          "boolean"
                        ]
                      },
                      {
                        "type": "map",
                        "values": [
                          "string",
                          "int",
                          "boolean"
                        ]
                      },
                      "string",
                      "int",
                      "boolean"
                    ]
                  }
                }
              ]
            }
          },
          {
            "name": "stateName",
            "type": "string"
          }
        ]
      }
    }
  ]
}

Since the previously chosen option with the Jolt transformation did not work, I decided to use a proven means for extreme cases – a script. There is a good processor – ScriptedTransformRecord, which allows you to process one record at a time, getting an object of the type Record.

The script turned out to be quite large due to the presence of map types, as well as the possibility of the “null” value in the required fields. Since there is a possibility of modifying the data format or adjusting S2T, it will be necessary to change the script in the future, which, given its large size, entails an increase in complexity for the developer. So I decided to abandon the script and return to the developed and debugged specification, because for JSON it works correctly, the error occurs only when working with a record, when the data scheme is applied.

That is, in this case, I decided to abandon the recording processing and move on to processing the entire content, which leads to an increase in the number of FlowFiles and, accordingly, an increase in the use of RAM.

In NiFi, you can apply the Jolt specification to records using JoltTransformRecord or use the JoltTransformJSON processor, which expects JSON as input and transforms it not as a record, but as a single JSON file. Since the entire content is processed, it is better to input a single JSON object rather than an array to reduce overhead. Therefore, it was necessary to first split the incoming FlowFile into fragments, where each would contain one JSON. This allowed the transformation for a single object to be performed quickly, but entailed the generation of a large number of FlowFiles. For this, I used SplitRecord, where Reader read the Avro format, and RecordSetWriter was configured to write a single JSON object.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *