Showing posts with label scala. Show all posts
Showing posts with label scala. Show all posts

Monday, December 2, 2024

Zio-kafka, faster than java-kafka

TlDR: Concurrency and pre-fetching gives zio-kafka a higher consumer throughput than the default java Kafka client for most workloads.

Zio-kafka is an asynchronous Kafka client based on the ZIO async runtime. It allows you to consume from Kafka with a ZStream per partition. ZStreams are streams on steroids; they have all the stream operators known from e.g. RX-Java and Monix, and then some. ZStreams are supported by ZIO's super efficient runtime.

So the claim is that zio-kafka consumes faster than the regular Java client. But zio-kafka wraps the default java Kafka client. So how can it be faster? The trick is that with zio-kafka, processing (meaning your code) runs in parallel with the code that pulls records from the Kafka broker.

Obviously, there is some overhead in distributing the received records to the streams that need to process it. So the question then is, when is the extra overhead of zio-kafka less than the gain by parallel processing? Can we estimate this? In turns out we can! For this estimate we use the benchmarks that zio-kafka runs from GitHub. In particular, we look at these 2 benchmarks with runs from 2024-11-30:

  • the throughput benchmark, uses zio-kafka and takes around 592 ms
  • the kafkaClients benchmark, uses Kafka java client directly and takes around 544 ms

Both benchmarks:

  • run using JMH on a 4 core github runner with 16GB RAM
  • consume 50k records of ~512 bytes from 6 partitions
  • process the records by counting the number of incoming records, batch by batch
  • are using the same consumer settings, in particular max.poll.records is set to 1000
  • the broker runs in the same JVM as the consumer, so there is almost no networking overhead
  • do not commit offsets (but note that committing does not change the outcome of this article)

First we calculate the overhead of java-kafka. For this we assume that counting the number of records takes no time at all. This is reasonable, a count operation is just a few CPU instructions, nothing compared to fetching something over the network, even if it is on the same machine. Therefore, in the figure, the 'processing' rectangle collapses to a thick vertical line.


Polling and processing as blocks of execution on a thread/fiber.

We also assume that every poll returns the same amount of records, and takes the same amount of time. This is not how it works in practise, but when the records are already available we're probably not that far off. As the program consumes 50k records, and each poll returns a batch of 1k records, there are 50 polls. Therefore, the overhead of java-kafka is 544/50 ≈ 11 ms per poll.

Now we can calculate the overhead of zio-kakfa. Again we assume that processing takes no time at all, so that every millisecond that the zio-kafka benchmark takes longer can be attributed to zio-kafka's overhead. The zio-kakfa benchmark runs longer for 592-544 = 48 ms. Therefore zio-kafka's overhead is 48/50 ≈ 1ms per poll.

Now lets look at a more realistic scenario where processing does take time.


Polling and processing as blocks of execution on a thread/fiber.

As you can see a java-kafka program alternates between polling and processing, while the zio-kafka program processes the records in parallel distributed over 6 streams (1 stream per partition, each stream runs independently on its own ZIO fiber). In the figure we assume that the work is evenly distributed, but unless the load is really totally skewed, it won't matter that much in practice due to zio-kafka's per-partition pre-fetching. As you can see the zio-kafka program in this scenario is much faster. But is this a realistic example? When do zio-kafka programs become faster than java-kafka programs?

Observe that how long polling takes is not important for this question because polling time is the same for both programs. So we will only look at the time between polls. We use (p) for the processing time per batch for the java-kafka program. For the zio-kafka program time between polls is equal to zio-kafka's overhead (o) plus processing time per batch divided by the number of partitions (n). So we want to know for which p:

o + p n p

This solves to:

p o n n - 1

For the benchmark we fill in the zio-kakfa overhead (o = 1 ms) and the number of partitions (n = 6) and we get: p ≥ 1.2 ms. (Remember, this is processing time per batch of 1000 records.)

In the previous paragraph we assumed that processing is IO intensive. When the processing is compute intensive, ZIO can not actually run more fibers in parallel than the number of cores available. In our example we have 4 cores, using n = 4 gives p ≥ 1.3 ms which is still a very low tipping point.

Conclusion

For IO loads, or with enough cores available, even quite low processing times per batch, makes zio-kafka consume faster than the plain java Kafka library. The plain java consumer is only faster for trivial processing tasks like counting.

If you'd like to know what this looks like in practice, you can take a look at this real-world example application: Kafka Big-Query Express. This is a zio-kafka application recently open sourced by Adevinta. Here is the Kafka consumer code. (Disclaimer: I designed and partly implemented this application.)

Tuesday, May 28, 2024

Java plugins with isolating class loaders

My team's article on how to write Java plugins has been published on the Adevinta Tech Blog. Enjoy!

Wednesday, January 24, 2024

Scheduling tasks and sharing state with streams

Recently we built a system that needs to perform 2 tasks. Taks 1 runs every 15 minutes, task 2 runs every 2 minutes. Task 1 kicks off some background jobs (an upload to BigQuery), task 2 checks upon the results of these background jobs and does some cleanup when they are done (delete the uploaded files). The two tasks need to share information back and forth about what jobs are running in the background.

Now think to yourself (ignore the title for now 😉), what would be the most elegant way to implement this? I suspect that most developers will come with a solution that involves some locking, synchronisation and global state. For example by sharing the information through a transactional database, or by using a semaphore to prevent the two tasks from running at the same time plus sharing information in a global variable. This is understandable, most programming environments do not provide better techniques for these kinds of problems at all!

However, if your environment supports streams and has some kind of scheduling, here are two tricks you can use: one for the scheduling of the tasks, the second for sharing information without a global variable.

Here is an example for the first written in Scala using the ZIO streams library. Read on for an explanation.

import zio._ import zio.stream._ def performTask1: Task[Unit] = ??? def performTask2: Task[Unit] = ??? // An enumeration (scala 2 style) for our task. sealed trait BusinessTask object Task1 extends BusinessTask object Task2 extends BusinessTask ZStream.mergeAllUnbounded()( ZStream.fromSchedule(Schedule.fixed(15.minutes)).as(Task1), ZStream.fromSchedule(Schedule.fixed(2.minutes)).as(Task2) ) .mapZIO { case Task1 => performTask1 case Task2 => performTask2 } .runDrain

We create 2 streams, each stream contains sequential numbers, emitted upon a schedule. As you can see, the schedule corresponds directly with the requirements. We do not really care for the sequential numbers, so with stream operator as we convert the stream's emitted values to a value from the BusinessTask enumeration.

Then we merge the two streams. We now have a stream that emits the two enumeration values at the time the corresponding task should run. This is already a big win! Even when the two schedules produce an item at the same time, the tasks will run sequentially. This is because by default streams are evaluated without parallelism.

We are not there yet though. The tasks need to share information. They could access a shared variable but then we still have tightly coupled components and no guarantees that the shared variable is used correctly.

Also, wouldn't it be great if performTask1 and performTask2 are functions that can be tested in isolation? With streams this is possible.

Here is the second part of the idea. Again, read on for an explanation.

case class State(...) val initialState = State(...) def performTask1(state: State): Task[State] = ??? def performTask2(state: State): Task[State] = ??? ZStream.mergeAllUnbounded()( ZStream.fromSchedule(Schedule.fixed(15.minutes)).as(Task1), ZStream.fromSchedule(Schedule.fixed(2.minutes)).as(Task2) ) .scanZIO(initialState) { (state, task) => task match { case Task1 => performTask1(state) case Task2 => performTask2(state) } } .runDrain

We have changed the signatures of the performTask* methods. Also, the mapZIO operator has been replaced with scanZIO. The stream operator scanZIO works much like foldLeft on collections. Like foldLeft, it accepts an initial state, and a function that combines the accumulated state plus the next stream element (of type BusinessTask) and converts those into the next state.

Stream operator scanZIO also emits the new states. This allows further common processing. For example we can persist the state to disk, or collect custom metrics about the state.

Conclusion

