Saturday, May 12, 2012

Even More Concurrency; Or, What You Can't Find By Searching Google Scholar

(Continuing the series of Schrodinger's Equation of Software, First Class Everything: Loops, Tail-Calls, and Continuations, and Automatic Concurrency)

      Our last version of cps_map_eval gave us concurrent evaluation of arguments, and without even needing any locks or other kinds of synchronization. That's pretty impressive... except that it only works as long as a) every argument returns at least once and b) no argument returns more than once before all have returned at least once.
      Those may not seem like very strict restrictions, except that the whole point of continuations is to break them. Consider this program:

((vau () % (+ (return 2) 2)))

      With the last version of cps_map_eval, it will cause an error, because when the first argument thread has terminated, it will not have filled in its slot- it jumped to a different continuation. The parent thread will try to evaluate (+ None 2) and throw an exception, when what we really wanted was for it to not run at all! A similar problem can be seen in this program:

(+  ((vau () % (par (return 1) (return 2))))
    (do-long-computation))

      In this case, no arguments fail to return. But the first argument activates its continuation twice, and from different threads. If both of those continuation calls execute before (do-long-computation) completes, the first one will fill in the argument slot and disappear as intended, but the second will go ahead and execute the function body with an incomplete argument list, because we have assumed erroneously that a continuation can only be called a second time after the function body has begun executing. This is false as soon as argument expressions can themselves contain multiple threads, and even more false if you allow mutation so that external threads might get hold of the continuation.
      I searched far and wide for existing research on combining continuations and concurrency. But it seems that on this particular topic, nothing exists. Fortress does not have continuations, and no experimental LISP dialect has automatic concurrency! There is a lot of interesting work on what continuations mean in concurrent systems, and different ways to deal with them when you have explicit language constructs for parallelism like spawn or pcall, but none of it is easily applicable to a situation where all of the parallelism is implicit. There are some very interesting ideas out there, like this work on subcontinuations and this on inter-thread communication with ports, which I may come back to in later posts, but for now it looks like I, too, can be on the cutting edge of functional programming research.
      What we want is not for all argument threads to terminate before executing the function body, but for all argument slots to be filled, whether it's by the same threads that were started for that purpose or not. In order to do that, we'll have to introduce a counter for the number of argument positions that's decremented whenever an argument slot is filled; when the counter reaches zero, then we can run the function body. A version of cps_map_eval that does that looks like this:

def cps_map_eval(k,v,*x):
    """
    Evaluates the elements of an argument list,
    creating continuations that will assign values
    to the correct indices in the evaluated list.
    """
    arglen = len(x)
    if arglen == 0: return k([])

    counter = [arglen]
    finished = Event()
    flock = Lock()
    argv = [None]*arglen

    def assign_val(i,val):
        argv[i] = val
        flock.acquire()
        counter[0] -= 1
        flock.release()
        if counter[0] == 0:
            finished.set()

    def reassign(i,val):
        finished.wait()
        new_argv = argv[:]
        new_argv[i] = val
        return k(new_argv)

    def arg_thread(i,ax):
        eval(ax,v,ArgK( lambda val: assign_val(i,val),
                        lambda val: reassign(i,val)))

    threads = [Thread(target=arg_thread,args=(i,ax))
                for i, ax in enumerate(x[:-1])]

    for t in threads: t.start()

    def arg_k(val):
        argv[-1] = val
        flock.acquire()
        counter[0] -= 1
        flock.release()
        if counter[0] == 0:
            finished.set()
        else:
            finished.wait()
        for t in threads: t.join()
        return k(argv)

    return Tail(x[-1],v,
                ArgK(arg_k,lambda val: reassign(-1,val)))

      Again, we're using a 1-element array because Python won't let us mutate variables in closure scopes, but will let us alter the contents of collections. Initial continuations fill in values and decrement the counter; non-initial continuations and the parent thread that will execute the function body wait until all of the slots are filled (every initial continuation has been called). When all of the slots are filled, any waiting threads are unblocked and the function body can be executed for as many times as continuations containing it have been activated. This almost works. Except for one rather important problem. This program will never terminate:

((vau () % (+ (return 1) 1)))

      The argument thread will send 1 to the outer continuation, never giving anything to (+ [] 1), and the parent thread will hang forever waiting for that slot to be filled. So, we can't allow the parent thread to block. To fix this, we don't privilege the parent thread. It becomes just another argument evaluator, whose sole speciality is the ability to call join() to cleanup all the other threads eventually. Instead, every initial continuation checks to see whether it filled in the last argument slot, and if so, it evaluates the function body. This means that the thread that began a computation may not be the same thread that completes it- the continuation can get picked up by a different thread, and the parent may end up swapped out for one of its children. This version of cps_map_eval takes care of that:

def cps_map_eval(k,v,*x):
    """
    Evaluates the elements of an argument list,
    creating continuations that will assign values
    to the correct indices in the evaluated list.
    """
    arglen = len(x)
    if arglen == 0: return k([])

    counter = [arglen]
    finished = Event()
    flock = Lock()
    argv = [None]*arglen

    def assign_val(i,val):
        argv[i] = val
        flock.acquire()
        counter[0] -= 1
        flock.release()
        if counter[0] == 0:
            finished.set()
            return k(argv)

    def reassign(i,val):
        finished.wait()
        new_argv = argv[:]
        new_argv[i] = val
        return k(new_argv)

    def arg_thread(i,ax):
        eval(ax,v,ArgK( lambda val: assign_val(i,val),
                        lambda val: reassign(i,val)))

    threads = [Thread(target=arg_thread,args=(i,ax))
                 for i, ax in enumerate(x[:-1])]

    for t in threads: t.start()

    def arg_k(val):
        r = assign_val(-1,val)
        for t in threads: t.join()
        return r

    return Tail(x[-1],v,
            ArgK(arg_k, lambda val: reassign(-1,val)))
      We're getting closer. All of the previous examples will now behave properly. But this one won't:

((vau () % (+ (return 1)
              ((vau () % (par (return 1)
                              (return 2)))))))

      The first argument slot is never filled. The first thread to return to the second argument slot fills in its value and terminates. And the second thread to return to the second argument slot then waits for the first one to be filled. And it will wait forever, that one blocked thread preventing the program from terminating.
      We could consider having some kind of thread garbage collector that periodically checks to see if all existing threads are blocked and if so terminates the program, or cleans up threads that can never be unblocked because the necessary continuations aren't held by anyone anymore. But what we really want is to simply not block in the first place. The choice of the name Event for the notification objects we've been using from Python's threading library so far is apt: we'd like something like an event loop that can store up computations and then execute them when an event (filling all the argument slots) occurs, or not if that event never occurs.
      We can achieve the same effect without actually building an event loop, though. Everything can still live nicely inside of cps_map_eval, and it looks like this:

def cps_map_eval(k,v,*x):
    """
    Evaluates the elements of an argument list,
    creating continuations that will assign values
    to the correct indices in the evaluated list.
    """
    arglen = len(x)
    if arglen == 0: return k([])

    counter = [arglen]
    flock = Lock()
    argv = [None]*arglen
    blocked = set()

    def assign_val(i,val):
        argv[i] = val
        flock.acquire()
        counter[0] -= 1
        if counter[0] == 0:
            flock.release()
            for t in blocked: t.start()
            r = k(argv)
            for t in blocked: t.join()
            blocked.clear()
            return r
        flock.release()

    def reassign(i,val):
        new_argv = argv[:]
        new_argv[i] = val
        return k(new_argv)

    def reactivate(i,val):
        flock.acquire()
        if counter[0] == 0:
            flock.release()
            return reassign(i,val)
        blocked.add(Thread(target=reassign,args=(i,val)))
        flock.release()

    def arg_thread(i,ax):
        eval(ax,v,ArgK( lambda val: assign_val(i,val),
                        lambda val: reactivate(i,val)))

    threads = [Thread(target=arg_thread,args=(i,ax))
                 for i, ax in enumerate(x[:-1])]

    for t in threads: t.start()

    def arg_k(val):
        r = assign_val(-1,val)
        for t in threads: t.join()
        return r

    return Tail(x[-1],v,
            ArgK(arg_k, lambda val: reactivate(-1,val)))

      If a continuation call cannot immediately continue, it's dumped into a blocked set and the thread terminates. If the argument slots are ever all filled, that set is checked, and everything in it is re-activated. If they aren't, the set just gets garbage collected when the necessary continuations are no longer held by anybody else. Note that thread join()s only occur after a parent thread has finished all possible useful work, and cannot do anything but return a final value to the top-level eval loop; thus, they do not inhibit concurrency. We could even get along without them, and let Python deal with the cleanup, but if we were using raw POSIX pthreads that wouldn't be an option, so we might as well cleanup our garbage. We did end up having to use 1 lock, to ensure atomic access to the counter, but that's pretty darn small; most of the time, it will never be contended. Note that we need to keep the lock around additions to the blocked set, just in case, but we don't need the lock when iterating over it; that's because we only get to that point if the counter has just been set to zero, and if the counter is zero, we know that no other thread will ever access the blocked set again.
      So there it is. Automatic concurrency that plays nice with continuations, without any need to modify the core vau evaluator.

As usual, working code can be found at https://github.com/gliese1337/schrodinger-lisp/.