The first implementation is in Java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package examples; | |
import java.util.Date; | |
import java.util.concurrent.*; | |
public class ProducersConsumer { | |
private static int PRODUCERS = 10; | |
public static void main(String... args) throws InterruptedException { | |
final BlockingQueue<String> queue = new LinkedBlockingDeque<String>(); | |
final ConcurrentHashMap<String, Integer> container = new ConcurrentHashMap<String, Integer>(); | |
ExecutorService executorService = Executors.newCachedThreadPool(); | |
executorService.execute(createConsumer(queue, container)); | |
for(int i = 0; i< PRODUCERS; i++) { | |
executorService.execute(createProducer(queue, "a" + i)); | |
} | |
System.out.println(new Date() +" All started up\n"); | |
Thread.sleep(1000); | |
System.out.println(new Date() + " Shutting down"); | |
executorService.shutdown(); | |
System.out.println(new Date() + " " + container + "\nEnd"); | |
analyseResults(container); | |
} | |
private static Runnable createProducer(final BlockingQueue<String> queue, final String msg) { | |
return new Runnable() { | |
@Override | |
public void run() { | |
while (true) { | |
queue.add(msg); | |
} | |
} | |
}; | |
} | |
private static Runnable createConsumer(final BlockingQueue<String> queue, final ConcurrentHashMap<String, Integer> container) { | |
return new Runnable() { | |
@Override | |
public void run() { | |
while (true) { | |
try { | |
synchronized (container) { | |
String msg = queue.take(); | |
Integer val = container.get(msg); | |
int newVal = val == null ? 0 : val + 1; | |
container.put(msg, newVal); | |
} | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
} | |
}; | |
} | |
private static void analyseResults(final ConcurrentHashMap<String, Integer> container) { | |
int min = Integer.MAX_VALUE; | |
int max = 0; | |
int sum = 0; | |
for (Integer val : container.values()) { | |
sum += val; | |
min = Math.min(val, min); | |
max = Math.max(val,max); | |
} | |
System.out.println("Number of active producers: " + container.keySet().size()); | |
System.out.println("Number of messages: " + sum); | |
System.out.println("Laziest producer: " + min); | |
System.out.println("Busiest producer: " + max); | |
} | |
} | |
//Results for 10 producers | |
//Number of active producers: 10 | |
//Number of messages: 97521 | |
//Laziest producer: 6951 | |
//Busiest producer: 12150 | |
//Results for 100 producers | |
//Number of active producers: 100 | |
//Number of messages: 4183 | |
//Laziest producer: 10 | |
//Busiest producer: 143 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns clojure_study.other.producer-consumer | |
(:require [clojure.core.async :as a])) | |
(defn consume! [msg container-atom] | |
(let [current (get @container-atom msg 0)] | |
(swap! container-atom assoc msg (inc current)))) | |
(defn produce! | |
"Puts messages in the channel" | |
[msg a-channel] | |
(a/go-loop [] | |
(a/>! a-channel msg) | |
(recur))) | |
(defn start-consumer! | |
"Consumes messages from the channel and puts them in a container grouped by content" | |
[a-channel container-atom] | |
(a/go-loop [] | |
(when-let [msg (a/<! a-channel)] | |
(consume! msg container-atom) | |
(recur)))) | |
(defn start-producers! | |
"Start a given number of producers on the given channel" | |
[num-of-producers a-channel] | |
(let [producers (atom [])] | |
(doseq [x (range num-of-producers) | |
:let [msg (str "a" x)]] | |
(let [producer (produce! msg a-channel)] | |
(swap! producers conj producer))) | |
producers)) | |
(defn analyse [result] | |
(let [num-of-actives (-> result keys count) | |
sum (reduce + (vals result)) | |
min-val (->> result vals (apply min)) | |
max-val (->> result vals (apply max))] | |
(println (str | |
"Number of active producers:" num-of-actives | |
"\nNumber of messages:" sum | |
"\nLaziest producer:" min-val | |
"\nBusiest producer:" max-val)))) | |
(def container (atom {})) | |
(def QUEUE (a/chan)) | |
(def PRODUCER-NUM 10) | |
(start-consumer! QUEUE container) | |
(def producers (start-producers! PRODUCER-NUM QUEUE)) | |
;;wait a second | |
(a/<!! (a/timeout 1000)) | |
(doseq [producer @producers] | |
(a/close! producer)) | |
(analyse @container) | |
;Results for 10 producers | |
; Number of active producers:10 | |
; Number of messages:187828 | |
; Laziest producer:18667 | |
; Busiest producer:18961 | |
;Results for 100 producers | |
; Number of active producers:100 | |
; Number of messages:127944 | |
; Laziest producer:1277 | |
; Busiest producer:1282 |
The Java one is really struggling under 100 producers. I haven't done any tuning on the ExecutorService, though, but in the Clojure version I didn't even need to. The latter is distributing the load extremely evenly and reaches a much higher throughput than the Java version.
No comments :
Post a Comment