Skip to content

Commit 68a0c5e

Browse files
committed
review fix
1 parent 9d1fad8 commit 68a0c5e

File tree

3 files changed

+137
-72
lines changed

3 files changed

+137
-72
lines changed
Lines changed: 28 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,22 @@
1-
import { Readable } from "node:stream";
2-
import { ReadableStream } from "node:stream/web";
31
import type { NextApiRequest, NextApiResponse } from "next";
42

5-
function iteratorToStream(iterator: AsyncIterator<Uint8Array>) {
6-
return new ReadableStream({
7-
async pull(controller) {
8-
const { value, done } = await iterator.next();
9-
10-
if (done) {
11-
controller.close();
12-
} else {
13-
controller.enqueue(value);
14-
}
15-
},
16-
});
17-
}
18-
19-
function sleep(time: number) {
3+
const SADE_SMOOTH_OPERATOR_LYRIC = `Diamond life, lover boy
4+
He move in space with minimum waste and maximum joy
5+
City lights and business nights
6+
When you require streetcar desire for higher heights
7+
No place for beginners or sensitive hearts
8+
When sentiment is left to chance
9+
No place to be ending but somewhere to start
10+
No need to ask, he's a smooth operator
11+
Smooth operator, smooth operator
12+
Smooth operator`;
13+
14+
function sleep(ms: number) {
2015
return new Promise((resolve) => {
21-
setTimeout(resolve, time);
16+
setTimeout(resolve, ms);
2217
});
2318
}
2419

25-
const encoder = new TextEncoder();
26-
27-
async function* makeIterator() {
28-
for (let i = 1; i <= 10; i++) {
29-
yield encoder.encode(`<p data-testid="iteratorCount">${i}</p>`);
30-
await sleep(1000);
31-
}
32-
}
33-
3420
export default async function handler(
3521
req: NextApiRequest,
3622
res: NextApiResponse,
@@ -39,14 +25,23 @@ export default async function handler(
3925
return res.status(405).json({ message: "Method not allowed" });
4026
}
4127

42-
res.setHeader("Content-Type", "text/html; charset=utf-8");
28+
res.setHeader("Content-Type", "text/event-stream");
4329
res.setHeader("Connection", "keep-alive");
4430
res.setHeader("Cache-Control", "no-cache, no-transform");
31+
res.setHeader("Transfer-Encoding", "chunked");
32+
33+
res.write(
34+
`data: ${JSON.stringify({ type: "start", model: "ai-lyric-model" })}\n\n`,
35+
);
36+
await sleep(1000);
37+
38+
const lines = SADE_SMOOTH_OPERATOR_LYRIC.split("\n");
39+
for (const line of lines) {
40+
res.write(`data: ${JSON.stringify({ type: "content", body: line })}\n\n`);
41+
await sleep(1000);
42+
}
4543

46-
// create and pipe the stream
47-
const iterator = makeIterator();
48-
const stream = iteratorToStream(iterator);
44+
res.write(`data: ${JSON.stringify({ type: "complete" })}\n\n`);
4945

50-
// we need to import ReadableStream from `node:stream/web` to make TypeScript happy
51-
return Readable.fromWeb(stream).pipe(res);
46+
res.end();
5247
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
"use client";
2+
3+
import { useEffect, useState } from "react";
4+
5+
type Event = {
6+
type: "start" | "content" | "complete";
7+
model?: string;
8+
body?: string;
9+
};
10+
11+
export default function SSE() {
12+
const [events, setEvents] = useState<Event[]>([]);
13+
const [finished, setFinished] = useState(false);
14+
15+
useEffect(() => {
16+
const e = new EventSource("/api/streaming");
17+
18+
e.onmessage = (msg) => {
19+
console.log(msg);
20+
try {
21+
const data = JSON.parse(msg.data) as Event;
22+
if (data.type === "complete") {
23+
e.close();
24+
setFinished(true);
25+
}
26+
if (data.type === "content") {
27+
setEvents((prev) => prev.concat(data));
28+
}
29+
} catch (err) {
30+
console.error(err, msg);
31+
}
32+
};
33+
}, []);
34+
35+
return (
36+
<div
37+
style={{
38+
padding: "20px",
39+
marginBottom: "20px",
40+
display: "flex",
41+
flexDirection: "column",
42+
gap: "40px",
43+
}}
44+
>
45+
<h1
46+
style={{
47+
fontSize: "2rem",
48+
marginBottom: "20px",
49+
}}
50+
>
51+
Sade - Smooth Operator
52+
</h1>
53+
<div>
54+
{events.map((e, i) => (
55+
<p data-testid="line" key={i}>
56+
{e.body}
57+
</p>
58+
))}
59+
</div>
60+
{finished && (
61+
<iframe
62+
data-testid="video"
63+
width="560"
64+
height="315"
65+
src="https://www.youtube.com/embed/4TYv2PhG89A?si=e1fmpiXZZ1PBKPE5"
66+
title="YouTube video player"
67+
allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture; web-share"
68+
referrerPolicy="strict-origin-when-cross-origin"
69+
allowFullScreen
70+
></iframe>
71+
)}
72+
</div>
73+
);
74+
}
Lines changed: 35 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,54 +1,50 @@
11
import { expect, test } from "@playwright/test";
22

3-
test("streaming should work in api route", async ({ page }) => {
4-
const ITERATOR_LENGTH = 10;
5-
6-
const res = await page.goto("/api/streaming", {
7-
// we set waitUntil: "commit" to ensure that the response is streamed
8-
// without this option, the response would be buffered and sent all at once
9-
// we could also drop the `await` aswell, but then we can't see the headers first.
10-
waitUntil: "commit",
11-
});
3+
const SADE_SMOOTH_OPERATOR_LYRIC = `Diamond life, lover boy
4+
He move in space with minimum waste and maximum joy
5+
City lights and business nights
6+
When you require streetcar desire for higher heights
7+
No place for beginners or sensitive hearts
8+
When sentiment is left to chance
9+
No place to be ending but somewhere to start
10+
No need to ask, he's a smooth operator
11+
Smooth operator, smooth operator
12+
Smooth operator`;
1213

13-
expect(res?.headers()["content-type"]).toBe("text/html; charset=utf-8");
14-
expect(res?.headers()["cache-control"]).toBe("no-cache, no-transform");
15-
// AWS API Gateway remaps the connection header to `x-amzn-remapped-connection`
16-
expect(res?.headers()["x-amzn-remapped-connection"]).toBe("keep-alive");
14+
test("streaming should work in api route", async ({ page }) => {
15+
await page.goto("/sse");
1716

18-
// wait for first number to be present
19-
await page.getByTestId("iteratorCount").first().waitFor();
17+
// wait for first line to be present
18+
await page.getByTestId("line").first().waitFor();
19+
const initialLines = await page.getByTestId("line").count();
20+
// fail if all lines appear at once
21+
// this is a safeguard to ensure that the response is streamed and not buffered all at once
22+
expect(initialLines).toBe(1);
2023

21-
const seenNumbers: Array<{ number: string; time: number }> = [];
24+
const seenLines: Array<{ line: string; time: number }> = [];
2225
const startTime = Date.now();
2326

24-
const initialParagraphs = await page.getByTestId("iteratorCount").count();
25-
// fail if all paragraphs appear at once
26-
// this is a safeguard to ensure that the response is streamed and not buffered all at once
27-
expect(initialParagraphs).toBe(1);
28-
29-
while (
30-
seenNumbers.length < ITERATOR_LENGTH &&
31-
Date.now() - startTime < 11000
32-
) {
33-
const elements = await page.getByTestId("iteratorCount").all();
34-
if (elements.length > seenNumbers.length) {
35-
expect(elements.length).toBe(seenNumbers.length + 1);
36-
const newElement = elements[elements.length - 1];
37-
seenNumbers.push({
38-
number: await newElement.innerText(),
27+
// we loop until we see all lines
28+
while (seenLines.length < SADE_SMOOTH_OPERATOR_LYRIC.split("\n").length) {
29+
const lines = await page.getByTestId("line").all();
30+
if (lines.length > seenLines.length) {
31+
expect(lines.length).toBe(seenLines.length + 1);
32+
const newLine = lines[lines.length - 1];
33+
seenLines.push({
34+
line: await newLine.innerText(),
3935
time: Date.now() - startTime,
4036
});
4137
}
42-
await page.waitForTimeout(100);
38+
// wait for a bit before checking again
39+
await page.waitForTimeout(200);
4340
}
4441

45-
expect(seenNumbers.map((n) => n.number)).toEqual(
46-
[...Array(ITERATOR_LENGTH)].map((_, i) => String(i + 1)),
42+
expect(seenLines.map((n) => n.line)).toEqual(
43+
SADE_SMOOTH_OPERATOR_LYRIC.split("\n"),
4744
);
48-
49-
// verify streaming timing
50-
for (let i = 1; i < seenNumbers.length; i++) {
51-
const timeDiff = seenNumbers[i].time - seenNumbers[i - 1].time;
52-
expect(timeDiff).toBeGreaterThanOrEqual(100);
45+
for (let i = 1; i < seenLines.length; i++) {
46+
expect(seenLines[i].time - seenLines[i - 1].time).toBeGreaterThan(500);
5347
}
48+
49+
await expect(page.getByTestId("video")).toBeVisible();
5450
});

0 commit comments

Comments
 (0)