problems and solutions (“crutch”)

object Example1 extends App with Configure {

	spark.udf.register("blue", udf(() => "0000FF")) // создаем и регестрируем udf
	spark.sql("select blue()").show()
	/* вывод:
			+------+
			|blue()|
			+------+
			|0000FF|
			+------+
	 */

}

Let's consider a situation where the UDF function itself comes into the program as a string. For example, we have a variable codewhich contains "() => "0000FF"". At runtime you need to get an object of type Function0[String] from this string and use it to create a UDF. So my goal is to write a program that can handle complex lambdas with an arbitrary number of arguments and arbitrary string outputs.

For example:

  • “(num: Int) => num * num”

  • “(str: String) => str.reverse”

  • “(num1: Double, str: String) => Math.pow(num1, str,size)”

  • etc

How it didn’t work out and what error occurred:

consider the following code

import org.apache.spark.sql.functions.udf

import scala.reflect.runtime.universe
import scala.tools.reflect.ToolBox

object Example2 extends App with Configure {

	def udfRegister(labdaCode: String): Unit = {

		import universe._
		val toolbox = universe.runtimeMirror(this.getClass.getClassLoader).mkToolBox()

		val tree = toolbox.parse(labdaCode)
		val compiledCode = toolbox.compile(tree)
		val function = compiledCode().asInstanceOf[Function0[String]]
		//сторока успешно конвертируеться в Function0(String)


		println(function()) // вывод: 0000FF
		val udfBlue = udf(() => function()) // успешное создание udf


		spark.udf.register("blue", udfBlue)

	}

	udfRegister("() => \"0000FF\"")
	spark.sql("select blue()").show() // возникает ошибка

}

When running the task, the following error occurs:
cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.sql.catalyst.expressions.ScalaUDF.f of type scala.Function1 in instance of org.apache.spark.sql.catalyst.expressions.ScalaUDF

The error indicates that your code or work with Spark attempted to use SerializedLambda instead of the expected type scala.Function1 (a function with one argument).

In Spark, functions must be serializable in order to be passed across the cluster. SerializedLambda can occur when an attempt is made to pass a function that is not serializable correctly.

In its turn SerializedLambda I am Class, used in Java to serialize lambda expressions and reference methods. This interface is generated by the Java compiler when serializing lambda expressions. (judging by this, this class does not provide the necessary serialization tool for Spark)

What's especially confusing is that this error does not occur when converting the resulting object to Function0[String], which Spark fully supports and also does not occur when creating a UDF object. The error appears already in runtime when starting a task on the cluster.

However the following code works again:

import org.apache.spark.sql.functions.udf

import scala.reflect.runtime.universe
import scala.tools.reflect.ToolBox

object Example3 extends App with Configure {

	val labdaCode = "() => \"0000FF\""

	import universe._
	val toolbox = universe.runtimeMirror(this.getClass.getClassLoader).mkToolBox()

	val tree = toolbox.parse(labdaCode)
	val compiledCode = toolbox.compile(tree)
	val function = compiledCode().asInstanceOf[Function0[String]]
	//сторока успешно конвертируеться в Function0(String)


	println(function()) // вывод: 0000FF
	val udfBlue = udf(() => function()) // успешное создание udf

	spark.udf.register("blue", udfBlue) // успешное регестрирование udf

	spark.sql("select blue()").show()
	/*вывод:

		+------+
		|blue()|
		+------+
		|0000FF|
		+------+

	 */

}

In essence in Example3 everything happens the same as in Example2but the first one works, but the second one doesn’t, which leads to even more confusion

import org.apache.spark.sql.functions.udf

import scala.reflect.runtime.universe
import scala.tools.reflect.ToolBox

object Example4 extends App with Configure {

	if (true) {

		val labdaCode = "() => \"0000FF\""

		import universe._
		val toolbox = universe.runtimeMirror(this.getClass.getClassLoader).mkToolBox()

		val tree = toolbox.parse(labdaCode)
		val compiledCode = toolbox.compile(tree)
		val function = compiledCode().asInstanceOf[Function0[String]]
		//сторока успешно конвертируеться в Function0(String)


		println(function()) // вывод: 0000FF
		val udfBlue = udf(() => function()) // успешное создание udf

		spark.udf.register("blue", udfBlue) // успешное регестрирование udf

	}

	spark.sql("select blue()").show() // возникает ошибка

}

like Example3 Example4 does not do the same, but for some reason the if block will again lead to an error
the use of loops and monads, Try, etc. also leads to errors (but I won’t demonstrate this here)

