Flujos de planificación como Thomas Jefferson / Sudo Null IT News

Este artículo se centra en cómo distribuir tareas entre los canales de cola para minimizar el tiempo de procesamiento general y en la conexión inesperada entre este método de programación y el método de Thomas Jefferson.

Antecedentes y motivación

¿Sabe cómo las líneas de ensamblaje en los canales de fabricación y de instrucción en los procesadores crean una forma de paralelismo sin violar el determinismo (el orden de los datos de entrada en el canal se conserva en la salida)?

A este paralelismo implícito lo llamaremos vía canalización. Implícito, porque simplemente con dividir la tarea en etapas, automáticamente obtenemos paralelismo. Cuando esta técnica se aplica al software, nos permite escribir programas completamente secuenciales (para cada etapa) mientras hacemos un uso eficiente del hardware paralelo.

¿Cómo funciona esto? Cada etapa se ejecuta de forma independiente (en el mismo procesador/núcleo) y las etapas están conectadas a través de colas. Cuando el primer procesador/núcleo completa la primera etapa del primer elemento, la pasa a la segunda etapa y continúa trabajando en la primera etapa del segundo elemento. A medida que las colas entre etapas se llenan, logramos paralelismo manteniendo el determinismo (las salidas llegan en el mismo orden que las entradas).

A continuación se muestra un ejemplo de una cinta transportadora en una fábrica de botellas:

Cada etapa está conectada por cintas transportadoras (colas) y funciona en paralelo con las demás, por ejemplo, después de llenar la primera botella, se puede llenar la segunda mientras se cierra simultáneamente la primera botella, etc.

Si una etapa se ejecuta lentamente, la cola de entrada en esa etapa se puede dividir o dividir en partes agregando otro controlador a esa etapa. Esto enviará cada segundo elemento de entrada a un nuevo controlador, lo que prácticamente duplicará el rendimiento.

En este artículo me gustaría discutir la siguiente pregunta: ¿cuál es la mejor manera de distribuir procesadores/núcleos entre etapas? Por ejemplo, si solo tenemos dos procesadores/núcleos, pero tres etapas, entonces no tiene sentido asignar uno de ellos a la última etapa hasta que al menos algunos de los elementos se procesen en la segunda etapa.

Inicialmente quiero desarrollar una biblioteca para probar estos conceptos, pero a largo plazo me gustaría crear un lenguaje de programación que utilice estas ideas para ver si podemos crear algo que se amplíe a medida que aumente la cantidad de procesadores/núcleos disponibles.

Inspiración y trabajos anteriores

Aunque los ejemplos de fabricación en línea de montaje en el sector manufacturero son anteriores a Henry Ford, parece que fue entonces cuando se generalizó. Wikipedia lee:

La línea de montaje, impulsada por cintas transportadoras, redujo el tiempo de producción de un Modelo T a sólo 93 minutos al dividir el proceso en 45 pasos. Al producir automóviles más rápido de lo que se secaba la pintura de la época, tuvo una inmensa influencia en el mundo.

A modo de comparación, antes de la introducción de una cinta transportadora para la producción de automóviles. tomó alrededor de 12,5 horas.

Los procesadores son otro ejemplo del uso de canalizaciones para acelerar el procesamiento de instrucciones. Una canalización podría verse así: recibir una instrucción, recibir operandos, ejecutar la instrucción y finalmente escribir los resultados.

Dado el tremendo éxito de este concepto tanto en la fabricación como en el hardware, ¿esperaría que la canalización se hiciera popular también en el software? Sin embargo, por alguna razón esto no ha sucedido todavía, aunque esta idea tiene partidarios.

Jim Gray habló sobre el paralelismo en los canales de software y la separación de preocupaciones en su entrevista con motivo de recibir el Premio Turing. Lenguajes de programación basados ​​en flujo de datosEn particular programación basada en flujos de datos Paul Morrison usa esta idea. Patrón Disruptor LMAX También se basa en el paralelismo de canalización y admite lo que Jim llama paralelismo de partición. Una de las fuentes que cita Disruptor es el artículo SEDA: una arquitectura para servicios de Internet escalables y bien acondicionados (2001)que también habla sobre canalizaciones y distribución dinámica de subprocesos entre etapas. Recientemente, mientras estudiaba trabajar Jim, descubrí que los motores de bases de datos también implementan algo similar al paralelismo de canalizaciones. En el artículo se ofrece un ejemplo de motores de bases de datos que utilizan esta técnica: Paralelismo impulsado por bocados.

Estos ejemplos de paralelismo canalizado de software me inspiraron a pensar en este concepto. Sin embargo, solo comencé a pensar realmente en cómo diseñar un lenguaje de programación para hacer que la canalización sea más fácil de usar en el software después de leer la siguiente cita de Martin Thompson, uno de los desarrolladores de LMAX Disruptor:

Si hay algo que le diría a la gente de Erlang, es que tienen las cosas bien desde un alto nivel, pero necesitan invertir en su infraestructura de mensajería para que sea súper rápida, súper eficiente y obedezca todas las propiedades correctas para permitir que esto las cosas funcionan muy bien.

Audiencia broma Joe Armstrong que un programa Erlang sin modificar solo se ejecutaría 33 veces más rápido en una máquina de 64 núcleos, no las 64 veces que esperaba la gerencia de Ericsson, comencé a pensar en cómo se podría diseñar un lenguaje de programación para facilitar la canalización de programas.

Empecé a investigar este tema en dos anterior artículosy también escribió Se trata de escalar elásticamente una etapa hacia arriba y hacia abajo, pero en esta publicación adoptaremos un enfoque más global.

El panorama general

El sistema consta de tres partes: una tubería, manejadores y un planificador:

Programador monitorea la tubería estimando la longitud de las colas de entrada para cada etapa y el tiempo promedio de servicio en esa etapa. Utilizando estos datos, calcula cómo distribuir los controladores disponibles.

El algoritmo de distribución del controlador funciona de la siguiente manera:

  1. Se generan todas las configuraciones posibles para distribuir controladores entre etapas;

  2. Cada configuración se evalúa utilizando la siguiente fórmula:\sum_{s}\frac{l_{s} \cdot t_{s}}{w_{s} + 1}​, donde s – este es el escenario l_{s}​ es la longitud de la cola de entrada en la etapa s, t_ {s} es el tiempo promedio de procesamiento en la etapa s, y w_ {s} — el número de procesadores asignados a esta etapa.

  3. Se selecciona la configuración con el valor más bajo, es decir, aquella donde el tiempo total de procesamiento es mínimo.

Los controladores, generalmente uno por procesador/núcleo disponible, procesan un lote de datos de entrada en la etapa a la que el programador los dirige y luego lo informan al programador, después de lo cual el proceso se repite hasta que se completa el flujo de datos de entrada.

Si nos acercamos y observamos el pipeline con más detalle, podemos ver que consta de un nodo de origen, N etapas y un nodo de destino:

La fuente puede ser un archivo, un socket de red, una lista de elementos proporcionada por el usuario, etc., a partir del cual se generan los datos de entrada para la cola de la primera etapa. Los datos de entrada pueden ser bytes con prefijo de longitud, bytes sin formato, bytes delimitados por nueva línea, etc. Del mismo modo, un nodo de destino también puede ser un archivo, una salida estándar o un socket. Entre el nodo de origen y el de destino, el procesamiento principal se produce en diferentes etapas.

Prototipo de implementación

Espero que de la imagen de arriba quede claro que la mayor parte del código será “cableado” (conectando componentes mediante colas). La parte más interesante: cómo el programador determina qué tarea asignar al controlador una vez que termina de trabajar en la anterior.

Comencemos presentando la configuración de los controladores distribuidos a lo largo del pipeline. Cada etapa tiene un nombre o identificador, por lo que la configuración se puede representar como un par de identificadores y la cantidad de controladores asignados a esta etapa.

newtype Config = Config (Map StageId NumOfWorkers)
  deriving Show
type NumOfWorkers = Int

Configuración inicial: a todas las etapas se les asignan controladores cero:

initConfig :: (StageId) -> Config
initConfig stageIds =
  Config (Map.fromList (zip stageIds (replicate (length stageIds) 0)))

La implementación de la asignación de controladores comienza generando todas las opciones de configuración posibles, descartando aquellas que asignan controladores a etapas completadas (“completada” es una etapa que ya no recibirá datos), evaluando todas las configuraciones y seleccionando la que tenga la puntuación más baja.

allocateWorkers :: Int -> Map StageId QueueStats -> Set StageId -> Maybe Config
allocateWorkers cpus qstats done = case result of
  ()                -> Nothing
  (cfg, _score) : _ -> Just cfg
  where
    result = sortBy (comparing snd)
               ( (cfg, sum (Map.elems (scores qstats cfg)))
               | cfg <- possibleConfigs cpus (Map.keys qstats)
               , not (allocatesDoneStages cfg done)
               )

Todas las configuraciones posibles se generan de la siguiente manera:

possibleConfigs :: Int -> (StageId) -> (Config)
possibleConfigs cpus stages = map (Config . Map.fromList . zip stages) $ filter ((== cpus) . sum)
  ( foldl' (\ih i -> update i succ ih) (replicate (length stages) 0) slot
  | choice <- combinations (0.. (cpus + length stages - 1)) cpus
  , let slot = ( c - i | (i, c) <- zip (0.. ) choice )
  )
  where
    combinations :: (a) -> Int -> ((a))
    combinations xs n = filter ((== n) . length) (subsequences xs)
    -- update i f xs = xs(i) := f (xs(i))
    update :: Int -> (a -> a) -> (a) -> (a)
    update i f = go () i
      where
        go acc _ ()       = reverse acc
        go acc 0 (x : xs) = reverse acc ++ f x : xs
        go acc n (x : xs) = go (x : acc) (n - 1) xs

Luego la evaluación se realiza de la siguiente manera:

scores :: Map StageId QueueStats -> Config -> Map StageId Double
scores qss (Config cfg) = joinMapsWith score qss cfg
  where
    score :: QueueStats -> Int -> Double
    score qs workers =
      (fromIntegral (queueLength qs) * fromIntegral avgServiceTimePicos)
      /
      (fromIntegral workers + 1)
      where
        avgServiceTimePicos :: Word64
        avgServiceTimePicos
          | len == 0  = 1 -- XXX: What's the right value here?
          | otherwise = sum (serviceTimesPicos qs) `div` len
          where
            len :: Word64
            len = genericLength (serviceTimesPicos qs)

Una pequeña función auxiliar para unir tablas hash:

joinMapsWith :: Ord k => (a -> b -> c) -> Map k a -> Map k b -> Map k c
joinMapsWith f m1 m2 = assert (Map.keys m1 == Map.keys m2) $
  Map.fromList
    ( (k, f x (m2 Map.! k))
    | (k, x) <- Map.toList m1
    )

Lo último que debemos hacer es definir las configuraciones que asignan controladores a las etapas completadas:

allocatesDoneStages :: Config -> Set StageId -> Bool
allocatesDoneStages (Config cfg) done =
  any (\(stageId, numWorkers) -> stageId `Set.member` done && numWorkers > 0)
      (Map.toList cfg)

Lanzamiento del prototipo

Terminemos con un par de ejemplos en REPL. Digamos que tenemos dos controladores y dos etapas (A y B). La etapa A tiene tres elementos en su cola de entrada (y esa será toda la entrada que recibirá) y ninguna etapa se ha completado aún (ese es el último argumento S.empty):

>>> allocateWorkers 2 
                    (M.fromList ( ("A", QueueStats 3 ())
                                , ("B", QueueStats 0 ()))) 
                    S.empty

(Constructor QueueStats toma la longitud de la cola de entrada como primer argumento y una lista de tiempos de procesamiento como segundo argumento).

Si ejecutamos el código anterior, obtendremos:

Just (Config (fromList (("A",2),("B",0))))

Esto significa que ambos manejadores deben ser asignados a la etapa A. Supongamos que hemos realizado esta asignación y después de una unidad de tiempo ambos manejadores han completado su trabajo. Esto significa que ahora queda un elemento en la cola de entrada, mientras que la segunda etapa (B) ahora tiene dos elementos en su cola de entrada. Dado que ambos controladores han completado su trabajo, ejecutamos la función de distribución nuevamente:

>>> allocateWorkers 2 
                    (M.fromList ( ("A", QueueStats 1 (1,1))
                                , ("B", QueueStats 2 ()))) 
                    S.empty
Just (Config (fromList (("A",1),("B",1))))

Esto significa que debemos asignar un controlador para cada etapa. Si volvemos a imaginar que hemos hecho esto y ambos se han completado después de una unidad de tiempo, terminamos en una situación en la que los tres elementos se han procesado en el primer paso (A) y podemos marcarlo como completado mientras estamos en la segunda etapa tendrá dos elementos en su cola de entrada:

>>> allocateWorkers 2 
                    (M.fromList ( ("A", QueueStats 0 (1,1,1))
                                , ("B", QueueStats 2 (1)))) 
                    (S.fromList ("A"))
Just (Config (fromList (("A",0),("B",2))))

En este caso, la asignación de controladores significará que ambos controladores serán asignados a la segunda etapa. Una vez que los manejadores hayan terminado de trabajar en estos elementos, la segunda etapa también procesará todos los elementos y terminaremos con el proceso:

>>> allocateWorkers 2 
                    (M.fromList ( ("A", QueueStats 0 (1,1,1))
                                , ("B", QueueStats 0 (1,1,1)))) 
                    (S.fromList ("A", "B"))
Nothing

Una conexión inesperada con Thomas Jefferson

Cuando desarrollé la idea de planificación descrita anteriormente, la hablé con mi amigo Daniel Gustafsson, quien inmediatamente respondió: “Es un poco como método jefferson“(por distribución de escaños en los parlamentos).

Así es como funciona:

Una vez que se han contado todos los votos, se calculan las probabilidades secuenciales para cada partido. El partido con mayores probabilidades gana un escaño y sus probabilidades se vuelven a calcular. Esto se repite hasta llenar el número requerido de asientos. La fórmula para calcular el coeficiente se ve así:

quot = \frac{V}{s + 1}

Dónde:

  • V es el número total de votos que recibió el partido, y

  • s es el número de escaños que el partido ya ha ganado, inicialmente 0 para todos los partidos”.

La analogía es la siguiente:

  • fiestas: etapas en una tubería

  • asientos para el partido: manejadores asignados al escenario

  • votar: “puntuación” (longitud de la cola de entrada multiplicada por el tiempo medio de procesamiento)

  • redondo: número total de manipuladores

Intentemos repetir el ejemplo que vimos anteriormente, donde la Etapa A y la Etapa B tenían longitudes de cola de 3 y 2 respectivamente, pero usando el método de Jefferson:

  1. En el juego/etapa de primera ronda A recibe 1 voto mientras el partido/escenario B obtiene 2 votos, por lo que las probabilidades son \frac{1}{0 + 1} Y \frac{2}{0 + 1} en consecuencia, lo que significa que la etapa B gana la ronda y obtiene un lugar.

  2. En la segunda ronda obtenemos las probabilidades: \frac{1}{0 + 1} = 1 Y \frac{2}{1 + 1} = 1 (tenga en cuenta que aquí s = 1desde etapa/lote B ya ganó un lugar en la ronda anterior). Esto significa que obtenemos un empate, en cuyo caso supongo que podemos elegir aleatoriamente el primer juego para que nuestro ejemplo coincida con la implementación*.

Daniel también explicó que aunque Jefferson propuso este método, en realidad no se utiliza en los EE. UU., pero la mayoría de los países de Europa, incluido el Parlamento de la UE, utilizan este método.

Conclusión y trabajo futuro.

Analizamos una estrategia para escalar elásticamente la cantidad de procesadores/núcleos asignados a una sola etapa en una tubería. La capacidad de hacer esto será útil en los siguientes casos:

  • Cuando la carga en el sistema cambia y una etapa de repente se vuelve más lenta que otra; Gracias a la elasticidad, podemos reasignar núcleos y mantener el rendimiento.

  • Cuando la carga disminuye, podemos reducir la escala y usar núcleos en otras partes del sistema.

También vimos cómo el método de Thomas Jefferson para distribuir escaños en el Parlamento podría usarse para resolver el mismo problema. Esta conexión inesperada me hace preguntarme dónde más podría aparecer este algoritmo.

Todavía estamos lejos de implementar el tiempo de ejecución de un lenguaje de programación paralelo utilizando estas ideas. En particular, la actual realización utiliza colas concurrentes simples para conectar etapas, lo que significa que ampliar una etapa no conserva la salida determinista. Esto se puede resolver usando disruptores, como describí en mi artículo antiguo. He recopilado muchas otras tareas que deben completarse por separado. archivo. Si estás interesado en algo de esto, no dudes en contactar a mí.

Publicaciones Similares

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *