Problemas y soluciones (“muleta”) / Sudo Null IT News

Remolque:

Este artículo presenta cómo el autor resolvió su problema y no afirma que no podría haberse hecho mejor o más bellamente. Además, el autor no afirma que este problema o tarea deba resolverse mediante este método particular y no insiste en repetir este enfoque. Esta es sólo una de las posibles soluciones propuestas por el autor y se presenta aquí únicamente con fines informativos.

El autor señala que no pudo encontrar información sobre este problema ni en recursos en ruso ni en inglés, incluido Habr y otras fuentes. El artículo es un material de discusión y el autor estará encantado de ver soluciones alternativas en los comentarios.

Lo que estoy tratando de hacer:

Consideremos la creación de una UDF. blueque devuelve la cadena “0000FF” a la API de Scala”.

object Example1 extends App with Configure {

	spark.udf.register("blue", udf(() => "0000FF")) // создаем и регестрируем udf
	spark.sql("select blue()").show()
	/* вывод:
			+------+
			|blue()|
			+------+
			|0000FF|
			+------+
	 */

}

Consideremos una situación en la que la propia función UDF ingresa al programa como una cadena. Por ejemplo, tenemos una variable codeque contiene "() => "0000FF"". En tiempo de ejecución necesitas obtener un objeto de tipo. Function0(String) de esta cadena y utilícela para crear una UDF. Entonces, mi objetivo es escribir un programa que pueda manejar lambdas complejas con un número arbitrario de argumentos y salidas de cadenas arbitrarias.

Por ejemplo:

  • “(núm: Int) => núm * núm”

  • “(cadena: Cadena) => cadena.reversa”

  • “(num1: Doble, str: Cadena) => Math.pow(num1, str,tamaño)”

  • etc.

Cómo no funcionó y qué error ocurrió:

considere el siguiente código

import org.apache.spark.sql.functions.udf

import scala.reflect.runtime.universe
import scala.tools.reflect.ToolBox

object Example2 extends App with Configure {

	def udfRegister(labdaCode: String): Unit = {

		import universe._
		val toolbox = universe.runtimeMirror(this.getClass.getClassLoader).mkToolBox()

		val tree = toolbox.parse(labdaCode)
		val compiledCode = toolbox.compile(tree)
		val function = compiledCode().asInstanceOf(Function0(String))
		//сторока успешно конвертируеться в Function0(String)


		println(function()) // вывод: 0000FF
		val udfBlue = udf(() => function()) // успешное создание udf


		spark.udf.register("blue", udfBlue)

	}

	udfRegister("() => \"0000FF\"")
	spark.sql("select blue()").show() // возникает ошибка

}

Al ejecutar la tarea, se produce el siguiente error:
cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.sql.catalyst.expressions.ScalaUDF.f of type scala.Function1 in instance of org.apache.spark.sql.catalyst.expressions.ScalaUDF

El error indica que su código o trabajo con Spark intentó usar SerializedLambda en lugar del tipo esperado scala.Function1 (una función con un argumento).

En Spark, las funciones deben ser serializables para poder pasar a través del clúster. SerializedLambda puede ocurrir cuando se intenta pasar una función que no es serializable correctamente.

A su momento SerializedLambda soy Clase, utilizado en Java para serializar expresiones lambda y métodos de referencia. Esta interfaz la genera el compilador de Java al serializar expresiones lambda. (A juzgar por esto, esta clase no proporciona la herramienta de serialización necesaria para Spark)

Lo que es especialmente confuso es que este error no ocurre al convertir el objeto resultante a Function0(String), que Spark es totalmente compatible y tampoco ocurre al crear un objeto UDF. El error ya aparece en tiempo de ejecución al iniciar una tarea en el clúster.

Sin embargo, el siguiente código vuelve a funcionar:

import org.apache.spark.sql.functions.udf

import scala.reflect.runtime.universe
import scala.tools.reflect.ToolBox

object Example3 extends App with Configure {

	val labdaCode = "() => \"0000FF\""

	import universe._
	val toolbox = universe.runtimeMirror(this.getClass.getClassLoader).mkToolBox()

	val tree = toolbox.parse(labdaCode)
	val compiledCode = toolbox.compile(tree)
	val function = compiledCode().asInstanceOf(Function0(String))
	//сторока успешно конвертируеться в Function0(String)


	println(function()) // вывод: 0000FF
	val udfBlue = udf(() => function()) // успешное создание udf

	spark.udf.register("blue", udfBlue) // успешное регестрирование udf

	spark.sql("select blue()").show()
	/*вывод:

		+------+
		|blue()|
		+------+
		|0000FF|
		+------+

	 */

}

En esencia Example3 todo pasa igual que en Example2pero el primero funciona, pero el segundo no, lo que genera aún más confusión

import org.apache.spark.sql.functions.udf

import scala.reflect.runtime.universe
import scala.tools.reflect.ToolBox

object Example4 extends App with Configure {

	if (true) {

		val labdaCode = "() => \"0000FF\""

		import universe._
		val toolbox = universe.runtimeMirror(this.getClass.getClassLoader).mkToolBox()

		val tree = toolbox.parse(labdaCode)
		val compiledCode = toolbox.compile(tree)
		val function = compiledCode().asInstanceOf(Function0(String))
		//сторока успешно конвертируеться в Function0(String)


		println(function()) // вывод: 0000FF
		val udfBlue = udf(() => function()) // успешное создание udf

		spark.udf.register("blue", udfBlue) // успешное регестрирование udf

	}

	spark.sql("select blue()").show() // возникает ошибка

}

como el Ejemplo3, el Ejemplo4 no hace lo mismo, pero por alguna razón el bloque if volverá a generar un error
el uso de bucles y mónadas, Try, etc. también genera errores (pero no lo demostraré aquí)

entonces todo se reduce a crear la manera correcta function cual no seria SerializedLambda y existen las siguientes restricciones:

  • Puede crear lambdas/funciones solo en el método principal

  • no se puede crear en if, try, para bloques, mónadas, etc.

Aunque los ejemplos demuestran cómo trabajar con una función azul muy simple, mi tarea es crear una aplicación universal que acepte cualquier lambda (en realidad, de 0 a 10 ya que el máximo es UDF10) en formato de cadena.

Determinemos cómo el programa recibe información sobre funciones:

Quiero que la información se reciba en formato JSON y tendrá la siguiente estructura:

( //массив поскольку функций может прийти много или неприйти вообще
  {
    "name": "blue", //имя бля udf
    "targetType": "String", //тип данных для возращаемый
    "fun": "() => \"00ff00\"", //собственно сама лямбда
    "imports": ("...", "...") //опциональное, на случай если нужны дополнительные импорты
  },
  // остальные имеют такую же структуру
  {
    "name": "reverse",
    "targetType": "String",
    "fun": "(str:String) => str.reverse"
  },
  {
    "name": "plus",
    "targetType": "Int",
    "fun": "(i1:Int, i2:Int) => i1 + i2"
  },
  {
    "name": "sum3",
    "types": "Int,Int,Int",
    "targetType": "Int",
    "fun": "(i1:Int, i2:Int, i3:Int) => i1 + i2 + i3"
  }
)

Decidamos una clase que refleje esta estructura Json:

// думаю тут не стоит расписывать какое поле за что отвечает
case class UDFConfig(name: String,
  					 targetType: String,
					 fun: String,
					 imports: List(String) = List.empty)

Para analizar el archivo json usaré la biblioteca. json4s y la siguiente clase

package Util

import org.json4s.Formats
import org.json4s.native.JsonMethods.parse

//небольшой обьект для ковертации строки в формате json в список обьектов типа JsonType 
object JsonHelper {

	private def jsonToListConvert(JsonType)(json: String)(implicit mf: Manifest(JsonType), formats: Formats): List(Either(Throwable, JsonType)) = {
		parse(json)
			.extract(List(JsonType))
			.map(Right(_))
	}

	implicit class JsonConverter(json: String) {

		def jsonToList(JsonType)()(implicit mf: Manifest(JsonType), formats: Formats): List(Either(Throwable, JsonType))= {
			jsonToListConvert(JsonType)(json)
		}

	}

}

