Thursday, 1 June 2017

Playing with thread pools

In the project I've been working on I recently came across a situation where the service was hammered by updates and it started struggling under the load of simultaneous client (predominantly read) requests. The solution was maintain separate thread pools for updates and client requests. The service needs to be available, but we can accept delays in updates. This inspired me to play around with thread pools - which for some reason I've never done before - in a small demonstration.

If you run CommonThreadPoolExperiment you'll see that the number of completed read processes converges to that of the write processes. On my machine, around 10 seconds.

With SeparateThreadPoolsExperiment the numbers are starkly different, the reads are unperturbed by the much slower writes.


package org.home
import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicInteger
import com.typesafe.scalalogging.StrictLogging
import scala.collection.immutable.Seq
import scala.collection.mutable.ListBuffer
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Random
import ExecutionContextFactory.instantiate
object SeparateThreadPoolsExperiment extends App with StrictLogging {
val readContext: ExecutionContext = instantiate(fixedThreadPoolSize = 2)
val writeContext: ExecutionContext = instantiate(fixedThreadPoolSize = 2)
ExperimentRunner.run(writeContext, readContext)
}
object CommonThreadPoolExperiment extends App with StrictLogging {
val commonContext: ExecutionContext = instantiate(fixedThreadPoolSize = 4)
ExperimentRunner.run(writeContext = commonContext, readContext = commonContext)
}
object ExperimentRunner extends StrictLogging {
//Runs reader and writer processes with the execution contexts and logs their throughput
def run(writeContext: ExecutionContext, readContext: ExecutionContext) = {
def longJob(n: Int): Long = {
//do something CPU intensive
def now() = System.currentTimeMillis()
val start = now()
(1 to n).map(_ ⇒ Random.nextInt).sorted
now() - start
}
def doRead(): Long = longJob(5000)
def doWrite(): Long = longJob(25000)
val writers = new Processes(n = 10000, operation = doWrite, writeContext)
val readers = new Processes(n = 10000, operation = doRead, readContext)
//start writers asynchronously
Future(writers.run())(writeContext)
//start readers asynchronously
Future(readers.run())(readContext)
//Log the throughput once in every second
def avg(ns: Seq[Long]) = ns.sum.toDouble / ns.size
val i = new AtomicInteger()
while (i.getAndIncrement() < 100) {
logger.info(
s"""
|iteration: ${i.get()}
|readers: [${readers.jobsDone()}] [${avg(readers.responseTimes())}]
|writers: [${writers.jobsDone()}][${avg(writers.responseTimes())}]"""
.stripMargin)
Thread.sleep(1000)
}
}
}
//Asynchronously runs 'n' processes and keeps track of their state
class Processes(n: Int, operation: () ⇒ Long, ec: ExecutionContext) extends StrictLogging {
private val responseTimesBuffer: ListBuffer[Long] = ListBuffer.empty
private val jobs = new AtomicInteger(0)
def run(): Unit = {
(1 to n).foreach { id ⇒
Future {
responseTimesBuffer.append(operation())
jobs.incrementAndGet()
}(ec)
}
}
//only take the last 100
def responseTimes(): Seq[Long] = responseTimesBuffer.takeRight(100).toList
def jobsDone(): Int = jobs.get()
}
object ExecutionContextFactory extends StrictLogging {
//Creates an ExecutionContext with a fixed thread pool
def instantiate(fixedThreadPoolSize: Int): ExecutionContext = new ExecutionContext {
val threadPool = Executors.newFixedThreadPool(fixedThreadPoolSize)
override def reportFailure(cause: Throwable): Unit = {
logger.error("Some error", cause)
}
override def execute(runnable: Runnable): Unit = {
threadPool.submit(runnable)
}
}
}