Skip to content

An S3 backend. #23

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,5 @@ pom.xml.asc
*.swo
*.swn
*~
/.idea
*.iml
14 changes: 9 additions & 5 deletions env/profiling/hitchhiker/bench.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
[clojure.tools.cli :refer [parse-opts]]
[excel-templates.build :as excel]
[hitchhiker.redis :as redis]
[hitchhiker.s3 :as s3]
[hitchhiker.tree.core :as core]
[hitchhiker.tree.messaging :as msg])
(:import [java.io File FileWriter]))
Expand Down Expand Up @@ -129,11 +130,10 @@
:validate [#(#{"fractal" "b-tree" "sorted-set"} %) "Data structure must be fractal, b-tree, or sorted set"]]
[nil "--backend testing" "Runs the benchmark with the specified backend"
:default "testing"
:validate [#(#{"redis" "testing"} %) "Backend must be redis or testing"]]
:validate [#(#{"redis" "testing" "s3"} %) "Backend must be redis, s3 or testing"]]
["-d" "--delete-pattern PATTERN" "Specifies how the operations will be reordered on delete"
:default "forward"
:validate [#(#{"forward" "reverse" "shuffle" "zero"} %) "Incorrect delete pattern"]
]
:validate [#(#{"forward" "reverse" "shuffle" "zero"} %) "Incorrect delete pattern"]]
[nil "--sorted-set" "Runs the benchmarks on a sorted set"]
["-b" "--tree-width WIDTH" "Determines the width of the trees. Fractal trees use sqrt(b) child pointers; the rest is for messages."
:default 300
Expand All @@ -143,6 +143,8 @@
:default 1000
:parse-fn #(Long. %)
:validate [pos? "flush frequency must be positive"]]
[nil "--bucket STRING" "The S3 bucket to use."
:default "hitchhiker-tree-s3-test"]
["-h" "--help" "Prints this help"]])

(defn exit
Expand Down Expand Up @@ -171,7 +173,8 @@
""
"Backends:"
"testing: this backend serializes nothing, just using an extra indirection"
"redis: this backend uses a local redis server"]))
"redis: this backend uses a local redis server"
"s3: this backend uses an S3 bucket"]))

(defn make-template-for-one-tree-freq-combo
[list-of-benchmark-results filter-by]
Expand Down Expand Up @@ -215,7 +218,8 @@
(let [backend (case (:backend options)
"testing" (core/->TestingBackend)
"redis" (do (redis/start-expiry-thread!)
(redis/->RedisBackend)))
(redis/->RedisBackend))
"s3" (s3/->S3Backend (:bucket options)))
delete-xform (case (:delete-pattern options)
"forward" identity
"reverse" reverse
Expand Down
8 changes: 7 additions & 1 deletion project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,17 @@
[org.clojure/core.memoize "0.5.8"]
[com.taoensso/carmine "2.12.2"]
[org.clojure/core.rrb-vector "0.0.11"]
[amazonica "0.3.75"
:exclusions [com.amazonaws/aws-java-sdk]]
[com.amazonaws/aws-java-sdk-core "1.11.26"]
[com.amazonaws/aws-java-sdk-s3 "1.11.26"]
[org.clojure/core.rrb-vector "0.0.11"]
[org.clojure/core.cache "0.6.5"]]
:aliases {"bench" ["with-profile" "profiling" "run" "-m" "hitchhiker.bench"]}
:jvm-opts ["-server" "-Xmx3700m" "-Xms3700m"]
:profiles {:test
{:dependencies [[org.clojure/test.check "0.9.0"]]}
{:dependencies [[org.clojure/test.check "0.9.0"]
[org.clojure/tools.nrepl "0.2.11"]]}
:profiling
{:main hitchhiker.bench
:source-paths ["env/profiling"]
Expand Down
102 changes: 102 additions & 0 deletions src/hitchhiker/s3.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
(ns hitchhiker.s3
(:require [amazonica.aws.s3 :as s3]
[clojure.string :refer [split]]
[hitchhiker.tree.core :as core]
[taoensso.nippy :as nippy])
(:import [com.google.common.io ByteStreams]
[java.util UUID]
[java.io ByteArrayInputStream]))

;;; Main node data stored in uuid key
;;; bucket-name/8E3806E4-865D-43C8-A823-9CCF7D9D88CB
;;; References from node -> node uses sub object:
;;; bucket-name/F9FD37E5-5BF3-4681-93A0-A56E9823A068/->2793532B-8FC9-4E0D-8CD4-19AE87F3133E
;;; References to node uses sub object:
;;; bucket-name/0834FB96-CC7A-4531-8A8E-B9FB811F5D5B/<-A38D2654-EDF2-460A-9D88-388DEC77AA05

(defrecord S3Addr [last-key bucket key storage-addr]
core/IResolve
(dirty? [_] false)
(last-key [_] last-key)
(resolve [_]
(with-open [in (:input-stream (s3/get-object bucket key))]
(nippy/thaw (ByteStreams/toByteArray in)))))

(defn synthesize-storage-addr
[bucket key]
(doto (promise)
(deliver {:bucket bucket :key key})))

(defn s3-addr
[last-key bucket key]
(->S3Addr last-key bucket key (synthesize-storage-addr bucket key)))

(nippy/extend-thaw :b-tree/s3-addr
[data-input]
(let [last-key (nippy/thaw-from-in! data-input)
bucket (nippy/thaw-from-in! data-input)
key (nippy/thaw-from-in! data-input)]
(s3-addr last-key bucket key)))

(nippy/extend-freeze S3Addr :b-tree/s3-addr
[{:keys [last-key bucket key]} data-output]
(nippy/freeze-to-out! data-output last-key)
(nippy/freeze-to-out! data-output bucket)
(nippy/freeze-to-out! data-output key))

(defn write-object
[bucket key bytes]
(println "write-object" bucket key (count bytes))
(s3/put-object :bucket-name bucket
:key key
:metadata {:content-length (count bytes)}
:input-stream (ByteArrayInputStream. bytes)))

(defn delete-object
[bucket key]
#_(doall (for [other-key (map :key (:object-summaries
(s3/list-objects :bucket-name bucket
:prefix (str key "/->"))))]
(s3/delete-object :bucket-name bucket
:key (str (last (split other-key "/->")) "/<-" key))
; TODO: delete other-key if no refs?
))
#_(doall (for [other-key (map :key (:object-summaries
(s3/list-objects :bucket-name bucket
:prefix (str key "/<-"))))]
(s3/delete-object :bucket-name bucket
:key (str (last (split other-key "/<-")) "/->" key))))
(s3/delete-object :bucket-name bucket :key key))

(comment
(defn add-refs
[node-key child-keys]
(doall
(for [{:keys [bucket key]} child-keys]
(do
(write-object bucket (str node-key "/->" key) (byte-array 0))
(write-object bucket (str key "/<-" node-key) (byte-array 0))))))
)

(defrecord S3Backend [#_service bucket]
core/IBackend
(new-session [_]
(atom {:writes 0 :deletes 0}))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the s3 API tell you about how much money you're spending on I/O? This could be really interesting if we could track the # of bytes uploaded, for example :)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The API doesn't track that, it's something you need to track manually.

However, all data transfer in to S3 is free, and there's a $0.005 fee per 1000 PUTs. It's also neat if you're fetching data from an EC2 instance in the same region, since the data transfer is also free, you just pay for requests.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting--does Amazonica batch writes? If not, the tracing GC algo would allow you to avoid needing any auxiliary objects, to reduce the number of puts.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shouldn't batch writes -- it's a thin shim on the Java SDK, and that doesn't do any batching. And yeah, doing something more sensible for GC would eliminate additional requests.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think confetti does some diffing for you though, code here: https://github.com/confetti-clj/s3-deploy/blob/master/src/confetti/s3_deploy.clj. This is a great idea, keep it up!

(anchor-root [_ node]
;; maybe we could use versioning and object expiration to
;; handle this? For now don't expire anything :-P
node)
(write-node [_ node session]
(swap! session update-in [:writes] inc)
(let [key (UUID/randomUUID)
addr (s3-addr (core/last-key node) bucket key)]
(write-object bucket key (nippy/freeze node))
#_(when (core/index-node? node)
(add-refs key
(for [child (:children node)
:let [child-key @(:storage-addr child)]]
child-key)))
addr))
(delete-addr [_ addr session]
(swap! session update-in [:deletes] inc)
(delete-object (:bucket addr) (:key addr))))
69 changes: 69 additions & 0 deletions src/hitchhiker/tracing_gc.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
(ns hitchhiker.tracing-gc
(:require [hitchhiker.tree.core :as hh]))

;; Note: this implementation is single-threaded, and could be made parallel without too much effort

;; We might need to trace millions or billions of keys. That might not fit in memory, so this could be backed
;; by leveldb or hsql so that we can spill to disk when necessary. We don't need a functional datastructure here.
(defprotocol IGCScratch
(add-to-work-queue! [this addr] "Adds the given address to the work queue to be processed")
(pop-from-work-queue! [this] "Pops the next element off of the work queue, or returns nil if we're done")
(observe-addr! [this addr] "Marks the given addr as being currently active")
(observed? [this addr] "Returns true if the given addr was observed"))
;
;;; The workq is a ref containing a collection of addresses we still need to scan.
;;; The observed-set is a ref containing the set of addresses we know are active
;;; For simplicity, adding an addr to the workq automatically observes it as well
;;; ^^ this allows us to only add new addrs to the workq, without a separate set of "in workq"
(defrecord InMemScratch [workq observed-set]
IGCScratch
(add-to-work-queue! [_ addr]
(dosync
(when-not (contains? @observed-set addr)
(alter workq conj addr)
(alter observed-set conj addr))))
(pop-from-work-queue! [_]
(dosync
(when (seq @workq)
(let [head (peek @workq)]
(alter workq pop)
head))))
(observe-addr! [_ addr]
(dosync
(alter observed-set conj addr)))
(observed? [_ addr]
(contains? @observed-set addr)))

(defn in-mem-scratch
"Creates an instance of in memory GC scratch"
[]
(->InMemScratch (ref []) (ref #{})))

(defn trace-gc!
"Does a tracing GC and frees up all unused keys.
This is a simple mark-sweep algorithm.

gc-scratch should be an instance of IGCScratch
gc-roots should be a list of the roots, which should implement IResolve. These are generated by calls to anchor-root.
all-keys should be a lazy sequence that will contain every key in storage. This algorithm will not hold the whole sequence in memory
delete-fn will be called on every key that should be deleted during the sweep phase"
[gc-scratch gc-roots all-keys delete-fn]
;; First, we'll initialize the work queue
(doseq [root gc-roots]
(add-to-work-queue! gc-scratch root))
;; Now, we'll do the mark phase
(loop []
(when-let [addr (pop-from-work-queue! gc-scratch)]
(observe-addr! gc-scratch addr)
(when (hh/index? addr)
(let [node (hh/resolve addr)]
(doseq [c (:children node)]
(add-to-work-queue! gc-scratch c))))
(recur)))
;; Next, we do the sweep
(loop [ks all-keys]
(when (seq ks)
(let [head (first ks)]
(when-not (observed? gc-scratch head)
(delete-fn head)))
(recur (next ks)))))
Loading