Thursday, 1 June 2017

Playing with thread pools

In the project I've been working on I recently came across a situation where the service was hammered by updates and it started struggling under the load of simultaneous client (predominantly read) requests. The solution was maintain separate thread pools for updates and client requests. The service needs to be available, but we can accept delays in updates. This inspired me to play around with thread pools - which for some reason I've never done before - in a small demonstration.

If you run CommonThreadPoolExperiment you'll see that the number of completed read processes converges to that of the write processes. On my machine, around 10 seconds.

With SeparateThreadPoolsExperiment the numbers are starkly different, the reads are unperturbed by the much slower writes.


package org.home
import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicInteger
import com.typesafe.scalalogging.StrictLogging
import scala.collection.immutable.Seq
import scala.collection.mutable.ListBuffer
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Random
import ExecutionContextFactory.instantiate
object SeparateThreadPoolsExperiment extends App with StrictLogging {
val readContext: ExecutionContext = instantiate(fixedThreadPoolSize = 2)
val writeContext: ExecutionContext = instantiate(fixedThreadPoolSize = 2)
ExperimentRunner.run(writeContext, readContext)
}
object CommonThreadPoolExperiment extends App with StrictLogging {
val commonContext: ExecutionContext = instantiate(fixedThreadPoolSize = 4)
ExperimentRunner.run(writeContext = commonContext, readContext = commonContext)
}
object ExperimentRunner extends StrictLogging {
//Runs reader and writer processes with the execution contexts and logs their throughput
def run(writeContext: ExecutionContext, readContext: ExecutionContext) = {
def longJob(n: Int): Long = {
//do something CPU intensive
def now() = System.currentTimeMillis()
val start = now()
(1 to n).map(_ ⇒ Random.nextInt).sorted
now() - start
}
def doRead(): Long = longJob(5000)
def doWrite(): Long = longJob(25000)
val writers = new Processes(n = 10000, operation = doWrite, writeContext)
val readers = new Processes(n = 10000, operation = doRead, readContext)
//start writers asynchronously
Future(writers.run())(writeContext)
//start readers asynchronously
Future(readers.run())(readContext)
//Log the throughput once in every second
def avg(ns: Seq[Long]) = ns.sum.toDouble / ns.size
val i = new AtomicInteger()
while (i.getAndIncrement() < 100) {
logger.info(
s"""
|iteration: ${i.get()}
|readers: [${readers.jobsDone()}] [${avg(readers.responseTimes())}]
|writers: [${writers.jobsDone()}][${avg(writers.responseTimes())}]"""
.stripMargin)
Thread.sleep(1000)
}
}
}
//Asynchronously runs 'n' processes and keeps track of their state
class Processes(n: Int, operation: () ⇒ Long, ec: ExecutionContext) extends StrictLogging {
private val responseTimesBuffer: ListBuffer[Long] = ListBuffer.empty
private val jobs = new AtomicInteger(0)
def run(): Unit = {
(1 to n).foreach { id ⇒
Future {
responseTimesBuffer.append(operation())
jobs.incrementAndGet()
}(ec)
}
}
//only take the last 100
def responseTimes(): Seq[Long] = responseTimesBuffer.takeRight(100).toList
def jobsDone(): Int = jobs.get()
}
object ExecutionContextFactory extends StrictLogging {
//Creates an ExecutionContext with a fixed thread pool
def instantiate(fixedThreadPoolSize: Int): ExecutionContext = new ExecutionContext {
val threadPool = Executors.newFixedThreadPool(fixedThreadPoolSize)
override def reportFailure(cause: Throwable): Unit = {
logger.error("Some error", cause)
}
override def execute(runnable: Runnable): Unit = {
threadPool.submit(runnable)
}
}
}

Friday, 17 March 2017

Imitating Scala Futures with Go channels (in Go and Clojure)

Scala's Future abstraction is one of my favourite feature of the language. It's simple, elegant and powerful. I make the beginner's attempt to mimic them in Go in a simplistic domain,
Let's assume we have two functions which will take some time (or http calls that naturally return Futures) and we need to aggregate their results.

The Scala way

//assume these take time
def longComp1(): Int = 5
def longComp2(): Int = 6
def aggregate(i: Int, j: Int): Unit = {
println(s"i is $i, j is $j")
}
val longComp1Result = Future { longComp1() }
val longComp2Result = Future { longComp2() }
val aggregatedFuture: Future[Unit] = for {
r1 <- longComp1Result
r2 <- longComp1Result
} yield aggregate(r1, r2)
view raw Futures.scala hosted with ❤ by GitHub

