Skip to content

Commit a298c60

Browse files
committed
Make rum injector stream/writer more resilient to errors
1 parent 9de0da8 commit a298c60

File tree

4 files changed

+231
-53
lines changed

4 files changed

+231
-53
lines changed

dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeOutputStream.java

Lines changed: 51 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,19 @@
55

66
/**
77
* An OutputStream containing a circular buffer with a lookbehind buffer of n bytes. The first time
8-
* that the latest n bytes matches the marker, a content is injected before.
8+
* that the latest n bytes matches the marker, a content is injected before. In case of IOException
9+
* thrown by the downstream, the buffer will be lost unless the error occurred when draining it. In
10+
* this case the draining will be resumed.
911
*/
1012
public class InjectingPipeOutputStream extends OutputStream {
1113
private final byte[] lookbehind;
1214
private int pos;
13-
private boolean bufferFilled;
15+
private int count;
1416
private final byte[] marker;
1517
private final byte[] contentToInject;
16-
private boolean found = false;
17-
private int matchingPos = 0;
18+
private boolean filter;
19+
private boolean wasDraining;
20+
private int matchingPos;
1821
private final Runnable onContentInjected;
1922
private final int bulkWriteThreshold;
2023
private final OutputStream downstream;
@@ -34,32 +37,39 @@ public InjectingPipeOutputStream(
3437
this.marker = marker;
3538
this.lookbehind = new byte[marker.length];
3639
this.pos = 0;
40+
this.count = 0;
41+
this.matchingPos = 0;
42+
this.wasDraining = false;
43+
// should filter the stream to potentially inject into it.
44+
this.filter = true;
3745
this.contentToInject = contentToInject;
3846
this.onContentInjected = onContentInjected;
3947
this.bulkWriteThreshold = marker.length * 2 - 2;
4048
}
4149

4250
@Override
4351
public void write(int b) throws IOException {
44-
if (found) {
52+
if (!filter) {
53+
if (wasDraining) {
54+
// continue draining
55+
drain();
56+
}
4557
downstream.write(b);
4658
return;
4759
}
4860

49-
if (bufferFilled) {
61+
if (count == lookbehind.length) {
5062
downstream.write(lookbehind[pos]);
63+
} else {
64+
count++;
5165
}
5266

5367
lookbehind[pos] = (byte) b;
5468
pos = (pos + 1) % lookbehind.length;
5569

56-
if (!bufferFilled) {
57-
bufferFilled = pos == 0;
58-
}
59-
6070
if (marker[matchingPos++] == b) {
6171
if (matchingPos == marker.length) {
62-
found = true;
72+
filter = false;
6373
downstream.write(contentToInject);
6474
if (onContentInjected != null) {
6575
onContentInjected.run();
@@ -73,18 +83,23 @@ public void write(int b) throws IOException {
7383

7484
@Override
7585
public void write(byte[] array, int off, int len) throws IOException {
76-
if (found) {
86+
if (!filter) {
87+
if (wasDraining) {
88+
// needs drain
89+
drain();
90+
}
7791
downstream.write(array, off, len);
7892
return;
7993
}
94+
8095
if (len > bulkWriteThreshold) {
8196
// if the content is large enough, we can bulk write everything but the N trail and tail.
8297
// This because the buffer can already contain some byte from a previous single write.
8398
// Also we need to fill the buffer with the tail since we don't know about the next write.
8499
int idx = arrayContains(array, off, len, marker);
85100
if (idx >= 0) {
86101
// we have a full match. just write everything
87-
found = true;
102+
filter = false;
88103
drain();
89104
downstream.write(array, off, idx);
90105
downstream.write(contentToInject);
@@ -99,7 +114,13 @@ public void write(byte[] array, int off, int len) throws IOException {
99114
write(array[i]);
100115
}
101116
drain();
117+
boolean tmpFilter = filter;
118+
119+
// will be reset if no errors after the following write
120+
filter = false;
102121
downstream.write(array, off + marker.length - 1, len - bulkWriteThreshold);
122+
filter = tmpFilter;
123+
103124
for (int i = len - marker.length + 1; i < len; i++) {
104125
write(array[i]);
105126
}
@@ -133,16 +154,19 @@ private int arrayContains(byte[] array, int off, int len, byte[] search) {
133154
}
134155

135156
private void drain() throws IOException {
136-
if (bufferFilled) {
137-
for (int i = 0; i < lookbehind.length; i++) {
138-
downstream.write(lookbehind[(pos + i) % lookbehind.length]);
157+
if (count > 0) {
158+
boolean tmpFilter = filter;
159+
filter = false;
160+
wasDraining = true;
161+
int start = (pos - count + lookbehind.length) % lookbehind.length;
162+
int cnt = count;
163+
for (int i = 0; i < cnt; i++) {
164+
downstream.write(lookbehind[(start + i) % lookbehind.length]);
165+
count--;
139166
}
140-
} else {
141-
downstream.write(this.lookbehind, 0, pos);
167+
filter = tmpFilter;
168+
wasDraining = false;
142169
}
143-
pos = 0;
144-
matchingPos = 0;
145-
bufferFilled = false;
146170
}
147171

148172
@Override
@@ -152,9 +176,12 @@ public void flush() throws IOException {
152176

153177
@Override
154178
public void close() throws IOException {
155-
if (!found) {
156-
drain();
179+
try {
180+
if (filter || wasDraining) {
181+
drain();
182+
}
183+
} finally {
184+
downstream.close();
157185
}
158-
downstream.close();
159186
}
160187
}

dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeWriter.java

Lines changed: 56 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,19 @@
55

66
/**
77
* A Writer containing a circular buffer with a lookbehind buffer of n bytes. The first time that
8-
* the latest n bytes matches the marker, a content is injected before.
8+
* the latest n bytes matches the marker, a content is injected before. In case of IOException
9+
* thrown by the downstream, the buffer will be lost unless the error occurred when draining it. In
10+
* this case the draining will be resumed.
911
*/
1012
public class InjectingPipeWriter extends Writer {
1113
private final char[] lookbehind;
1214
private int pos;
13-
private boolean bufferFilled;
15+
private int count;
1416
private final char[] marker;
1517
private final char[] contentToInject;
16-
private boolean found = false;
17-
private int matchingPos = 0;
18+
private boolean filter;
19+
private boolean wasDraining;
20+
private int matchingPos;
1821
private final Runnable onContentInjected;
1922
private final int bulkWriteThreshold;
2023
private final Writer downstream;
@@ -34,32 +37,39 @@ public InjectingPipeWriter(
3437
this.marker = marker;
3538
this.lookbehind = new char[marker.length];
3639
this.pos = 0;
40+
this.count = 0;
41+
this.matchingPos = 0;
42+
this.wasDraining = false;
43+
// should filter the stream to potentially inject into it.
44+
this.filter = true;
3745
this.contentToInject = contentToInject;
3846
this.onContentInjected = onContentInjected;
3947
this.bulkWriteThreshold = marker.length * 2 - 2;
4048
}
4149

4250
@Override
4351
public void write(int c) throws IOException {
44-
if (found) {
52+
if (!filter) {
53+
if (wasDraining) {
54+
// continue draining
55+
drain();
56+
}
4557
downstream.write(c);
4658
return;
4759
}
4860

49-
if (bufferFilled) {
61+
if (count == lookbehind.length) {
5062
downstream.write(lookbehind[pos]);
63+
} else {
64+
count++;
5165
}
5266

5367
lookbehind[pos] = (char) c;
5468
pos = (pos + 1) % lookbehind.length;
5569

56-
if (!bufferFilled) {
57-
bufferFilled = pos == 0;
58-
}
59-
6070
if (marker[matchingPos++] == c) {
6171
if (matchingPos == marker.length) {
62-
found = true;
72+
filter = false;
6373
downstream.write(contentToInject);
6474
if (onContentInjected != null) {
6575
onContentInjected.run();
@@ -71,25 +81,25 @@ public void write(int c) throws IOException {
7181
}
7282
}
7383

74-
@Override
75-
public void flush() throws IOException {
76-
downstream.flush();
77-
}
78-
7984
@Override
8085
public void write(char[] array, int off, int len) throws IOException {
81-
if (found) {
86+
if (!filter) {
87+
if (wasDraining) {
88+
// needs drain
89+
drain();
90+
}
8291
downstream.write(array, off, len);
8392
return;
8493
}
94+
8595
if (len > bulkWriteThreshold) {
8696
// if the content is large enough, we can bulk write everything but the N trail and tail.
8797
// This because the buffer can already contain some byte from a previous single write.
8898
// Also we need to fill the buffer with the tail since we don't know about the next write.
8999
int idx = arrayContains(array, off, len, marker);
90100
if (idx >= 0) {
91101
// we have a full match. just write everything
92-
found = true;
102+
filter = false;
93103
drain();
94104
downstream.write(array, off, idx);
95105
downstream.write(contentToInject);
@@ -104,7 +114,13 @@ public void write(char[] array, int off, int len) throws IOException {
104114
write(array[i]);
105115
}
106116
drain();
117+
boolean tmpFilter = filter;
118+
119+
// will be reset if no errors after the following write
120+
filter = false;
107121
downstream.write(array, off + marker.length - 1, len - bulkWriteThreshold);
122+
filter = tmpFilter;
123+
108124
for (int i = len - marker.length + 1; i < len; i++) {
109125
write(array[i]);
110126
}
@@ -138,23 +154,34 @@ private int arrayContains(char[] array, int off, int len, char[] search) {
138154
}
139155

140156
private void drain() throws IOException {
141-
if (bufferFilled) {
142-
for (int i = 0; i < lookbehind.length; i++) {
143-
downstream.write(lookbehind[(pos + i) % lookbehind.length]);
157+
if (count > 0) {
158+
boolean tmpFilter = filter;
159+
filter = false;
160+
wasDraining = true;
161+
int start = (pos - count + lookbehind.length) % lookbehind.length;
162+
int cnt = count;
163+
for (int i = 0; i < cnt; i++) {
164+
downstream.write(lookbehind[(start + i) % lookbehind.length]);
165+
count--;
144166
}
145-
} else {
146-
downstream.write(this.lookbehind, 0, pos);
167+
filter = tmpFilter;
168+
wasDraining = false;
147169
}
148-
pos = 0;
149-
matchingPos = 0;
150-
bufferFilled = false;
170+
}
171+
172+
@Override
173+
public void flush() throws IOException {
174+
downstream.flush();
151175
}
152176

153177
@Override
154178
public void close() throws IOException {
155-
if (!found) {
156-
drain();
179+
try {
180+
if (filter || wasDraining) {
181+
drain();
182+
}
183+
} finally {
184+
downstream.close();
157185
}
158-
downstream.close();
159186
}
160187
}

dd-java-agent/agent-bootstrap/src/test/groovy/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeOutputStreamTest.groovy

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,37 @@
11
package datadog.trace.bootstrap.instrumentation.buffer
22

33
import datadog.trace.test.util.DDSpecification
4+
import org.apache.commons.io.IOUtils
45

56
class InjectingPipeOutputStreamTest extends DDSpecification {
7+
static class GlitchedOutputStream extends FilterOutputStream {
8+
int glitchesPos
9+
int count
10+
11+
GlitchedOutputStream(OutputStream out, int glitchesPos) {
12+
super(out)
13+
this.glitchesPos = glitchesPos
14+
}
15+
16+
@Override
17+
void write(byte[] b, int off, int len) throws IOException {
18+
count += len
19+
if (count >= glitchesPos) {
20+
glitchesPos = Integer.MAX_VALUE
21+
throw new IOException("Glitched after $count bytes")
22+
}
23+
out.write(b, off, len)
24+
}
25+
26+
@Override
27+
void write(int b) throws IOException {
28+
if (++count == glitchesPos) {
29+
throw new IOException("Glitched after $glitchesPos bytes")
30+
}
31+
out.write(b)
32+
}
33+
}
34+
635
def 'should filter a buffer and inject if found #found'() {
736
setup:
837
def downstream = new ByteArrayOutputStream()
@@ -20,4 +49,37 @@ class InjectingPipeOutputStreamTest extends DDSpecification {
2049
"<html><body/></html>" | "</head>" | "<something/>" | false | "<html><body/></html>"
2150
"<foo/>" | "<longerThanFoo>" | "<nothing>" | false | "<foo/>"
2251
}
52+
53+
def 'should be resilient to exceptions when writing #body'() {
54+
setup:
55+
def baos = new ByteArrayOutputStream()
56+
def downstream = new GlitchedOutputStream(baos, glichesAt)
57+
def piped = new InjectingPipeOutputStream(downstream, marker.getBytes("UTF-8"), contentToInject.getBytes("UTF-8"), null)
58+
when:
59+
try {
60+
for (String line : body) {
61+
final bytes = line.getBytes("UTF-8")
62+
try {
63+
piped.write(bytes)
64+
} catch (IOException ioe) {
65+
ioe.printStackTrace()
66+
piped.write(bytes)
67+
}
68+
}
69+
} finally {
70+
IOUtils.closeQuietly(piped) // it can throw when draining at close
71+
}
72+
then:
73+
assert baos.toByteArray() == expected.getBytes("UTF-8")
74+
where:
75+
body | marker | contentToInject | glichesAt | expected
76+
// write fails after the content has been injected
77+
["<html>", "<head>", "<foo/>", "</head>", "<body/>", "</html>"] | "</head>" | "<script>true</script>" | 60 | "<html><head><foo/><script>true</script></head><body/></html>"
78+
// write fails before the content has been injected
79+
["<html>", "<head>", "<foo/>", "</head>", "<body/>", "</html>"] | "</head>" | "<script>true</script>" | 20 | "<html><head><foo/></head><body/></html>"
80+
// write fails after having filled the buffer. The last line is written twice
81+
["<html>", "<body/>", "</html>"] | "</head>" | "<something/>" | 10 | "<html><body/></h</html>"
82+
// expected broken since the real write happens at close (drain) being the content smaller than the buffer. And retry on close is not a common practice. Hence, we suppose loosing content
83+
["<foo/>"] | "<longerThanFoo>" | "<nothing>" | 3 | "<f"
84+
}
2385
}

0 commit comments

Comments
 (0)