Monday, 24 November 2014

Procuder-consumer example in Java and Clojure

I want to implement a very simply consumer-producer app. One consumer consuming from a large number of producers. I would like to see how much throughput is possible in accordance with the number of producers.

The first implementation is in Java
package examples;
import java.util.Date;
import java.util.concurrent.*;
public class ProducersConsumer {
private static int PRODUCERS = 10;
public static void main(String... args) throws InterruptedException {
final BlockingQueue<String> queue = new LinkedBlockingDeque<String>();
final ConcurrentHashMap<String, Integer> container = new ConcurrentHashMap<String, Integer>();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(createConsumer(queue, container));
for(int i = 0; i< PRODUCERS; i++) {
executorService.execute(createProducer(queue, "a" + i));
}
System.out.println(new Date() +" All started up\n");
Thread.sleep(1000);
System.out.println(new Date() + " Shutting down");
executorService.shutdown();
System.out.println(new Date() + " " + container + "\nEnd");
analyseResults(container);
}
private static Runnable createProducer(final BlockingQueue<String> queue, final String msg) {
return new Runnable() {
@Override
public void run() {
while (true) {
queue.add(msg);
}
}
};
}
private static Runnable createConsumer(final BlockingQueue<String> queue, final ConcurrentHashMap<String, Integer> container) {
return new Runnable() {
@Override
public void run() {
while (true) {
try {
synchronized (container) {
String msg = queue.take();
Integer val = container.get(msg);
int newVal = val == null ? 0 : val + 1;
container.put(msg, newVal);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
}
private static void analyseResults(final ConcurrentHashMap<String, Integer> container) {
int min = Integer.MAX_VALUE;
int max = 0;
int sum = 0;
for (Integer val : container.values()) {
sum += val;
min = Math.min(val, min);
max = Math.max(val,max);
}
System.out.println("Number of active producers: " + container.keySet().size());
System.out.println("Number of messages: " + sum);
System.out.println("Laziest producer: " + min);
System.out.println("Busiest producer: " + max);
}
}
//Results for 10 producers
//Number of active producers: 10
//Number of messages: 97521
//Laziest producer: 6951
//Busiest producer: 12150
//Results for 100 producers
//Number of active producers: 100
//Number of messages: 4183
//Laziest producer: 10
//Busiest producer: 143
The Clojure version
(ns clojure_study.other.producer-consumer
(:require [clojure.core.async :as a]))
(defn consume! [msg container-atom]
(let [current (get @container-atom msg 0)]
(swap! container-atom assoc msg (inc current))))
(defn produce!
"Puts messages in the channel"
[msg a-channel]
(a/go-loop []
(a/>! a-channel msg)
(recur)))
(defn start-consumer!
"Consumes messages from the channel and puts them in a container grouped by content"
[a-channel container-atom]
(a/go-loop []
(when-let [msg (a/<! a-channel)]
(consume! msg container-atom)
(recur))))
(defn start-producers!
"Start a given number of producers on the given channel"
[num-of-producers a-channel]
(let [producers (atom [])]
(doseq [x (range num-of-producers)
:let [msg (str "a" x)]]
(let [producer (produce! msg a-channel)]
(swap! producers conj producer)))
producers))
(defn analyse [result]
(let [num-of-actives (-> result keys count)
sum (reduce + (vals result))
min-val (->> result vals (apply min))
max-val (->> result vals (apply max))]
(println (str
"Number of active producers:" num-of-actives
"\nNumber of messages:" sum
"\nLaziest producer:" min-val
"\nBusiest producer:" max-val))))
(def container (atom {}))
(def QUEUE (a/chan))
(def PRODUCER-NUM 10)
(start-consumer! QUEUE container)
(def producers (start-producers! PRODUCER-NUM QUEUE))
;;wait a second
(a/<!! (a/timeout 1000))
(doseq [producer @producers]
(a/close! producer))
(analyse @container)
;Results for 10 producers
; Number of active producers:10
; Number of messages:187828
; Laziest producer:18667
; Busiest producer:18961
;Results for 100 producers
; Number of active producers:100
; Number of messages:127944
; Laziest producer:1277
; Busiest producer:1282

The Java one is really struggling under 100 producers. I haven't done any tuning on the ExecutorService, though, but in the Clojure version I didn't even need to. The latter is distributing the load extremely evenly and reaches a much higher throughput than the Java version.

No comments :

Post a Comment