Planning flows like Thomas Jefferson

This article focuses on how to distribute tasks among queue pipelines to minimize overall processing time, and the unexpected connection between this scheduling method and Thomas Jefferson's method.

Background and motivation

Do you know how assembly lines in manufacturing and instruction pipelines in processors create a form of parallelism without violating determinism (the order of the input data into the pipeline is preserved at the output)?

We'll call this implicit parallelism via pipelining. Implicit, because simply by dividing the task into stages, we automatically obtain parallelism. When this technique is applied to software, it allows us to write fully sequential programs (for each stage) while making efficient use of parallel hardware.

How does this work? Each stage runs independently (on the same processor/core), and the stages are connected through queues. When the first processor/core completes the first stage of the first element, it passes it to the second stage and continues working on the first stage of the second element. As the queues between stages fill up, we achieve parallelism while maintaining determinism (outputs arrive in the same order as inputs).

Here is an example of a conveyor belt in a bottle factory:

Each stage is connected by conveyor belts (queues) and works in parallel with the others, for example, after filling the first bottle, you can fill the second while simultaneously closing the first bottle, etc.

If a stage is running slowly, the input queue at that stage can be split or split into parts by adding another handler to that stage. This will send every second input element to a new handler, effectively almost doubling the throughput.

In this article I would like to discuss the following question: what is the best way to distribute processors/cores between stages? For example, if we have only two processors/cores, but three stages, then it makes no sense to allocate one of them to the last stage until at least some of the elements are processed in the second stage.

Initially I want to develop a library to test these concepts, but in the long term I would like to create a programming language that uses these ideas to see if we can create something that will scale as the number of processors/cores available increases.

Inspiration and previous works

Although examples of assembly line manufacturing in manufacturing predate Henry Ford, it appears that this is when it became widespread. Wikipedia reads:

The assembly line, driven by conveyor belts, reduced production time for a Model T to just 93 minutes by dividing the process into 45 steps. Producing cars quicker than paint of the day could dry, it had an immense influence on the world.

For comparison, before the introduction of a conveyor belt for car production it took about 12.5 hours.

Processors are another example of using pipelines to speed up instruction processing. A pipeline might look like this: receive an instruction, receive operands, execute the instruction, and finally write the results.

Given the tremendous success of this concept in both manufacturing and hardware, would you expect pipelining to become popular in software as well? However, for some reason, this has not happened yet, although this idea has supporters.

Jim Gray talked about parallelism in software pipelines and separation of concerns in his interview on the occasion of receiving the Turing Award. Dataflow-based programming languagesin particular programming based on data flows Paul Morrison use this idea. Pattern LMAX Disruptor is also based on pipeline parallelism and supports what Jim calls partitioning parallelism. One of the sources Disruptor cites is the article SEDA: An Architecture for Well‑Conditioned, Scalable Internet Services (2001)which also talks about pipelines and dynamic distribution of threads across stages. Recently, while studying work Jim, I discovered that database engines also implement something similar to pipeline parallelism. An example of database engines using this technique is given in the article: Morsel‑Driven Parallelism.

These examples of software pipelined parallelism inspired me to think about this concept. However, I only really began to think about how to design a programming language to make pipelining easier to use in software after I read the following quote from Martin Thompson, one of the developers of LMAX Disruptor:

If there's one thing I'd say to the Erlang folks, it's you got the stuff right from a high-level, but you need to invest in your messaging infrastructure so it's super fast, super efficient and obeys all the right properties to let this stuff work really well.

Hearing joke Joe Armstrong that an unmodified Erlang program would only run 33 times faster on a 64-core machine, not the 64 times Ericsson management expected, I started thinking about how a programming language could be designed to make pipelining easier in programs. .

I started researching this topic in two previous articlesand also wrote about elastically scaling one stage up and down, but in this post we'll take a more global approach.

The big picture

The system consists of three parts: a pipeline, handlers and a scheduler:

Scheduler monitors the pipeline by estimating the length of the input queues for each stage and the average service time at that stage. Using this data, it calculates how to distribute the available handlers.

The handler distribution algorithm works as follows:

  1. All possible configurations for distributing handlers across stages are generated;

  2. Each configuration is evaluated using the following formula:\sum_{s}\frac{l_{s} \cdot t_{s}}{w_{s} + 1}​​​, where s – this is the stage l_{s}​ is the length of the input queue at stage s, t_{s} is the average processing time at stage s, and w_{s} — the number of processors allocated to this stage.

  3. The configuration with the lowest value is selected, that is, the one where the total processing time is minimal.

Handlers, usually one per available processor/core, process a batch of input data at the stage the scheduler directs them to and then report it to the scheduler, after which the process is repeated until the input data stream is completed.

If we zoom in and look at the pipeline in more detail, we can see that it consists of a source, N stages, and a destination node:

The source can be a file, a network socket, a list of items provided by the user, etc., from which the input data for the first stage queue is created. Input data can be length-prefixed bytes, raw bytes, newline-delimited bytes, etc. Likewise, a destination node can also be a file, standard output, or a socket. Between the source and the destination node, the main processing occurs at different stages.

Implementation prototype

I hope that from the picture above it becomes clear that most of the code will be “wiring” (connecting components using queues). The most interesting part: how the scheduler determines which task to assign to the handler after it finishes working on the previous one.

Let's start by presenting the configuration of handlers distributed throughout the pipeline. Each stage has a name or identifier, so the configuration can be represented as a pair of identifier and the number of handlers assigned to this stage.

newtype Config = Config (Map StageId NumOfWorkers)
  deriving Show
type NumOfWorkers = Int

Initial configuration – all stages are assigned zero handlers:

initConfig :: [StageId] -> Config
initConfig stageIds =
  Config (Map.fromList (zip stageIds (replicate (length stageIds) 0)))

The implementation of handler allocation begins with generating all possible configuration options, discarding those that assign handlers to completed stages (“completed” is a stage that will no longer receive data), evaluating all configurations and selecting the one with the lowest score.

allocateWorkers :: Int -> Map StageId QueueStats -> Set StageId -> Maybe Config
allocateWorkers cpus qstats done = case result of
  []                -> Nothing
  (cfg, _score) : _ -> Just cfg
  where
    result = sortBy (comparing snd)
               [ (cfg, sum (Map.elems (scores qstats cfg)))
               | cfg <- possibleConfigs cpus (Map.keys qstats)
               , not (allocatesDoneStages cfg done)
               ]

All possible configurations are generated as follows:

possibleConfigs :: Int -> [StageId] -> [Config]
possibleConfigs cpus stages = map (Config . Map.fromList . zip stages) $ filter ((== cpus) . sum)
  [ foldl' (\ih i -> update i succ ih) (replicate (length stages) 0) slot
  | choice <- combinations [0.. (cpus + length stages - 1)] cpus
  , let slot = [ c - i | (i, c) <- zip [0.. ] choice ]
  ]
  where
    combinations :: [a] -> Int -> [[a]]
    combinations xs n = filter ((== n) . length) (subsequences xs)
    -- update i f xs = xs[i] := f (xs[i])
    update :: Int -> (a -> a) -> [a] -> [a]
    update i f = go [] i
      where
        go acc _ []       = reverse acc
        go acc 0 (x : xs) = reverse acc ++ f x : xs
        go acc n (x : xs) = go (x : acc) (n - 1) xs

Then the assessment is made as follows:

scores :: Map StageId QueueStats -> Config -> Map StageId Double
scores qss (Config cfg) = joinMapsWith score qss cfg
  where
    score :: QueueStats -> Int -> Double
    score qs workers =
      (fromIntegral (queueLength qs) * fromIntegral avgServiceTimePicos)
      /
      (fromIntegral workers + 1)
      where
        avgServiceTimePicos :: Word64
        avgServiceTimePicos
          | len == 0  = 1 -- XXX: What's the right value here?
          | otherwise = sum (serviceTimesPicos qs) `div` len
          where
            len :: Word64
            len = genericLength (serviceTimesPicos qs)

A small helper function for joining hash tables:

joinMapsWith :: Ord k => (a -> b -> c) -> Map k a -> Map k b -> Map k c
joinMapsWith f m1 m2 = assert (Map.keys m1 == Map.keys m2) $
  Map.fromList
    [ (k, f x (m2 Map.! k))
    | (k, x) <- Map.toList m1
    ]

The last thing we need to do is define the configurations that assign handlers to completed stages:

allocatesDoneStages :: Config -> Set StageId -> Bool
allocatesDoneStages (Config cfg) done =
  any (\(stageId, numWorkers) -> stageId `Set.member` done && numWorkers > 0)
      (Map.toList cfg)

Prototype launch