Y creemos un rasgo Configure en el cual se determinan las variables que serán necesarias para el trabajo posterior

Иimport config.UDFConfig
import org.apache.spark.sql.SparkSession
import org.json4s.DefaultFormats

import scala.io.Source

trait Configure {

	lazy val spark = SparkSession
		.builder()
		.appName("Example UDF runtime compile")
		.master("local(*)")
		.getOrCreate()
	lazy val sc = spark.sparkContext
	// создали спрак сессию и контекст

	lazy val pathUdfsJson = "./UDFs.json" //путь до json c функциями

	implicit lazy val formats = DefaultFormats // список форматов для json4s, в моем случае хватеает дефолтных форматов
	import Util.JsonHelper._

	lazy val udfJson = getJson(pathUdfsJson) // получаем сам json в виде строки
	lazy val udfsConfigs: List(UDFConfig) = udfJson
		.jsonToList(UDFConfig)
		.filter(_.isRight)
		.map {
			case Right(udfconfig) => udfconfig
		} // получаем все корректные udfconfig
		  // этот обект мне и понадобиться

	//метод для чтения файлов
  	private def getJson(path: String): String = {
		val file = Source.fromFile(path)
		try {
			file.getLines().mkString("\n")
		} finally {
			file.close()
		}
	}

}

Definamos el último objeto que udfconfig traducirá en una cadena que se convertirá dinámicamente en FunciónN con el tiempo.

package Util

import config.UDFConfig

object UDFHelper {

	private def configToStringConvert(udfConfig: UDFConfig): String = {

		//создадим все импорты для итоговой строки
		val imports = udfConfig
			.imports 																 // достаем импорты из конфига
			.map(_.trim.stripPrefix("import ").trim) // удаляем с лева ключевое слово если оно есть
			.distinct 															 // оставляем уникальные
			.map(imp => f"import ${imp}")            // конкатенируем слева ключевое слово
			.mkString("\n")                          // все соеденяем через отступ

		val Array(params, functionBody) = udfConfig.fun.split("=>", 2).map(_.trim) // Отделяем функциональную часть от переменных

		val paramsTypes: Seq((String, String)) = params
			.stripPrefix("(") 			// Убираем с лева скобку
			.stripSuffix(")") 			// Убираем с права скобку
			.split(",")       			// Разделяем параметры
			.map(_.trim)      			// Убираем лишние пробелы
			.map {
				case "" => null
				case param =>
					val Array(valueName, valueType) = param.split(":").map(_.trim)
					(valueName, valueType)
			}                 			// разделяем имя переменой от типа данных этой переменной
			.filter(_ != null) 			// отвильтровываем null-ы

		val funcTypes: String = paramsTypes.size match {
			case 0 => udfConfig.targetType
			case _ => f"${List
				.fill(paramsTypes.size)("Any")
				.mkString("", ", ", ", ")}${udfConfig.targetType}"
		} //здесь получаем перечисление через запятую типов данных Function

		val anyParams = paramsTypes.map(_._1.trim + "_any").mkString(", ") //парметры лямда вырожения

		val instances = paramsTypes.map {
			case (valueName, valueType) =>
				f"	val ${valueName}: ${valueType} = ${valueType}_any.asInstanceOf(${valueType})"
		}.mkString("\n") // тут определяем конвертации парметров люмбды в необхадимые типы

		// собираем все вместе в итоговый стринг
		f"""
			 |${imports}
			 |
			 |val func: Function${paramsTypes.size}(${funcTypes}) = (${anyParams}) => {
			 |
			 |${instances}
			 |
			 |  (
			 |${functionBody}
			 |  ).asInstanceOf(${udfConfig.targetType})
			 |
			 |}
			 |
			 |func
			 |""".stripMargin

	}

	implicit class Converter(udfConfig: UDFConfig) {

		def configToString(): String = {
			configToStringConvert(udfConfig)
		}

	}

}

