diff --git a/.gitignore b/.gitignore index 8fc68fb..0c629e0 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,5 @@ pom.xml.asc *.swo *.swn *~ +/.idea +*.iml \ No newline at end of file diff --git a/env/profiling/hitchhiker/bench.clj b/env/profiling/hitchhiker/bench.clj index d47cccc..f656c7b 100644 --- a/env/profiling/hitchhiker/bench.clj +++ b/env/profiling/hitchhiker/bench.clj @@ -4,6 +4,7 @@ [clojure.tools.cli :refer [parse-opts]] [excel-templates.build :as excel] [hitchhiker.redis :as redis] + [hitchhiker.s3 :as s3] [hitchhiker.tree.core :as core] [hitchhiker.tree.messaging :as msg]) (:import [java.io File FileWriter])) @@ -129,11 +130,10 @@ :validate [#(#{"fractal" "b-tree" "sorted-set"} %) "Data structure must be fractal, b-tree, or sorted set"]] [nil "--backend testing" "Runs the benchmark with the specified backend" :default "testing" - :validate [#(#{"redis" "testing"} %) "Backend must be redis or testing"]] + :validate [#(#{"redis" "testing" "s3"} %) "Backend must be redis, s3 or testing"]] ["-d" "--delete-pattern PATTERN" "Specifies how the operations will be reordered on delete" :default "forward" - :validate [#(#{"forward" "reverse" "shuffle" "zero"} %) "Incorrect delete pattern"] - ] + :validate [#(#{"forward" "reverse" "shuffle" "zero"} %) "Incorrect delete pattern"]] [nil "--sorted-set" "Runs the benchmarks on a sorted set"] ["-b" "--tree-width WIDTH" "Determines the width of the trees. Fractal trees use sqrt(b) child pointers; the rest is for messages." :default 300 @@ -143,6 +143,8 @@ :default 1000 :parse-fn #(Long. %) :validate [pos? "flush frequency must be positive"]] + [nil "--bucket STRING" "The S3 bucket to use." + :default "hitchhiker-tree-s3-test"] ["-h" "--help" "Prints this help"]]) (defn exit @@ -171,7 +173,8 @@ "" "Backends:" "testing: this backend serializes nothing, just using an extra indirection" - "redis: this backend uses a local redis server"])) + "redis: this backend uses a local redis server" + "s3: this backend uses an S3 bucket"])) (defn make-template-for-one-tree-freq-combo [list-of-benchmark-results filter-by] @@ -215,7 +218,8 @@ (let [backend (case (:backend options) "testing" (core/->TestingBackend) "redis" (do (redis/start-expiry-thread!) - (redis/->RedisBackend))) + (redis/->RedisBackend)) + "s3" (s3/->S3Backend (:bucket options))) delete-xform (case (:delete-pattern options) "forward" identity "reverse" reverse diff --git a/project.clj b/project.clj index c3b1d2c..7a0dfc3 100644 --- a/project.clj +++ b/project.clj @@ -7,11 +7,17 @@ [org.clojure/core.memoize "0.5.8"] [com.taoensso/carmine "2.12.2"] [org.clojure/core.rrb-vector "0.0.11"] + [amazonica "0.3.75" + :exclusions [com.amazonaws/aws-java-sdk]] + [com.amazonaws/aws-java-sdk-core "1.11.26"] + [com.amazonaws/aws-java-sdk-s3 "1.11.26"] + [org.clojure/core.rrb-vector "0.0.11"] [org.clojure/core.cache "0.6.5"]] :aliases {"bench" ["with-profile" "profiling" "run" "-m" "hitchhiker.bench"]} :jvm-opts ["-server" "-Xmx3700m" "-Xms3700m"] :profiles {:test - {:dependencies [[org.clojure/test.check "0.9.0"]]} + {:dependencies [[org.clojure/test.check "0.9.0"] + [org.clojure/tools.nrepl "0.2.11"]]} :profiling {:main hitchhiker.bench :source-paths ["env/profiling"] diff --git a/src/hitchhiker/s3.clj b/src/hitchhiker/s3.clj new file mode 100644 index 0000000..ba4cd47 --- /dev/null +++ b/src/hitchhiker/s3.clj @@ -0,0 +1,102 @@ +(ns hitchhiker.s3 + (:require [amazonica.aws.s3 :as s3] + [clojure.string :refer [split]] + [hitchhiker.tree.core :as core] + [taoensso.nippy :as nippy]) + (:import [com.google.common.io ByteStreams] + [java.util UUID] + [java.io ByteArrayInputStream])) + +;;; Main node data stored in uuid key +;;; bucket-name/8E3806E4-865D-43C8-A823-9CCF7D9D88CB +;;; References from node -> node uses sub object: +;;; bucket-name/F9FD37E5-5BF3-4681-93A0-A56E9823A068/->2793532B-8FC9-4E0D-8CD4-19AE87F3133E +;;; References to node uses sub object: +;;; bucket-name/0834FB96-CC7A-4531-8A8E-B9FB811F5D5B/<-A38D2654-EDF2-460A-9D88-388DEC77AA05 + +(defrecord S3Addr [last-key bucket key storage-addr] + core/IResolve + (dirty? [_] false) + (last-key [_] last-key) + (resolve [_] + (with-open [in (:input-stream (s3/get-object bucket key))] + (nippy/thaw (ByteStreams/toByteArray in))))) + +(defn synthesize-storage-addr + [bucket key] + (doto (promise) + (deliver {:bucket bucket :key key}))) + +(defn s3-addr + [last-key bucket key] + (->S3Addr last-key bucket key (synthesize-storage-addr bucket key))) + +(nippy/extend-thaw :b-tree/s3-addr + [data-input] + (let [last-key (nippy/thaw-from-in! data-input) + bucket (nippy/thaw-from-in! data-input) + key (nippy/thaw-from-in! data-input)] + (s3-addr last-key bucket key))) + +(nippy/extend-freeze S3Addr :b-tree/s3-addr + [{:keys [last-key bucket key]} data-output] + (nippy/freeze-to-out! data-output last-key) + (nippy/freeze-to-out! data-output bucket) + (nippy/freeze-to-out! data-output key)) + +(defn write-object + [bucket key bytes] + (println "write-object" bucket key (count bytes)) + (s3/put-object :bucket-name bucket + :key key + :metadata {:content-length (count bytes)} + :input-stream (ByteArrayInputStream. bytes))) + +(defn delete-object + [bucket key] + #_(doall (for [other-key (map :key (:object-summaries + (s3/list-objects :bucket-name bucket + :prefix (str key "/->"))))] + (s3/delete-object :bucket-name bucket + :key (str (last (split other-key "/->")) "/<-" key)) + ; TODO: delete other-key if no refs? + )) + #_(doall (for [other-key (map :key (:object-summaries + (s3/list-objects :bucket-name bucket + :prefix (str key "/<-"))))] + (s3/delete-object :bucket-name bucket + :key (str (last (split other-key "/<-")) "/->" key)))) + (s3/delete-object :bucket-name bucket :key key)) + +(comment + (defn add-refs + [node-key child-keys] + (doall + (for [{:keys [bucket key]} child-keys] + (do + (write-object bucket (str node-key "/->" key) (byte-array 0)) + (write-object bucket (str key "/<-" node-key) (byte-array 0)))))) +) + +(defrecord S3Backend [#_service bucket] + core/IBackend + (new-session [_] + (atom {:writes 0 :deletes 0})) + (anchor-root [_ node] + ;; maybe we could use versioning and object expiration to + ;; handle this? For now don't expire anything :-P + node) + (write-node [_ node session] + (swap! session update-in [:writes] inc) + (let [key (UUID/randomUUID) + addr (s3-addr (core/last-key node) bucket key)] + (write-object bucket key (nippy/freeze node)) + #_(when (core/index-node? node) + (add-refs key + (for [child (:children node) + :let [child-key @(:storage-addr child)]] + child-key))) + addr)) + (delete-addr [_ addr session] + (swap! session update-in [:deletes] inc) + (delete-object (:bucket addr) (:key addr)))) diff --git a/src/hitchhiker/tracing_gc.clj b/src/hitchhiker/tracing_gc.clj new file mode 100644 index 0000000..2c21f67 --- /dev/null +++ b/src/hitchhiker/tracing_gc.clj @@ -0,0 +1,69 @@ +(ns hitchhiker.tracing-gc + (:require [hitchhiker.tree.core :as hh])) + +;; Note: this implementation is single-threaded, and could be made parallel without too much effort + +;; We might need to trace millions or billions of keys. That might not fit in memory, so this could be backed +;; by leveldb or hsql so that we can spill to disk when necessary. We don't need a functional datastructure here. +(defprotocol IGCScratch + (add-to-work-queue! [this addr] "Adds the given address to the work queue to be processed") + (pop-from-work-queue! [this] "Pops the next element off of the work queue, or returns nil if we're done") + (observe-addr! [this addr] "Marks the given addr as being currently active") + (observed? [this addr] "Returns true if the given addr was observed")) +; +;;; The workq is a ref containing a collection of addresses we still need to scan. +;;; The observed-set is a ref containing the set of addresses we know are active +;;; For simplicity, adding an addr to the workq automatically observes it as well +;;; ^^ this allows us to only add new addrs to the workq, without a separate set of "in workq" +(defrecord InMemScratch [workq observed-set] + IGCScratch + (add-to-work-queue! [_ addr] + (dosync + (when-not (contains? @observed-set addr) + (alter workq conj addr) + (alter observed-set conj addr)))) + (pop-from-work-queue! [_] + (dosync + (when (seq @workq) + (let [head (peek @workq)] + (alter workq pop) + head)))) + (observe-addr! [_ addr] + (dosync + (alter observed-set conj addr))) + (observed? [_ addr] + (contains? @observed-set addr))) + +(defn in-mem-scratch + "Creates an instance of in memory GC scratch" + [] + (->InMemScratch (ref []) (ref #{}))) + +(defn trace-gc! + "Does a tracing GC and frees up all unused keys. + This is a simple mark-sweep algorithm. + + gc-scratch should be an instance of IGCScratch + gc-roots should be a list of the roots, which should implement IResolve. These are generated by calls to anchor-root. + all-keys should be a lazy sequence that will contain every key in storage. This algorithm will not hold the whole sequence in memory + delete-fn will be called on every key that should be deleted during the sweep phase" + [gc-scratch gc-roots all-keys delete-fn] + ;; First, we'll initialize the work queue + (doseq [root gc-roots] + (add-to-work-queue! gc-scratch root)) + ;; Now, we'll do the mark phase + (loop [] + (when-let [addr (pop-from-work-queue! gc-scratch)] + (observe-addr! gc-scratch addr) + (when (hh/index? addr) + (let [node (hh/resolve addr)] + (doseq [c (:children node)] + (add-to-work-queue! gc-scratch c)))) + (recur))) + ;; Next, we do the sweep + (loop [ks all-keys] + (when (seq ks) + (let [head (first ks)] + (when-not (observed? gc-scratch head) + (delete-fn head))) + (recur (next ks))))) diff --git a/test/hitchhiker/s3_test.clj b/test/hitchhiker/s3_test.clj new file mode 100644 index 0000000..2c12b52 --- /dev/null +++ b/test/hitchhiker/s3_test.clj @@ -0,0 +1,163 @@ +(ns hitchhiker.s3-test + (:require [amazonica.core :refer [with-credential]] + [amazonica.aws.s3 :as s3] + [clojure.test :refer [use-fixtures]] + [clojure.test.check.clojure-test :refer [defspec]] + [clojure.test.check.generators :as gen] + [clojure.test.check.properties :as prop] + [hitchhiker.s3 :as hhs3] + [hitchhiker.tree.core :as core] + hitchhiker.tree.core-test + [hitchhiker.tree.messaging :as msg])) + +(defn setup-fake-s3 + [f] + (let [port (+ 10000 (rand-int 1024)) + test-dir (str "s3-test-" (gensym)) + proc (.exec (Runtime/getRuntime) (into-array ^String ["fakes3" + "server" + "-p" (str port) + "-r" test-dir]))] + (with-credential {:endpoint (str "http://localhost:" port)} + (s3/set-s3client-options :path-style-access true) + (s3/create-bucket "test-bucket") + (f)) + (.destroy proc) + (let [func (fn [func f] + (when (.isDirectory f) + (doseq [f2 (.listFiles f)] + (func func f2))) + (clojure.java.io/delete-file f))] + (func func (clojure.java.io/file test-dir))))) + +(use-fixtures :once setup-fake-s3) + +(defn insert + [t k] + (msg/insert t k k)) + +(defn lookup-fwd-iter + [t v] + (seq (map first (msg/lookup-fwd-iter t v)))) + +(defn mixed-op-seq + "This is like the basic mixed-op-seq tests, but it also mixes in flushes to redis + and automatically deletes the old tree" + [add-freq del-freq flush-freq universe-size num-ops] + (prop/for-all [ops (gen/vector (gen/frequency + [[add-freq (gen/tuple (gen/return :add) + (gen/no-shrink gen/int))] + [flush-freq (gen/return [:flush])] + [del-freq (gen/tuple (gen/return :del) + (gen/no-shrink gen/int))]]) + 0)] + (assert (let [ks (:object-summaries (s3/list-objects "test-bucket"))] + (empty? ks)) + "Start with no keys") + (let [[b-tree root set] + (reduce (fn [[t root set] [op x]] + (let [x-reduced (when x (mod x universe-size))] + (condp = op + :flush (let [t (:tree (core/flush-tree t (hhs3/->S3Backend "test-bucket")))] + #_(when root + (wcar {} (redis/drop-ref root))) + #_(println "flush") + [t @(:storage-addr t) set]) + :add (do #_(println "add") [(insert t x-reduced) root (conj set x-reduced)]) + :del (do #_(println "del") [(msg/delete t x-reduced) root (disj set x-reduced)])))) + [(core/b-tree (core/->Config 3 3 2)) nil #{}] + ops)] + #_(println "Make it to the end of a test, tree has" (count (lookup-fwd-iter b-tree -1)) "keys left") + (let [b-tree-order (lookup-fwd-iter b-tree -1) + res (= b-tree-order (seq (sort set)))] + #_(wcar {} (redis/drop-ref root)) + (assert (let [ks (:object-summaries (s3/list-objects "test-bucket"))] + (empty? ks)) + "End with no keys") + (assert res (str "These are unequal: " (pr-str b-tree-order) " " (pr-str (seq (sort set))))) + res)))) + +(defspec test-many-keys-bigger-trees + 100 + (mixed-op-seq 800 200 10 1000 1000)) + +(comment + (test-many-keys-bigger-trees) + + + (count (remove (reduce (fn [t [op x]] + (let [x-reduced (when x (mod x 1000))] + (condp = op + :flush t + :add (conj t x-reduced) + :del (disj t x-reduced)))) + #{} + (drop-last 2 opseq)) (lookup-fwd-iter (msg/delete test-tree -33) 0))) + (:op-buf test-tree) + (count (sort (reduce (fn [t [op x]] + (let [x-reduced (when x (mod x 1000))] + (condp = op + :flush t + :add (conj t x-reduced) + :del (disj t x-reduced)))) + #{} + opseq))) + + + (let [ops (->> (read-string (slurp "broken-data.edn")) + (map (fn [[op x]] [op (mod x 100000)])) + (drop-last 125))] + (let [[b-tree s] (reduce (fn [[t s] [op x]] + (let [x-reduced (mod x 100000)] + (condp = op + :add [(insert t x-reduced) + (conj s x-reduced)] + :del [(msg/delete t x-reduced) + (disj s x-reduced)]))) + [(core/b-tree (core/->Config 3 3 2)) #{}] + ops)] + (println ops) + (println (->> (read-string (slurp "broken-data.edn")) + (map (fn [[op x]] [op (mod x 100000)])) + (take-last 125) + first)) + (println (lookup-fwd-iter b-tree -1)) + (println (sort s)) + )) + (defn trial [] + (let [opseq (read-string (slurp "broken-data.edn")) + [b-tree root] (reduce (fn [[t root] [op x]] + (let [x-reduced (when x (mod x 1000))] + (condp = op + :flush (let [_ (println "About to flush...") + t (:tree (core/flush-tree t (hhs3/->S3Backend "test-bucket")))] + #_(when root + (wcar {} (redis/drop-ref root))) + (println "flushed") + [t @(:storage-addr t)]) + :add (do (println "about to add" x-reduced "...") + (let [x [(insert t x-reduced) root]] + (println "added") x + )) + :del (do (println "about to del" x-reduced "...") + (let [x [(msg/delete t x-reduced) root]] + (println "deled") x))))) + [(core/b-tree (core/->Config 3 3 2))] + opseq)] + (def test-tree b-tree) + (println "Got diff" + (count (remove (reduce (fn [t [op x]] + (let [x-reduced (when x (mod x 1000))] + (condp = op + :flush t + :add (conj t x-reduced) + :del (disj t x-reduced)))) + #{} + opseq) (lookup-fwd-iter test-tree 0)))) + (println "balanced?" (hitchhiker.tree.core-test/check-node-is-balanced test-tree)) + (def my-root root))) + + (map #(and (second %) (mod (second %) 1000)) opseq) + + + (def opseq (read-string (io/resource "redis_test_data.clj")))) diff --git a/test/hitchhiker/tracing_gc_test.clj b/test/hitchhiker/tracing_gc_test.clj new file mode 100644 index 0000000..761fa67 --- /dev/null +++ b/test/hitchhiker/tracing_gc_test.clj @@ -0,0 +1,38 @@ +(ns hitchhiker.tracing-gc-test + (:require + [clojure.set :as set] + [clojure.test.check :as tc] + [clojure.test.check.clojure-test :refer [defspec]] + [clojure.test.check.generators :as gen] + [clojure.test.check.properties :as prop] + [hitchhiker.tree.core :as tree] + [hitchhiker.tracing-gc :as gc])) + +(defn tree-insert + [t k] + (tree/insert t k k)) + +(def gen-tree + (gen/bind (gen/vector gen/int) + (fn [vs] + (let [tree (reduce tree-insert (tree/b-tree (tree/->Config 3 2 1)) + vs)] + (gen/return tree))))) + +(defn tree-keys + [b-tree] + (tree-seq tree/index-node? :children b-tree)) + +(defspec unreferenced-keys-are-deleted + 1000 + (prop/for-all [live gen-tree + dead gen-tree] + (let [deleted (atom (set [])) + delete-fn (fn [item] + (swap! deleted conj item))] + (gc/trace-gc! (gc/in-mem-scratch) [live] + (-> (concat (tree-keys live) + (tree-keys dead)) + (shuffle)) + delete-fn) + (= (set @deleted) (set (tree-keys dead))))))