Recently we built a system that needs to perform 2 tasks. Taks 1 runs every 15 minutes,
task 2 runs every 2 minutes. Task 1 kicks off some background jobs (an upload to
BigQuery), task 2 checks upon the results of these background jobs and does some cleanup
when they are done (delete the uploaded files). The two tasks need to share information back and forth about what
jobs are running in the background.
Now think to yourself (ignore the title for now 😉), what would be the most elegant
way to implement this? I suspect that most developers will come with a solution that
involves some locking, synchronisation and global state. For example by sharing the
information through a transactional database, or by using a semaphore to prevent the two
tasks from running at the same time plus sharing information in a global variable.
This is understandable, most programming environments do not provide better
techniques for these kinds of problems at all!
However, if your environment supports streams and has some kind of scheduling, here are
two tricks you can use: one for the scheduling of the tasks, the second for sharing
information without a global variable.
Here is an example for the first written in Scala using the ZIO streams library. Read on for an explanation.
import zio._
import zio.stream._
def performTask1: Task[Unit] = ???
def performTask2: Task[Unit] = ???
// An enumeration (scala 2 style) for our task.
sealed trait BusinessTask
object Task1 extends BusinessTask
object Task2 extends BusinessTask
ZStream.mergeAllUnbounded()(
ZStream.fromSchedule(Schedule.fixed(15.minutes)).as(Task1),
ZStream.fromSchedule(Schedule.fixed(2.minutes)).as(Task2)
)
.mapZIO {
case Task1 => performTask1
case Task2 => performTask2
}
.runDrain
We create 2 streams, each stream contains sequential numbers, emitted upon a schedule.
As you can see, the schedule corresponds directly with the requirements. We do not really
care for the sequential numbers, so with stream operator as we convert the stream's
emitted values to a value from the BusinessTask enumeration.
Then we merge the two streams. We now have a stream that emits the two
enumeration values at the time the corresponding task should run. This is already a big
win! Even when the two schedules produce an item at the same time, the tasks will run
sequentially. This is because by default streams are evaluated without parallelism.
We are not there yet though. The tasks need to share information. They could access a
shared variable but then we still have tightly coupled components and no guarantees
that the shared variable is used correctly.
Also, wouldn't it be great if performTask1 and performTask2 are functions that can
be tested in isolation? With streams this is possible.
Here is the second part of the idea. Again, read on for an explanation.
case class State(...)
val initialState = State(...)
def performTask1(state: State): Task[State] = ???
def performTask2(state: State): Task[State] = ???
ZStream.mergeAllUnbounded()(
ZStream.fromSchedule(Schedule.fixed(15.minutes)).as(Task1),
ZStream.fromSchedule(Schedule.fixed(2.minutes)).as(Task2)
)
.scanZIO(initialState) { (state, task) =>
task match {
case Task1 => performTask1(state)
case Task2 => performTask2(state)
}
}
.runDrain
We have changed the signatures of the performTask* methods. Also, the mapZIO operator
has been replaced with scanZIO. The stream operator scanZIO works much like
foldLeft on collections. Like foldLeft, it accepts an initial state, and a function
that combines the accumulated state plus the next stream element (of type BusinessTask)
and converts those into the next state.
Stream operator scanZIO also emits the new states. This allows further common processing.
For example we can persist the state to disk, or collect custom metrics about the state.
Conclusion
Using libraries with higher level constructs like streams, we can express straightforward
requirements in a straightforward way. With a few lines of code we have solved the
scheduling requirement, and showed an elegant way of sharing information between tasks
without global variables.