Let's wrap up with a couple of examples in the REPL. Let's say we have two handlers and two stages (A and B). Stage A has three elements in its input queue (and that will be all the input it will receive) and no stage has completed yet (that's the last argument S.empty):

>>> allocateWorkers 2 
                    (M.fromList [ ("A", QueueStats 3 [])
                                , ("B", QueueStats 0 [])]) 
                    S.empty

(Constructor QueueStats takes the length of the input queue as the first argument and a list of processing times as the second argument.)

If we run the above code, we will get:

Just (Config (fromList [("A",2),("B",0)]))

This means that both handlers must be allocated to stage A. Let's assume that we have made this allocation and after one time unit both handlers have completed their work. This means that there is now one element left in the input queue, while the second stage (B) now has two elements in its input queue. Since both handlers have completed their work, we run the distribution function again:

>>> allocateWorkers 2 
                    (M.fromList [ ("A", QueueStats 1 [1,1])
                                , ("B", QueueStats 2 [])]) 
                    S.empty
Just (Config (fromList [("A",1),("B",1)]))

This means that we must allocate one handler for each stage. If we again imagine that we've done this and they've both completed after one time unit, we end up in a situation where all three items have been processed in the first step (A) and we can mark it as completed while on the second stage will have two elements in its input queue:

>>> allocateWorkers 2 
                    (M.fromList [ ("A", QueueStats 0 [1,1,1])
                                , ("B", QueueStats 2 [1])]) 
                    (S.fromList ["A"])
Just (Config (fromList [("A",0),("B",2)]))

In this case, handler allocation will mean that both handlers will be allocated to the second stage. Once the handlers have finished working on these elements, the second stage will also process all the elements, and we are done with the process:

>>> allocateWorkers 2 
                    (M.fromList [ ("A", QueueStats 0 [1,1,1])
                                , ("B", QueueStats 0 [1,1,1])]) 
                    (S.fromList ["A", "B"])
Nothing

An unexpected connection with Thomas Jefferson

When I developed the planning idea described above, I discussed it with my friend Daniel Gustafsson, who immediately replied: “It’s a bit like Jefferson method“(by distribution of seats in parliaments).

Here's how it works:

Once all the votes have been counted, sequential odds are calculated for each party. The party with the highest odds wins one seat and its odds are recalculated. This is repeated until the required number of seats are filled. The formula for calculating the coefficient looks like this:

quot = \frac{V}{s + 1}

Where:

  • V is the total number of votes the party received, and

  • s is the number of seats that the party has already won, initially 0 for all parties.”

The analogy is as follows:

  • parties: stages in a pipeline

  • seats for the party: handlers allocated to the stage

  • vote: “score” (input queue length times average processing time)

  • rounds: total number of handlers

Let's try to repeat the example we looked at earlier, where Stage A and Stage B had queue lengths of 3 and 2 respectively, but using Jefferson's method:

  1. In the first round game/stage A receives 1 vote while the party/stage B gets 2 votes, so the odds are \frac{1}{0 + 1} And \frac{2}{0 + 1} accordingly, which means that the stage B wins the round and gets one place.

  2. In the second round we get the odds: \frac{1}{0 + 1} = 1 And \frac{2}{1 + 1} = 1 (note that here s = 1since stage/batch B already won a place in the previous round). This means we get a draw, in which case I suppose we can randomly choose the first game so that our example matches the implementation*.

Daniel also explained that although Jefferson proposed this method, it is not actually used in the US, but most countries in Europe, including the EU Parliament, use this method.

Conclusion and future work

We looked at a strategy to elastically scale the number of processors/cores allocated to a single stage in a pipeline. The ability to do this will be useful in the following cases:

  • When the load on the system changes and one stage suddenly becomes slower than another; Thanks to elasticity, we can reallocate cores and maintain throughput.

  • When the load decreases, we can scale down and use cores in other parts of the system.

We also saw how Thomas Jefferson's method of apportioning seats in Parliament could be used to solve the same problem. This unexpected connection makes me wonder where else this algorithm might pop up.

We are still far from implementing the runtime of a parallel programming language using these ideas. In particular, the current implementation uses simple concurrent queues to connect stages, which means that scaling up a stage does not preserve deterministic output. This can be solved using Disruptors, as I described in my old article. I have collected many other tasks that need to be completed in a separate file. If you are interested in any of this, feel free to contact me.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *