Saturday, July 22, 2017

redo-signals: Planting imperative trees in a declarative forest

redo-signals is a Scala library for functional reactive programming. With redo-signals, you write declarative code:

val x = new Source[Int](0)
val y = new Source[Int](0)

val z = tracking { implicit t =>
x.track + y.track
}

But some behavior is poorly suited to declarative style. Consider the following set of rules:

  1. For every element of sourceList1, ensure there exists a Type1Worker worker in workerList.
  2. For every element of sourceList2, ensure there exists a Type2Worker worker in workerList.
  3. Don't remove or re-create workers.

You could write these rules declaratively, but they are more natural as imperative code:

sourceList1.zipWithStalenessFrom(Seq()).foreach { case (oldXs, newXs) =>
workerList() = workerList.now ++ ((newXs.toSet -- oldXs.toSet) map (new Type1Worker(_)))
}

sourceList2.zipWithStalenessFrom(Seq()).foreach { case (oldXs, newXs) =>
workerList() = workerList.now ++ ((newXs.toSet -- oldXs.toSet) map (new Type2Worker(_)))
}

But this introduces an unintended consequence. Supposing sourceList1 and sourceList2 originate from the same source. Upon every change in the source data, each foreach() will execute once, resulting in two updates to workerList, when there should be only one.

Consider the following complete example:

case class Payload(name: String)
trait Worker
class Type1Worker(val payload: Payload) extends Worker
class Type2Worker(val payload: Payload) extends Worker

val rootSourceList = new Source[Seq[Payload]](Seq())

val sourceList1 = rootSourceList map (_ filter (_.name.startsWith("a")))
val sourceList2 = rootSourceList map (_ filter (_.name.startsWith("b")))

val workerList = new Source[Seq[Worker]](Seq(), debugName = Some("workerList"))

sourceList1.zipWithStalenessFrom(Seq()).foreach { case (oldXs, newXs) =>
workerList() = workerList.now ++ ((newXs.toSet -- oldXs.toSet) map (new Type1Worker(_)))
} (workerList.redoObserving)

sourceList2.zipWithStalenessFrom(Seq()).foreach { case (oldXs, newXs) =>
workerList() = workerList.now ++ ((newXs.toSet -- oldXs.toSet) map (new Type2Worker(_)))
} (workerList.redoObserving)

workerList foreach { workers =>
println(s"Now have ${workers.length} workers")
}

// One update
rootSourceList() = Seq(Payload("a 1"), Payload("b 1"))

This prints

Now have 0 workers
Now have 1 workers
Now have 2 workers

Two updates appear in output: from 0 to 1, and from 1 to 2. How can we merge these into a single update?

The solution is to use foreachDelayed:

sourceList1.zipWithStalenessFrom(Seq()).foreachDelayed { xs => implicit u =>
workerList.updateLater {
xs map { case (oldXs, newXs) =>
workerList.now ++ ((newXs.toSet -- oldXs.toSet) map (new Type1Worker(_)))
}
}
}(workerList.redoObserving)

sourceList2.zipWithStalenessFrom(Seq()).foreachDelayed { xs => implicit u =>
workerList.updateLater {
xs map { case (oldXs, newXs) =>
workerList.now ++ ((newXs.toSet -- oldXs.toSet) map (new Type2Worker(_)))
}
}
}(workerList.redoObserving)

How does this work? In redo-signals, changes to the state of the world are made in two sweeps: the first sweeps invalidates all signals, and the second sweep executes listeners, which, in the process, often re-evaluate signals, causing them to become valid. foreachDelayed and updateLater allow the user to tap in to this procedure, by scheduling code to run on either the first or second sweep.

The body of the function passed to foreachDelayed runs on the first sweep -- the invalidation sweep. This means that it is guaranteed to run exactly once per update. However, because it is on the invalidation sweep, it's not allowed to look at the state of any signals, because any signal is at risk of being invalid.

So, it passes the work on to updateLater. updateLater is the complement to foreachDelayed. It schedules an update to run on the second sweep, execution sweep, at which time it may evaluate xs and workerList.now.

The result is that you can write imperative code that executes like declarative code -- no more than it has to. The various parts are schedules to run on the correct sweeps, and signals change their values exactly as many times as their are root changes to the system. And so imperative code mixes seamlessly into the declarative world.

No comments:

Post a Comment