TLDR: You can do ZIO memoization in just a few lines, however, use zio-cache for more complex use cases.
Recently I was working on fetching Avro schema's from a schema registry. Avro schema's are immutable and therefore perfectly cacheable. Also, the number of possible schema's is limited so cache evictions are not needed. We can simply cache every schema for ever in a plain hash-map. So, we are doing memoization.
Since this was the first time I did this in a ZIO based application, I looked around for existing solutions. What I wanted is something like this:
def fetchSchema(schemaId: String): Task[Schema] = {
val fetchFromRegistry: Task[Schema] = ???
fetchFromRegistry.memoizeBy(key = schemaId)
}
Frankly, I was a bit disappointed that ZIO does not already support this out of the box. However, as you'll see in this article, the proposed syntax only works for simple use cases. (Actually, there is ZIO.memoize but that is even simpler and only caches the result for a single ZIO instance, not for any instance that gives the same value.)
Let's continue anyway and implement it ourselves.
The idea is that method memoizeBy first looks in a map using the given key. If the value is not present, we get the result from the original Zio and store it in the map. If the value is present, it will be used and the original Zio is not executed.
A map, yes, we need also need to give the method a map! The map might be used and updated concurrently. I choose to wrap an immutable map in a Ref, but you could also use a ConcurrentMap.
Here we go:
import zio._
import scala.collection.immutable.Map
implicit class ZioMemoizeBy[R, E, A](zio: ZIO[R, E, A]) {
def memoizeBy[B](cacheRef: Ref[Map[B, A]], key: B): ZIO[R, E, A] = {
for {
cache <- cacheRef.get
value <- cache.get(key) match {
case Some(value) => ZIO.succeed(value)
case None => zio.tap(value => cacheRef.update(_.updated(key, value)))
}
} yield value
}
}
That is it, just a few lines of code to put in some corner of your application.
Here is a full example with memoizeBy using Service Pattern 2.0:
import org.apache.avro.Schema
import zio._
import scala.collection.immutable.Map
trait SchemaFetcher {
def fetchSchema(schemaId: String): Task[Schema]
}
object SchemaFetcherLive {
val layer: ZLayer[Any, Throwable, SchemaFetcher] = ZLayer {
for {
// Create the Ref and the Map:
schemaCacheRef <- Ref.make(Map.empty[String, Schema])
} yield SchemaFetcherLive(schemaCacheRef)
}
}
case class SchemaFetcherLive(
schemaCache: Ref[Map[String, Schema]]
) extends SchemaFetcher {
def fetchSchema(schemaId: String): Task[Schema] = {
val fetchFromRegistry: Task[Schema] = ...
// Use memoizeBy to make fetchFromRegistry more efficient!
fetchFromRegistry.memoizeBy(schemaCache, schemaId)
}
}
Discussion
Note how we're using the default immutable Map. Because it is immutable, all threads can read from the map at the same time without synchronization. We only need some synchronization using Ref, to atomically replace the map after a new element was added.
When multiple requests for the same key come in at roughly the same time, both are executed, and both lead to an update of the map. This is not as advanced as e.g. zio-cache, which detects multiple simultaneous requests for the same key. In the presented use case this is not a problem and very unlikely to happen often anyway.
Can we improve further? Yes, we can! If you look at method fetchSchema in the example, you see that a fetchFromRegistry ZIO is constructed, but we do not use it when the value is already present. And even worse, the value already being present is the common case! This is not very efficient. If efficiency is a problem, another API is needed. Zio-cache does not have this problem. In zio-cache the cache is aware of how to look up new values (it is a loading cache). So here is a trade off: efficiency with a more complex API, or highly readable code.
Using zio-cache
For completeness, here is (almost) the same example using zio-cache:
import org.apache.avro.Schema
import zio._
import zio.cache.{Cache, Lookup}
trait SchemaFetcher {
def fetchSchema(schemaId: String): Task[Schema]
}
object ZioCacheSchemaFetcherLive {
val layer: ZLayer[SomeService, Throwable, SchemaFetcher] = ZLayer {
for {
someService <- ZIO.service[SomeService]
// the fetching logic can use someService:
fetchFromRegistry: String => Task[Schema] = ???
// create the cache:
cache <- Cache.make(
capacity = 1000,
timeToLive = Duration.Infinity,
lookup = Lookup(fetchFromRegistry)
)
} yield ZioCacheSchemaFetcherLive(cache)
}
}
case class ZioCacheSchemaFetcherLive(
cache: Cache[String, Throwable, Schema]
) extends SchemaFetcher {
def fetchSchema(schemaId: String): Task[Schema] = {
// use the loader cache:
cache.get(schemaId)
}
}
We now need a reference to fetchFromRegistry while constructing the layer. This complicates the code a bit; we can no longer define fetchFromRegistry in the case class. In the example we pull a SomeService so that we can put the definition of fetchFromRegistry into the for comprehension and stick to Service Pattern 2.0. Perhaps we should completely move it to another service so that we can write lookup = Lookup(someService.fetchFromRegistry). That, I'll leave as an exercise to the reader.
Conclusion
For simple use cases like fetching Avro schema's, this article presents an appropriately light weight way to do memoization. If you need more features such as eviction and detection of concurrent invocations, I recommend zio-cache.
Update 2024-01-24
Here is a version of cachedBy that only fetches a value once, even when two fibers request it concurrently. The second fiber is semantically blocked until the first fiber has produced the value.
import zio._
object ZioCaching {
implicit class ZioCachedBy[R, E, A](zio: ZIO[R, E, A]) {
def cachedBy[B](cacheRef: Ref[Map[B, Promise[E, A]]], key: B): ZIO[R, E, A] = {
for {
newPromise <- Promise.make[E, A]
actualPromise <- cacheRef.modify { cache =>
cache.get(key) match {
case Some(existingPromise) => (existingPromise, cache)
case None => (newPromise, cache + (key -> newPromise))
}
}
_ <- ZIO.when(actualPromise eq newPromise) {
zio.intoPromise(newPromise)
}
value <- actualPromise.await
} yield value
}
}
}
I love how easy it is to create a simple almost bullet proof cache with just a few lines of code. The combination of refs, zio and immutable data structures is a godsent.
ReplyDeleteAs an exercise to the reader: when we store the ZIO itself in the cache (after calling memoize on it), we fix concurrent fetches for the same key.
ReplyDelete