The Go way

//define a type for brevity below
type LongComp func() int
longComp1 := func() int {
return 5
}
longComp2 := func() int {
return 6
}
aggregate := func(is []int) {
fmt.Println(is)
}
f := func(fs []LongComp, agg func([]int)) {
channels := []chan int{}
//loop through the functions
for _, fun := range fs {
//create a channel for each and load it's result in
c := make(chan int)
channels = append(channels, c)
go func(f LongComp) {
c <- f()
close(c)
}(fun)
}
//get all the elements off the channel and apply the aggregate function
go func() {
results := []int{}
for _, c := range channels {
results = append(results, <-c)
}
agg(results)
}()
}
f([]LongComp{longComp1, longComp2}, aggregate)
view raw futures.go hosted with ❤ by GitHub

It's rather clunky compared to Scala's solution. But the main problem is that it's not polymorphic on the types. The same lump of code would need to be redefined every time the function signatures change.

Here is a more generic version below that uses explicit type casting.


longComp1 := func() int {
return 5
}
longComp2 := func() int {
return 6
}
aggregate := func(is []int) {
fmt.Println(is)
}
type LongComp2 func() interface{}
//the "generic" function that works regardless of the underlying type
f2 := func(fs []LongComp2, agg func([]interface{})) {
channels := []chan interface{}{}
for _, fun := range fs {
c := make(chan interface{})
channels = append(channels, c)
go func(f LongComp2) {
c <- f()
close(c)
}(fun)
}
go func() {
results := []interface{}{}
for _, c := range channels {
results = append(results, <-c)
}
agg(results)
}()
}
//functions needs to be transformed to accept and return interface{}
//transform a ()->int function to a ()->interface{} function
//without changing behaviour
wrap := func(f func() int) func() interface{} {
return func() interface{} {
return f()
}
}
f2([]LongComp2{
wrap(longComp1),
wrap(longComp2)},
//transform the aggregate function to accept []interface{} instead of []int
func(xs []interface{}) {
is := []int{}
for _, x := range xs {
is = append(is, x.(int))
}
aggregate(is)
})
view raw futures2.go hosted with ❤ by GitHub

Not the nicest code I've ever written. If only Go had generics... Any Golang expert out there who could suggest improvements?

The Clojure Way

(ns home.async-play
(:require [clojure.core.async :as async]))
(defn long-comp1 [] 5)
(defn long-comp2 [] 6)
(defn aggregate [xs]
(println xs))
;;simple version, working with 2 functions
(let [c1 (async/go (long-comp1))
c2 (async/go (long-comp2))]
(async/go
(aggregate [(async/<! c1) (async/<! c2)])))
;;generalized version, working with a sequence of functions
(defn fs->chans
"Returns channels holding the result of each function execution"
[fs]
(for [f fs] (async/go (f))))
(let [channels (fs->chans [long-comp1 long-comp2])]
(async/go
(aggregate (map async/<!! channels))))
view raw futures.clj hosted with ❤ by GitHub

Clojure borrowed its go-routine and channels idea from Golang. Yet the difference between the Clojure and Go versions in terms of brevity is striking. It's the result of 3 independent factors. One, Clojure is dynamically typed, hence there is no need to hassle with neither function signatures nor generics, or rather the lack of them. Two, it's a functional language. Maps and list comprehensions are way more terse than Go's for-loops. The third factor is async/go returns a channel containing the result of the function executed. Go needs to create a slice of channels first, loop through the slice of functions, create anonymous functions in go blocks where the function is executed and its result is put on the channel, ....lot's of hassle.

With generics many of Go's problems would go away. Rob Pike explicitly said, no generics, but hopefully someone will just force him or do it himself instead.

Monday, 13 March 2017

Solving problems in Scala, Clojure and Go - Template Method Design Pattern

I've planning for ages to write a series of posts about comparing OO and FP solutions for common problems. In a similar vein comparing Scala and Clojure. Usually my enthusiasm doesn't survive until I have time to sit down in front of the computer out of working hours. But recently I've started to read about Go and it rekindled my interest in such mental exercises. It also gives me a nice opportunity to learn Go. Also these 3 languages are quite different, which makes these little problems even more educative.

Without further ado, here is the first one.

