Skip to content

Commit d05f6a3

Browse files
committed
stream -> InputStream conversion
1 parent 18f6044 commit d05f6a3

File tree

6 files changed

+400
-92
lines changed

6 files changed

+400
-92
lines changed

src/byte_streams.clj

Lines changed: 73 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
(:require
55
[manifold.stream :as s]
66
[manifold.deferred :as d]
7+
[byte-streams.pushback-stream :as ps]
78
[byte-streams.char-sequence :as cs]
89
[byte-streams.utils :refer (fast-memoize)]
910
[clojure.java.io :as io]
@@ -27,6 +28,7 @@
2728
FileOutputStream
2829
FileInputStream
2930
ByteArrayInputStream
31+
ByteArrayOutputStream
3032
PipedOutputStream
3133
PipedInputStream
3234
DataInputStream
@@ -68,11 +70,7 @@
6870
(defn- protocol? [x]
6971
(and (map? x) (contains? x :on-interface)))
7072

71-
(defn seq-of [x]
72-
(list 'seq-of x))
73-
74-
(defn stream-of [x]
75-
(list 'stream-of x))
73+
(declare seq-of stream-of)
7674

7775
(defn seq-of? [x]
7876
(and (seq? x)
@@ -90,6 +88,13 @@
9088

9189
(defn- abstract-type-descriptor [x]
9290
(cond
91+
92+
(and (sequential? x) (= 'var (first x)))
93+
(abstract-type-descriptor (second x))
94+
95+
(var? x)
96+
x
97+
9398
(seq-of? x)
9499
(list 'list '(quote seq-of) (abstract-type-descriptor (second x)))
95100

@@ -105,6 +110,12 @@
105110
(:var x)
106111
x))))
107112

113+
(defn seq-of [x]
114+
(list 'seq-of x))
115+
116+
(defn stream-of [x]
117+
(list 'stream-of x))
118+
108119
(defmacro def-conversion
109120
"Defines a conversion from one type to another."
110121
[[src dst :as conversion] params & body]
@@ -397,9 +408,12 @@
397408
"Returns a descriptor that can be used with `conversion-path`."
398409
[x]
399410
(cond
400-
(or (class? x) (protocol? x))
411+
(class? x)
401412
x
402413

414+
(protocol? x)
415+
(:var x)
416+
403417
(contains? @src->dst->conversion (class x))
404418
(class x)
405419

@@ -411,7 +425,7 @@
411425

412426
(defn convert
413427
"Converts `x`, if possible, into type `dst`, which can be either a class or protocol. If no such conversion
414-
is possible, an IllegalArgumentException is thrown.
428+
is possible, an IllegalArgumentException is thrown. If `x` is a stream, then the `src` type must be explicitly specified.
415429
416430
`options` is a map, whose available settings depend on what sort of transform is being performed:
417431
@@ -429,45 +443,17 @@
429443
(cond
430444

431445
(s/source? x)
432-
(if (stream-of? dst)
433-
434-
;; -> (stream-of a)
435-
(let [s' (s/stream)
436-
dst (if (protocol? (second dst))
437-
(:var (second dst))
438-
(second dst))]
439-
(-> x
440-
(s/take! ::none)
441-
(d/chain
442-
(fn [msg]
443-
(if (identical? ::none msg)
444-
(s/close! s')
445-
(let [src (type-descriptor msg)]
446-
(if-let [f (converter src dst)]
447-
(do
448-
(s/put! s' (f msg options))
449-
(s/connect-via x #(s/put! s' (f % options)) s'))
450-
(do
451-
(s/close! x)
452-
(s/close! s'))))))))
453-
s')
454-
455-
456-
(let [msg @(s/take! x ::none)
457-
src (type-descriptor msg)
458-
s' (s/stream)]
459-
(when-not (identical? ::none msg)
460-
(s/put! s' msg)
461-
(s/connect x s')
462-
(if-let [f (converter (stream-of src) dst)]
463-
(f s' options)
464-
(throw (IllegalArgumentException. (str "Don't know how to convert a stream of " src " into " dst)))))))
446+
(let [src (get options :source-type)]
447+
(assert src "must specify `:source-type` when converting streams")
448+
(if-let [f (converter src dst)]
449+
(f x options)
450+
(throw (IllegalArgumentException. (str "don't know how to convert " src " into " dst)))))
465451

466452
(not (or (nil? x) (and (sequential? x) (empty? x))))
467-
(let [src (type-descriptor x)
468-
dst (if (protocol? dst)
469-
(:var dst)
470-
dst)]
453+
(let [src (or
454+
(when (sequential? x)
455+
(get options :source-type))
456+
(type-descriptor x))]
471457
(if (or
472458
(= src dst)
473459
(and (class? src) (class? dst) (.isAssignableFrom ^Class dst src)))
@@ -559,11 +545,11 @@
559545
(f source' sink' options))))
560546

561547
(and
562-
(conversion-path src ByteSource)
563-
(conversion-path dst ByteSink))
548+
(conversion-path src #'ByteSource)
549+
(conversion-path dst #'ByteSink))
564550
(fn [source sink {:keys [close?] :or {close? true} :as options}]
565-
(let [source' (convert source ByteSource options)
566-
sink' (convert sink ByteSink options)]
551+
(let [source' (convert source #'ByteSource options)
552+
sink' (convert sink #'ByteSink options)]
567553
(default-transfer source' sink' options)
568554
(when close?
569555
(doseq [x [source sink source' sink']]
@@ -593,6 +579,8 @@
593579
([source sink]
594580
(transfer source sink nil))
595581
([source sink options]
582+
(transfer source nil sink options))
583+
([source source-type sink options]
596584
(if (s/source? source)
597585

598586
(let [msg @(s/take! source ::none)
@@ -626,6 +614,26 @@
626614

627615
;;; conversion definitions
628616

617+
(def-conversion ^{:cost 0} [(stream-of byte-array) InputStream]
618+
[s options]
619+
(let [ps (ps/pushback-stream (get options :buffer-size 65536))]
620+
(s/consume
621+
(fn [^bytes ary]
622+
(ps/put-array ps ary 0 (alength ary)))
623+
s)
624+
(s/on-drained s #(ps/close ps))
625+
(ps/->input-stream ps)))
626+
627+
(def-conversion ^{:cost 0} [(stream-of ByteBuffer) InputStream]
628+
[s options]
629+
(let [ps (ps/pushback-stream (get options :buffer-size 65536))]
630+
(s/consume
631+
(fn [buf]
632+
(ps/put-buffer ps buf))
633+
s)
634+
(s/on-drained s #(ps/close ps))
635+
(ps/->input-stream ps)))
636+
629637
;; byte-array => byte-buffer
630638
(def-conversion ^{:cost 0} [byte-array ByteBuffer]
631639
[ary {:keys [direct?] :or {direct? false}}]
@@ -747,7 +755,7 @@
747755
(.close sink))))
748756
source))
749757

750-
(def-conversion ^{:cost 1.5} [(seq-of ByteSource) InputStream]
758+
(def-conversion ^{:cost 1.5} [(seq-of #'ByteSource) InputStream]
751759
[srcs options]
752760
(let [chunk-size (get options :chunk-size 65536)
753761
out (PipedOutputStream.)
@@ -762,8 +770,14 @@
762770
(.close out))))
763771
in))
764772

773+
(def-conversion ^{:cost 2} [#'ByteSource byte-array]
774+
[src options]
775+
(let [os (ByteArrayOutputStream.)]
776+
(transfer src os)
777+
(.toByteArray os)))
778+
765779
;; generic byte-source => lazy char-sequence
766-
(def-conversion ^{:cost 2} [ByteSource CharSequence]
780+
(def-conversion ^{:cost 2} [#'ByteSource CharSequence]
767781
[source options]
768782
(cs/decode-byte-source
769783
#(let [bytes (take-bytes! source % options)]
@@ -1102,14 +1116,14 @@
11021116
([x]
11031117
(to-byte-source x nil))
11041118
([x options]
1105-
(convert x ByteSource options)))
1119+
(convert x #'ByteSource options)))
11061120

11071121
(defn to-byte-sink
11081122
"Converts the object to something that satisfies `ByteSink`."
11091123
([x]
11101124
(to-byte-sink x nil))
11111125
([x options]
1112-
(convert x ByteSink options)))
1126+
(convert x #'ByteSink options)))
11131127

11141128
;;;
11151129

@@ -1119,13 +1133,15 @@
11191133
sign (long (if (pos? diff) -1 1))
11201134
a (if (pos? diff) b' a')
11211135
b (if (pos? diff) a' b')
1122-
limit (p/>> (.remaining a) 2)]
1136+
limit (p/>> (.remaining a) 2)
1137+
a-offset (.position a)
1138+
b-offset (.position b)]
11231139
(let [cmp (loop [idx 0]
11241140
(if (p/>= idx limit)
11251141
0
11261142
(let [cmp (p/-
1127-
(p/int->uint (.getInt a idx))
1128-
(p/int->uint (.getInt b idx)))]
1143+
(p/int->uint (.getInt a (p/+ idx a-offset)))
1144+
(p/int->uint (.getInt b (p/+ idx b-offset))))]
11291145
(if (p/== 0 cmp)
11301146
(recur (p/+ idx 4))
11311147
(p/* sign cmp)))))]
@@ -1135,8 +1151,8 @@
11351151
(if (p/>= idx limit')
11361152
diff
11371153
(let [cmp (p/-
1138-
(p/byte->ubyte (.get a idx))
1139-
(p/byte->ubyte (.get b idx)))]
1154+
(p/byte->ubyte (.get a (p/+ idx a-offset)))
1155+
(p/byte->ubyte (.get b (p/+ idx b-offset))))]
11401156
(if (p/== 0 cmp)
11411157
(recur (p/inc idx))
11421158
(p/* sign cmp))))))

src/byte_streams/InputStream.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package byte_streams;
2+
3+
import java.io.IOException;
4+
5+
public class InputStream extends java.io.InputStream {
6+
7+
public interface Streamable {
8+
int available();
9+
void close();
10+
long skip(long n);
11+
int read() throws IOException;
12+
int read(byte[] bytes, int offset, int length) throws IOException;
13+
}
14+
15+
private Streamable _s;
16+
17+
public InputStream(Streamable s) {
18+
_s = s;
19+
}
20+
21+
public void close() {
22+
_s.close();
23+
}
24+
25+
public int available() {
26+
return _s.available();
27+
}
28+
29+
public boolean markSupported() {
30+
return false;
31+
}
32+
33+
public void mark(int readlimit) {
34+
throw new UnsupportedOperationException();
35+
}
36+
37+
public void reset() {
38+
throw new UnsupportedOperationException();
39+
}
40+
41+
public long skip(long n) {
42+
return _s.skip(n);
43+
}
44+
45+
public int read() throws IOException {
46+
return _s.read();
47+
}
48+
49+
public int read(byte[] bytes, int offset, int length) throws IOException {
50+
return _s.read(bytes, offset, length);
51+
}
52+
}

src/byte_streams/char_sequence.clj

Lines changed: 32 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
CharsetDecoder
1212
CoderResult]))
1313

14+
(set! *unchecked-math* true)
15+
1416
(defn parse-result [^CoderResult result]
1517
(cond
1618
(.isUnderflow result) :underflow
@@ -33,38 +35,39 @@
3335
^ByteBuffer extra-bytes
3436
close-fn
3537
byte-source]
36-
(lazy-seq
37-
(let [num-bytes (+ (if extra-bytes
38-
(.remaining extra-bytes)
39-
0)
40-
(long chunk-size))
41-
len (long
42-
(Math/ceil
43-
(/ num-bytes
44-
(.averageCharsPerByte decoder))))
45-
out (CharBuffer/allocate len)]
38+
(let [chunk-size (long chunk-size)]
39+
(lazy-seq
40+
(let [num-bytes (+ (if extra-bytes
41+
(.remaining extra-bytes)
42+
0)
43+
(long chunk-size))
44+
len (long
45+
(Math/ceil
46+
(/ num-bytes
47+
(.averageCharsPerByte decoder))))
48+
out (CharBuffer/allocate len)]
4649

47-
(if (and extra-bytes (= :overflow (decode decoder extra-bytes out)))
50+
(if (and extra-bytes (= :overflow (decode decoder extra-bytes out)))
4851

49-
;; we didn't even exhaust the overflow bytes, try again
50-
(cons
51-
out
52-
(lazy-char-buffer-sequence decoder chunk-size extra-bytes close-fn byte-source))
52+
;; we didn't even exhaust the overflow bytes, try again
53+
(cons
54+
out
55+
(lazy-char-buffer-sequence decoder chunk-size extra-bytes close-fn byte-source))
5356

54-
(if-let [in (byte-source chunk-size)]
55-
(let [result (decode decoder in out)]
56-
(cons
57-
(.flip out)
58-
(lazy-char-buffer-sequence
59-
decoder
60-
chunk-size
61-
(when (= :overflow result) in)
62-
close-fn
63-
byte-source)))
64-
(do
65-
(flush decoder out)
66-
(when close-fn (close-fn))
67-
(.flip out)))))))
57+
(if-let [in (byte-source chunk-size)]
58+
(let [result (decode decoder in out)]
59+
(cons
60+
(.flip out)
61+
(lazy-char-buffer-sequence
62+
decoder
63+
chunk-size
64+
(when (= :overflow result) in)
65+
close-fn
66+
byte-source)))
67+
(do
68+
(flush decoder out)
69+
(when close-fn (close-fn))
70+
(.flip out))))))))
6871

6972
(defn decode-byte-source
7073
[byte-source

0 commit comments

Comments
 (0)