Monday, December 2, 2024

Zio-kafka, faster than java-kafka

TlDR: Concurrency and pre-fetching gives zio-kafka a higher consumer throughput than the default java Kafka client for most workloads.

Zio-kafka is an asynchronous Kafka client based on the ZIO async runtime. It allows you to consume from Kafka with a ZStream per partition. ZStreams are streams on steroids; they have all the stream operators known from e.g. RX-Java and Monix, and then some. ZStreams are supported by ZIO's super efficient runtime.

So the claim is that zio-kafka consumes faster than the regular Java client. But zio-kafka wraps the default java Kafka client. So how can it be faster? The trick is that with zio-kafka, processing (meaning your code) runs in parallel with the code that pulls records from the Kafka broker.

Obviously, there is some overhead in distributing the received records to the streams that need to process it. So the question then is, when is the extra overhead of zio-kafka less than the gain by parallel processing? Can we estimate this? In turns out we can! For this estimate we use the benchmarks that zio-kafka runs from GitHub. In particular, we look at these 2 benchmarks with runs from 2024-11-30:

  • the throughput benchmark, uses zio-kafka and takes around 592 ms
  • the kafkaClients benchmark, uses Kafka java client directly and takes around 544 ms

Both benchmarks:

  • run using JMH on a 4 core github runner with 16GB RAM
  • consume 50k records of ~512 bytes from 6 partitions
  • process the records by counting the number of incoming records, batch by batch
  • are using the same consumer settings, in particular max.poll.records is set to 1000
  • the broker runs in the same JVM as the consumer, so there is almost no networking overhead
  • do not commit offsets (but note that committing does not change the outcome of this article)

First we calculate the overhead of java-kafka. For this we assume that counting the number of records takes no time at all. This is reasonable, a count operation is just a few CPU instructions, nothing compared to fetching something over the network, even if it is on the same machine. Therefore, in the figure, the 'processing' rectangle collapses to a thick vertical line.


Polling and processing as blocks of execution on a thread/fiber.

We also assume that every poll returns the same amount of records, and takes the same amount of time. This is not how it works in practise, but when the records are already available we're probably not that far off. As the program consumes 50k records, and each poll returns a batch of 1k records, there are 50 polls. Therefore, the overhead of java-kafka is 544/50 ≈ 11 ms per poll.

Now we can calculate the overhead of zio-kakfa. Again we assume that processing takes no time at all, so that every millisecond that the zio-kafka benchmark takes longer can be attributed to zio-kafka's overhead. The zio-kakfa benchmark runs longer for 592-544 = 48 ms. Therefore zio-kafka's overhead is 48/50 ≈ 1ms per poll.

Now lets look at a more realistic scenario where processing does take time.


Polling and processing as blocks of execution on a thread/fiber.

As you can see a java-kafka program alternates between polling and processing, while the zio-kafka program processes the records in parallel distributed over 6 streams (1 stream per partition, each stream runs independently on its own ZIO fiber). In the figure we assume that the work is evenly distributed, but unless the load is really totally skewed, it won't matter that much in practice due to zio-kafka's per-partition pre-fetching. As you can see the zio-kafka program in this scenario is much faster. But is this a realistic example? When do zio-kafka programs become faster than java-kafka programs?

Observe that how long polling takes is not important for this question because polling time is the same for both programs. So we will only look at the time between polls. We use (p) for the processing time per batch for the java-kafka program. For the zio-kafka program time between polls is equal to zio-kafka's overhead (o) plus processing time per batch divided by the number of partitions (n). So we want to know for which p:

o + p n p

This solves to:

p o n n - 1

For the benchmark we fill in the zio-kakfa overhead (o = 1 ms) and the number of partitions (n = 6) and we get: p ≥ 1.2 ms. (Remember, this is processing time per batch of 1000 records.)

In the previous paragraph we assumed that processing is IO intensive. When the processing is compute intensive, ZIO can not actually run more fibers in parallel than the number of cores available. In our example we have 4 cores, using n = 4 gives p ≥ 1.3 ms which is still a very low tipping point.

Conclusion

For IO loads, or with enough cores available, even quite low processing times per batch, makes zio-kafka consume faster than the plain java Kafka library. The plain java consumer is only faster for trivial processing tasks like counting.

If you'd like to know what this looks like in practice, you can take a look at this real-world example application: Kafka Big-Query Express. This is a zio-kafka application recently open sourced by Adevinta. Here is the Kafka consumer code. (Disclaimer: I designed and partly implemented this application.)

Sunday, August 25, 2024

MavenGate gets it all wrong and hurts open source

MavenGate claims that some Maven namespaces (for example nl.grons, the namespace I control) are vulnerable to hijacking. If I understand it correctly, the idea is that hackers can place a package with the existing or newer Maven coordinates in the same, or different Maven repository, thereby luring users into using a hacked version of your package. Sounds serious, and it probably is.

However, they then went on to create a list of Maven namespaces that are vulnerable. Unfortunately, they do not say what criteria were used to put namespaces on this list. Is it because the associated DNS domain expired? Because the DNS domain moved to a different owner, or only to another DNS registrar? Is it because the PGP key used to sign packages is not on a known server? Or something else entirely? For some reason my namespace ended up on the list, even though I never lost control of the DNS domain and strictly follow all their recommendations.

Even more unfortunately, this is not even the right way to look at the problem. It is not the namespaces that are vulnerable, it is the Maven repositories themselves! It is the Maven repositories that are responsible for checking the namespace against ownership of the associated DNS domain and link that to a PGP key. Once the key is linked to the namespace, packages signed with a different PGP key should not be accepted. Any exceptions to this rule should be considered very carefully.

Now to my second point, how does this hurt open source? Since my Maven Central account was blocked after MavenGate, I contacted Sonatype, the owners of Maven Central. Luckily, I use Keybase and was therefore easily able to assert I am still owner of the DNS domain and the PGP key that has been used to sign packages. But then Sonatype also wrote this:

It is important to note that, even if we are able to verify your publisher authorization, security software may flag components published under this namespace. It may be worth considering registering a separate, new namespace with a clean-slate reputation.

I am just an individual publishing open source packages in my free time. IMHO it is totally unreasonable to ask people to switch to another domain because some random company on the internet suspects you might be vulnerable! Switching to a new DNS domain is a lot of work and in addition, not everyone is willing or able to bear the costs. I suspect that many people, including me, will give up rather than join a race against 'security software'.

To summarize:

  • MavenGate declares Maven namespaces to be vulnerable based on unclear and probably wrong criteria.
  • If this is taken seriously, the bar to publishing open source becomes so high that many will give up instead.

Note: I have tried to contact the MavenGate authors, but unfortunately did not receive a reply yet.

Tuesday, May 28, 2024

Java plugins with isolating class loaders

My team's article on how to write Java plugins has been published on the Adevinta Tech Blog. Enjoy!

Friday, April 26, 2024

Making ZIO-Kafka Safer And Faster

My talk "Making ZIO-Kafka Safer And Faster" at Functional Scala 2023 went online!

Explore Erik van Oosten's presentation on improving ZIO-Kafka for better safety and performance. Learn about the modifications introduced in 2023, get insights into the library's internal workings, and uncover useful ZIO techniques and Kafka's lesser-known challenges.

Contents in the video:

2:07 Improvements
9:06 Results
10:29 Rebalances
18:10 The Future

Sunday, April 21, 2024

Tips for running Roundcube for years

I have been running a Roundcube instance for about 8 years now. At the beginning I only used it as a backup email client that can be invoked from anywhere. Nowadays, is it so good that I didn't even bother installing Thunderbird on my work laptop.

Unfortunately, I discovered that the docker backup of Roundcube had become quite large, many GBs. This was quite unexpected for a service that is used by only 2 people. The reason was quickly found: the sqlite database was huge!

What did I know? I though ony Postgresql needed scheduled cleanups. Turns out sqlite needs it too! Would this be the reason Android phones tend to fill up over time?

Anyways, the fix was simple: run the vacuum command! So, now I have the following run daily using cron. Problem solved!

sqlite3 /paht/to/roundcubemail.sqlite 'VACUUM;'

Wednesday, January 24, 2024

Scheduling tasks and sharing state with streams

Recently we built a system that needs to perform 2 tasks. Taks 1 runs every 15 minutes, task 2 runs every 2 minutes. Task 1 kicks off some background jobs (an upload to BigQuery), task 2 checks upon the results of these background jobs and does some cleanup when they are done (delete the uploaded files). The two tasks need to share information back and forth about what jobs are running in the background.

Now think to yourself (ignore the title for now 😉), what would be the most elegant way to implement this? I suspect that most developers will come with a solution that involves some locking, synchronisation and global state. For example by sharing the information through a transactional database, or by using a semaphore to prevent the two tasks from running at the same time plus sharing information in a global variable. This is understandable, most programming environments do not provide better techniques for these kinds of problems at all!

However, if your environment supports streams and has some kind of scheduling, here are two tricks you can use: one for the scheduling of the tasks, the second for sharing information without a global variable.

Here is an example for the first written in Scala using the ZIO streams library. Read on for an explanation.

import zio._ import zio.stream._ def performTask1: Task[Unit] = ??? def performTask2: Task[Unit] = ??? // An enumeration (scala 2 style) for our task. sealed trait BusinessTask object Task1 extends BusinessTask object Task2 extends BusinessTask ZStream.mergeAllUnbounded()( ZStream.fromSchedule(Schedule.fixed(15.minutes)).as(Task1), ZStream.fromSchedule(Schedule.fixed(2.minutes)).as(Task2) ) .mapZIO { case Task1 => performTask1 case Task2 => performTask2 } .runDrain

We create 2 streams, each stream contains sequential numbers, emitted upon a schedule. As you can see, the schedule corresponds directly with the requirements. We do not really care for the sequential numbers, so with stream operator as we convert the stream's emitted values to a value from the BusinessTask enumeration.

Then we merge the two streams. We now have a stream that emits the two enumeration values at the time the corresponding task should run. This is already a big win! Even when the two schedules produce an item at the same time, the tasks will run sequentially. This is because by default streams are evaluated without parallelism.

We are not there yet though. The tasks need to share information. They could access a shared variable but then we still have tightly coupled components and no guarantees that the shared variable is used correctly.

Also, wouldn't it be great if performTask1 and performTask2 are functions that can be tested in isolation? With streams this is possible.

Here is the second part of the idea. Again, read on for an explanation.

case class State(...) val initialState = State(...) def performTask1(state: State): Task[State] = ??? def performTask2(state: State): Task[State] = ??? ZStream.mergeAllUnbounded()( ZStream.fromSchedule(Schedule.fixed(15.minutes)).as(Task1), ZStream.fromSchedule(Schedule.fixed(2.minutes)).as(Task2) ) .scanZIO(initialState) { (state, task) => task match { case Task1 => performTask1(state) case Task2 => performTask2(state) } } .runDrain

We have changed the signatures of the performTask* methods. Also, the mapZIO operator has been replaced with scanZIO. The stream operator scanZIO works much like foldLeft on collections. Like foldLeft, it accepts an initial state, and a function that combines the accumulated state plus the next stream element (of type BusinessTask) and converts those into the next state.

Stream operator scanZIO also emits the new states. This allows further common processing. For example we can persist the state to disk, or collect custom metrics about the state.

Conclusion

Using libraries with higher level constructs like streams, we can express straightforward requirements in a straightforward way. With a few lines of code we have solved the scheduling requirement, and showed an elegant way of sharing information between tasks without global variables.

Sunday, November 26, 2023

Discovering scala-cli while fixing my digital photo archive

Over the years I built up a nice digital photo library with my family. It is a messy process. Here are some of the things that can go wrong:

  • Digital cameras that add incompatible exif metadata.
  • Some files have exif tag CreateDate, others DateTimeOriginal.
  • Images shared via Whatsapp or Signal do not have an exif date tag at all.
  • Wrong rotation.
  • Fuzzy, yet memorable jpeg images wich take 15MB because of their resolution and high quality settings.
  • Badly supported ancient movie formats like 3gp and RIFF AVI.
  • Old movie formats that need 3 times more disk space than h.265.
  • Losing almost all your photos because you thought you could copy an Iphoto library using tar and cp (hint: you can’t). (This took a low-level harddisk scan and months of manual de-duplication work to recover the photos.)
  • Another low-level scan of an SD card to find accidentally deleted photos.
  • Date in image file name corresponds to import date, not creation date.
  • Weird file names that order the files differently than from creation date.
  • Images from 2015 are stored in the folder for 2009.
  • etc.

I wrote countless bash scripts to mold the collection into order. Unfortunately, to various success. However, now that I am ready to import the library into Immich (please, do sponsor them, they are building a very nice product!), I decided to start cleaning up everything.

So there I was, writing yet another bash script, struggling with parsing a date response from exiftool. And then I remembered the recent articles about scala-cli and decided to try it out.

The experience was amazing! Even without proper IDE support, I was able to crank out scripts that did more, more accurately and faster than I could ever have accomplished in bash.

Here are some of the things that I learned:

  • Take the time to learn os-lib.
  • If the scala code gets harder to write, open a proper IDE and use code completion. Then copy the code over to your .sc file.
  • One well placed .par (using scala-parallel-collections) can more than quadruple the performance of your script!
  • You will still spend a lot of time parsing the output from other programs (like exiftoool).
  • Scala-cli scripts run very well from Github actions as well.

Conclusions

Next time you open your editor to write a bash file, think again. Perhaps you should really write some scala instead.