so it all comes down to creating the right way function which would not be SerializedLambda and there are the following restrictions:

  • You can create lambdas/functions only in the main method

  • cannot be created in if, try, for blocks, monads, etc.

Although the examples demonstrate working with a very simple blue function, my task is to make a universal application that will accept any lambda (actually from 0 to 10 since the maximum is UDF10) in string format

Let's determine how the program receives information about functions:

I want the information to be received in JSON format and it will have the following structure:

[ //массив поскольку функций может прийти много или неприйти вообще
  {
    "name": "blue", //имя бля udf
    "targetType": "String", //тип данных для возращаемый
    "fun": "() => \"00ff00\"", //собственно сама лямбда
    "imports": ["...", "..."] //опциональное, на случай если нужны дополнительные импорты
  },
  // остальные имеют такую же структуру
  {
    "name": "reverse",
    "targetType": "String",
    "fun": "(str:String) => str.reverse"
  },
  {
    "name": "plus",
    "targetType": "Int",
    "fun": "(i1:Int, i2:Int) => i1 + i2"
  },
  {
    "name": "sum3",
    "types": "Int,Int,Int",
    "targetType": "Int",
    "fun": "(i1:Int, i2:Int, i3:Int) => i1 + i2 + i3"
  }
]

Let's decide on a class that will reflect this Json structure:

// думаю тут не стоит расписывать какое поле за что отвечает
case class UDFConfig(name: String,
  					 targetType: String,
					 fun: String,
					 imports: List[String] = List.empty)

To parse the json file I will use the library json4s and next class

package Util

import org.json4s.Formats
import org.json4s.native.JsonMethods.parse

//небольшой обьект для ковертации строки в формате json в список обьектов типа JsonType 
object JsonHelper {

	private def jsonToListConvert[JsonType](json: String)(implicit mf: Manifest[JsonType], formats: Formats): List[Either[Throwable, JsonType]] = {
		parse(json)
			.extract[List[JsonType]]
			.map(Right(_))
	}

	implicit class JsonConverter(json: String) {

		def jsonToList[JsonType]()(implicit mf: Manifest[JsonType], formats: Formats): List[Either[Throwable, JsonType]]= {
			jsonToListConvert[JsonType](json)
		}

	}

}

And let's create a trait Configure in which determine the variables that will be needed for further work

Иimport config.UDFConfig
import org.apache.spark.sql.SparkSession
import org.json4s.DefaultFormats

import scala.io.Source

trait Configure {

	lazy val spark = SparkSession
		.builder()
		.appName("Example UDF runtime compile")
		.master("local[*]")
		.getOrCreate()
	lazy val sc = spark.sparkContext
	// создали спрак сессию и контекст

	lazy val pathUdfsJson = "./UDFs.json" //путь до json c функциями

	implicit lazy val formats = DefaultFormats // список форматов для json4s, в моем случае хватеает дефолтных форматов
	import Util.JsonHelper._

	lazy val udfJson = getJson(pathUdfsJson) // получаем сам json в виде строки
	lazy val udfsConfigs: List[UDFConfig] = udfJson
		.jsonToList[UDFConfig]
		.filter(_.isRight)
		.map {
			case Right(udfconfig) => udfconfig
		} // получаем все корректные udfconfig
		  // этот обект мне и понадобиться

	//метод для чтения файлов
  	private def getJson(path: String): String = {
		val file = Source.fromFile(path)
		try {
			file.getLines().mkString("\n")
		} finally {
			file.close()
		}
	}

}

Let's define the last object that udfconfig will translate into a string that will dynamically turn into FunctionN in time

package Util

import config.UDFConfig

object UDFHelper {

	private def configToStringConvert(udfConfig: UDFConfig): String = {

		//создадим все импорты для итоговой строки
		val imports = udfConfig
			.imports 																 // достаем импорты из конфига
			.map(_.trim.stripPrefix("import ").trim) // удаляем с лева ключевое слово если оно есть
			.distinct 															 // оставляем уникальные
			.map(imp => f"import ${imp}")            // конкатенируем слева ключевое слово
			.mkString("\n")                          // все соеденяем через отступ

		val Array(params, functionBody) = udfConfig.fun.split("=>", 2).map(_.trim) // Отделяем функциональную часть от переменных

		val paramsTypes: Seq[(String, String)] = params
			.stripPrefix("(") 			// Убираем с лева скобку
			.stripSuffix(")") 			// Убираем с права скобку
			.split(",")       			// Разделяем параметры
			.map(_.trim)      			// Убираем лишние пробелы
			.map {
				case "" => null
				case param =>
					val Array(valueName, valueType) = param.split(":").map(_.trim)
					(valueName, valueType)
			}                 			// разделяем имя переменой от типа данных этой переменной
			.filter(_ != null) 			// отвильтровываем null-ы

		val funcTypes: String = paramsTypes.size match {
			case 0 => udfConfig.targetType
			case _ => f"${List
				.fill(paramsTypes.size)("Any")
				.mkString("", ", ", ", ")}${udfConfig.targetType}"
		} //здесь получаем перечисление через запятую типов данных Function

