Skip to content

Deprecate single-segment top-level namespace #53

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions project.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(defproject org.clj-commons/byte-streams (or (System/getenv "PROJECT_VERSION") "0.2.10")
(defproject org.clj-commons/byte-streams (or (System/getenv "PROJECT_VERSION") "0.2.11")
:description "A simple way to handle the menagerie of Java byte representations."
:license {:name "MIT License"
:url "http://opensource.org/licenses/MIT"}
@@ -7,7 +7,7 @@
:username :env/clojars_username
:password :env/clojars_password
:sign-releases false}]]
:dependencies [[primitive-math "0.1.6"]
:dependencies [[com.github.piotr-yuxuan/primitive-math "0.1.7"]
[manifold "0.1.9"]]
:profiles {:dev {:dependencies [[org.clojure/clojure "1.10.3"]
[org.clojure/test.check "1.1.0"]
@@ -27,7 +27,7 @@
:cljfmt {:indents {#".*" [[:inner 0]]}}
:codox {:source-uri "https://github.com/clj-commons/byte-streams/blob/master/{filepath}#L{line}"
:metadata {:doc/format :markdown}
:namespaces [byte-streams]}
:namespaces [clj-commons.byte-streams]}
:global-vars {*warn-on-reflection* true}
:java-source-paths ["src"]
:jvm-opts ^:replace ["-server" "-Xmx4g"])
4 changes: 3 additions & 1 deletion src/byte_streams.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
(ns byte-streams
(ns ^{;; single segment namespace is deprecated, use clj-commons.byte-streams
:deprecated true}
byte-streams
(:refer-clojure :exclude [byte-array vector-of])
(:require
[manifold
4 changes: 4 additions & 0 deletions src/byte_streams/ByteBufferInputStream.java
Original file line number Diff line number Diff line change
@@ -4,6 +4,10 @@
import java.io.IOException;
import java.nio.ByteBuffer;

/**
* Deprecated, use clj_commons.byte_streams.ByteBufferInputStream.
*/
@Deprecated()
public class ByteBufferInputStream extends InputStream {

private ByteBuffer _buf;
4 changes: 4 additions & 0 deletions src/byte_streams/InputStream.java
Original file line number Diff line number Diff line change
@@ -2,6 +2,10 @@

import java.io.IOException;

/**
* Deprecated, use clj_commons.byte_streams.InputStream.
*/
@Deprecated()
public class InputStream extends java.io.InputStream {

public interface Streamable {
4 changes: 4 additions & 0 deletions src/byte_streams/Utils.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package byte_streams;

/**
* Deprecated, use clj_commons.byte_streams.Utils.
*/
@Deprecated()
public class Utils {
public static byte[] byteArray(int length) {
return new byte[length];
4 changes: 3 additions & 1 deletion src/byte_streams/char_sequence.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
(ns byte-streams.char-sequence
(ns ^{;; single segment namespace is deprecated, use clj-commons.byte-streams.char-sequence
:deprecated true}
byte-streams.char-sequence
(:refer-clojure :exclude [flush])
(:import
[java.util.concurrent.locks
4 changes: 3 additions & 1 deletion src/byte_streams/graph.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
(ns byte-streams.graph
(ns ^{;; single segment namespace is deprecated, use clj-commons.byte-streams.graph
:deprecated true}
byte-streams.graph
(:refer-clojure :exclude [type])
(:require
[manifold.stream :as s]
4 changes: 3 additions & 1 deletion src/byte_streams/protocols.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
(ns byte-streams.protocols
(ns ^{;; single segment namespace is deprecated, use clj-commons.byte-streams.protocols
:deprecated true}
byte-streams.protocols
(:require
[byte-streams.utils :refer [defprotocol+]])
(:import
4 changes: 3 additions & 1 deletion src/byte_streams/pushback_stream.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
(ns byte-streams.pushback-stream
(ns ^{;; single segment namespace is deprecated, use clj-commons.byte-streams.byte-streams.pushback-stream
:deprecated true}
byte-streams.pushback-stream
(:refer-clojure :exclude [take])
(:require
[primitive-math :as p]
4 changes: 3 additions & 1 deletion src/byte_streams/utils.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
(ns byte-streams.utils)
(ns ^{;; single segment namespace is deprecated, use clj-commons.byte-streams.utils
:deprecated true}
byte-streams.utils)

(defmacro defprotocol+ [name & body]
(when-not (resolve name)
1,019 changes: 1,019 additions & 0 deletions src/clj_commons/byte_streams.clj

Large diffs are not rendered by default.

57 changes: 57 additions & 0 deletions src/clj_commons/byte_streams/ByteBufferInputStream.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package clj_commons.byte_streams;

import java.io.InputStream;
import java.io.IOException;
import java.nio.ByteBuffer;

public class ByteBufferInputStream extends InputStream {

private ByteBuffer _buf;

public ByteBufferInputStream(ByteBuffer buf) {
_buf = buf;
}

public void close() {
}

public int available() {
return _buf.remaining();
}

public boolean markSupported() {
return true;
}

public void mark(int readlimit) {
_buf.mark();
}

public void reset() {
_buf.reset();
}

public long skip(long n) {
int nP = Math.min((int)n, _buf.remaining());
_buf.position(_buf.position() + nP);
return (long)nP;
}

public int read() throws IOException {
if (!_buf.hasRemaining()) {
return -1;
} else {
return (int) _buf.get() & 0xFF;
}
}

public int read(byte[] bytes, int offset, int length) throws IOException {
length = Math.min(length, _buf.remaining());
if (length == 0) {
return -1;
} else {
_buf.get(bytes, offset, length);
return length;
}
}
}
52 changes: 52 additions & 0 deletions src/clj_commons/byte_streams/InputStream.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package clj_commons.byte_streams;

import java.io.IOException;

public class InputStream extends java.io.InputStream {

public interface Streamable {
int available();
void close();
long skip(long n);
int read() throws IOException;
int read(byte[] bytes, int offset, int length) throws IOException;
}

private Streamable _s;

public InputStream(Streamable s) {
_s = s;
}

public void close() {
_s.close();
}

public int available() {
return _s.available();
}

public boolean markSupported() {
return false;
}

public void mark(int readlimit) {
throw new UnsupportedOperationException();
}

public void reset() {
throw new UnsupportedOperationException();
}

public long skip(long n) {
return _s.skip(n);
}

public int read() throws IOException {
return _s.read();
}

public int read(byte[] bytes, int offset, int length) throws IOException {
return _s.read(bytes, offset, length);
}
}
7 changes: 7 additions & 0 deletions src/clj_commons/byte_streams/Utils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package clj_commons.byte_streams;

public class Utils {
public static byte[] byteArray(int length) {
return new byte[length];
}
}
122 changes: 122 additions & 0 deletions src/clj_commons/byte_streams/char_sequence.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
(ns clj-commons.byte-streams.char-sequence
(:refer-clojure :exclude [flush])
(:import
[java.util.concurrent.locks
ReentrantLock]
[java.io
ByteArrayOutputStream]
[java.nio
ByteBuffer
CharBuffer]
[java.nio.charset
Charset
CharsetDecoder
CoderResult
CodingErrorAction]))

(set! *unchecked-math* true)

(defn coding-error-action [action]
(case
:report CodingErrorAction/REPORT
:ignore CodingErrorAction/IGNORE
:replace CodingErrorAction/REPLACE))

(defn parse-result [^CoderResult result]
(cond
(.isUnderflow result) :underflow
(.isOverflow result) :overflow
:else (throw (IllegalArgumentException. "Malformed byte-stream input to CharsetDecoder"))))

(defn decode
[^CharsetDecoder decoder ^ByteBuffer in ^CharBuffer out]
(parse-result (.decode decoder in out false)))

(defn flush
[^CharsetDecoder decoder ^ByteBuffer in ^CharBuffer out]
(parse-result (.decode decoder (or in (ByteBuffer/allocate 0)) out true))
(parse-result (.flush decoder out)))

(defn concat-bytes [^ByteBuffer a ^ByteBuffer b]
(let [buf (ByteBuffer/allocate (+ (.remaining a) (.remaining b)))]
(.put buf a)
(.put buf b)
(.flip buf)))

(defn lazy-char-buffer-sequence
[^CharsetDecoder decoder
chunk-size
^ByteBuffer extra-bytes
close-fn
byte-source]
(lazy-seq
(let [num-bytes (+ (long
(if extra-bytes
(.remaining extra-bytes)
0))
(long chunk-size))
len (long
(Math/ceil
(/ num-bytes
(.averageCharsPerByte decoder))))
out (CharBuffer/allocate len)]

(if (and extra-bytes (= :overflow (decode decoder extra-bytes out)))

;; we didn't even exhaust the overflow bytes, try again
(cons
out
(lazy-char-buffer-sequence decoder chunk-size extra-bytes close-fn byte-source))

(if-let [in (byte-source chunk-size)]
(let [in (if (and extra-bytes (.hasRemaining extra-bytes))
(concat-bytes extra-bytes in)
in)
result (decode decoder in out)]
(cons
(.flip out)
(lazy-char-buffer-sequence
decoder
chunk-size
(when (.hasRemaining ^ByteBuffer in) in)
close-fn
byte-source)))
(do
(flush decoder extra-bytes out)
(when close-fn (close-fn))
(.flip out)))))))

(defn decode-byte-source
[byte-source
close-fn
{:keys [chunk-size encoding on-encoding-error]
:or {chunk-size 1024
on-encoding-error :replace
encoding "UTF-8"}}]
(let [action (coding-error-action on-encoding-error)
decoder (doto (.newDecoder (Charset/forName encoding))
(.onMalformedInput action)
(.onUnmappableCharacter action))
s (lazy-char-buffer-sequence decoder chunk-size nil close-fn byte-source)]
(reify
java.io.Closeable
(close [_] (when close-fn (close-fn)))

CharSequence
(charAt [_ idx]
(loop [remaining idx, s s]
(if (empty? s)
(throw (IndexOutOfBoundsException. (str idx)))
(let [^CharBuffer buf (first s)]
(if (< (.remaining buf) remaining)
(.charAt buf remaining)
(recur (- remaining (.remaining buf)) (rest s)))))))
(length [_]
(reduce + (map #(.remaining ^CharBuffer %) s)))
#_(subSequence [_ start end]
)
(toString [_]
(let [buf (StringBuffer.)]
(doseq [b s]
(.append buf b))
(.toString buf))))))
312 changes: 312 additions & 0 deletions src/clj_commons/byte_streams/graph.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,312 @@
(ns clj-commons.byte-streams.graph
(:refer-clojure :exclude [type])
(:require
[manifold.stream :as s]
[clj-commons.byte-streams
[utils :refer [defprotocol+ defrecord+ deftype+]]
[protocols :as p]])
(:import
[java.util
LinkedList
PriorityQueue]))

(declare pprint-type)

(deftype+ Conversion [f ^double cost]
Object
(equals [_ x]
(and
(instance? Conversion x)
(identical? f (.f ^Conversion x))
(== cost (.cost ^Conversion x))))
(hashCode [_]
(bit-xor (System/identityHashCode f) (unchecked-int cost))))

(deftype+ Type [wrapper type]
Object
(equals [_ x]
(and
(instance? Type x)
(= wrapper (.wrapper ^Type x))
(= type (.type ^Type x))))
(hashCode [_]
(bit-xor
(hash wrapper)
(hash type)))
(toString [this]
(pr-str (pprint-type this))))

(defn pprint-type [^Type x]
(if-let [wrapper (.wrapper x)]
(list (symbol (str wrapper "-of")) (.type x))
(.type x)))

(defn type
([t]
(if (instance? Type t)
t
(type nil t)))
([wrapper t]
(Type. wrapper
(if (var? t)
@t
t))))

(defn- protocol? [x]
(and (map? x) (contains? x :on-interface)))

(defn canonicalize [x]
(if (protocol? x)
@(:var x)
x))

(defn- class-satisfies? [protocol ^Class c]
(boolean
(or
(.isAssignableFrom ^Class (:on-interface protocol) c)
(some
#(.isAssignableFrom ^Class % c)
(keys (:impls protocol))))))

(defn assignable? [^Type a ^Type b]
(and
(= (.wrapper a) (.wrapper b))
(let [a (canonicalize (.type a))
b (canonicalize (.type b))]
(cond
(and (class? a) (class? b))
(.isAssignableFrom ^Class b a)

(and (protocol? b) (class? a))
(class-satisfies? b a)

:else
(= a b)))))

(defprotocol+ IConversionGraph
(assoc-conversion [_ src dst f cost])
(equivalent-targets [_ dst])
(possible-sources [_])
(possible-targets [_])
(possible-conversions [_ src])
(conversion [_ src dst]))

(defn implicit-conversions [^Type src]
(cond

;; vector -> seq
(= 'vector (.wrapper src))
[[[src (Type. 'seq (.type src))] (Conversion. (fn [x _] (seq x)) 1)]]

;; seq -> stream
(= 'seq (.wrapper src))
[[[src (Type. 'stream (.type src))] (Conversion. (fn [x _] (s/->source x)) 1)]]

;; stream -> seq
(= 'stream (.wrapper src))
[[[src (Type. 'seq (.type src))] (Conversion. (fn [x _] (s/stream->seq x)) 1)]]

:else
nil))

(deftype+ ConversionGraph [m]
IConversionGraph
(assoc-conversion [_ src dst f cost]
(let [m' (assoc-in m [src dst] (Conversion. f cost))
m' (if (and
(nil? (.wrapper ^Type src))
(nil? (.wrapper ^Type dst)))
(let [src (.type ^Type src)
dst (.type ^Type dst)]
(-> m'
(assoc-in [(Type. 'seq src) (Type. 'seq dst)]
(Conversion. (fn [x options] (map #(f % options) x)) cost))
(assoc-in [(Type. 'stream src) (Type. 'stream dst)]
(Conversion. (fn [x options] (s/map #(f % options) x)) (+ cost 0.1)))))
m')]
(ConversionGraph. m')))
(possible-sources [_]
(keys m))
(possible-targets [_]
(->> m vals (mapcat keys)))
(equivalent-targets [_ dst]
(->> m
vals
(mapcat keys)
(filter #(assignable? % dst))))
(possible-conversions [_ src]
(->> m
keys
(filter (partial assignable? src))
(mapcat (fn [src]
(map
(fn [[k v]]
[[src k] v])
(get m src))))
(concat (implicit-conversions src))
(into {}))))

(defn conversion-graph []
(ConversionGraph. {}))

;;;

(defrecord+ ConversionPath [path fns visited? cost]
Comparable
(compareTo [_ x]
(let [cmp (compare cost (.cost ^ConversionPath x))]
(if (zero? cmp)
(compare (count path) (count (.path ^ConversionPath x)))
cmp))))

(defn- conj-path [^ConversionPath p src dst ^Conversion c]
(ConversionPath.
(conj (.path p) [src dst])
(conj (.fns p) (.f c))
(conj (.visited? p) dst)
(+ (.cost p) (.cost c))))

(def conversion-path
(memoize
(fn [g src dst]
(let [path (ConversionPath. [] [] #{src} 0)]
(if (assignable? src dst)
path
(let [q (doto (PriorityQueue.) (.add path))
dsts (equivalent-targets g dst)]
(loop []
(when-let [^ConversionPath p (.poll q)]
(let [curr (or (-> p .path last second) src)]
(if (some #(assignable? curr %) dsts)
p
(do
(doseq [[[src dst] c] (->> curr
(possible-conversions g)
(remove (fn [[[src dst] c]] ((.visited? p) dst))))]
(.add q (conj-path p src dst c)))
(recur))))))))))))

;;;

(defn closeable-seq [s exhaustible? close-fn]
(if (empty? s)
(when exhaustible?
(close-fn)
nil)
(reify

clojure.lang.IPending
(isRealized [_]
(or
(not (instance? clojure.lang.IPending s))
(realized? s)))

Object
(finalize [_]
(close-fn))

java.io.Closeable
(close [_]
(close-fn))

clojure.lang.Sequential
clojure.lang.ISeq
clojure.lang.Seqable
(seq [this] this)
(cons [_ a]
(closeable-seq (cons a s) exhaustible? close-fn))
(next [this]
(closeable-seq (next s) exhaustible? close-fn))
(more [this]
(let [rst (next this)]
(if (empty? rst)
'()
rst)))
(first [_]
(first s))
(equiv [a b]
(= s b)))))

(defn conversion-fn [g src dst]
(when-let [path (conversion-path g src dst)]
(condp = (count (:path path))
0 (fn [x _] x)

1 (let [f (->> path :fns first)]
(if (p/closeable? src)
(fn [x options]
(let [x' (f x options)]
(when-not (p/closeable? x')
(p/close x))
x'))
f))

;; multiple stages
(let [fns (->> path :fns (apply vector))]
(fn [x options]
(let [close-fns (LinkedList.)
result (reduce
(fn [x f]

;; keep track of everything that needs to be closed once the bytes are exhausted
(when (p/closeable? x)
(.add close-fns #(p/close x)))
(f x options))
x
fns)]
(if-let [close-fn (when-not (or (p/closeable? result)
(.isEmpty close-fns))
#(loop []
(when-let [f (.poll close-fns)]
(f)
(recur))))]
(cond

(seq? result)
(closeable-seq result true close-fn)

(s/source? result)
(do
(s/on-drained result close-fn)
result)

:else
(do
;; we assume that if the end-result is closeable, it will take care of all the intermediate
;; objects beneath it. I think this is true as long as we're not doing multiple streaming
;; reads, but this might need to be revisited.
(when-not (p/closeable? result)
(close-fn))
result))
result)))))))

(defn seq-conversion-fn [g convert wrapper dst]
(let [path (->> g
possible-sources
(remove #(nil? (.wrapper ^Type %)))
(remove #(#{String CharSequence} (.type ^Type %)))
(map #(conversion-path g % dst))
(remove nil?)
(sort-by :cost)
first)
^Type src (-> path :path first first)]

(when src
(let [wrapper' (.wrapper src)
type' (.type src)]
(fn [x options]
(->> x

((condp = [wrapper wrapper']
'[seq vector] vec
'[stream vector] (comp vec s/stream->seq)
'[seq stream] s/->source
'[stream seq] s/stream->seq
identity))

((condp = wrapper'
'vector (partial mapv #(convert % type' options))
'seq (partial map #(convert % type' options))
'stream (partial s/map #(convert % type' options))))

(#((conversion-fn g src (-> path :path last last)) % options))))))))
34 changes: 34 additions & 0 deletions src/clj_commons/byte_streams/protocols.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
(ns clj-commons.byte-streams.protocols
(:require
[clj-commons.byte-streams.utils :refer [defprotocol+]])
(:import
[java.util.concurrent
ConcurrentHashMap]))

(defprotocol+ Closeable
(close [_] "A protocol that is a superset of `java.io.Closeable`."))

(defprotocol+ ByteSource
(take-bytes! [_ n options] "Takes `n` bytes from the byte source."))

(defprotocol+ ByteSink
(send-bytes! [_ bytes options] "Puts `bytes` in the byte sink."))

(extend-protocol Closeable

java.io.Closeable
(close [this] (.close this))

)

(let [m (ConcurrentHashMap.)]
(defn closeable? [x]
(if (nil? x)
false
(let [c (class x)
v (.get m c)]
(if (nil? v)
(let [v (satisfies? Closeable x)]
(.put m c v)
v)
v)))))
306 changes: 306 additions & 0 deletions src/clj_commons/byte_streams/pushback_stream.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,306 @@
(ns clj-commons.byte-streams.pushback-stream
(:refer-clojure :exclude [take])
(:require
[piotr-yuxuan.primitive-math :as p]
[clj-commons.byte-streams.utils :refer [doit definterface+ deftype+]]
[manifold
[utils :as u]
[stream :as s]
[deferred :as d]]
[clojure.walk :as walk])
(:import
[java.nio
ByteBuffer]
[clj_commons.byte_streams
InputStream
InputStream$Streamable]
[java.util
LinkedList
ArrayDeque]))

(set! *unchecked-math* true)

(definterface+ PushbackStream
(put [^bytes x ^int offset ^int length])
(put [^java.nio.ByteBuffer buf])
(pushback [^bytes ary ^int offset ^int length])
(pushback [^java.nio.ByteBuffer buf])
(take [^bytes ary ^int offset ^int length ^boolean eager?])
(^void close []))

(deftype Consumption
[^ByteBuffer buf
deferred
^boolean eager?])

(defn trigger [^Consumption c]
(let [^ByteBuffer buf (.buf c)]
(d/success! (.deferred c) (.position buf))))

(defn put [^ByteBuffer src ^ByteBuffer dst]
(let [l (.limit src)]
(.limit src (p/+ (.position src) (p/min (.remaining src) (.remaining dst))))
(.put dst src)
(.limit src l)))

(defn- expand-either [first? form]
(let [form' (->> form
(map
#(if (and (seq? %) (= 'either (first %)))
(nth % (if first? 1 2))
[%]))
(apply concat))]
(with-meta
(if (seq? form)
form'
(into (empty form) form'))
(meta form))))

(defn walk
[inner outer form]
(let [form' (cond
(list? form) (outer (apply list (map inner form)))
(seq? form) (outer (doall (map inner form)))
(coll? form) (outer (into (empty form) (map inner form)))
:else (outer form))]
(if (instance? clojure.lang.IMeta form')
(with-meta form' (meta form))
form')))

(defn prewalk
[f form]
(walk (partial prewalk f) identity (f form)))

(defmacro ^:private both [body]
`(do
~(prewalk
(fn [x]
(if (sequential? x)
(expand-either true x)
x))
body)
~(prewalk
(fn [x]
(if (sequential? x)
(expand-either false x)
x))
body)))

(both
(deftype+ (either [PushbackByteStream] [SynchronizedPushbackByteStream])
[lock
^LinkedList consumers
^long buffer-capacity
^:unsynchronized-mutable ^int buffer-size
^:unsynchronized-mutable deferred
^:unsynchronized-mutable closed?
^LinkedList buffer]

InputStream$Streamable

(available [_]
buffer-size)

(read [this]
(let [ary (byte-array 1)
len (long @(.take this ary 0 1 true))]
(if (zero? len)
-1
(p/bit-and 0xFF (get ary 0)))))

(read [this ary offset length]
(let [n (long @(.take this ary offset length true))]
(if (zero? n)
-1
n)))

(skip [this n]
@(.take this (byte-array n) 0 n true))

PushbackStream

(put [_ buf]

(let [[consumers d]
((either
[do]
[u/with-lock* lock])

(if closed?
[nil
(d/success-deferred false)]

[(loop [acc []]
(if-let [^Consumption c (.peek consumers)]
(let [^ByteBuffer out (.buf c)]
(put buf out)
(when (or (.eager? c) (not (.hasRemaining out)))
(.remove consumers)
(recur (conj acc c))))
acc))

(do
(when (.hasRemaining buf)
(.add buffer buf)
(set! buffer-size (unchecked-int (p/+ buffer-size (.remaining buf)))))

(cond

deferred
deferred

(p/<= buffer-size buffer-capacity)
(d/success-deferred true)

:else
(set! deferred (d/deferred))))]))]

(when consumers
(doit [c consumers]
(trigger c)))

d))

(put [this ary offset length]
(.put this
(-> (ByteBuffer/wrap ary)
(.position offset)
(.limit (+ offset length)))))

(pushback [_ buf]
(let [consumers
((either
[do]
[u/with-lock* lock])
(let [consumers
(loop [acc []]
(if-let [^Consumption c (.peek consumers)]
(let [^ByteBuffer out (.buf c)]
(put buf out)
(when (or (.eager? c) (not (.hasRemaining out)))
(.remove consumers)
(recur (conj acc c))))
acc))]

(when (.hasRemaining buf)
(.addLast buffer buf)
(set! buffer-size (unchecked-int (p/+ buffer-size (.remaining buf)))))

consumers))]

(doit [c consumers]
(trigger c))))

(pushback [this ary offset length]
(.pushback this
(-> (ByteBuffer/wrap ary)
(.position offset)
(.limit (+ offset length)))))

(take [_ ary offset length eager?]

(let [out (-> (ByteBuffer/wrap ary)
(.position offset)
^ByteBuffer (.limit (+ offset length))
.slice)

[put take]

((either
[do]
[u/with-lock* lock])

(loop []
(when-let [^ByteBuffer in (.peek buffer)]
(put in out)
(when-not (.hasRemaining in)
(.remove buffer))
(when (.hasRemaining out)
(recur))))

(set! buffer-size (unchecked-int (p/- buffer-size (.position out))))

[(when (and (p/<= buffer-size buffer-capacity) deferred)
(let [d deferred]
(set! deferred nil)
d))

(if (or closed?
(and (pos? (.position out))
(or eager? (not (.hasRemaining out)))))
(d/success-deferred (.position out))
(let [d (d/deferred)]
(.add consumers (Consumption. out d eager?))
d))])]

(when put
(d/success! put true))

take))

(close [_]
(when ((either
[do]
[u/with-lock* lock])
(when-not closed?
(set! closed? true)
true))
(loop []
(when-let [^Consumption c (.poll consumers)]
(let [^ByteBuffer buf (.buf c)]
(d/success! (.deferred c) (.position buf)))
(recur))))

true)))

(defn pushback-stream [capacity]
(SynchronizedPushbackByteStream.
(u/mutex)
(LinkedList.)
capacity
0
nil
false
(LinkedList.)))

(defn unsafe-pushback-stream [capacity]
(PushbackByteStream.
(u/mutex)
(LinkedList.)
capacity
0
nil
false
(LinkedList.)))

(def classname "clj_commons.byte_streams.pushback_stream.PushbackStream")

(definline put-array
[p ary offset length]
`(.put ~(with-meta p {:tag classname}) ~ary ~offset ~length))

(definline put-buffer
[p buf]
`(.put ~(with-meta p {:tag classname}) ~buf))

(definline close [p]
`(.close ~(with-meta p {:tag classname})))

(definline eager-take
[p ary offset length]
`(.take ~(with-meta p {:tag classname}) ~ary ~offset ~length true))

(definline take
[p ary offset length]
`(.take ~(with-meta p {:tag classname}) ~ary ~offset ~length false))

(definline pushback-array
[p ary offset length]
`(.pushback ~(with-meta p {:tag classname}) ~ary ~offset ~length))

(definline pushback-buffer
[p buf]
`(.pushback ~(with-meta p {:tag classname}) ~buf))

(defn ->input-stream [pushback-stream]
(InputStream. pushback-stream))
30 changes: 30 additions & 0 deletions src/clj_commons/byte_streams/utils.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
(ns clj-commons.byte-streams.utils)

(defmacro defprotocol+ [name & body]
(when-not (resolve name)
`(defprotocol ~name ~@body)))

(defmacro deftype+ [name & body]
(when-not (resolve name)
`(deftype ~name ~@body)))

(defmacro defrecord+ [name & body]
(when-not (resolve name)
`(defrecord ~name ~@body)))

(defmacro definterface+ [name & body]
(when-not (resolve name)
`(definterface ~name ~@body)))

(defmacro doit
"A version of doseq that doesn't emit all that inline-destroying chunked-seq code."
[[x it] & body]
(let [it-sym (gensym "iterable")]
`(let [~it-sym ~it
it# (.iterator ~(with-meta it-sym {:tag 'java.lang.Iterable}))]
(loop []
(when (.hasNext it#)
(let [~x (.next it#)]
~@body)
(recur))))))

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(ns byte-streams-reload-test
(ns clj-commons.byte-streams-reload-test
(:require
[clojure.test :refer :all]))

Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
(ns byte-streams-simple-check
(ns clj-commons.byte-streams-simple-check
(:require
[clojure.test :refer :all]
[byte-streams :as bs]
[clj-commons.byte-streams :as bs]
[clojure.test.check.generators :as gen]
[clojure.test.check.properties :as prop]
[clojure.test.check.clojure-test :as ct :refer (defspec)]))
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
(ns byte-streams-test
(ns clj-commons.byte-streams-test
(:require
[byte-streams :refer [bytes= compare-bytes conversion-path convert dev-null possible-conversions seq-of stream-of to-byte-array to-byte-buffer to-byte-buffers to-input-stream to-string transfer vector-of]]
[clj-commons.byte-streams :refer [bytes= compare-bytes conversion-path convert dev-null possible-conversions seq-of stream-of to-byte-array to-byte-buffer to-byte-buffers to-input-stream to-string transfer vector-of]]
[clojure.test :refer :all]
[byte-streams.char-sequence :as cs])
[clj-commons.byte-streams.char-sequence :as cs])
(:refer-clojure
:exclude [vector-of])
(:import
2 changes: 1 addition & 1 deletion test/pushback_stream_test.clj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
(ns pushback-stream-test
(:require
[clojure.test :refer :all]
[byte-streams.pushback-stream :as p]))
[clj-commons.byte-streams.pushback-stream :as p]))

(def in (byte-array (range 100)))

2 changes: 1 addition & 1 deletion version.edn
Original file line number Diff line number Diff line change
@@ -1 +1 @@
"0.2"
"0.2.11"