In OO polymorphism is achieved by inheritance. Inheritance also enables sharing data and logic sparing the developer some code duplication. It is a powerful tool, but nowadays generally regarded overused. The Template Method Design Pattern is a nice example and I've implemented it dozens (if not hundreds) of times at work. Let's see how to solve the problems it solves in languages that don't have inheritance.

The idea is to devise a toy problem, solve it in Scala (it would be the same in Java, I just want keep the boilerplate down) to demonstrate the OO-way, then come up with Clojure and Go solutions.

The Problem

Our domain has employees who can fall into 2 broad categories, Office Workers and Field Workers. Their base salary and the calculation of how their years at the company contribute to their salary are different. I let the code speak for itself.

Scala/OO solution

sealed abstract class Employee(val name: String, years: Int, baseSalary: BigDecimal) {
//abstract method
def yearsContribution(): BigDecimal
//do the same logic with different data and sub-logic in each subclass
def salary(): BigDecimal = baseSalary + yearsContribution()
override def toString: String = s"My name is $name and I've been working here for $years years."
}
class OfficeWorker(name: String, years: Int)
extends Employee(name, years, baseSalary = BigDecimal(1000)) {
override def yearsContribution(): BigDecimal = years * 30
}
//extra fields
class FieldWorker(name: String, years: Int, physicalWork: Boolean)
extends Employee(name, years, baseSalary = BigDecimal(800)) {
override def yearsContribution(): BigDecimal =
years * (if (physicalWork) 40 else 30)
}
object EmloyeeTestRunner extends App {
val o = new OfficeWorker("joe", 12)
val f = new FieldWorker("jack", 8, true)
Seq[Employee](o, f).foreach { e ⇒
println(s"$e. I earn ${e.salary()}")
}
}
Plain and simple for a OO developer.

Clojure solution

