Z: Concurrency 2

LANG, Z

Following my previous post on the matter, I've had plenty of time to think about concurrency design in the context of zippy.

An overarching goal here is ease-of-use: it should be easy to make sequential things parallel, and it should be easy to do so safely. Finally, it should also be easy to understand what is going on.

Effect handlers and multitasking

With zippy, I intend to use one-shot effects pervasively to capture the idea of side effects. And with that comes effect handlers. An effect is a lot like an super-powered exception. A handler is some dynamically scoped piece of code which executes whenever an effect is performed, and it has access to the part of the call stack from that point to (and including) the handler itself in the form of a resumption.

A resumption is a lot like a function (it's a delimited continuation) and by calling it, you're effectively jumping back to where the effect was performed. The argument to the resumption is the return value of the corresponding effect.

Finally, resumptions in zippy are all affine functions, meaning they can be resumed once or not at all. Resuming multiple times could make effects a bit more expensive since the entire chunk of the call stack must, somehow, be copied. Also it makes control flow a bit unpredictable and hard to reason about, and doesn't interact well with resources or other affine/linear types.

type Logs where
  eff log(message: String)

let reverse_logger = Logs handler
  eff log message =
    let result = resume()
    print message
    result

fun main() =
  with reverse_logger
  Logs.log "first"
  Logs.log "second"

-- Output:
-- second
-- first

Effects are interesting, because they give you cooperative multi-tasking for free. Note how a performed effect essentially pauses the current computation and yields control to the caller. Since resume, the resumption is just an affine function, it can be stored away in a variable, list, or queue, and called later (leaning on a linearity checker to ensure such values are not used multiple times).

A simple example of such cooperation is that of an infinite stream. We can define a Yields T effect for functions which yield values of type T:

type Yields T where
  eff yield(value: T)

A stream is then just any function with this effect.

fun to_stream |T| (values: List T) / Yields T =
  for item in values do
    Yields.yield item

Now, whenever such a stream yields, what happens next is entirely in the hands of the handler. A custom stream-based for-loop could be implemented, which itself uses effects to break:

type Breaks T where
  eff break(value: T) : Nothing

fun (for*) |T, U, Ret, E|
  (iterand   : Unit -> U    / E & Yields T)
  (body      : T    -> Unit / E & Breaks Ret)
  -- The else clause, run if the loop doesn't break
  (otherwise : U    -> Ret  / E)
return Ret do E =
  with Breaks Ret handler
    eff break value = value
    return value = otherwise value

  with Yields T handler
    eff yield value = body value

  it()

This makes it easy to run code concurrently with a stream. Also note that anyone using for* is allowed to break whenever they want, and that by doing this, the stream itself should be cancelled.

fun main() =
  for* value: Int in to_stream [1, 2, -3, 4] do
    print value
    if value < 0 do
      break()

Since any handler is allowed to not resume, cancellation is by necessity supported everywhere in the language.

Joins

Although effects make it easy to do multi-tasking in a cooperative way, it doesn't really take advantage of parallelism.

First, note that running pure1 code in parallel is easy. There's no access to the outside world except through the function arguments and return value, so there's no danger of data races, deadlocks, or other issues.

A bigger challenge arises if, say, the main thread spawns a child thread, and that child thread has an effect which escapes. The intuitive way this would work is that this effect gets handled by the appropriate handler in the main thread (after all, that is where the child thread “comes from”), but just running such a handler in a child thread is an easy way to get data races and for handlers to become corrupted if their stack frame is returned from.

The basic solution I have in mind is, at its core, about limiting parallelism to only be allowed with pure code. This (rather strict) regime ensures data race safety, but is on its own pretty harsh. Like many other languages, there would be some kind of Thread library (probably part of the stdlib) with a function spawn that takes a function and returns a join. A join is itself a function you can call, at which point, the calling thread will block until the spawned thread finishes. The join returns whatever it is the spawned function returns.

import Standard.Thread
import Standard.Time.second

fun main() =
  let join = Thread.spawn () =>
    Thread.sleep (2.second)
    5

  print "working hard..."
  let value: Int = join()
  print "done!"
  print value

-- Output:
-- working hard... (2 second delay)
-- done!
-- 5
Effect safety of sleeping

Sleeping a thread is probably an effectful operation (after all, it probably does have to figure out the current time or do a syscall or something to actually do it). But it doesn't really need to be; a pure function will never return a different result whether a sleep happens or not. For that to actually be an observable thing2, the code would need to invoke some other effect (Times) to actually measure the time.

Anyway, I think a nice compromise (to ensure sleeps don't just get disappeared by the partial evaluator) is to keep sleeps effectful, but to have their effect be Threads. Then, the thread spawning function can take a function and handle its Threads effect.

It can be quite useful to allow effects to escape a thread (imagine if you weren't allowed to log from inside a thread!) so to regain this, the join function is augmented: instead of returning the result of the thread, it returns either the result or another join.

type Join Res Eff = Unit -> JoinResult Res (Join Res Eff) / Eff

type JoinResult Res Join where
  val done(result: Res)
  val paused(join: Join)

The idea here is that whenever an effect which isn't handled by the thread itself is performed, that thread is paused and the effect is saved. Then, whenever the appropriate join is called, that effect “re-thrown” in the calling thread. Assuming the effect handler does resume, the join then saves whatever it resumed with and returns another join. That join, in turn, represents the continuation of the thread. When a thread finishes, instead of returning another join, it just returns the result as normal.

This is the critical part of the design. Basically, what's happening is that a thread is free to run pure code in parallel. But the moment an effect escapes the thread, it must pause itself. It's then up to whatever thread has its join to actually handle the effect. And when the effect is handled, the thread can continue doing the same.

Imagine some function which logs something once if its argument is even and twice if its argument is odd before returning some result:

fun compute(x: Nat) : Nat =
  Logs.log "computing the result"
  if is_odd x do
    Logs.log "(for an odd argument)"
  x / 2

This can be spawned in a thread, which will immediately execute the function. However, as soon as it hits the effectful bit, it pauses execution and waits for the join.

import Standard.Thread

fun main() =
  with Logs handler
    let log = print

  let join = Thread.spawn () => compute 5
  -- Nothing has been printed yet!

At this point, main might be in the middle of some complicated computation itself. Nothing has happened, however: although the thread may have performed the effect, that effect is just patiently waiting for the main thread to call the join.

Only when it does call the join will the effect be performed:

  var result = join()
  -- Now the logger has been invoked

In this case, since the function prints, the result contains a join. We can help the thread make progress by calling the joins in a loop until it returns:

  let result = loop
    result := case result
      is JoinResult.finished (let value) => break final_result
      is JoinResult.paused   (let join)  => join()

  print("Final result: ", result)

This looks a lot like polling an asynchronous or cooperatively multitasked function, and that's because it is. This system completely abstracts away the concept of threads: indeed, they aren't actually required here. All that needs to happen is that the join returns whenever an effect happens or a value is returned. It's perfectly fine for the spawned thread to run synchronously during the call join. However, because the chunk of code that the join must wait for is pure, it is also completely okay for it to run that in parallel.

This actually makes it really easy to reason about multi-tasking code since it looks and behaves as if your code was synchronous.

Cancellation

What happens if you just never call the join? The thread, at that point, is unable to make any progress. It could just sit around forever, waiting for something that will never come, but a more reasonable approach is to cancel it whenever the join is dropped.

Since effects already support cancellation, this is completely okay. Further, this cancellation can only happen at effect boundaries. If a thread is waiting for an effect, but the join is dropped, then from the perspective of that thread, it's no different from if the resumption had been dropped in the handler – it has no way to tell the difference, so you don't need to worry about two kinds of cancellation.

Schedulers

Since a thread now needs to be polled to stay alive, there's some potential for fun here. It would probably get pretty annoying to write the join-until-done loop, so a library of schedulers could appear.

A scheduler would be used a bit like this:

let result1, result2, result3 = Thread.round_robin(task1, task2, task3)

And it would be responsible for spawning each task and polling them in a loop. A round robin scheduler could do the polling in a round-robin fashion (surprising, I know). A biased scheduler might poll the first task more often.

Since the choice of scheduler influences the order in which joins are called, they also influence the order in which effects happen. But importantly, since a scheduler is just normal code, they will always do it in the same order.3

Determinism and composability

This leads to a very important point: this multi-tasking system is completely deterministic! Since different threads can only interact using effects, and effects are explicitly order using joins and schedulers, the program itself will be completely deterministic.

This is really, really nice actually, because it makes debugging much easier. Non-interacting parts of your program might run in parallel (or sequentially, or interleaved – it doesn't matter), but the parts of your program where output happens will always have a predictable order.

It's worth mentioning that this system also gives you a kind of structured concurrency: if you want a thread to run to completion, you have to poll it. This places a bound on the lifetime of a thread. It's only alive as long as the join is alive.

But unlike many other systems for structured concurrency, this also gives you quite a bit of freedom. The lifetime of a thread is no longer limited to a fixed, lexical scope, but to the lifetime of a variable. And that variable can be passed to other functions, returned as a function, used inside other effects, moved into a thread of its own, and so on – the possibilities are endless! And the moment you're done with the thread and want it gone, you can just forget about the variable and it will clean up after itself, or you can give it to some scheduler and run it to completion.


During the design of zippy, I've also been quite focused on ensuring composability: basically, being able to take two independent programs/functions/expressions that, on their own, work, and easily combining them so the resulting thing also works.

Being able to handle arbitrary effects is important here: if you for whatever reason require purity or determinism, you can make anything pure and deterministic by handlings all its effects.

By forcing all of the actually parallel parts to be pure, this applies even in the context of multitasking. Although a function or library might use a non-deterministic scheduler, perhaps to achieve better performance, you can regain purity by handling the source of the effect.

Essentially, I think it is important to have a strong set of requirements – purity – which anyone can rely on if they need, with the ability to “undo” anything which prevents that requirement from being met.

Pitfalls

An issue I haven't touched on is what exactly happens if we never join. Naturally, if the join is dropped, the thread should be cancelled. This might be problematic in the presence of linear types however. An effect looks basically just like a function, so it really should be allowed to take a linear type as argument. And in single-threaded code, if an effect is performed with such a type as its argument, it is guaranteed to be used.

But the threading system allows (and probably even encourages) “forgetting” effects by ignoring the join and cancelling the thread. This is subtly different from single-threaded cancellation. There, cancellation can only happen inside an effect handler, at which point the linear value is in our hands and we must do something with it. With a join, the cancellation can happen before a handler, effectively keeping the linear value in a limbo of disuse.

It might be possible to make a join linear by looking at its effects; since the join somehow has to hold the effects it is parameterized by it is reasonable for the linearity of the join to be dependent on that. That way, if a thread performs effects which take linear arguments, anyone holding such a join is forced to use it. I'm not sure this is the best way to go about things, though.

It's also worth mentioning that this isn't a problem for affine types, since they, by definition, are allowed to be unused. So a join would only need to be linear if there were other linear types in the mix.

Also, this pseudo-cooperative tasking system is, in my opinion, quite nice to work with, it does come with overhead. Namely, there's synchronization involved every time an escaping effect is performed. That's pretty bad if you have some effect heavy workload you wish to run on another thread, even if it is great for determinism.

I think an actor-like system (such as the one described in Z: Concurrency) would be a worthy inclusion. A basic idea is that each actor has its own entry point and can only access the default handlers (leaving the issue of thread safety as a problem for the outside world).

Finally, there is the question of “what if I mutate an argument?” This doesn't really induce an effect, but it does change the type of the function. I haven't fully worked out the value semantics yet, but I imagine that if you somehow got a hold of a mutable reference, then mutating it would use the same mechanism to ensure a deterministic order of mutation and protect against data races. I might end up just enforcing copying on captured variables, in which case this is not an issue.