Skip to content

[TEST][NO-MERGE] Stress test named pipes #365

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions bridge/src/main/scala/protocbridge/frontend/PluginFrontend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,16 @@ object PluginFrontend {
gen: ProtocCodeGenerator,
request: Array[Byte]
): Array[Byte] = {
Try {
// Use try-catch to handle all Throwable including OutOfMemoryError, StackOverflowError, etc.
try {
gen.run(request)
}.recover { case throwable =>
createCodeGeneratorResponseWithError(
throwable.toString + "\n" + getStackTrace(throwable)
)
}.get
} catch {
case throwable: Throwable =>
System.err.println("createCodeGeneratorResponseWithError...")
createCodeGeneratorResponseWithError(
throwable.toString + "\n" + getStackTrace(throwable)
)
}
}

def createCodeGeneratorResponseWithError(error: String): Array[Byte] = {
Expand Down Expand Up @@ -117,7 +120,9 @@ object PluginFrontend {
fsin: InputStream,
env: ExtraEnv
): Array[Byte] = {
// System.err.println("readInputStreamToByteArrayWithEnv...")
val bytes = readInputStreamToByteArrayWithEnv(fsin, env)
// System.err.println("runWithBytes...")
runWithBytes(gen, bytes)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,42 @@ object PosixPluginFrontend extends PluginFrontend {

Future {
blocking {
val fsin = Files.newInputStream(inputPipe)
val response = PluginFrontend.runWithInputStream(plugin, fsin, env)
fsin.close()
try {
// System.err.println("Files.newInputStream...")
val fsin = Files.newInputStream(inputPipe)
// System.err.println("PluginFrontend.runWithInputStream...")
val response = PluginFrontend.runWithInputStream(plugin, fsin, env)
// System.err.println("fsin.close...")
fsin.close()

val fsout = Files.newOutputStream(outputPipe)
fsout.write(response)
fsout.close()
// System.err.println("Files.newOutputStream...")
val fsout = Files.newOutputStream(outputPipe)
// System.err.println("fsout.write...")
fsout.write(response)
// System.err.println("fsout.close...")
fsout.close()

// System.err.println("blocking done.")
} catch {
case e: Throwable =>
// Handles rare exceptions not already gracefully handled in `runWithBytes`.
// Such exceptions aren't converted to `CodeGeneratorResponse`
// because `fsin` might not be fully consumed,
// therefore the plugin shell script might hang on `inputPipe`,
// and never consume `CodeGeneratorResponse`.
System.err.println("Exception occurred in PluginFrontend outside runWithBytes")
e.printStackTrace(System.err)
// Force an exit of the program.
// This is because the plugin shell script might hang on `inputPipe`,
// due to `fsin` not fully consumed.
// Or it might hang on `outputPipe`, due to `fsout` not closed.
// We can't simply close `fsout` here either,
// because `Files.newOutputStream(outputPipe)` will hang
// if `outputPipe` is not yet opened by the plugin shell script for reading.
// Therefore, the program might be stuck waiting for protoc,
// which in turn is waiting for the plugin shell script.
sys.exit(1)
}
}
}
(sh, InternalState(inputPipe, outputPipe, tempDirPath, sh))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package protocbridge.frontend

import org.apache.commons.io.IOUtils
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.must.Matchers
import protocbridge.{ExtraEnv, ProtocCodeGenerator}

import java.io.ByteArrayOutputStream
import scala.sys.process.ProcessIO
import scala.util.Random

class PosixPluginFrontendSpec extends AnyFlatSpec with Matchers {
if (!PluginFrontend.isWindows) {
it must "execute a program that forwards input and output to given stream" in {
val random = new Random()
val toSend = Array.fill(123)(random.nextInt(256).toByte)
val toReceive = Array.fill(456)(random.nextInt(256).toByte)
val env = new ExtraEnv(secondaryOutputDir = "tmp")

val fakeGenerator = new ProtocCodeGenerator {
override def run(request: Array[Byte]): Array[Byte] = {
request mustBe (toSend ++ env.toByteArrayAsField)
toReceive
}
}

// Repeat 10,000 times since named pipes on macOS are flaky.
val repeatCount = 10000
for (i <- 1 to repeatCount) {
if (i % 100 == 1) println(s"Running iteration $i of $repeatCount")

val (path, state) = PosixPluginFrontend.prepare(
fakeGenerator,
env
)
val actualOutput = new ByteArrayOutputStream()
val process = sys.process
.Process(path.toAbsolutePath.toString)
.run(new ProcessIO(writeInput => {
writeInput.write(toSend)
writeInput.close()
}, processOutput => {
IOUtils.copy(processOutput, actualOutput)
processOutput.close()
}, _.close()))
process.exitValue()
actualOutput.toByteArray mustBe toReceive
PosixPluginFrontend.cleanup(state)
}
}
}
}
24 changes: 24 additions & 0 deletions dd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import sys
import time

total_bytes = 0
input_file = sys.argv[1] if len(sys.argv) > 1 else "/dev/stdin"
output_file = sys.argv[2] if len(sys.argv) > 2 else "/dev/stdout"

sys.stderr.write(f"[{time.time_ns()}][dd.py] Opening input stream\n")
with open(input_file, "rb") as input_stream:
sys.stderr.write(f"[{time.time_ns()}][dd.py] Opened input stream\n")

sys.stderr.write(f"[{time.time_ns()}][dd.py] Opening output stream\n")
with open(output_file, "wb") as output_stream:
sys.stderr.write(f"[{time.time_ns()}][dd.py] Opened output stream\n")

while True:
chunk = input_stream.read(4096)
if not chunk:
break
output_stream.write(chunk)
total_bytes += len(chunk)
sys.stderr.write(f"[{time.time_ns()}][dd.py] Transferred {len(chunk)} bytes, total {total_bytes}\n")

sys.stderr.write(f"[{time.time_ns()}][dd.py] Transferred total {total_bytes} bytes\n")
96 changes: 96 additions & 0 deletions pipe_stress_test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
#!/usr/bin/env bash

BYTE_LENGTH="$1"
READER_MODE="${2:-dd}"
WRITER_MODE="${3:-dd}"

TEST_FILE="$(mktemp -t protopipe)"
dd if=/dev/urandom of="$TEST_FILE" bs=1 count="$BYTE_LENGTH" 2>/dev/null

test_pipe() {
# Create a unique temporary directory for the pipe
PIPE_DIR=$(mktemp -d -t protopipe)
PIPE_PATH="$PIPE_DIR/output"
PIPE_DATA_PATH="$PIPE_DIR/pipe_data"

# Create the named pipe
mkfifo "$PIPE_PATH"
echo "Created named pipe at $PIPE_PATH"

# Start monitoring the pipe using fs_usage in the background
# sudo fs_usage -w | grep "$PIPE_PATH" &
# MONITOR_PID=$!
# echo "Started monitoring the pipe $PIPE_PATH (PID: $MONITOR_PID)"

# Start dumping random bytes to the pipe in the background
if [[ "$READER_MODE" == "dd" ]]; then
(dd if="$TEST_FILE" of="$PIPE_PATH" 2>/dev/null && echo "Completed dumping random bytes to the pipe") &
elif [[ "$READER_MODE" == "dd.py" ]]; then
(python3 "$(dirname "$0")"/dd.py "$TEST_FILE" "$PIPE_PATH" && echo "Completed dumping random bytes to the pipe") &
elif [[ "$READER_MODE" == "cat" ]]; then
(cat "$TEST_FILE" > "$PIPE_PATH" && echo "Completed dumping random bytes to the pipe") &
else
echo "Invalid reader mode: $READER_MODE"
exit 1
fi
DUMP_PID=$!
echo "Started dumping random bytes to the pipe (PID: $DUMP_PID)"

# Randomize the sleep duration
SLEEP_DURATION=$((RANDOM % 100))
echo "Sleeping for: $SLEEP_DURATION milliseconds"
sleep "$(echo "scale=3; $SLEEP_DURATION/1000" | bc)"

# Start a process to consume the data from the pipe
if [[ "$WRITER_MODE" == "dd" ]]; then
(dd if="$PIPE_PATH" of="$PIPE_DATA_PATH" 2>/dev/null && echo "Completed consuming random bytes from the pipe") &
elif [[ "$WRITER_MODE" == "dd.py" ]]; then
(python3 "$(dirname "$0")"/dd.py "$PIPE_PATH" "$PIPE_DATA_PATH" && echo "Completed consuming random bytes from the pipe") &
elif [[ "$WRITER_MODE" == "cat" ]]; then
(cat "$PIPE_PATH" > "$PIPE_DATA_PATH" && echo "Completed consuming random bytes from the pipe") &
else
echo "Invalid writer mode: $WRITER_MODE"
exit 1
fi
CONSUME_PID=$!
echo "Started consuming data from the pipe (PID: $CONSUME_PID)"

# Ensure the dumping process is killed
wait $DUMP_PID 2>/dev/null
echo "The dumping process has stopped (PID: $DUMP_PID)"

# Ensure the consuming process is killed
wait $CONSUME_PID 2>/dev/null
echo "The consuming process has stopped (PID: $CONSUME_PID)"

# Stop the monitoring
# kill $MONITOR_PID 2>/dev/null
# wait $MONITOR_PID 2>/dev/null
# echo "Stopped monitoring the pipe (PID: $MONITOR_PID)"

# Check the size of the data read from the pipe
DATA_SIZE=$(wc -c < "$PIPE_DATA_PATH")
if [ "$DATA_SIZE" -ne "$BYTE_LENGTH" ]; then
echo "Error: Expected $BYTE_LENGTH bytes, but read $DATA_SIZE bytes"
exit 1
else
echo "Successfully read $BYTE_LENGTH bytes from the pipe"
fi

# Remove the pipe
rm "$PIPE_PATH"
rm "$PIPE_DATA_PATH"

# Remove the temporary directory
rmdir "$PIPE_DIR"
}

# Kill existing fs_usage instances
# sudo pkill fs_usage

# Repeat the process
counter=0;
while test_pipe; do
((counter++)); echo "Iterations completed: $counter";
done
echo "Command failed after $counter successful iterations."