Thursday, 27 November 2014

Swarm - I

I've read a lot of interesting stuff about swarm-intelligence and a couple of weeks ago the idea suddenly came to implement something in Clojure. The full source code is in Github (the application can be run from the app.clj), here I'd like to show how the story has evolved step by step. The first part of the task was to implement some kind of "vector algebra" to model the moves of entities.


(ns
^{:author mate.magyari
:doc "Simple vector algebra"}
clojure-study.ideas.swarm.vector-algebra
(:require [clojure.test :as test]))
(defn square "Square of"
[num]
(* num num))
(defn v* "Multiplication for vectors"
[scalar a-vector]
{:x (* scalar (:x a-vector))
:y (* scalar (:y a-vector))})
(defn v+ "Sum vector of vectors"
[& vectors]
(apply merge-with + vectors))
(defn v- "Diff vector of vectors"
[v1 v2]
(merge-with - v1 v2))
(defn magnitude [{x :x y :y}] "Length of a vector"
(Math/sqrt
(+ (square x) (square y))))
(defn polar->cartesian
"Transforming a polar vector representation to a cartesian one"
[{angle :angle magnitude :magnitude}]
{:x (* magnitude (Math/cos angle))
:y (* magnitude (Math/sin angle))})
(defn cartesian->polar
"Transforming a cartesian vector representation to a polar one"
[v]
{:angle (Math/atan2 (:y v) (:x v))
:magnitude (magnitude v)})
(defn rotate-cartesian "Rotating a vector"
[v angle]
(-> v
cartesian->polar
(update-in [:angle] + angle)
polar->cartesian))
(defn distance "Distance between 2 points"
[point-from point-to]
(magnitude
(merge-with - point-to point-from)))
(defn null-vector? "Checks whether the vector is a null vector"
[v]
(or (= v {:x 0 :y 0})
(= v {:x 0.0 :y 0.0})))
(defn normalize "Normalize the vector"
[a-vector]
(let [len (magnitude a-vector)]
(if (= 0.0 len)
a-vector ; null vector simply returned
(let [div-len #(/ % len)]
(-> a-vector
(update-in [:x] div-len)
(update-in [:y] div-len))))))
(defn direction-vector
"(Normalized) direction vector from point A to point B"
[point-from point-to]
(normalize (merge-with - point-to point-from)))
(defn weight-point "Calculates the weight point of the points"
[& points]
(let [n (count points)
sums (apply v+ points)]
(-> sums
(update-in [:x] #(/ % n))
(update-in [:y] #(/ % n)))))
;;============== TESTS ==================
(defn is= [a b]
(test/is (= a b)))
(defn is-close-enough [vec-1 vec-2]
(test/is (> 0.001
(magnitude
(merge-with - vec-1 vec-2)))))
(test/deftest some-tests
(is= 5.0 (magnitude {:x 4 :y 3}))
(is= 5.0 (distance {:x 1 :y 3} {:x 4 :y 7}))
(is= {:x 1.0 :y 0.0} (normalize {:x 4 :y 0}))
(is= {:x 0.0 :y 1.0} (normalize {:x 0 :y 6}))
(is-close-enough {:x (Math/sqrt 0.5) :y (Math/sqrt 0.5)} (normalize {:x 8 :y 8}))
(is= {:x 0.0 :y 1.0} (direction-vector {:x 2 :y 6} {:x 2 :y 8}))
(is= {:x -1.0 :y 0.0} (direction-vector {:x 8 :y 6} {:x 5 :y 6}))
(is= {:x 15 :y 20} (v+ {:x 4 :y 6} {:x 6 :y 10} {:x 5 :y 4})))
(test/deftest cartesian->polar-test
(is= {:angle 0.0, :magnitude 1.0} (cartesian->polar {:x 1 :y 0}))
(is= {:angle Math/PI, :magnitude 1.0} (cartesian->polar {:x -1 :y 0}))
(is-close-enough {:x 3 :y 7} (-> {:x 3 :y 7} cartesian->polar polar->cartesian)))
(test/deftest rotate-test
(is-close-enough {:x 0 :y 1} (rotate-cartesian {:x 1 :y 0} (/ Math/PI 2)))
(is-close-enough {:x -1 :y 0} (rotate-cartesian {:x 1 :y 0} Math/PI))
(is-close-enough {:x 0 :y -1} (rotate-cartesian {:x -1 :y 0} (/ Math/PI 2)))
(is-close-enough {:x 1 :y 0} (rotate-cartesian {:x -1 :y 0} Math/PI))
)
(test/deftest weight-point-test
(is-close-enough {:x 4 :y 6} (weight-point {:x 1 :y 4} {:x 7 :y 8})))
(test/run-tests 'clojure-study.ideas.swarm.vector-algebra)

Monday, 24 November 2014

Procuder-consumer example in Java and Clojure

I want to implement a very simply consumer-producer app. One consumer consuming from a large number of producers. I would like to see how much throughput is possible in accordance with the number of producers.

The first implementation is in Java
package examples;
import java.util.Date;
import java.util.concurrent.*;
public class ProducersConsumer {
private static int PRODUCERS = 10;
public static void main(String... args) throws InterruptedException {
final BlockingQueue<String> queue = new LinkedBlockingDeque<String>();
final ConcurrentHashMap<String, Integer> container = new ConcurrentHashMap<String, Integer>();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(createConsumer(queue, container));
for(int i = 0; i< PRODUCERS; i++) {
executorService.execute(createProducer(queue, "a" + i));
}
System.out.println(new Date() +" All started up\n");
Thread.sleep(1000);
System.out.println(new Date() + " Shutting down");
executorService.shutdown();
System.out.println(new Date() + " " + container + "\nEnd");
analyseResults(container);
}
private static Runnable createProducer(final BlockingQueue<String> queue, final String msg) {
return new Runnable() {
@Override
public void run() {
while (true) {
queue.add(msg);
}
}
};
}
private static Runnable createConsumer(final BlockingQueue<String> queue, final ConcurrentHashMap<String, Integer> container) {
return new Runnable() {
@Override
public void run() {
while (true) {
try {
synchronized (container) {
String msg = queue.take();
Integer val = container.get(msg);
int newVal = val == null ? 0 : val + 1;
container.put(msg, newVal);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
}
private static void analyseResults(final ConcurrentHashMap<String, Integer> container) {
int min = Integer.MAX_VALUE;
int max = 0;
int sum = 0;
for (Integer val : container.values()) {
sum += val;
min = Math.min(val, min);
max = Math.max(val,max);
}
System.out.println("Number of active producers: " + container.keySet().size());
System.out.println("Number of messages: " + sum);
System.out.println("Laziest producer: " + min);
System.out.println("Busiest producer: " + max);
}
}
//Results for 10 producers
//Number of active producers: 10
//Number of messages: 97521
//Laziest producer: 6951
//Busiest producer: 12150
//Results for 100 producers
//Number of active producers: 100
//Number of messages: 4183
//Laziest producer: 10
//Busiest producer: 143
The Clojure version
(ns clojure_study.other.producer-consumer
(:require [clojure.core.async :as a]))
(defn consume! [msg container-atom]
(let [current (get @container-atom msg 0)]
(swap! container-atom assoc msg (inc current))))
(defn produce!
"Puts messages in the channel"
[msg a-channel]
(a/go-loop []
(a/>! a-channel msg)
(recur)))
(defn start-consumer!
"Consumes messages from the channel and puts them in a container grouped by content"
[a-channel container-atom]
(a/go-loop []
(when-let [msg (a/<! a-channel)]
(consume! msg container-atom)
(recur))))
(defn start-producers!
"Start a given number of producers on the given channel"
[num-of-producers a-channel]
(let [producers (atom [])]
(doseq [x (range num-of-producers)
:let [msg (str "a" x)]]
(let [producer (produce! msg a-channel)]
(swap! producers conj producer)))
producers))
(defn analyse [result]
(let [num-of-actives (-> result keys count)
sum (reduce + (vals result))
min-val (->> result vals (apply min))
max-val (->> result vals (apply max))]
(println (str
"Number of active producers:" num-of-actives
"\nNumber of messages:" sum
"\nLaziest producer:" min-val
"\nBusiest producer:" max-val))))
(def container (atom {}))
(def QUEUE (a/chan))
(def PRODUCER-NUM 10)
(start-consumer! QUEUE container)
(def producers (start-producers! PRODUCER-NUM QUEUE))
;;wait a second
(a/<!! (a/timeout 1000))
(doseq [producer @producers]
(a/close! producer))
(analyse @container)
;Results for 10 producers
; Number of active producers:10
; Number of messages:187828
; Laziest producer:18667
; Busiest producer:18961
;Results for 100 producers
; Number of active producers:100
; Number of messages:127944
; Laziest producer:1277
; Busiest producer:1282

The Java one is really struggling under 100 producers. I haven't done any tuning on the ExecutorService, though, but in the Clojure version I didn't even need to. The latter is distributing the load extremely evenly and reaches a much higher throughput than the Java version.

Thursday, 6 November 2014

Dynamic typing - Timeout

Another piece of useful functionality made easy by dynamic typing and clojure.async. Adding a timeout to a function.


(ns clojure-study.ideas.timeouter
(:require [clojure.core.async :as async]))
(defn add-timeout
"f - the original function
timeout-ms - timeout in milliseconds
timeout-value - the value the function should return in case of timeout"
[f timeout-ms timeout-value]
(fn [& args]
(let [timeout-channel (async/timeout timeout-ms)
result-channel (async/thread (apply f args))
[r c] (async/alts!! [timeout-channel result-channel])]
(if (= c timeout-channel)
timeout-value ;; if timeout
r)))) ;; if successful
;; ----- THAT'S IT. LET'S SEE A DEMONSTRATION ------------
;a slow function
(defn slow-fn []
(Thread/sleep 100)
:slow)
;a fast function
(defn fast-fn []
(Thread/sleep 10)
:fast)
;add 50 ms timeouts to both, returning the :timeout fall-back value
(def slow-with-timeout (add-timeout slow-fn 50 :timeout))
(def fast-with-timeout (add-timeout fast-fn 50 :timeout))
;see the results
(println "Here comes the slow:" (slow-with-timeout))
;Here comes the slow: :timeout
(println "Here comes the fast:" (fast-with-timeout))
;Here comes the fast: :fast
view raw timeouter.clj hosted with ❤ by GitHub