1+ (ns stress-test
2+ (:require [aux :as a]
3+ [clojure.core.async :as async]
4+ [clojure.data.fressian :as fr]
5+ [clojure.edn :as edn]
6+ [metrics :as m])
7+ (:import [io.prometheus.metrics.core.datapoints TimerApi]
8+ [org.fressian.handlers ConvertList]))
9+
10+ (def read-handlers
11+ {" fressian/list"
12+ (reify ConvertList
13+ (convertList [_ items]
14+ (vec items)))})
15+
16+ (defn gen-vec [size]
17+ (into [] (for [_ (range size)]
18+ (random-uuid ))))
19+
20+ (defn gen-map [size]
21+ (into {} (for [_ (range size)]
22+ {(random-uuid ) (random-uuid )})))
23+
24+ (defmacro with-duration [metric & body]
25+ `(let [t# (.startTimer ~metric)
26+ st# (System/currentTimeMillis )]
27+ (try
28+ ~@body
29+ (- (System/currentTimeMillis ) st#)
30+ (finally
31+ (.close t#)))))
32+
33+ (defn stress-test [{:keys [struct-type struct-size convert-list?] :as argmap}]
34+ (let [gen-struct-fn (case struct-type
35+ 'map gen-map
36+ 'vec gen-vec)
37+ metric ^TimerApi (-> (:test-metric (m/initialize-and-return-metrics ))
38+ (.labelValues (into-array String (map str [struct-type struct-size
39+ (if convert-list?
40+ " enabled" " disabled" )]))))
41+ ; pass custom ConvertList or fallback to defualt read handlers in data.fressian
42+ fr-handlers (if convert-list?
43+ (-> fr/clojure-read-handlers (merge read-handlers) (fr/associative-lookup ))
44+ (fr/associative-lookup fr/clojure-read-handlers))]
45+ ; We need to wait for Prometheus to pick up monitoring
46+ (Thread/sleep ^long (* 1000 60 1 ))
47+ (a/info (merge {:event :starting-stress-test } argmap))
48+ (let [work-chan (async/chan 1 )
49+ timeout-chan (async/timeout (* 1000 60 15 ))]
50+ (loop [runs 0
51+ proc-time 0 ]
52+ (let [structs (repeatedly 1000 #(gen-struct-fn struct-size))
53+ ; put generated structs fressianed into the work channel as a single coll
54+ _ (async/put! work-chan (for [st structs] (fr/write st)))
55+ [val port] (async/alts!! [work-chan timeout-chan])]
56+ (cond
57+ (= port timeout-chan)
58+ (a/info (merge {:event :stress-test-complete :runs runs :processing-time proc-time} argmap))
59+
60+ (= port work-chan)
61+ ; measure time to read all structs in work chan
62+ (let [dur (with-duration metric (doseq [fs val]
63+ (fr/read fs :handlers fr-handlers)))]
64+ (recur (inc runs) (+ proc-time dur)))))))))
65+
66+ (defn -main [& args]
67+ (let [parsed-args (map edn/read-string args)
68+ argmap (zipmap [:struct-type :struct-size :convert-list? ] parsed-args)]
69+ (stress-test argmap)
70+ (System/exit 0 )))
0 commit comments