(ns example
(:require [clojure.spec :as s]))
;; Spec section. It's not needed for the solution, but provides
;; documentation and can generate tests.
(def employee-types #{:office-worker :field-agent})
(s/def ::type employee-types)
(s/def ::years (s/int-in 0 100))
(s/def ::name string?)
(s/def ::base-salary (s/int-in 0 1000))
(s/def ::physical-work? boolean?)
(s/def ::employee (s/keys :req [::years ::name ::base-salary ::type]
:opt [::physical-work?]))
(s/fdef salary
:args (s/cat :in ::employee)
:ret number?)
;; ============================================================
(defmulti years-contribution ::type)
(defmethod years-contribution :office-worker [x]
(* 30 (::years x)))
(defmethod years-contribution :field-agent [x]
(let [multiplier (if (::physical-work? x) 40 30)]
(* multiplier (::years x))))
(defn to-string [employee]
(str "My name is " (::name employee) " and I've been working here for " (::years employee) " years."))
(defn office-worker [name years]
{::name name ::years years ::base-salary 800 ::type :office-worker})
(defn field-agent [name years physical-work?]
{::name name ::years years ::base-salary 1000
::physical-work? physical-work? ::type :field-agent})
(defn salary [employee]
(+ (::base-salary employee) (years-contribution employee)))
;;=============== demonstrate =======================
(let [joe (field-agent "joe" 17 false)
jil (field-agent "jil" 17 true)
jack (office-worker "jack" 20)]
(doseq [x [joe jil jack]]
(println (to-string x))
(println (salary x))))

No inheritance, obviously and no types either. Therefore the toString and salary functions are not defined on types, but the entity maps are passed as arguments. Another solution could have been using protocols and defrecords, that would have yielded a more OO-like solution. However for a single function it seemed to be an overkill.

Go solution

package main
import "fmt"
type (
employee interface {
yearContribution() float32
baseSalary() int
toString() string
}
baseEmployee struct {
name string
years int
}
officeWorker struct {
baseEmployee
}
fieldWorker struct {
baseEmployee
physicalWork bool
}
)
func (_ officeWorker) baseSalary() int {
return 500
}
func (_ fieldWorker) baseSalary() int {
return 1000
}
func (o officeWorker) yearContribution() float32 {
return 30 * float32(o.years)
}
func (f fieldWorker) yearContribution() float32 {
if f.physicalWork {
return 40 * float32(f.years)
} else {
return 30 * float32(f.years)
}
}
func salary(e employee) float32 {
return float32(e.baseSalary()) + e.yearContribution()
}
func (e baseEmployee) toString() string {
return fmt.Sprintf("My name is %s and I've been working here for %d years.", e.name, e.years)
}
func main() {
ow := officeWorker{baseEmployee{name:"joe", years:20}}
fw := fieldWorker{baseEmployee: baseEmployee{name:"jack", years:20}, physicalWork: true}
for _, e := range []employee{ow, fw} {
fmt.Println(e.toString())
fmt.Println(salary(e))
}
}

It took me some time to come up with a solution. I Go there are no abstract classes or virtual methods. This required me to define to a separate employee interface and a baseEmployee struct. And I couldn't attach the salary method to the interface either, even though it has both the yearContribution() and the baseSalary() methods on it. It is not necessarily a problem, this could be idiomatic in Go.

Tuesday, 3 January 2017

Solution for "Docker is eating up my disk!"

I've started to play with Docker recently and quickly bumped into a problem. Although I've never experienced it on my Mac, on AWS EC2 the space was eaten up on /dev/xvda1 and killed my Docker process eventually. The solution proved to be running the following commands

    docker stop $(docker ps -a -q) || true
    docker rm $(docker ps -a -q) || true
    docker rmi $(docker images -f "dangling=true" -q) || true
    docker volume ls -qf dangling=true | xargs -r docker volume rm

Saturday, 7 May 2016

New version of Principle

After a year long hiatus, I've touched Principle again. A new version is out, 0.34, that moves all configuration from the clunky pom.xml to a neat yaml file, like the one below


root_package: org.tindalos.principle
checks:
layering:
layers: [infrastructure, app, domain]
violation_threshold: 0
third_party_restrictions:
allowed_libraries:
- layer: infrastructure
libraries: [org.apache.maven, org.json, org.yaml, com.google.common.collect, jdepend]
- layer: domain
libraries: [org.apache.commons]
violation_threshold: 0
package_coupling:
cyclic_dependencies_threshold: 0
acd_threshold: 0.35
modules:
# Map modules to packages
module-definitions:
EXPECTATIONS: [domain.expectations]
CORE: [domain.core]
AGENTSCORE: [domain.agentscore]
AGENTS: [domain.agents, infrastructure.reporters]
CHECKER: [domain.checker]
# Define dependencies between modules
module-dependencies:
EXPECTATIONS: []
CORE: [EXPECTATIONS]
AGENTSCORE: [CORE, EXPECTATIONS]
AGENTS: [CORE,AGENTSCORE,EXPECTATIONS]
CHECKER: [CORE, AGENTSCORE]
violation_threshold: 0
structure_analysis_enabled: true
view raw principle.yml hosted with ❤ by GitHub

The next big thing will be to make it usable a SBT plugin, while also retaining the Maven plugin nature. By looking at the documentation of "Simple" Build Tool, this looks challenging.

Saturday, 9 April 2016

The Reader Monad

Yet another exploration in Monadland. Like the State Monad, its sibling, Read Monad had managed to elude me until I came across an enlightening example in Debashish Gosh's excellent book, Functional and Reactive Domain Modelling. In the following example I'll describe a simple scenario where I'd usually use dependency injection and refactor it to a Reader monad using variant.

Version 1. Dependency Injection

case class User(id: Int)
trait NotificationService {
def notifyUserAboutSth(user: User): Unit
}
trait UserRepository {
def find(id: Int): Option[User]
}
class UserService(
repository: UserRepository,
notificationService: NotificationService) {
def notifyUser(userId: Int): Unit = repository.find(userId) match {
case Some(foundUser) ⇒ notificationService.notifyUserAboutSth(foundUser)
case None ⇒ println("No user found!")
}
}
view raw DIExample.scala hosted with ❤ by GitHub
When one tries to follow FP principles, she strives to build her application up like an onion. The core should contain pure functions, and all interaction with external services - DB, web service calls, user input, ... - , i.e. side-effects, should be confined to the outer layer. In the code above the domain logic and side-effects are undisentanglable. The next version shows an alternative.

Version 2. Higher order function

case class Context(userRepository: UserRepository, notificationService: NotificationService)
object UserService {
def notifyUser(userId: Int): Context ⇒ Unit = context ⇒
context.userRepository.find(userId) match {
case Some(foundUser) ⇒ context.notificationService.notifyUserAboutSth(foundUser)
case None ⇒ println("No user found!")
}
}
//build up the computation
val notify: Context => Unit = UserService.notifyUser(joe)
//fire the side-effects
notify(context)
This is better. `notifyUser` is now a referentially transparent function. The actual execution of the effects is deferred to a later point, when the result function is called with the context. The Reader monad is nothing else just a convenient wrapper around such a function.

Version 3. Reader Monad

case class Reader[R, A](run: R ⇒ A) {
def map[B](f: A ⇒ B): Reader[R, B] = Reader(r ⇒ f(run(r)))
def flatMap[B](f: A ⇒ Reader[R, B]): Reader[R, B] = Reader(r ⇒ f(run(r)).run(r))
}
object UserService {
def notifyUser(user: User): Reader[Context, Unit] = Reader { context ⇒
context.userRepository.find(user.id) match {
case Some(foundUser) ⇒ context.notificationService.notifyUserAboutSth(foundUser)
case None ⇒ println("No user found!")
}
}
}
//build up the computation
val notify: Reader[Context, Unit] = UserService.notifyUser(joe)
//fire the side-effects
notify.run(context)
The benefit Reader monad offers over the simple HOF-solution is the monadic composability, like in the example below.

trait AccountService {
def credit(userId: Int, amount: Int): Reader[Context, Unit]
def debit(userId: Int, amount: Int): Reader[Context, Unit]
}
val service: AccountService = ???
val transfer: Reader[Context, Unit] = for {
_ ← service.credit(1, 10)
_ ← service.debit(2, 10)
} yield ()
//fire the side-effects
transfer.run(context)

Note that inside the for comprehension the context doesn't even appear.

Saturday, 12 March 2016

Struggling with the State Monad

After spending hours trying to find articles explaining the damn thing without resorting to "let's take this contrived example..." or "let's take a real life example, generating pseudo-random numbers" (seriously?!), I'm still left with frustration. Eventually I'd reached out to Kaloz, who pointed me to one of his earlier explorations of the topic. His example is here, but I'm still in the dark why is it any better than a simple foldLeft. In the code below I refactored slightly Kaloz's code to keep the generic part apart from the specific solutions. A second foldLeft-using function is provided to match the signature of the function using the State monad, although I don't see any additional value in that.

package org.bluecollar.scalaz
import scalaz.Scalaz._
import scalaz._
object StateMonadExamples extends App {
//Domain and test values
sealed trait Input
case object Coin extends Input
case object Turn extends Input
case class Machine(locked: Boolean, candies: Int, coins: Int)
private def applyInput(i: Input): (Machine) ⇒ Machine =
(m: Machine) => (i, m) match {
case (_, Machine(_, 0, _)) => m
case (Coin, Machine(false, _, _)) => m
case (Turn, Machine(true, _, _)) => m
case (Coin, Machine(true, candy, coin)) => Machine(false, candy, coin + 1)
case (Turn, Machine(false, candy, coin)) => Machine(true, candy - 1, coin)
}
val inputs = List(Coin, Turn, Coin, Turn, Coin, Turn, Coin, Turn)
val machine = Machine(true, 5, 10)
//Implementation with STATE MONAD starts =============
val state = scalaz.StateT.stateMonad[Machine]
def rules(i: Input): State[Machine, (Int, Int)] = for {
_ <- modify(applyInput(i))
m <- get
} yield (m.coins, m.candies)
def simulateMachine(inputs: List[Input]): State[Machine, (Int, Int)] = for {
_ <- state.sequence(inputs.map(rules))
m <- get[Machine]
} yield (m.coins, m.candies)
def simulationWithStateMonad(inputs: List[Input], machine: Machine) = simulateMachine(inputs)(machine)
//Implementation with STATE MONAD ends =============
//Implementation with foldLeft. A 2-(rather short)liner
def simulationWithFoldLeft(inputs: List[Input], machine: Machine) = inputs.foldLeft(machine) { (m, input) ⇒
applyInput(input)(m)
}
//Implementation with foldLeft, where the result is the same format
def simulationWithFoldLeft2(inputs: List[Input], machine: Machine) = {
def convert(m: Machine) = (m, (m.coins, m.candies))
inputs.foldLeft(convert(machine)) { (m, input) ⇒
convert(applyInput(input)(m._1))
}
}
//Test
println(simulationWithStateMonad(inputs, machine))
//(Machine(true,1,14),(14,1))
println(simulationWithFoldLeft(inputs, machine))
//Machine(true,1,14))
println(simulationWithFoldLeft2(inputs, machine))
//(Machine(true,1,14),(14,1))
}
Kaloz? What am I missing?