En cuanto al uso del tipo Any: como ya mencioné, mi tarea es crear una lógica universal, donde una lambda o función transmitida vía JSON pueda tener cualquier número de parámetros (en la práctica, de 0 a 10). En condiciones donde no es posible convertir dinámicamente una cadena a FunctionNtipo de uso Any es la única salida (más adelante quedará claro por qué). De lo contrario, tendría que crear muchos objetos para cada número posible de parámetros.

\suma_{i=0}^{10} {9^{i+1}}

que es bastante.
9 porque decidí limitarme a 9 tipos de datos: String, Int, Boolean, Byte, Short, Long, Float, Double, Date, Timestamp

Creamos objetos de grabadora:

Porque no tengo permitido usar construcciones como if o matchno puedo determinar dinámicamente la cantidad de parámetros en un objeto UDFRegN y ajústelo en consecuencia. Como resultado tuve que crear varios objetos separados. UDFRegN, cada uno de los cuales admite una cierta cantidad de parámetros. Por ejemplo, para el caso con cero parámetros, creé el objeto correspondiente UDFRegN.

Este enfoque es necesario debido a las limitaciones que impiden que la lógica UDF se adapte dinámicamente según la cantidad de parámetros pasados. Entonces en este caso sería necesario crear 11 objetos separados. UDFRegN para procesar todas las combinaciones posibles de parámetros.

import org.apache.spark.sql.functions.udf

import java.sql.{Date, Timestamp}
import scala.reflect.runtime.universe
import scala.tools.reflect.ToolBox

object UDFReg0 extends App with Configure { // как говорилось можно только функцию main использовать

	val indexcode = Integer.parseInt(args(0)) // это единственная возможность получить парметр извне и предполагается что это тндекс в списке UDFsConfig
	
	import Util.UDFHelper._
	val udfConfig = udfsConfigs(indexcode) //получаем udfConfig (из Configure)
	val functionCode = udfConfig.configToString() // преобразовываем его в код
	
	val toolbox = universe.runtimeMirror(getClass.getClassLoader).mkToolBox()

	val tree = toolbox.parse(functionCode)
	val compiledCode = toolbox.compile(tree)
	val function = compiledCode().asInstanceOf(Function0(Any)) // получаем функцию из кода

	val myUDF = udfConfig.targetType match { // выбираем instanceOF в зависимости от типа данных который должна возращать функция
		case "String" => udf(() => function().asInstanceOf(String))
		case "Int" => udf(() => function().asInstanceOf(Int))
		case "Boolean" => udf(() => function().asInstanceOf(Boolean))
		case "Byte" => udf(() => function().asInstanceOf(Byte))
		case "Short" => udf(() => function().asInstanceOf(Short))
		case "Long" => udf(() => function().asInstanceOf(Long))
		case "Float" => udf(() => function().asInstanceOf(Float))
		case "Double" => udf(() => function().asInstanceOf(Double))
		case "Date" => udf(() => function().asInstanceOf(Date))
		case "Timestamp" => udf(() => function().asInstanceOf(Timestamp))
		case _ => throw new IllegalArgumentException(f"Неизвестный тип")
	}

	spark.udf.register(udfConfig.name, myUDF) // регистраця UDF

}

Y queda por escribir 10 más del mismo tipo, casi exactamente los mismos “registros”
y para ello escribiremos el último script, para no hacerlo manualmente)

import java.io.{File, PrintWriter}

object GenerateRegestrators extends App {

	(0 to 10).toList.map {

		num =>

			val types = (0 to num).toList.map(_ => "Any").mkString(", ")
			val lambdaVaues = (0 until  num).toList.map(n => f"value${n}: Any").mkString(", ")
			val functionValues = (0 until  num).toList.map(n => f"value${n}").mkString(", ")

			f"""
				 |import org.apache.spark.sql.functions.udf
				 |
				 |import java.sql.{Date, Timestamp}
				 |import scala.reflect.runtime.universe
				 |import scala.tools.reflect.ToolBox
				 |
				 |object UDFReg${num} extends App with Configure {
				 |
				 |	val indexcode = Integer.parseInt(args(0))
				 |
				 |	import Util.UDFHelper._
				 |	val udfConfig = udfsConfigs(indexcode)
				 |	val functionCode = udfConfig.configToString()
				 |
				 |	val toolbox = universe.runtimeMirror(getClass.getClassLoader).mkToolBox()
				 |
				 |	val tree = toolbox.parse(functionCode)
				 |	val compiledCode = toolbox.compile(tree)
				 |	val function = compiledCode().asInstanceOf(Function${num}(${types}))
				 |
				 | 	val myUDF = udfConfig.targetType match {
				 |		case "String" => udf((${lambdaVaues}) => function(${functionValues}).asInstanceOf(String))
				 |		case "Int" => udf((${lambdaVaues}) => function(${functionValues}).asInstanceOf(Int))
				 |		case "Boolean" => udf((${lambdaVaues}) => function(${functionValues}).asInstanceOf(Boolean))
				 |		case "Byte" => udf((${lambdaVaues}) => function(${functionValues}).asInstanceOf(Byte))
				 |		case "Short" => udf((${lambdaVaues}) => function(${functionValues}).asInstanceOf(Short))
				 |		case "Long" => udf((${lambdaVaues}) => function(${functionValues}).asInstanceOf(Long))
				 |		case "Float" => udf((${lambdaVaues}) => function(${functionValues}).asInstanceOf(Float))
				 |		case "Double" => udf((${lambdaVaues}) => function(${functionValues}).asInstanceOf(Double))
				 |		case "Date" => udf((${lambdaVaues}) => function(${functionValues}).asInstanceOf(Date))
				 |		case "Timestamp" => udf((${lambdaVaues}) => function(${functionValues}).asInstanceOf(Timestamp))
				 |		case _ => throw new IllegalArgumentException(f"Неизвестный тип")
				 |	}
				 |
				 |	spark.udf.register(udfConfig.name, myUDF)
				 |
				 |}
				 |""".stripMargin


	}
		.zipWithIndex
		.foreach {
			case (str, index) =>
				val file = new File(f"./src/main/scala/UDFReg${index}.scala")
				val writer = new PrintWriter(file)

				try {
					writer.write(str)
				} finally {
					writer.close()
				}
		}


}

No hay nada que explicar aquí, simplemente genera otros objetos grabadores por analogía.
después de ejecutar el script, aparecen clases innecesarias en el proyecto UDFRegN.

poniendolo todo junto:

Así que todo lo que queda es escribir un último ejemplo donde finalmente funcionará la compilación dinámica de UDF a partir de una cadena:

object Example5 extends App with Configure {

	import Util.UDFHelper._
	udfsConfigs.zipWithIndex.foreach { //индекс нужен чтоб передать в args
		case (udfConfig, index) => 
			udfConfig.fun.split("=>", 2)(0).count(_ == ':') match { 
              //тут и происходит выбор какому регестратору передать
				case 0 => UDFReg0.main(Array( index.toString ))
				case 1 => UDFReg1.main(Array( index.toString ))
				case 2 => UDFReg2.main(Array( index.toString ))
				case 3 => UDFReg3.main(Array( index.toString ))
				case 4 => UDFReg4.main(Array( index.toString ))
				case 5 => UDFReg5.main(Array( index.toString ))
				case 6 => UDFReg6.main(Array( index.toString ))
				case 7 => UDFReg7.main(Array( index.toString ))
				case 8 => UDFReg8.main(Array( index.toString ))
				case 9 => UDFReg9.main(Array( index.toString ))
				case 10 => UDFReg10.main(Array( index.toString ))
				case _ => Unit
			}

	}

	spark.sql("select blue(), reverse('namoW tobor ociP ociP') AS rev, plus(1, 2), sum3(1, 1, 1)").show
    /* вывод
	
		+------+--------------------+----------+-------------+
		|blue()|                 rev|plus(1, 2)|sum3(1, 1, 1)|
		+------+--------------------+----------+-------------+
		|00ff00|Pico Pico robot W...|         3|            3|
		+------+--------------------+----------+-------------+
	
	 */

}

y sucedió.

PD: puedes mirar el código. aquí

Publicaciones Similares

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *