Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.function.Supplier;

import org.eclipse.jetty.http.HttpTokens.EndOfContent;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Index;
import org.eclipse.jetty.util.StringUtil;
Expand Down Expand Up @@ -246,7 +247,7 @@ public Result generateRequest(MetaData.Request info, ByteBuffer header, ByteBuff

case COMMITTED:
{
return committed(chunk, content, last);
return committed(info, chunk, content, last);
}

case COMPLETING:
Expand All @@ -268,11 +269,14 @@ public Result generateRequest(MetaData.Request info, ByteBuffer header, ByteBuff
}
}

private Result committed(ByteBuffer chunk, ByteBuffer content, boolean last)
private Result committed(MetaData info, ByteBuffer chunk, ByteBuffer content, boolean last)
{
int len = BufferUtil.length(content);
long len = BufferUtil.length(content);
Content.Source source = info.getContentSource();

// handle the content.
// Handle the content.
if (len == 0 && source != null)
len = source.getLength();
if (len > 0)
{
if (isChunking())
Expand Down Expand Up @@ -401,15 +405,18 @@ else if (status == HttpStatus.NO_CONTENT_204 || status == HttpStatus.NOT_MODIFIE

generateHeaders(header, content, last);

// handle the content.
int len = BufferUtil.length(content);
// Handle the given content.
long len = BufferUtil.length(content);
Content.Source source = info.getContentSource();
if (len == 0 && source != null)
len = source.getLength();
if (len > 0)
{
_contentPrepared += len;
if (isChunking() && !head)
prepareChunk(header, len);
}
_state = last ? State.COMPLETING : State.COMMITTED;
_state = last && source == null ? State.COMPLETING : State.COMMITTED;
}
catch (BufferOverflowException e)
{
Expand All @@ -432,7 +439,7 @@ else if (status == HttpStatus.NO_CONTENT_204 || status == HttpStatus.NOT_MODIFIE

case COMMITTED:
{
return committed(chunk, content, last);
return committed(info, chunk, content, last);
}

case COMPLETING_1XX:
Expand Down Expand Up @@ -474,7 +481,7 @@ public void servletUpgrade()
startTunnel();
}

private void prepareChunk(ByteBuffer chunk, int remaining)
private void prepareChunk(ByteBuffer chunk, long remaining)
{
// if we need CRLF add this to header
if (_needCRLF)
Expand All @@ -483,7 +490,8 @@ private void prepareChunk(ByteBuffer chunk, int remaining)
// Add the chunk size to the header
if (remaining > 0)
{
BufferUtil.putHexInt(chunk, remaining);
// TODO: we need a long as required by RFC 9110.
BufferUtil.putHexInt(chunk, (int)remaining);
BufferUtil.putCRLF(chunk);
_needCRLF = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.Objects;
import java.util.function.Supplier;

import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.util.NanoTime;

/**
Expand All @@ -25,7 +26,7 @@
* <p>Specific HTTP response information is captured by {@link Response}.</p>
* <p>HTTP trailers information is captured by {@link MetaData}.</p>
*/
public class MetaData implements Iterable<HttpField>
public class MetaData implements Iterable<HttpField>, Content.Source.Aware
{
/**
* <p>Returns whether the given HTTP request method and HTTP response status code
Expand All @@ -44,6 +45,8 @@ public static boolean isTunnel(String method, int status)
private final HttpFields _httpFields;
private final long _contentLength;
private final Supplier<HttpFields> _trailers;
// TODO: final or not?
private Content.Source _source;

public MetaData(HttpVersion version, HttpFields fields)
{
Expand Down Expand Up @@ -105,6 +108,18 @@ public Supplier<HttpFields> getTrailersSupplier()
return _trailers;
}

@Override
public Content.Source getContentSource()
{
return _source;
}

@Override
public void setContentSource(Content.Source source)
{
_source = source;
}

/**
* Get the length of the content in bytes.
* @return the length of the content in bytes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.eclipse.jetty.io.internal.ContentSourceConsumer;
import org.eclipse.jetty.io.internal.ContentSourceRetainableByteBuffer;
import org.eclipse.jetty.io.internal.ContentSourceString;
import org.eclipse.jetty.io.internal.Transferable;
import org.eclipse.jetty.util.Blocker;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
Expand Down Expand Up @@ -106,6 +107,38 @@ public static void copy(Source source, Sink sink, Chunk.Processor chunkProcessor
new ContentCopier(source, sink, chunkProcessor, callback).iterate();
}

public static void copy(Source source, boolean last, Sink sink, Callback callback)
{
new ContentCopier(source, last, sink, null, callback).iterate();
}

private static void copyRange(Source source, long length, Sink sink, Callback callback)
{
// TODO: it would be really difficult to make a source remember the bytes...
// a subsequent call with the same source cannot have stored a chunk that
// it returned previously, so do we really need a range?
// Isn't the length always implicit to be the full length?
// In HTTP/2 a large write is chunked and the write callback is not completed
// until all the chunks are written (we store the BB in a DATA frame, and we
// consume the BB chunk by chunk).
// How can we do the same with a Source?
// We can read a BB, wrap it in a DATA frame, even if larger than maxFrameSize
// or flow control, as the Flusher will remember it.
// But for transferTo(), we need a similar way for a Source to have position
// and limit that a BB has, so perhaps we need a Source.Seekable.
}

public static boolean transfer(Source source, long length, Sink sink, Callback callback)
{
if (source instanceof Transferable.From from)
{
if (from.transferTo(sink, length, callback))
return true;
}
copyRange(source, length, sink, callback);
return false;
}

/**
* <p>A source of content that can be read with a read/demand model.</p>
* <p>To avoid leaking its resources, a source <b>must</b> either:</p>
Expand Down Expand Up @@ -175,6 +208,13 @@ interface Factory
Content.Source newContentSource(ByteBufferPool.Sized bufferPool, long offset, long length);
}

interface Aware
{
Source getContentSource();

void setContentSource(Source source);
}
Comment on lines +211 to +216
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #13656 as an alternate way to tunnel a Source through a ByteBuffer only API.
Although I'm hating the current iteration of Aware less than I did originally


/**
* Create a {@code Content.Source} from zero or more {@link ByteBuffer}s
* @param byteBuffers The {@link ByteBuffer}s to use as the source.
Expand Down Expand Up @@ -657,6 +697,8 @@ default boolean rewind()
*/
public interface Sink
{
ByteBuffer TRANSFER = ByteBuffer.allocate(0);

/**
* <p>Wraps the given {@link OutputStream} as a {@link Sink}.
* @param out The stream to wrap
Expand Down Expand Up @@ -866,6 +908,36 @@ static void write(Sink sink, boolean last, String utf8Content, Callback callback
sink.write(last, ByteBuffer.wrap(utf8Content.getBytes(StandardCharsets.UTF_8)), callback);
}

static void write(Sink sink, boolean last, Content.Source source, Callback callback)
{
Content.Source.Aware aware = findContentSourceAware(sink);
if (aware != null)
{
// Optimization to enable zero-copy.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explain in the comment how the optimization still uses the sink.write path, so that commit logic etc. can be triggered. Specifically, we do not do aware.transfer(source, callback) because we might need to commit the response.

aware.setContentSource(source);
sink.write(last, TRANSFER, callback);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel that we should allow some failures on the callback here to fall through to a normal copy. I.e. if something in the implementation is Aware but cannot do a transferTo (perhaps because it would be non optimal) then it can fail this write with a TransferToFailed exception, which would then just do the normal copy below.

}
else
{
// Normal source.read() + sink.write() full copy.
Content.copy(source, last, sink, callback);
}
}

private static Content.Source.Aware findContentSourceAware(Sink sink)
{
while (true)
{
if (sink instanceof Content.Source.Aware aware)
return aware;
if (sink instanceof Wrapper wrapper)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if the sink is wrapped then we should not bypass it with transferTo
Perhaps we need an AwareWrapper, that would allow itself to be bypassed?

sink = wrapper.getWrapped();
else
break;
}
return null;
}

/**
* <p>Writes the given {@link ByteBuffer}, notifying the {@link Callback}
* when the write is complete.</p>
Expand All @@ -878,6 +950,27 @@ static void write(Sink sink, boolean last, String utf8Content, Callback callback
* @param callback the callback to notify when the write operation is complete
*/
void write(boolean last, ByteBuffer byteBuffer, Callback callback);

class Wrapper implements Sink
{
private final Sink wrapped;

public Wrapper(Sink wrapped)
{
this.wrapped = wrapped;
}

public Sink getWrapped()
{
return wrapped;
}

@Override
public void write(boolean last, ByteBuffer byteBuffer, Callback callback)
{
getWrapped().write(last, byteBuffer, callback);
}
}
}

/**
Expand Down
Loading
Loading