		val anyParams = paramsTypes.map(_._1.trim + "_any").mkString(", ") //парметры лямда вырожения

		val instances = paramsTypes.map {
			case (valueName, valueType) =>
				f"	val ${valueName}: ${valueType} = ${valueType}_any.asInstanceOf[${valueType}]"
		}.mkString("\n") // тут определяем конвертации парметров люмбды в необхадимые типы

		// собираем все вместе в итоговый стринг
		f"""
			 |${imports}
			 |
			 |val func: Function${paramsTypes.size}[${funcTypes}] = (${anyParams}) => {
			 |
			 |${instances}
			 |
			 |  (
			 |${functionBody}
			 |  ).asInstanceOf[${udfConfig.targetType}]
			 |
			 |}
			 |
			 |func
			 |""".stripMargin

	}

	implicit class Converter(udfConfig: UDFConfig) {

		def configToString(): String = {
			configToStringConvert(udfConfig)
		}

	}

}

Regarding the use of type Any: as I already mentioned, my task is to create universal logic, where a lambda or function transmitted via JSON can have any number of parameters (in practice, from 0 to 10). In conditions where it is not possible to dynamically convert a string to FunctionNtype usage Any is the only way out (later it will be clear why). Otherwise, you would have to create many objects for every possible number of parameters.

\sum_{i=0}^{10} {9^{i+1}}

which is quite a lot.
9 because I decided to limit myself to 9 data types: String, Int, Boolean, Byte, Short, Long, Float, Double, Date, Timestamp

We create recorder objects:

Because I'm not allowed to use constructs like if or matchI can't dynamically determine the number of parameters in one object UDFRegN and adjust it accordingly. As a result I had to create several separate objects UDFRegN, each of which supports a certain number of parameters. For example, for the case with zero parameters, I created the corresponding object UDFRegN.

This approach is necessary due to limitations that prevent the UDF logic from dynamically adapting depending on the number of parameters passed. So in this case it would be necessary to create 11 separate objects UDFRegN to process all possible parameter combinations.

import org.apache.spark.sql.functions.udf

import java.sql.{Date, Timestamp}
import scala.reflect.runtime.universe
import scala.tools.reflect.ToolBox

object UDFReg0 extends App with Configure { // как говорилось можно только функцию main использовать

	val indexcode = Integer.parseInt(args(0)) // это единственная возможность получить парметр извне и предполагается что это тндекс в списке UDFsConfig
	
	import Util.UDFHelper._
	val udfConfig = udfsConfigs(indexcode) //получаем udfConfig (из Configure)
	val functionCode = udfConfig.configToString() // преобразовываем его в код
	
	val toolbox = universe.runtimeMirror(getClass.getClassLoader).mkToolBox()

	val tree = toolbox.parse(functionCode)
	val compiledCode = toolbox.compile(tree)
	val function = compiledCode().asInstanceOf[Function0[Any]] // получаем функцию из кода

	val myUDF = udfConfig.targetType match { // выбираем instanceOF в зависимости от типа данных который должна возращать функция
		case "String" => udf(() => function().asInstanceOf[String])
		case "Int" => udf(() => function().asInstanceOf[Int])
		case "Boolean" => udf(() => function().asInstanceOf[Boolean])
		case "Byte" => udf(() => function().asInstanceOf[Byte])
		case "Short" => udf(() => function().asInstanceOf[Short])
		case "Long" => udf(() => function().asInstanceOf[Long])
		case "Float" => udf(() => function().asInstanceOf[Float])
		case "Double" => udf(() => function().asInstanceOf[Double])
		case "Date" => udf(() => function().asInstanceOf[Date])
		case "Timestamp" => udf(() => function().asInstanceOf[Timestamp])
		case _ => throw new IllegalArgumentException(f"Неизвестный тип")
	}

	spark.udf.register(udfConfig.name, myUDF) // регистраця UDF

}

And it remains to write 10 more of the same type, almost exactly the same “registers”
and for this we will write the last script, so as not to do it manually)

import java.io.{File, PrintWriter}

object GenerateRegestrators extends App {

	(0 to 10).toList.map {

		num =>

			val types = (0 to num).toList.map(_ => "Any").mkString(", ")
			val lambdaVaues = (0 until  num).toList.map(n => f"value${n}: Any").mkString(", ")
			val functionValues = (0 until  num).toList.map(n => f"value${n}").mkString(", ")

			f"""
				 |import org.apache.spark.sql.functions.udf
				 |
				 |import java.sql.{Date, Timestamp}
				 |import scala.reflect.runtime.universe
				 |import scala.tools.reflect.ToolBox
				 |
				 |object UDFReg${num} extends App with Configure {
				 |
				 |	val indexcode = Integer.parseInt(args(0))
				 |
				 |	import Util.UDFHelper._
				 |	val udfConfig = udfsConfigs(indexcode)
				 |	val functionCode = udfConfig.configToString()
				 |
				 |	val toolbox = universe.runtimeMirror(getClass.getClassLoader).mkToolBox()
				 |
				 |	val tree = toolbox.parse(functionCode)
				 |	val compiledCode = toolbox.compile(tree)
				 |	val function = compiledCode().asInstanceOf[Function${num}[${types}]]
				 |
				 | 	val myUDF = udfConfig.targetType match {
				 |		case "String" => udf((${lambdaVaues}) => function(${functionValues}).asInstanceOf[String])
				 |		case "Int" => udf((${lambdaVaues}) => function(${functionValues}).asInstanceOf[Int])
				 |		case "Boolean" => udf((${lambdaVaues}) => function(${functionValues}).asInstanceOf[Boolean])
				 |		case "Byte" => udf((${lambdaVaues}) => function(${functionValues}).asInstanceOf[Byte])
				 |		case "Short" => udf((${lambdaVaues}) => function(${functionValues}).asInstanceOf[Short])
				 |		case "Long" => udf((${lambdaVaues}) => function(${functionValues}).asInstanceOf[Long])
				 |		case "Float" => udf((${lambdaVaues}) => function(${functionValues}).asInstanceOf[Float])
				 |		case "Double" => udf((${lambdaVaues}) => function(${functionValues}).asInstanceOf[Double])
				 |		case "Date" => udf((${lambdaVaues}) => function(${functionValues}).asInstanceOf[Date])
				 |		case "Timestamp" => udf((${lambdaVaues}) => function(${functionValues}).asInstanceOf[Timestamp])
				 |		case _ => throw new IllegalArgumentException(f"Неизвестный тип")
				 |	}
				 |
				 |	spark.udf.register(udfConfig.name, myUDF)
				 |
				 |}
				 |""".stripMargin


	}
		.zipWithIndex
		.foreach {
			case (str, index) =>
				val file = new File(f"./src/main/scala/UDFReg${index}.scala")
				val writer = new PrintWriter(file)

				try {
					writer.write(str)
				} finally {
					writer.close()
				}
		}


}

there is nothing to explain here, it simply generates other recorder objects by analogy
after running the script, unnecessary classes appear in the project UDFRegN.

putting it all together:

So all that remains is to write one last example where dynamic compilation of UDF from a string will finally work:

object Example5 extends App with Configure {

	import Util.UDFHelper._
	udfsConfigs.zipWithIndex.foreach { //индекс нужен чтоб передать в args
		case (udfConfig, index) => 
			udfConfig.fun.split("=>", 2)(0).count(_ == ':') match { 
              //тут и происходит выбор какому регестратору передать
				case 0 => UDFReg0.main(Array( index.toString ))
				case 1 => UDFReg1.main(Array( index.toString ))
				case 2 => UDFReg2.main(Array( index.toString ))
				case 3 => UDFReg3.main(Array( index.toString ))
				case 4 => UDFReg4.main(Array( index.toString ))
				case 5 => UDFReg5.main(Array( index.toString ))
				case 6 => UDFReg6.main(Array( index.toString ))
				case 7 => UDFReg7.main(Array( index.toString ))
				case 8 => UDFReg8.main(Array( index.toString ))
				case 9 => UDFReg9.main(Array( index.toString ))
				case 10 => UDFReg10.main(Array( index.toString ))
				case _ => Unit
			}

	}

	spark.sql("select blue(), reverse('namoW tobor ociP ociP') AS rev, plus(1, 2), sum3(1, 1, 1)").show
    /* вывод
	
		+------+--------------------+----------+-------------+
		|blue()|                 rev|plus(1, 2)|sum3(1, 1, 1)|
		+------+--------------------+----------+-------------+
		|00ff00|Pico Pico robot W...|         3|            3|
		+------+--------------------+----------+-------------+
	
	 */

}

and it did happen.

PS you can look at the code here

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *