Tuesday, July 7, 2020

Avoiding Scala's Option.fold

Scala's Option.fold is a bit nasty to use sometimes because type inference is not always great. Here is a stupid toy example:

val someValue: Option[String] = ??? val processed = someValue.fold[Either[Error, UserName]]( Left(Error("No user name given")) )( value => Right(UserName(value)) )

The [Either[Error, UserName]] on fold is necessary otherwise the scala compiler can not derive the type.

Here is a really small trick to avoid Option.fold when you need to convert an Option to an Either:

val someValue: Option[String] = ??? val processed = someValue .toRight(left = Error("No user name given")) .map(value => Right(UserName(value)))

Much nicer!

Wednesday, April 15, 2020

Traefik v2 enable HSTS, Docker and nextcloud

This took me days to figure out how to configure Traefik v2. Here it is for posterity.

This is a docker-compose.yaml fragment to append to a service section:

labels: - "traefik.enable=true" - "traefik.http.routers.service.rule=Host(`www.example.com`)" - "traefik.http.routers.service.entrypoints=websecure" - "traefik.http.routers.service.tls.certresolver=myresolver" - "traefik.http.middlewares.servicests.headers.stsincludesubdomains=false" - "traefik.http.middlewares.servicests.headers.stspreload=true" - "traefik.http.middlewares.servicests.headers.stsseconds=31536000" - "traefik.http.middlewares.servicests.headers.isdevelopment=false" - "traefik.http.routers.service.middlewares=servicests"

It will:

  • tell Traefik to direct traffic for www.example.com to this container,
  • on the websecure entrypoint (this is configured statically),
  • using the myresolver (for Acme, resolver also configured statically),
  • configure middleware to add HSTS headers,
  • enable the middleware.


Here is a slightly more complex example for a nextcloud deployment which includes the recommended redirects.

labels: - "traefik.enable=true" - "traefik.http.routers.nextcloud.rule=Host(`nextcloud.example.com`)" - "traefik.http.routers.nextcloud.entrypoints=websecure" - "traefik.http.routers.nextcloud.tls.certresolver=myresolver" - "traefik.http.middlewares.nextcloudredir.redirectregex.permanent=true" - "traefik.http.middlewares.nextcloudredir.redirectregex.regex=https://(.*)/.well-known/(card|cal)dav" - "traefik.http.middlewares.nextcloudredir.redirectregex.replacement=https://$$1/remote.php/dav/" - "traefik.http.middlewares.nextcloudsts.headers.stsincludesubdomains=false" - "traefik.http.middlewares.nextcloudsts.headers.stspreload=true" - "traefik.http.middlewares.nextcloudsts.headers.stsseconds=31536000" - "traefik.http.middlewares.nextcloudsts.headers.isdevelopment=false" - "traefik.http.routers.nextcloud.middlewares=nextcloudredir,nextcloudsts"

Friday, April 10, 2020

Akka-http graceful shutdown


By default, when you restart a service, the old instance is simply killed. This means that all current requests are aborted; the caller will be left with a read timeout. We can do better!


A graceful shutdown looks as follows:

  1. The scheduler (Kubernetes, Nomad, etc.) sends a signal (usually SIGINT) to the service.
  2. The service gets the signal and closes all server-ports; it can no longer receive new request. This is very quickly picked up by the load-balancer. The load-balancer will no longer send new requests.
  3. All requests-in-progress complete one by one.
  4. When all requests are completed, or on a timeout, the service terminates.

Getting the signal to your service is unfortunately not always trivial. I have seen the following problems:

  • The Nomad scheduler by default does not send an SIGINT signal to the service. You will have to configure this.
  • When the service runs in a Docker container, by default the init process (with PID 1) will ignore the signal. Back when every Unix installation had control over the entire computer this made lots of sense. In a container though, not so much. This may be fixed in newer Docker version. Otherwise you will have to use a special init process such as tini.

Akka-http has excellent support for graceful shutdown. Unfortunately, the documentation is not very clear about it. Here follows an example which can be used as a template:

import akka.http.scaladsl.Http import akka.http.scaladsl.server._ import scala.concurrent.duration._ val logger = ??? val route: Route = ??? val interface: String = "" val port: Int = 80 val shutdownDeadline: FiniteDuration = 30.seconds Http() .bindAndHandle(route, interface, port) .map { binding => logger.info( "HTTP service listening on: " + s"http://${binding.localAddress.getHostName}:${binding.localAddress.getPort}/" ) sys.addShutdownHook { binding .terminate(hardDeadline = shutdownDeadline) .onComplete { _ => system.terminate() logger.info("Termination completed") } logger.info("Received termination signal") } } .onComplete { case Failure(ex) => logger.error("server binding error:", ex) system.terminate() sys.exit(1) case _ => }

Tuesday, March 3, 2020

Push Gauges

A colleague was complaining to me that Micrometer gauges didn't work the way he expected. This led to some interesting work.

What is a gauge?

In science a gauge is a device for making measurements. In computer systems a gauge is very similar: a 'metric' which tracks something in your system over time. For example, you could track the number of items in a job queue. Libraries like Micrometer and Dropwizard metrics make it easy to define gauges. Since the measurement in itself is not useful, those libraries also make it easy to send the measurements to a metric system such as Graphite or Prometheus. These systems are used for visualization and generating alerts.

Gauges are typically defined with a callback function that does the measurement. For example, using metrics-scala, the scala API for Dropwizard metrics, it looks like:

class JobQueue extends DefaultInstrumented { private val waitingJobsQueue = ??? // Defines a gauge metrics.gauge("queue.size") { // This code block is the callback which does a 'measurement'. waitingJobsQueue.size } }

Please note that the metric library determines when the callback function is invoked. For example, once every minute.

What is a push gauge?

My colleague had something else in mind. He didn't have access to the value all the time, but only when something was being processed. More like this:

class ExternalCacheUpdater extends DefaultInstrumented { def updateExternalCache(): Unit = { val items = fetchItemsFromDatabase() pushItemsToExternalCache(items) gauge.push(items.size) // Pushes a new measurement to the gauge. } }

In the example the application becomes responsible for pushing new measurements. The push gauge simply keeps track of the last value and reports that whenever the metrics library needs it. So under the covers the push gauge behaves like a normal gauge.

Push gauges like in this example are now made possible in this pull-request for metrics-scala. The only thing that was missing is the definition of the push gauge:

class ExternalCacheUpdater extends DefaultInstrumented { // Defines a push gauge private val gauge = metrics.pushGauge[Int]("cached.items", 0) def updateExternalCache(): Unit = // as above }
Push gauge with timeout

In some situations it may be misleading to report a very old measurement as the 'current' value. If the external cache in our example evicts items after 10 minutes, then the push gauge should not report measurements from more then 10 minutes ago. This is solved with a push gauge with timeout:

class ExternalCache extends DefaultInstrumented { // Defines a push gauge with timeout private val gauge = metrics.pushGaugeWithTimeout[Int]("cached.items", 0, 10.minutes) def updateExternalCache(): Unit = // as above }
Feedback wanted!

I have not seen this concept before in any metric library in the JVM ecosystem. Therefore I would like to collect as much feedback as possible before shipping this as a new feature of metrics-scala. If you have any ideas, comments or whatever, please leave a comment on the push-gauges pull-request or drop me an email!

Update 2020-03-05: The code example have been updated to reflect changes in the pull request.