Using libraries with higher level constructs like streams, we can express straightforward requirements in a straightforward way. With a few lines of code we have solved the scheduling requirement, and showed an elegant way of sharing information between tasks without global variables.

Sunday, November 26, 2023

Discovering scala-cli while fixing my digital photo archive

Over the years I built up a nice digital photo library with my family. It is a messy process. Here are some of the things that can go wrong:

  • Digital cameras that add incompatible exif metadata.
  • Some files have exif tag CreateDate, others DateTimeOriginal.
  • Images shared via Whatsapp or Signal do not have an exif date tag at all.
  • Wrong rotation.
  • Fuzzy, yet memorable jpeg images wich take 15MB because of their resolution and high quality settings.
  • Badly supported ancient movie formats like 3gp and RIFF AVI.
  • Old movie formats that need 3 times more disk space than h.265.
  • Losing almost all your photos because you thought you could copy an Iphoto library using tar and cp (hint: you can’t). (This took a low-level harddisk scan and months of manual de-duplication work to recover the photos.)
  • Another low-level scan of an SD card to find accidentally deleted photos.
  • Date in image file name corresponds to import date, not creation date.
  • Weird file names that order the files differently than from creation date.
  • Images from 2015 are stored in the folder for 2009.
  • etc.

I wrote countless bash scripts to mold the collection into order. Unfortunately, to various success. However, now that I am ready to import the library into Immich (please, do sponsor them, they are building a very nice product!), I decided to start cleaning up everything.

So there I was, writing yet another bash script, struggling with parsing a date response from exiftool. And then I remembered the recent articles about scala-cli and decided to try it out.

The experience was amazing! Even without proper IDE support, I was able to crank out scripts that did more, more accurately and faster than I could ever have accomplished in bash.

Here are some of the things that I learned:

  • Take the time to learn os-lib.
  • If the scala code gets harder to write, open a proper IDE and use code completion. Then copy the code over to your .sc file.
  • One well placed .par (using scala-parallel-collections) can more than quadruple the performance of your script!
  • You will still spend a lot of time parsing the output from other programs (like exiftoool).
  • Scala-cli scripts run very well from Github actions as well.

Conclusions

Next time you open your editor to write a bash file, think again. Perhaps you should really write some scala instead.

Sunday, October 8, 2023

Dependabot, Gradle and Scala

Due to a series of unfortunate circumstances we have to deal with a couple of projects that use Gradle as build tool at work. For these projects we wanted automatic PR generation for updated dependencies. Since we use Github Enterprise, using Dependabot seems logical. However, this turned out to be not very straightforward. This article documents one way that works for us.

As we were experimenting with Dependabot, we discovered the following rules:

  1. The scala version in the artifact name must not be a variable.
  2. A variable for the artifact version is fine, but it must be declared in the same file in the ext block.
  3. Versions should follow the Semver specification.
  4. You must not use Gradle’s + version range syntax anywhere, Maven’s version range syntax is fine.

In our projects the scala version comes from a plugin. In addition, we sometimes need to cross build for different scala versions, very much at odds with rule no. 1. We solved this with a switch statement.

With these rules and constraints we discovered that the following structure works for us and Dependabot:

ext { jacksonVersion = '2.15.2' scalaTestVersion = '3.0.8' } dependencies { switch(scalaMainVersion) { case "2.12": implementation "com.fasterxml.jackson.module:jackson-module-scala_2.12:$jacksonVersion" testImplementation "org.scalatest:scalatest_2.12:$scalaTestVersion" break case "2.13": implementation "com.fasterxml.jackson.module:jackson-module-scala_2.13:$jacksonVersion" testImplementation "org.scalatest:scalatest_2.13:$scalaTestVersion" break default: break } // implementation 'com.example:library:0.8+' // Don't do this implementation 'com.example:library:[0.8,1.0[' // This is fine }

It took 3 people a month to slowly discover this solution (thank you!). I hope that you, dear reader, will spend your time more productive.

Thursday, April 20, 2023

Zio-kafka hacking day

Not long ago I contacted Steven (committer of the zio-kafka library) to get some better understanding of how the library works. April 12, not more than 2 months later I am a committer, and I was sitting in a room together with Steven, Jules Ivanic (another committer) and wildcard Pierangelo Cecchetto (contributor), hacking on zio-kafka.

The meeting was an idea of Jules who was ‘in the neighborhood’. He was traveling from Australia for his company (Conduktor). We were able to get a nice room in the Amsterdam office of my employer (Adevinta). Amsterdam turned out to be a nice middle ground for Steven, me and Pierangelo. (Special thanks to Go Data Driven who also had place for us.)

In the morning we spoke about current and new ideas on how to improve the library. Also, we shared detailed knowledge on ZIO and what Kafka expects from its users. After lunch we started hacking. Having someone to start an ad hoc discussion turned out to be very productive; we were able to move some tough issues forward.

Here are some highlights.

PR #788 — Wait for stream end in rebalance listener is important to prevent duplicates during a rebalance process. This PR was mostly finished for quite some time, but many details made the extensive test suite fail. We were able to solve many of these issues.

In the area of performance we implemented an idea to replace buffering (pre-fetching a fixed number of polls), with pre-fetching based on the stream’s queue size. This resulted in PR #803 — Alternative backpressure mechanism.

We also laid the seeds for another performance improvement implementation: PR #908 — Optimistically resume partitions early.

These last two PRs showed great performance improvements bringing us much closer to direct usage of the Java Kafka client. All 3 PRs are now in review.

All in all it was a lot of fun to meet fellow enthusiasts and hack on the complex machinery that is inside zio-kafka.

Sunday, December 4, 2022

ZIO service layer pattern

While reading about ZIO-config in 2.0.4, the following pattern to create services caught my eye. I am copying it here for easy lookup. Enjoy.

val myLayer: ZLayer[PaymentRepo, Nothing, MyService] = ZLayer.scoped { for { repo <- ZIO.service[PaymentRepo] config <- ZIO.config(MyServiceImpl.config) ref <- Ref.make(MyState.Initial) impl <- ZIO.succeed(new MyServiceImpl(config, ref, repo)) _ <- impl.initialize _ <- ZIO.addFinalizer(impl.destroy) } yield impl }

Saturday, November 5, 2022

Speed up ZIOs with memoization

TLDR: You can do ZIO memoization in just a few lines, however, use zio-cache for more complex use cases.

Recently I was working on fetching Avro schema's from a schema registry. Avro schema's are immutable and therefore perfectly cacheable. Also, the number of possible schema's is limited so cache evictions are not needed. We can simply cache every schema for ever in a plain hash-map. So, we are doing memoization.

Since this was the first time I did this in a ZIO based application, I looked around for existing solutions. What I wanted is something like this:

def fetchSchema(schemaId: String): Task[Schema] = { val fetchFromRegistry: Task[Schema] = ??? fetchFromRegistry.memoizeBy(key = schemaId) }

Frankly, I was a bit disappointed that ZIO does not already support this out of the box. However, as you'll see in this article, the proposed syntax only works for simple use cases. (Actually, there is ZIO.memoize but that is even simpler and only caches the result for a single ZIO instance, not for any instance that gives the same value.)

Let's continue anyway and implement it ourselves.

The idea is that method memoizeBy first looks in a map using the given key. If the value is not present, we get the result from the original Zio and store it in the map. If the value is present, it will be used and the original Zio is not executed.

A map, yes, we need also need to give the method a map! The map might be used and updated concurrently. I choose to wrap an immutable map in a Ref, but you could also use a ConcurrentMap.

Here we go:

import zio._ import scala.collection.immutable.Map implicit class ZioMemoizeBy[R, E, A](zio: ZIO[R, E, A]) { def memoizeBy[B](cacheRef: Ref[Map[B, A]], key: B): ZIO[R, E, A] = { for { cache <- cacheRef.get value <- cache.get(key) match { case Some(value) => ZIO.succeed(value) case None => zio.tap(value => cacheRef.update(_.updated(key, value))) } } yield value } }

That is it, just a few lines of code to put in some corner of your application.

Here is a full example with memoizeBy using Service Pattern 2.0:

import org.apache.avro.Schema import zio._ import scala.collection.immutable.Map trait SchemaFetcher { def fetchSchema(schemaId: String): Task[Schema] } object SchemaFetcherLive { val layer: ZLayer[Any, Throwable, SchemaFetcher] = ZLayer { for { // Create the Ref and the Map: schemaCacheRef <- Ref.make(Map.empty[String, Schema]) } yield SchemaFetcherLive(schemaCacheRef) } } case class SchemaFetcherLive( schemaCache: Ref[Map[String, Schema]] ) extends SchemaFetcher { def fetchSchema(schemaId: String): Task[Schema] = { val fetchFromRegistry: Task[Schema] = ... // Use memoizeBy to make fetchFromRegistry more efficient! fetchFromRegistry.memoizeBy(schemaCache, schemaId) } }

Discussion

Note how we're using the default immutable Map. Because it is immutable, all threads can read from the map at the same time without synchronization. We only need some synchronization using Ref, to atomically replace the map after a new element was added.

When multiple requests for the same key come in at roughly the same time, both are executed, and both lead to an update of the map. This is not as advanced as e.g. zio-cache, which detects multiple simultaneous requests for the same key. In the presented use case this is not a problem and very unlikely to happen often anyway.

Can we improve further? Yes, we can! If you look at method fetchSchema in the example, you see that a fetchFromRegistry ZIO is constructed, but we do not use it when the value is already present. And even worse, the value already being present is the common case! This is not very efficient. If efficiency is a problem, another API is needed. Zio-cache does not have this problem. In zio-cache the cache is aware of how to look up new values (it is a loading cache). So here is a trade off: efficiency with a more complex API, or highly readable code.

Using zio-cache

For completeness, here is (almost) the same example using zio-cache:

import org.apache.avro.Schema import zio._ import zio.cache.{Cache, Lookup} trait SchemaFetcher { def fetchSchema(schemaId: String): Task[Schema] } object ZioCacheSchemaFetcherLive { val layer: ZLayer[SomeService, Throwable, SchemaFetcher] = ZLayer { for { someService <- ZIO.service[SomeService] // the fetching logic can use someService: fetchFromRegistry: String => Task[Schema] = ??? // create the cache: cache <- Cache.make( capacity = 1000, timeToLive = Duration.Infinity, lookup = Lookup(fetchFromRegistry) ) } yield ZioCacheSchemaFetcherLive(cache) } } case class ZioCacheSchemaFetcherLive( cache: Cache[String, Throwable, Schema] ) extends SchemaFetcher { def fetchSchema(schemaId: String): Task[Schema] = { // use the loader cache: cache.get(schemaId) } }

We now need a reference to fetchFromRegistry while constructing the layer. This complicates the code a bit; we can no longer define fetchFromRegistry in the case class. In the example we pull a SomeService so that we can put the definition of fetchFromRegistry into the for comprehension and stick to Service Pattern 2.0. Perhaps we should completely move it to another service so that we can write lookup = Lookup(someService.fetchFromRegistry). That, I'll leave as an exercise to the reader.

Conclusion

For simple use cases like fetching Avro schema's, this article presents an appropriately light weight way to do memoization. If you need more features such as eviction and detection of concurrent invocations, I recommend zio-cache.

Update 2024-01-24

Here is a version of cachedBy that only fetches a value once, even when two fibers request it concurrently. The second fiber is semantically blocked until the first fiber has produced the value.

import zio._ object ZioCaching { implicit class ZioCachedBy[R, E, A](zio: ZIO[R, E, A]) { def cachedBy[B](cacheRef: Ref[Map[B, Promise[E, A]]], key: B): ZIO[R, E, A] = { for { newPromise <- Promise.make[E, A] actualPromise <- cacheRef.modify { cache => cache.get(key) match { case Some(existingPromise) => (existingPromise, cache) case None => (newPromise, cache + (key -> newPromise)) } } _ <- ZIO.when(actualPromise eq newPromise) { zio.intoPromise(newPromise) } value <- actualPromise.await } yield value } } }

Wednesday, June 8, 2022

Zigzag bytes

I was playing around with a goofy idea for which I needed zigzag encoding for bytes. Zigzag encoding is often used in combination with variable length encoding in things like Avro, Thrift and Protobuf.

In zigzag encoded integers, the least significant bit is used for sign. To convert from regular encoding (2-complement) to zigzag (and back) you can use the following Scala code:

def i32ToZigZag(n: Int): Int = (n << 1) ^ (n >> 31) def zigZagToI32(n: Int): Int = (n >>> 1) ^ - (n & 1) def i64ToZigZag(n: Long): Long = (n << 1) ^ (n >> 63) def zigZagToI64(n: Long): Long = (n >>> 1) ^ - (n & 1)

Translate this to Java and the expressions after the = look exactly the same.

Using these bit shifting tricks for bytes is a whole lot more difficult. The problem is that Scala (like Java) does not support bit operations on Bytes. They always convert them to an Int first.

After a lot of fiddling, I settled on the following:

private def b(i: Int): Byte = (i & 0xff).toByte def i8ToZigZag(n: Byte): Byte = (b(n << 1) ^ (n >> 7)).toByte def zigZagToI8(n: Byte): Byte = b(((n & 0xff) >>> 1) ^ (256 - (n & 1)))

Translated to Java it should look like this (not tested!):

private byte b(int i) { return (byte)(i & 0xff); } public byte i8ToZigZag(byte n) { return (byte)(b(n << 1) ^ (n >> 7)); } public byte zigZagToI8(byte n) { return b(((n & 0xff) >>> 1) ^ (256 - (n & 1))); }

Is there a better way to do this?

Wednesday, January 26, 2022

Having fun with Ordering in Scala

Challenge: sort a list of objects by name, but some names have priority. If these names appear, they should be ordered by the position they have in the priority list.

For example:

val priorityList = Seq("Willow", "James", "Ezra") val input = Seq("Olivier", "Charlotte", "Willow", "Declan", "Aurora", "Ezra") val ordered = ??? assert(ordered == Seq("Willow", "Ezra", "Aurora", "Charlotte", "Declan", "Olivier"))

Challenge accepted.

Luckily Scala has strong support for sorting in the standard library. All sequences have a sorted method which accepts an Ordering. The ordering is an implicit parameter which means that normally we don't need to provide it; it will be derived to the natural ordering of the items. However, we are going to provide this parameter explicitly. Let's build an Ordering!

The challenge explains we have 2 orderings:

  1. first order by the priority list
  2. failing that, order by alphabet

Let's focus on the first ordering. The idea is to assign an integer 'priority-value' to each possible string that is based on the position in the priority list. If the string is not in the list, we use some high integer. The first ordering will simply order by this priority-value. Lower numbers go before higher number, just like the natural ordering of integers.

// attempt 1 val priorityValue = priorityList.indexOf(name)

This works well for any name on the priority list. E.g. Willow gets 0 and Ezra gets 2. Unfortunately, all the other names get priority value -1 which orders them even before Willow. We need to convert the -1 to something higher.
Since I like to program without if statements whenever possible, I looked at math for a solution. Modulus can do the trick:

// attempt 2 val priorityValue = priorityList.indexOf(name) % priorityList.size

Oops, wrong modulus implementation: -1 % 3 == -1. Let's use floorMod:

// attempt 3 val priorityValue = Math.floorMod(priorityList.indexOf(name), priorityList.size)

Now -1 gets converted into priorityList.size which is definitely higher than the other priority-values. However, since we don't really care what the higher number is we can just use Int.MaxValue:

val priorityValue = Math.floorMod(priorityList.indexOf(name), Int.MaxValue)

Now we wrap that in an Ordering:

val priorityOrdering: Ordering[String] = Ordering.by(name => Math.floorMod(priorityList.indexOf(name), Int.MaxValue))

Unfortunately Scala can't derive the type. We either need to type the value directly, or add a type on the name parameter.

Now we need to add the second ordering. We can simply use Ordering.String from the library. We combine the orderings with orElse, available since Scala 2.13. The second ordering is used when priorityOrdering can't decide because the priority-value is the same.
Note that for the special case where we compare a priority value with itself, e.g. Willow with Willow, the second ordering is also applied. This is okay though, the outcome doesn't change because these values are the same for the alphabetic ordering also.

Here is the complete code:

val priorityList = Seq("Willow", "James", "Ezra") val priorityOrdering: Ordering[String] = Ordering.by(name => Math.floorMod(priorityList.indexOf(name), Int.MaxValue)) val combinedOrdering: Ordering[String] = priorityOrdering.orElse(Ordering.String) val input = Seq("Olivier", "Charlotte", "Willow", "Declan", "Aurora", "Ezra") val ordered = input.sorted(combinedOrdering) assert(ordered == Seq("Willow", "Ezra", "Aurora", "Charlotte", "Declan", "Olivier"))

We're almost there. The challenge was to work on any object. Lets wrap it up a bit and also make it work for any type of priority value:

def priorityOrdering[A, B : Ordering](priorityList: Seq[B], by: A => B): Ordering[A] = { def priorityValue(b: B): Int = Math.floorMod(priorityList.indexOf(b), Int.MaxValue) Ordering.by[A, Int](a => priorityValue(by(a))).orElseBy(by) }

We can use like this:

val ordered = input.sorted(priorityOrdering[String, String](priorityList, identity))

Or like this. Here we sort persons by birthdate, ordering today before the other days:

case class Person(name: String, birthdate: MonthDay) val persons: Seq[Person] = ??? val priorityDates = Seq(MonthDay.now()) persons.sorted(priorityOrdering(priorityDates, (_: Person).birthdate))

Some remarks

Note that in all these cases type derivation is quite awful. The compiler has problems finding the correct types, even though all the information is available.

You should also know that you can't use this approach if you are stuck on Scala 2.12 or earlier since Ordering.orElse is not available there.

Alternative

You can side-step all the type derivation problems by using the sortBy method. Give it a function that returns a tuple of Ints, Strings, or anything for which an Ordering is already defined. The sequence is then sorted on the first value of the tuple, then on the second value, etc.:

def priorityValue(name: String): Int = Math.floorMod(priorityList.indexOf(name), Int.MaxValue) val ordered = input.sortBy(name => (priorityValue(name), name))

Conclusion

Although I had fun learning all about Ordering, next time I'll avoid it and go directly for sortBy.

Wednesday, December 8, 2021

Akka graceful shutdown - continued

Some time ago I wrote on how to gracefully shutdown Akka HTTP servers, crucial to prevent aborted requests during re-deployments or in elastic (cloud) environments where instances come and go. Look at the previous post for more details on how graceful shutdown works and some common caveats in setting it up.

This post refreshes how this works for newer Akka versions, and it gives some tips on how to speed up a shutdown.

Coordinated shutdown

Newer Akka versions have more extensive support for graceful shutdown in the form of coordinated shutdown. Here we show an example that uses coordinated shutdown to configure a graceful shutdown.

import akka.actor.{ActorSystem, CoordinatedShutdown} import akka.http.scaladsl.Http import scala.concurrent.duration._ import scala.util.{Failure, Success} implicit val system: ActorSystem = ??? val logger = ??? val routes: Route = ??? val interface: String = "0.0.0.0" val port: Int = 80 val shutdownDeadline: FiniteDuration = 5.seconds Http() .newServerAt(interface, port) .bind(routes) .map(_.addToCoordinatedShutdown(httpShutdownTimeout)) // ← that simple! .foreach { server => logger.info(s"HTTP service listening on: ${server.localAddress}") server.whenTerminationSignalIssued.onComplete { _ => logger.info("Shutdown of HTTP service initiated") } server.whenTerminated.onComplete { case Success(_) => logger.info("Shutdown of HTTP endpoint completed") case Failure(_) => logger.error("Shutdown of HTTP endpoint failed") } }

The important line is where we use addToCoordinatedShutdown. What follows is just logging so we know what's going on.

Shutting down more components

You probably have more parts that would benefit from a proper shutdown, e.g. a database connection pool. Here is an example on how to hook into the coordinated shutdown:

// Add this code _before_ construction of the HTTP server CoordinatedShutdown(system).addTask( CoordinatedShutdown.PhaseBeforeClusterShutdown, "database connection pool shutdown" ) { () => val dbPoolShutdown: Future[Done] = shutdownDatabase() dbPoolShutdown.onComplete { case Success(_) => logger.info("Shutdown of database connection pool completed") case Failure(_) => logger.error("Shutdown of database connection pool failed") } logger.info("Shutdown of database connection pool was initiated") dbPoolShutdown }

Coordinated shutdown consists of multiple phases. Which phases exist is configurable but the default phases are fine for this post.

Our code runs in the phase called before-cluster-shutdown. This phase runs after phase service-unbind in which the HTTP service shuts down.

Tips to speed up shutdown

The default phases need to complete in 10 seconds. If this is challenging for your system, here are 2 tips that might help.

First of all, you need to make sure that any blocking/synchronous code is wrapped in a blocking construct. This will signal to the Akka execution context that it needs to extend its threadpool. This is especially relevant if you have many shutdown tasks but it is good practice anyway. For example:

import akka.Done import scala.concurrent.blocking def shutdownDatabase: Future[Done] = { Future { blocking { database.close() // blocking code logger.info("Database connection closed") } Done } }

The second thing you could do is only shutdown on a best-effort basis. Closing the connection to a read-only system is probably not essential.
The trick is to use coordinated shutdown as initiator, but then immediately report completion. For example:

import akka.Done import scala.concurrent.blocking def bestEffortShutdownDatabase: Future[Done] = { // Starts db close in a Future, but ignore the result Future { blocking { database.close() } } Future.successful(Done) }

Now, when closing the databse takes too long, Akka won't complain about this in the logs.

For more details see Akka's coordinated shutdown documentation.

Tuesday, July 7, 2020

Avoiding Scala's Option.fold

Scala's Option.fold is a bit nasty to use sometimes because type inference is not always great. Here is a stupid toy example:

val someValue: Option[String] = ??? val processed = someValue.fold[Either[Error, UserName]]( Left(Error("No user name given")) )( value => Right(UserName(value)) )

The [Either[Error, UserName]] on fold is necessary otherwise the scala compiler can not derive the type.

Here is a really small trick to avoid Option.fold when you need to convert an Option to an Either:

val someValue: Option[String] = ??? val processed = someValue .toRight(left = Error("No user name given")) .map(value => Right(UserName(value)))

Much nicer!

Friday, April 10, 2020

Akka-http graceful shutdown

Why?

By default, when you restart a service, the old instance is simply killed. This means that all current requests are aborted; the caller will be left with a read timeout. We can do better!

What?

A graceful shutdown looks as follows:

  1. The scheduler (Kubernetes, Nomad, etc.) sends a signal (usually SIGINT) to the service.
  2. The service gets the signal and closes all server-ports; it can no longer receive new request. This is very quickly picked up by the load-balancer. The load-balancer will no longer send new requests.
  3. All requests-in-progress complete one by one.
  4. When all requests are completed, or on a timeout, the service terminates.
Caveats

Getting the signal to your service is unfortunately not always trivial. I have seen the following problems:

  • The Nomad scheduler by default does not send an SIGINT signal to the service. You will have to configure this.
  • When the service runs in a Docker container, by default the init process (with PID 1) will ignore the signal. Back when every Unix installation had control over the entire computer this made lots of sense. In a container though, not so much. This may be fixed in newer Docker version. Otherwise you will have to use a special init process such as tini.
Akka-HTTP

Akka-http has excellent support for graceful shutdown. Unfortunately, the documentation is not very clear about it. Here follows an example which can be used as a template:

Update 2021-12-08: For newer Akka versions, please use the template in the follow-up article.

Just for reference, here is the old template:

import akka.http.scaladsl.Http import akka.http.scaladsl.server._ import scala.concurrent.duration._ val logger = ??? val route: Route = ??? val interface: String = "0.0.0.0" val port: Int = 80 val shutdownDeadline: FiniteDuration = 30.seconds // Don't use this, see follow-up article instead! Http() .bindAndHandle(route, interface, port) .map { binding => logger.info( "HTTP service listening on: " + s"http://${binding.localAddress.getHostName}:${binding.localAddress.getPort}/" ) sys.addShutdownHook { binding .terminate(hardDeadline = shutdownDeadline) .onComplete { _ => system.terminate() logger.info("Termination completed") } logger.info("Received termination signal") } } .onComplete { case Failure(ex) => logger.error("server binding error:", ex) system.terminate() sys.exit(1) case _ => }

Tuesday, March 3, 2020

Push Gauges

A colleague was complaining to me that Micrometer gauges didn't work the way he expected. This led to some interesting work.

What is a gauge?

In science a gauge is a device for making measurements. In computer systems a gauge is very similar: a 'metric' which tracks something in your system over time. For example, you could track the number of items in a job queue. Libraries like Micrometer and Dropwizard metrics make it easy to define gauges. Since the measurement in itself is not useful, those libraries also make it easy to send the measurements to a metric system such as Graphite or Prometheus. These systems are used for visualization and generating alerts.

Gauges are typically defined with a callback function that does the measurement. For example, using metrics-scala, the scala API for Dropwizard metrics, it looks like:

class JobQueue extends DefaultInstrumented { private val waitingJobsQueue = ??? // Defines a gauge metrics.gauge("queue.size") { // This code block is the callback which does a 'measurement'. waitingJobsQueue.size } }

Please note that the metric library determines when the callback function is invoked. For example, once every minute.

What is a push gauge?

My colleague had something else in mind. He didn't have access to the value all the time, but only when something was being processed. More like this:

class ExternalCacheUpdater extends DefaultInstrumented { def updateExternalCache(): Unit = { val items = fetchItemsFromDatabase() pushItemsToExternalCache(items) gauge.push(items.size) // Pushes a new measurement to the gauge. } }

In the example the application becomes responsible for pushing new measurements. The push gauge simply keeps track of the last value and reports that whenever the metrics library needs it. So under the covers the push gauge behaves like a normal gauge.

Push gauges like in this example are now made possible in this pull-request for metrics-scala. The only thing that was missing is the definition of the push gauge:

class ExternalCacheUpdater extends DefaultInstrumented { // Defines a push gauge private val gauge = metrics.pushGauge[Int]("cached.items", 0) def updateExternalCache(): Unit = // as above }
Push gauge with timeout

In some situations it may be misleading to report a very old measurement as the 'current' value. If the external cache in our example evicts items after 10 minutes, then the push gauge should not report measurements from more then 10 minutes ago. This is solved with a push gauge with timeout:

class ExternalCache extends DefaultInstrumented { // Defines a push gauge with timeout private val gauge = metrics.pushGaugeWithTimeout[Int]("cached.items", 0, 10.minutes) def updateExternalCache(): Unit = // as above }
Feedback wanted!

I have not seen this concept before in any metric library in the JVM ecosystem. Therefore I would like to collect as much feedback as possible before shipping this as a new feature of metrics-scala. If you have any ideas, comments or whatever, please leave a comment on the push-gauges pull-request or drop me an email!

Update 2020-03-05: The code example have been updated to reflect changes in the pull request.

Wednesday, May 20, 2015

5 problems in an hour

@svpino posted a nice chalenge: solve 5 problems within an hour or denounce your title of software engineer.

It took me 40 minutes, of which 10 minutes was fighting around a limitation of the scala REPL.

Here are my solutions. They can all be pasted as is in the Scala REPL.

///////////////////////////////////////// // Problem 1 (Yuc! This is not a Scala problem!) def p1_for(xs: Seq[Int]): Int = { var s = 0; for (x <- xs) s += x; s } def p1_while(xs: Seq[Int]): Int = { var s = 0; var i = 0; while (i < xs.length) {s += xs(i); i+=1}; s } def p1_rec(xs: Seq[Int]): Int = if (xs.isEmpty) 0 else xs.head + p1_rec(xs.tail) def p1_proper(xs: Seq[Int]): Int = xs.foldLeft(0)(_+_) // :) scala> p1_for(Seq(1,2,3)) res0: Int = 6 scala> p1_while(Seq(1,2,3)) res1: Int = 6 scala> p1_rec(Seq(1,2,3)) res2: Int = 6 ///////////////////////////////////////// // Problem 2 def p2[A,B](a: Seq[A], b: Seq[B]): Seq[Any] = a.zip(b).flatMap { case (a,b) => Seq(a,b) } scala> p2(Seq("a","b","c"), Seq(1,2,3)) res3: Seq[Any] = List(a, 1, b, 2, c, 3) ///////////////////////////////////////// // Problem 3 def fibonaci: Iterator[Int] = Iterator.iterate((0,1)) { case (a,b) => (b, a+b) }.map(_._1) scala> fibonaci.take(100).mkString(", ") res79: String = 0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610, 987, 1597, 2584, 4181, 6765, 10946, 17711, 28657, 46368, 75025, 121393, 196418, 317811, 514229, 832040, 1346269, 2178309, 3524578, 5702887, 9227465, 14930352, 24157817, 39088169, 63245986, 102334155, 165580141, 267914296, 433494437, 701408733, 1134903170, 1836311903, 2971215073, 4807526976, 7778742049, 12586269025, 20365011074, 32951280099, 53316291173, 86267571272, 139583862445, 225851433717, 365435296162, 591286729879, 956722026041, 1548008755920, 2504730781961, 4052739537881, 6557470319842, 10610209857723, 17167680177565, 27777890035288, 44945570212853, 72723460248141, 117669030460994, 190392490709135, 308061521170129, 498454011879264, 806515533049393, 1304969544928657, 2111485077978050, 3416454622906707, 55... ///////////////////////////////////////// // Problem 4 def p4(n: Seq[Int]): Int = n.map(_.toString).sorted.reverse.mkString("").toInt scala> p4(Seq(50,2,1,9)) res73: Int = 95021 ///////////////////////////////////////// // Problem 5 sealed trait Expr { def evaluate: Int def show: String } case class Plus(e1: Expr, e2: Expr) extends Expr { def evaluate: Int = e1.evaluate + e2.evaluate def show: String = e1.show + " + " + e2.show } case class Min(e1: Expr, e2: Expr) extends Expr { def evaluate: Int = e1.evaluate - e2.evaluate def show: String = e1.show + " - " + e2.show } case class Literal(v: Int) extends Expr { def evaluate: Int = v def show: String = v.toString } def appendToLastLiteral(e: Expr, append: Int): Expr = e match { case Plus(e1, e2) => Plus(e1, appendToLastLiteral(e2, append)) case Min(e1, e2) => Min(e1, appendToLastLiteral(e2, append)) case Literal(v) => Literal((v.toString + append.toString).toInt) } def loop(e: Expr, todo: Seq[Int]): Unit = { if (todo.isEmpty) { if (e.evaluate == 100) println(e.show) } else { loop(Plus(e, Literal(todo.head)), todo.tail) loop(Min(e, Literal(todo.head)), todo.tail) loop(appendToLastLiteral(e, todo.head), todo.tail) } } scala> loop(Literal(1), Seq(2, 3, 4, 5, 6, 7, 8, 9)) 1 + 2 + 3 - 4 + 5 + 6 + 78 + 9 1 + 2 + 34 - 5 + 67 - 8 + 9 1 + 23 - 4 + 5 + 6 + 78 - 9 1 + 23 - 4 + 56 + 7 + 8 + 9 12 + 3 + 4 + 5 - 6 - 7 + 89 12 + 3 - 4 + 5 + 67 + 8 + 9 12 - 3 - 4 + 5 - 6 + 7 + 89 123 + 4 - 5 + 67 - 89 123 + 45 - 67 + 8 - 9 123 - 4 - 5 - 6 - 7 + 8 - 9 123 - 45 - 67 + 89

Thanks @svpino, this was fun!

Sunday, June 30, 2013

Announcing Metrics-Scala 3.0.0

Metrics-scala 3.0.0 was just released and is available in Maven central. This is the first release against Metrics version 3.0.0, and the first release where the code is maintained by me instead of being a line for line copy of Coda Hales' original.

A special thanks goes to @scullxbones who started the 3.0.0 branch and ported the tests to ScalaTest.

Changes:

  • Code is no longer a copy from Coda Hale's sources and is now maintained by me.
  • Depends on Metrics-core 3.0.0.
  • Ported tests from original to ScalaTest (@scullxbones).
  • Much more documentation.

Although the metrics-scala API is mostly source compatible, there are breaking API changes which are mostly caused by changes in the metrics-core library:

  • All code moved to the nl.grons.metrics.scala package (changed at Coda Hale's request).
  • The class Instrumented must now be created in your project by extending InstrumentedBuilder.
  • All configuration for histograms, meters, and timers are gone. These are now configured in the reporter.
  • Dropped method clear on Histogram and Timer.

More ideas and pull requests are welcome!

Friday, April 5, 2013

Fixing code and binary incompatibilities for cross Scala version library development

Scala is a fantastic language that unfortunately has a tradition of having no binary compatibility between versions. The result is that library developers have to go through a lot of pain to release their software for multiple scala versions. Even though starting with scala 2.9 minor versions are binary compatible, with scala 2.10 the situation has worsened because there are now some code incompatibilities as well.

This post shows some techniques for library developers to build releases against multiple scala versions, taking care of binary and code incompatibilities.

SBT — Simple Build Tool

The only viable option I know to build cross scala versions is SBT (Simple Build Tool). I am going to assume you are somewhat familiar with SBT. The most important cross build settings in your build.sbt are (full version on Github):

scalaVersion := "2.10.1" crossScalaVersions := Seq("2.9.1", "2.9.1-1", "2.9.2", "2.10.1") crossVersion := CrossVersion.binary

Key scalaVersion sets the current scala version to use, key crossScalaVersions contains all scala versions to use during cross builds.

The last settings has the effect that the correct scala version is appended to the name of your artifact. ‘Correct’ in this case means the full version for scala versions 2.9.x and lower, or just the 2 highest numbers for 2.10.0 and later. So if you have a setting name := "libname", the generated artifacts will be named libname_2.9.1, libname_2.9.1-1, libname_2.9.2 and libname_2.10.

Kick of a cross build by prepending ‘+’ to your command. E.g. sbt +test.

Code incompatibilities

Scala 2.10 brings some nasty code incompatibilities. The popular Akka library for example has partly moved into the main scala library. The consequence is that code for scala 2.9 needs to depend on Akka and import akka.dispatch.Future, while code for scala 2.10 needs no additional dependencies and import scala.concurrent.Future.

Another example are the changes around concurrent maps. In 2.9 one needs to do new java.util.concurrent.ConcurrentHashMap[A, B](1024).asScala to get a scala.collection.mutable.ConcurrentMap. In Scala 2.10 you are better of with scala.collection.concurrent.TrieMap.empty to get a scala.collection.concurrent.Map. All interfaces stay the same while all names changed.

Dependency incompatibilities

To define dependencies based on the current scala version you can use the following trick:

libraryDependencies <++= (scalaVersion) { v: String => if (v.startsWith("2.10")) Seq("com.yammer.metrics" % "metrics-core" % "2.1.5", "org.specs2" %% "specs2" % "1.13" % "test") else if (v.startsWith("2.9")) Seq("com.yammer.metrics" % "metrics-core" % "2.1.5", "com.typesafe.akka" % "akka-actor" % "2.0.5", "org.specs2" %% "specs2" % "1.12.3" % "test") else Seq() }

Fixing code incompatibilities

If code needs to differ between scala versions, the easiest way is to have multiple source roots. E.g.:

libname/ build.sbt src/ main/ scala/ scala_2.9/ scala_2.10/ test/

Add the following to your build.sbt to make it possible:

// The following prepends src/main/scala_2.9 or src/main/scala_2.10 to the compile path. unmanagedSourceDirectories in Compile <<= (unmanagedSourceDirectories in Compile, sourceDirectory in Compile, scalaVersion) { (sds: Seq[java.io.File], sd: java.io.File, v: String) => val mainVersion = v.split("""\.""").take(2).mkString(".") val extra = new java.io.File(sd, "scala_" + mainVersion) (if (extra.exists) Seq(extra) else Seq()) ++ sds }

Example code for 2.9 (full version on Github):

package nl.grons.sentries.cross object Concurrent { type Future[+A] = akka.dispatch.Future[A] val Future = akka.dispatch.Future val Await = akka.dispatch.Await type CMap[A, B] = scala.collection.mutable.ConcurrentMap[A, B] def defaultConcurrentMap[A,B](): CMap[A,B] = new java.util.concurrent.ConcurrentHashMap[A, B](1024).asScala }

Example code for 2.10 (full version on Github):

package nl.grons.sentries.cross object Concurrent { type Future[+A] = scala.concurrent.Future[A] val Future = scala.concurrent.Future val Await = scala.concurrent.Await type CMap[A, B] = scala.collection.concurrent.Map[A, B] def defaultConcurrentMap[A,B](): CMap[A,B] = scala.collection.concurrent.TrieMap.empty }

The rest of the code can now use the type aliases and references from here. E.g. nl.grons.sentries.cross.Concurrent.Future refers to Akka for scala 2.9 and to the standard library for scala 2.10.

Conclusions

With some hackary SBT allows you to define dependencies and source roots based on the current scala version. This allows you to overcome scala’s incompatibilities if you are a library developer that builds releases for multiple scala versions.

The techniques described in this post were developed for Sentries. The code is on Github.

Wednesday, February 6, 2013

Breaking the Circuit Breaker

The circuit breaker is this wonderful pattern to protect your application against resources that fail slowly. The idea is that you stop trying to use a resource when it has too many failures. Regular retries test the resource and will make the resource available again. The benefit is that your application can react quickly to a failed resource instead of hogging CPU, threads, network, etc. while you are waiting to find out the resource is unavailable.

So what's wrong?

Its the metaphor. In the classical description a circuit breaker has 3 states: the open state, the closed state and the half-open state. So what does it mean when the circuit breaker is open? When is a bridge open? When you can drive over it, or when you can sail through it? Only when you look at the first image you may see that a traditional open circuit breaker stops flow of electricity. To us that translates to no usage of the resource. In the 'closed' state electricity flows, which translates to having access to our resource. Now read that again and see if you can remember that!

Then we have a half-open state? Again, look at the first image. For such a switch half-open is still open. (A half-open bridge lets no traffic trough at all but that is another topic.) Why do we need the half-open state anyway? In the classical description we attempt to use the resource once while in this state. If it fails just once, we go back to the open state. This seems like a good idea, but let us think of modern networked applications. In such applications many requests are done simultaneously. So as soon as we switch to the half-open state for a retry, many, maybe hundreds of request will immediately try to use the resource, even if it is still down. This is exactly what we were trying to prevent!

Stop!

Although the circuit breaker is a great invention, I think we need a new metaphor, or at least some new terminology.

No more half-open

The first thing we can do is get rid of the half-open state. Instead, when its time to retry, we just let 1 client through to the resource. While that check is in progress we keep denying access to the resource for other clients; we stay in the same state. Only when the single check succeeds, we switch to the state in which we allow full access to the resource.

No more open

The second thing we need to do is to end the confusion on what it means to be 'open'. Instead I propose we call this state the broken state. No further explanation required. Good. In this state we do the regular retries.

Finally, to make things symmetric, I propose to rename the 'closed' state to flow state as all requests are granted.

Metaphor

Above I proposed new terminology but I failed to provide a new metaphor. Unfortunately metaphors are hard to find and too easy to get wrong. Perhaps a good metaphor should be related to the fact that we are limiting the number of errors we tolerate from a resource. If you have an idea, please let me know in a comment. I hope you liked my little rant. Any comments are always welcome.

—   ❧   —

Postscript: Sentries and the circuit breaker

The Sentries library contains a highly optimized circuit breaker implementation for Scala programs. The ideas in this article developed while writing Sentries. Feel free to have a look. As you can see there are only 2 states, the FlowState and the BrokenState. Note that the retryAt in BrokenState is a val; it can not be changed after initialization. When it is time to retry we replace the broken state with a new instance (in method attemptResetBrokenState).

Sunday, September 2, 2012

Introducing Sentries

I recently needed a friendly to use circuit breaker in a Scala program. What I found was okay, but not nearly good enough for high volume, highly concurrent applications. So I set out to make it better.

After some iterations of changing I realized that the circuit breaker could be combined with other stuff like rate limiting, monitoring and such. Now, three months later, Sentries is ready for the world.

Here is an example usage: Please visit Sentries on Githib and let me know what you think.

Update 2012-09-04: Version 0.2 will be available in Maven central around 2012-09-04 18:00 GMT.

Update 2012-10-08: Version 0.3 has just landed in Maven central. Its only feature is the new 'clean()' method on the sentry registry which is useful during testing.

Update 2013-02-06: Version 0.5 is out. This is the first to support Scala 2.10. Breaking change is that all durations are now of type akka.util.Duration or scala.concurrent.duration.Duration (depending on the Scala version) instead of Long.

Tuesday, October 5, 2010

Why Functional Programming Matters - An exploration of functional Scala - Part 2

package nl.grons.whyfp
import scala.math.abs
/**

A translation of Miranda examples in chapter 4 of the paper Why Functional Programming Matters by John Hughes, 1984.

This article is the continuation of part 1 - Chapter 3 examples. I will show you lazy evaluation and how this can be useful.

@author Erik van Oosten
*/

object Chapter4_LazyList {

/*

Chapter 4’s examples make heavy use of Miranda’s lazy evaluation. In particular, it uses the list as defined in chapter 3 in a lazy way. This allows the creation of lists of unbounded length. Scala supports lazy lists as well. You can find an implementation in the Stream class. However, as I want to stay close to the original, I redefine the list from chapter 3 as follows:

*/
/** The list from chapter 3, redefined to make it a lazy list. */ sealed abstract class List[+A] { def head: A def tail: List[A] } /** Lazyly evaluate the argument of the constructor AND the value of tail. */ final class Cons[A](hd: A, tl: => List[A]) extends List[A] { override val head: A = hd override lazy val tail: List[A] = tl }
/*

If you compare this to the List implementation in the previous article, you note that parameter ‘tl’ of ‘Cons’ is passed as an by-name parameter (because of the =>). This means that the given expression is only evaluated when the parameter is used. Too prevent the tail expression to be evaluated again and again, the ‘tail’ value is declared as a val. Now ‘tl’ is evaluated exactly once. It is also declared as lazy so that the evaluation of the expression is delayed until the first time it is needed instead of during construction time.

These 3 things together allow the list to behave lazily.

You might also notice that ‘Cons’ is no longer a case class (not possible with by-name parameters). Therefore I define the following:

*/
object Cons { def apply[A](hd: A, tl: => List[A]) = new Cons(hd, tl) // Warning: using unapply leads to evaluation of the tail. def unapply[A](c: Cons[A]) = Some(c.head, c.tail) }
/*

‘Nil’ did not change:

*/
/** The empty list. */ case object Nil extends List[Nothing] { override def head: Nothing = { throw new NoSuchElementException("head of empty list") } override def tail: List[Nothing] = { throw new NoSuchElementException("tail of empty list") } }
/*

Just for fun: here is an example of how the list can be used to construct an infinite list.

*/
/** A list of all integer numbers starting at x and higher. */ // Miranda: nat x = cons x nat(x+1) def nat(x: Int): List[Int] = Cons(x, nat(x + 1))
/*

The Miranda version of ‘nat’ is limited by the available memory, the Scala version is limited to scala.Int.MaxValue.

Function ‘map’ from the previous chapter won’t work. It will try to completely iterate the list which will obviously fail for unbounded lists. Lets implement it again with some simple pattern matching:

*/
/** Create a new lazy list where each element a is replaced by f(a). */ def map_wrong[A, B](f: A => B)(l: => List[A]): List[B] = l match { case Nil => Nil case Cons(head, tail) => Cons(f(head), map(f)(tail)) }
/*

This implementation seemed to work for quite some time, even though everything was slow. If you look carefully, you will see that it evaluates ‘tail’ in ‘Cons.unapply’ which is invoked to match the pattern Cons(head, tail). The result is that exactly one more item in the list is evaluated then necessary. Though this is merely wasteful for most algorithms, it is catastrophic (as in stack overflow) when that evaluation contains another recursive call to ‘map’. This actually happens in the last example of this article.

Here is an implementation that does not have this problem:

*/
/** Create a new lazy list where each element a is replaced by f(a). */ def map[A, B](f: A => B)(l: => List[A]): List[B] = l match { case Nil => Nil case c: Cons[A] => Cons(f(c.head), map(f)(c.tail)) }
/*

Notice that the recursive expression, which includes c.tail, is now used in a place that is lazily evaluated.

On the console:

scala> import nl.grons.whyfp.Chapter4_LazyList._ scala> def printFirst[A](count: Int, l: List[A]) { if (count > 0 && l != Nil) { println(l.head); printFirst(count - 1, l.tail) } } printFirst: [A](count: Int,l: nl.grons.whyfp.Chapter4_LazyList.List[A])Unit scala> printFirst(3, map[Int,Int](_ * 2)(nat(3))) 6 8 10
*/
}

object Chapter4_1_NewtonRaphsonSquareRoots {

import Chapter4_LazyList._
/*

Here’s an imperative Scala implementation that does a numerical calculation of the square root of a number.

*/
/** Approximate the square root of n to within eps * starting with approximation a0. */ def imperative_squareroot(a0: Double, eps: Double, n: Double): Double = { // Warning: imperative scala! var x = a0 // The initial value of y does not matter so long as abs(x-y) > eps var y = a0 + 2 * eps while (abs(x - y) > eps) { y = x x = (x + n/x) / 2 } x }
/*

Here are my translations of the rest:

*/
/** Next approximation of the square root of n, * based on the previous approximation x. */ // Miranda: next N x = (x + N/x) / 2 def next(n: Double)(x: Double): Double = (x + n/x) / 2 /** Produce an infinite list that starts with a and where all other values * are the result of applying f to the previous value. */ // Miranda: repeat f a = cons a (repeat f (f a)) def repeat[A](f: A => A, a: A): List[A] = Cons(a, repeat(f, f(a))) /** Give the first value of the list that is within eps of its preceding value. */ // Miranda: // within eps (cons a (cons b rest)) = // = b, if abs(a-b) <= eps // = within eps (cons b rest), otherwise def within(eps: Double, l: List[Double]): Double = { val a = l.head if (a.isNaN) throw new RuntimeException("nan") val rest = l.tail val b = rest.head if(abs(a - b) <= eps) b else within(eps, rest) }
/*

When a calculation overflow or underflow occurs (not unlikely when working with approximations), ‘within’ goes amok and will loop forever. This is prevented by validating the head of the list against NaN, Not A Number.

*/
/** Approximate the square root of n to within eps * starting with approximation a0. */ // Miranda: sqrt a0 eps N = within eps (repeat (next N) a0) def sqrt(a0: Double, eps: Double, n: Double) = within(eps, repeat(next(n), a0)) /** Gives the first value of the list that of which the ratio of change * with its preceding value is lower then eps. */ // Miranda: // relative eps (cons a (cons b rest)) = // = b, if abs(a-b) <= eps*abs b // = relative eps (cons b rest), otherwise def relative(eps: Double, l: List[Double]): Double = { val a = l.head if (a.isNaN) throw new RuntimeException("nan") val rest = l.tail val b = rest.head if(abs(a - b) <= eps * abs(b)) b else relative(eps, rest) } /** Approximate the square root of n to within ratio eps starting with * approximation a0. */ // Miranda: relativesqrt a0 eps N = relative eps (repeat (next N) a0) def relativesqrt(a0: Double, eps: Double, n: Double) = relative(eps, repeat(next(n), a0))
/*

On the Console:

scala> import nl.grons.whyfp.Chapter4_LazyList._ scala> import nl.grons.whyfp.Chapter4_1_NewtonRaphsonSquareRoots._ scala> relativesqrt(0.1, 0.01, 4) res0: Double = 2.000010925778043
*/
}

object Chapter4_2_NumericalDifferentiation {

import Chapter4_LazyList._
import Chapter4_1_NewtonRaphsonSquareRoots._
/*

This chapter is about numerical differentiation.

*/
/** An approximation of the differentiation of f for x over h. */ // Miranda: easydiff f x h = (f(x+h)-f x) / h def easydiff(f: Double => Double, x: Double)(h: Double): Double = (f(x + h) - f(x)) / h // Miranda: // differentiate h0 f x = map (easydiff f x)(repeat halve h0) // halve x = x/2 def differentiate(h0: Double, f: Double => Double, x: Double) = map(easydiff(f, x))(repeat(halve, h0)) def halve(x: Double) = x / 2
/*

On the console:

scala> import nl.grons.whyfp.Chapter4_LazyList._ scala> import nl.grons.whyfp.Chapter4_1_NewtonRaphsonSquareRoots._ scala> import nl.grons.whyfp.Chapter4_2_NumericalDifferentiation._ scala> val f: Double => Double = math.pow(_, 3.0) f: (Double) => Double = <function1> scala> within(0.001, differentiate(1, f, 2)) res0: Double = 12.000732436776161

Some more examples:

*/
// Miranda: // elimerror n (cons a (cons b rest)) = // = cons ((b*(2**n)-a)/(2**n-1)) (elimerror n (cons b rest)) def elimerror(n: Double, l: List[Double]): List[Double] = { val a = l.head val bandrest = l.tail val b = bandrest.head Cons( (b * math.pow(2.0, n) - a) / (math.pow(2.0, n) - 1.0), elimerror(n, bandrest)) } /** Calculate the order of n to be used in elimerror. */ // Miranda: // order (cons a (cons b (cons c rest))) = // = round(log2( (a-c)/(b-c) - 1 )) // round x = x rounded to the nearest integer // log2 x = the logarithm of x to the base 2 def order(l: List[Double]): Double = { val LOG2: Double = math.log(2.0) def log2(x: Double) = math.log(x) / LOG2 val a = l.head val b = l.tail.head val c = l.tail.tail.head val o = math.round(log2((a-c)/(b-c) - 1)) if (o.isNaN || o == 0) 1 else o } // Miranda: improve s = elimerror (order s) s def improve(s: List[Double]): List[Double] = elimerror(order(s), s)
/*

Note that my implementation of ‘order’ is not only real (no vague descriptions of ‘round’ and ‘log2’), but it is more robust then the original. First, it protects against NaN for the case that b-c reaches 0. Secondly, it prevents a result of 0 as that leads to a division by 0 in ‘elimerror’.

On the console:

scala> within(0.001, improve(differentiate(1, f, 2))) res0: Double = 11.9998779296875 scala> // Improve can be improved recursively scala> within(0.001, improve(improve(improve(differentiate(1, f, 2))))) res1: Double = 12.0
*/
// Miranda: // super s = map second (repeat improve s) // second (cons a (cons b rest)) = b def superimprove(s: List[Double]): List[Double] = map(second)(repeat(improve, s)) def second(l: List[Double]): Double = l.tail.head def superdifferentiate(f: Double => Double, x: Double): Double = { val eps: Double = 0.00000001 within(eps, superimprove(differentiate(x + 2 * eps, f, x))) }
}

object Chapter4_3_NumericalIntegration {

import Chapter4_LazyList._
/*

This chapter is about numerical integration.

*/
// Miranda: easyintegrate f a b = (f a + f b)*(b-a)/2 def easyintegrate(f: Double => Double, a: Double, b: Double) = (f(a) + f(b)) * (b-a) / 2 // Miranda: integrate f a b = cons (easyintegrate f a b) // (map addpair (zip (integrate f a mid) // (integrate f mid b))) // where mid = (a+b)/2 def integrate_simple(f: Double => Double, a: Double, b: Double): List[Double] = { def addpair(pair: (Double, Double)) = pair._1 + pair._2 val mid = (a + b) / 2 Cons( easyintegrate(f, a, b), map(addpair)(zip(integrate(f, a, mid), integrate(f, mid, b))) ) } /** Convert two lists to a list of pairs. */ // Miranda: zip (cons a s) (cons b t) = cons (pair a b) (zip s t) def zip[A,B](as: => List[A], bs: => List[B]): List[(A,B)] = as match { case Nil => Nil case a: Cons[A] => bs match { case Nil => Nil case b: Cons[B] => Cons((a.head, b.head), zip(a.tail, b.tail)) } }
/*

Again note that ‘zip’ was defined such that a.tail and b.tail are defined as a lazily evaluated argument.

*/
// Miranda: // integrate f a b = integ f a b (f a) (f b) // integ f a b fa fb = cons ((fa+fb)*(b-a)/2) // (map addpair (zip (integ f a m fa fm) // (integ f m b fm fb))) // where m = (a+b)/2 // fm = f m def integrate(f: Double => Double, a: Double, b: Double): List[Double] = { def integ(a: Double, b: Double, fa: Double, fb: Double): List[Double] = { def addpair(pair: (Double, Double)) = pair._1 + pair._2 val mid = (a + b) / 2 val fm = f(mid) Cons( (fa + fb) * (b-a) / 2, map(addpair)(zip(integ(a, mid, fa, fm), integ(mid, b, fm, fb))) ) } integ(a, b, f(a), f(b)) }
/*

And of course, seeing is believing:

scala> import nl.grons.whyfp.Chapter4_LazyList._ scala> import nl.grons.whyfp.Chapter4_1_NewtonRaphsonSquareRoots._ scala> import nl.grons.whyfp.Chapter4_2_NumericalDifferentiation._ scala> import nl.grons.whyfp.Chapter4_3_NumericalIntegration._ scala> def f(x: Double): Double = 1/(1+x*x) f: (x: Double)Double scala> within(0.001, integrate(f, 1, 2)) res0: Double = 0.3218612363325556 scala> relative(0.001, integrate(f, 1, 2)) res1: Double = 0.3217782239721789 scala> relative(0.001, superimprove(integrate(f, 1, 2))) res2: Double = 0.3217525935931843
*/
}
/*

Conclusion

Scala is an excellent language for functional style programs and I really like that static typing prevents tons of errors. My only problem with Scala — in terms of the ‘Why Functional Programming matters’ paper — is that lazy evaluation is not the natural way of working. The way expressions are evaluation is more closely to the Java world. To make use of lazy evaluation you have to be very much aware of what your doing and carefully make use of the lazy keyword and by-name parameters. With Miranda this comes naturally though probably at the cost of some performance for the case laziness is not needed. Perhaps that the standard collections library (scala.collection.immutable.Stream) hides this sufficiently and makes life easy.

A more fundamental way to fix this problem would be to express the laziness nature of an expression in its type. I am out of the academic world for a looong time, so I have no idea if this is being researched at all. It will take some hard thoughts to make this practical and simple to use though.

The next challenge would be to repeat this experiment with the standard collections. But I’ll leave that to someone with more time...

*/