, Content.Source.Aware
{
/**
* Returns whether the given HTTP request method and HTTP response status code
@@ -44,6 +45,8 @@ public static boolean isTunnel(String method, int status)
private final HttpFields _httpFields;
private final long _contentLength;
private final Supplier _trailers;
+ // TODO: final or not?
+ private Content.Source _source;
public MetaData(HttpVersion version, HttpFields fields)
{
@@ -105,6 +108,18 @@ public Supplier getTrailersSupplier()
return _trailers;
}
+ @Override
+ public Content.Source getContentSource()
+ {
+ return _source;
+ }
+
+ @Override
+ public void setContentSource(Content.Source source)
+ {
+ _source = source;
+ }
+
/**
* Get the length of the content in bytes.
* @return the length of the content in bytes
diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/Content.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/Content.java
index db43a898ea37..d07a341908e2 100644
--- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/Content.java
+++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/Content.java
@@ -44,6 +44,7 @@
import org.eclipse.jetty.io.internal.ContentSourceConsumer;
import org.eclipse.jetty.io.internal.ContentSourceRetainableByteBuffer;
import org.eclipse.jetty.io.internal.ContentSourceString;
+import org.eclipse.jetty.io.internal.Transferable;
import org.eclipse.jetty.util.Blocker;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
@@ -106,6 +107,38 @@ public static void copy(Source source, Sink sink, Chunk.Processor chunkProcessor
new ContentCopier(source, sink, chunkProcessor, callback).iterate();
}
+ public static void copy(Source source, boolean last, Sink sink, Callback callback)
+ {
+ new ContentCopier(source, last, sink, null, callback).iterate();
+ }
+
+ private static void copyRange(Source source, long length, Sink sink, Callback callback)
+ {
+ // TODO: it would be really difficult to make a source remember the bytes...
+ // a subsequent call with the same source cannot have stored a chunk that
+ // it returned previously, so do we really need a range?
+ // Isn't the length always implicit to be the full length?
+ // In HTTP/2 a large write is chunked and the write callback is not completed
+ // until all the chunks are written (we store the BB in a DATA frame, and we
+ // consume the BB chunk by chunk).
+ // How can we do the same with a Source?
+ // We can read a BB, wrap it in a DATA frame, even if larger than maxFrameSize
+ // or flow control, as the Flusher will remember it.
+ // But for transferTo(), we need a similar way for a Source to have position
+ // and limit that a BB has, so perhaps we need a Source.Seekable.
+ }
+
+ public static boolean transfer(Source source, long length, Sink sink, Callback callback)
+ {
+ if (source instanceof Transferable.From from)
+ {
+ if (from.transferTo(sink, length, callback))
+ return true;
+ }
+ copyRange(source, length, sink, callback);
+ return false;
+ }
+
/**
* A source of content that can be read with a read/demand model.
* To avoid leaking its resources, a source must either:
@@ -175,6 +208,13 @@ interface Factory
Content.Source newContentSource(ByteBufferPool.Sized bufferPool, long offset, long length);
}
+ interface Aware
+ {
+ Source getContentSource();
+
+ void setContentSource(Source source);
+ }
+
/**
* Create a {@code Content.Source} from zero or more {@link ByteBuffer}s
* @param byteBuffers The {@link ByteBuffer}s to use as the source.
@@ -657,6 +697,8 @@ default boolean rewind()
*/
public interface Sink
{
+ ByteBuffer TRANSFER = ByteBuffer.allocate(0);
+
/**
* Wraps the given {@link OutputStream} as a {@link Sink}.
* @param out The stream to wrap
@@ -866,6 +908,36 @@ static void write(Sink sink, boolean last, String utf8Content, Callback callback
sink.write(last, ByteBuffer.wrap(utf8Content.getBytes(StandardCharsets.UTF_8)), callback);
}
+ static void write(Sink sink, boolean last, Content.Source source, Callback callback)
+ {
+ Content.Source.Aware aware = findContentSourceAware(sink);
+ if (aware != null)
+ {
+ // Optimization to enable zero-copy.
+ aware.setContentSource(source);
+ sink.write(last, TRANSFER, callback);
+ }
+ else
+ {
+ // Normal source.read() + sink.write() full copy.
+ Content.copy(source, last, sink, callback);
+ }
+ }
+
+ private static Content.Source.Aware findContentSourceAware(Sink sink)
+ {
+ while (true)
+ {
+ if (sink instanceof Content.Source.Aware aware)
+ return aware;
+ if (sink instanceof Wrapper wrapper)
+ sink = wrapper.getWrapped();
+ else
+ break;
+ }
+ return null;
+ }
+
/**
*
Writes the given {@link ByteBuffer}, notifying the {@link Callback}
* when the write is complete.
@@ -878,6 +950,27 @@ static void write(Sink sink, boolean last, String utf8Content, Callback callback
* @param callback the callback to notify when the write operation is complete
*/
void write(boolean last, ByteBuffer byteBuffer, Callback callback);
+
+ class Wrapper implements Sink
+ {
+ private final Sink wrapped;
+
+ public Wrapper(Sink wrapped)
+ {
+ this.wrapped = wrapped;
+ }
+
+ public Sink getWrapped()
+ {
+ return wrapped;
+ }
+
+ @Override
+ public void write(boolean last, ByteBuffer byteBuffer, Callback callback)
+ {
+ getWrapped().write(last, byteBuffer, callback);
+ }
+ }
}
/**
diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/IOResources.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/IOResources.java
index 7cf055aef8c3..08e4d271e1e9 100644
--- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/IOResources.java
+++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/IOResources.java
@@ -24,8 +24,6 @@
import org.eclipse.jetty.util.Blocker;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
-import org.eclipse.jetty.util.IO;
-import org.eclipse.jetty.util.IteratingNestedCallback;
import org.eclipse.jetty.util.resource.MemoryResource;
import org.eclipse.jetty.util.resource.Resource;
@@ -233,7 +231,7 @@ public static void copy(Resource resource, Content.Sink sink, ByteBufferPool.Siz
if (resource instanceof Content.Source.Factory factory)
{
Content.Source source = factory.newContentSource(bufferPool, offset, length);
- Content.copy(source, sink, callback);
+ Content.Sink.write(sink, true, source, callback);
return;
}
@@ -241,7 +239,8 @@ public static void copy(Resource resource, Content.Sink sink, ByteBufferPool.Siz
Path path = resource.getPath();
if (path != null)
{
- new PathToSinkCopier(path, sink, bufferPool, offset, length, callback).iterate();
+ Content.Source source = Content.Source.from(bufferPool, path, offset, length);
+ Content.Sink.write(sink, true, source, callback);
return;
}
@@ -258,111 +257,11 @@ public static void copy(Resource resource, Content.Sink sink, ByteBufferPool.Siz
if (inputStream == null)
throw new IllegalArgumentException("Resource does not support InputStream: " + resource);
Content.Source source = Content.Source.from(bufferPool, inputStream, offset, length);
- Content.copy(source, sink, callback);
+ Content.Sink.write(sink, true, source, callback);
}
catch (Throwable x)
{
callback.failed(x);
}
}
-
- private static class PathToSinkCopier extends IteratingNestedCallback
- {
- private final SeekableByteChannel channel;
- private final Content.Sink sink;
- private final ByteBufferPool.Sized pool;
- private long remainingLength;
- private RetainableByteBuffer retainableByteBuffer;
- private boolean terminated;
-
- public PathToSinkCopier(Path path, Content.Sink sink, ByteBufferPool.Sized pool, long offset, long length, Callback callback) throws IOException
- {
- super(callback);
- this.sink = sink;
- this.pool = pool == null ? ByteBufferPool.SIZED_NON_POOLING : pool;
- this.remainingLength = length;
- this.channel = Files.newByteChannel(path);
- skipToOffset(channel, offset, length, this.pool);
- }
-
- private static void skipToOffset(SeekableByteChannel channel, long offset, long length, ByteBufferPool.Sized pool)
- {
- if (offset > 0L && length != 0L)
- {
- RetainableByteBuffer.Mutable byteBuffer = pool.acquire(1);
- try
- {
- channel.position(offset - 1);
- if (channel.read(byteBuffer.getByteBuffer().limit(1)) == -1)
- throw new IllegalArgumentException("Offset out of range");
- }
- catch (IOException e)
- {
- throw new UncheckedIOException(e);
- }
- finally
- {
- byteBuffer.release();
- }
- }
- }
-
- @Override
- public InvocationType getInvocationType()
- {
- return InvocationType.NON_BLOCKING;
- }
-
- @Override
- protected Action process() throws Throwable
- {
- if (terminated)
- return Action.SUCCEEDED;
-
- if (retainableByteBuffer == null)
- retainableByteBuffer = pool.acquire();
-
- ByteBuffer byteBuffer = retainableByteBuffer.getByteBuffer();
- BufferUtil.clearToFill(byteBuffer);
- if (remainingLength >= 0 && remainingLength < Integer.MAX_VALUE)
- byteBuffer.limit((int)Math.min(byteBuffer.capacity(), remainingLength));
- boolean eof = false;
- while (byteBuffer.hasRemaining() && !eof)
- {
- int read = channel.read(byteBuffer);
- if (read == -1)
- eof = true;
- else if (remainingLength >= 0)
- remainingLength -= read;
- }
- BufferUtil.flipToFlush(byteBuffer, 0);
- terminated = eof || remainingLength == 0;
- sink.write(terminated, byteBuffer, this);
- return Action.SCHEDULED;
- }
-
- @Override
- protected void onCompleteSuccess()
- {
- if (retainableByteBuffer != null)
- retainableByteBuffer.release();
- IO.close(channel);
- super.onCompleteSuccess();
- }
-
- @Override
- protected void onFailure(Throwable x)
- {
- IO.close(channel);
- super.onFailure(x);
- }
-
- @Override
- protected void onCompleteFailure(Throwable cause)
- {
- if (retainableByteBuffer != null)
- retainableByteBuffer.release();
- super.onCompleteFailure(cause);
- }
- }
}
diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/SelectableChannelEndPoint.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/SelectableChannelEndPoint.java
index 1eb0a151d40d..d43b73fec7f0 100644
--- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/SelectableChannelEndPoint.java
+++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/SelectableChannelEndPoint.java
@@ -253,20 +253,22 @@ public Runnable onSelected()
if (LOG.isDebugEnabled())
LOG.debug("onSelected {}->{} r={} w={} for {}", oldInterestOps, newInterestOps, fillable, flushable, this);
- // return task to complete the job
- Runnable task = fillable
- ? (flushable
- ? _runCompleteWriteFillable
- : _runFillable)
- : (flushable
- ? _runCompleteWrite
- : null);
+ Runnable task = taskForSelected(fillable, flushable);
if (LOG.isDebugEnabled())
LOG.debug("task {}", task);
return task;
}
+ protected Runnable taskForSelected(boolean fillable, boolean flushable)
+ {
+ if (fillable)
+ return flushable ? _runCompleteWriteFillable : _runFillable;
+ if (flushable)
+ return _runCompleteWrite;
+ return null;
+ }
+
private void updateKeyAction(Selector selector)
{
updateKey();
diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/SocketChannelEndPoint.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/SocketChannelEndPoint.java
index 0218bcb25fcf..89d78c5b9cb9 100644
--- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/SocketChannelEndPoint.java
+++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/SocketChannelEndPoint.java
@@ -16,10 +16,17 @@
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.FileChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import org.eclipse.jetty.io.internal.Transferable;
import org.eclipse.jetty.util.BufferUtil;
+import org.eclipse.jetty.util.Callback;
+import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -27,10 +34,12 @@
/**
* An {@link EndPoint} implementation based on {@link SocketChannel}.
*/
-public class SocketChannelEndPoint extends SelectableChannelEndPoint
+public class SocketChannelEndPoint extends SelectableChannelEndPoint implements Transferable.To
{
private static final Logger LOG = LoggerFactory.getLogger(SocketChannelEndPoint.class);
+ private final AtomicReference transferCallback = new AtomicReference<>();
+
public SocketChannelEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler)
{
super(scheduler, channel, selector, key);
@@ -42,6 +51,52 @@ public SocketChannel getChannel()
return (SocketChannel)super.getChannel();
}
+ @Override
+ public boolean transferFrom(FileChannel fileChannel, long offset, long length, Callback callback)
+ {
+ Transferable.transfer(fileChannel, offset, length, this, callback);
+ return true;
+ }
+
+ public void onIncompleteTransfer(Callback callback)
+ {
+ if (transferCallback.compareAndSet(null, callback))
+ onIncompleteFlush();
+ else
+ throw new IllegalStateException("Transfer callback already present");
+ }
+
+ @Override
+ protected Runnable taskForSelected(boolean fillable, boolean flushable)
+ {
+ Callback callback = transferCallback.getAndSet(null);
+ if (callback == null)
+ return super.taskForSelected(fillable, flushable);
+
+ // For the transfer case, only flushable must be true.
+ assert !fillable && flushable;
+
+ return new Invocable.ReadyTask(callback.getInvocationType(), callback::succeeded);
+ }
+
+ @Override
+ public void onClose(Throwable cause)
+ {
+ Callback callback = transferCallback.getAndSet(null);
+ if (callback != null)
+ callback.failed(cause == null ? new ClosedChannelException() : cause);
+ super.onClose(cause);
+ }
+
+ @Override
+ protected void onIdleExpired(TimeoutException timeout)
+ {
+ Callback callback = transferCallback.getAndSet(null);
+ if (callback != null)
+ callback.failed(timeout);
+ super.onIdleExpired(timeout);
+ }
+
@Override
public SocketAddress getRemoteSocketAddress()
{
diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ByteChannelContentSource.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ByteChannelContentSource.java
index 244d4d525ea7..2963c23ec0c0 100644
--- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ByteChannelContentSource.java
+++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ByteChannelContentSource.java
@@ -17,6 +17,7 @@
import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel;
import java.nio.channels.ClosedChannelException;
+import java.nio.channels.FileChannel;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -27,6 +28,7 @@
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.util.BufferUtil;
+import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.ExceptionUtil;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.thread.AutoLock;
@@ -36,7 +38,7 @@
* A {@link Content.Source} backed by a {@link ByteChannel}.
* Any calls to {@link #demand(Runnable)} are immediately satisfied.
*/
-public class ByteChannelContentSource implements Content.Source
+public class ByteChannelContentSource implements Content.Source, Transferable.From
{
private final AutoLock lock = new AutoLock();
private final SerializedInvoker _invoker = new SerializedInvoker(ByteChannelContentSource.class);
@@ -199,6 +201,22 @@ else if (_buffer.isRetained())
return _terminal;
}
+ @Override
+ public boolean transferTo(Content.Sink sink, long length, Callback callback)
+ {
+ try (AutoLock ignored = lock.lock())
+ {
+ lockedEnsureOpenOrTerminal();
+ if (Content.Chunk.isFailure(_terminal))
+ return false;
+ if (!(_byteChannel instanceof FileChannel fileChannel))
+ return false;
+ if (!(sink instanceof Transferable.To to))
+ return false;
+ return to.transferFrom(fileChannel, _offset, length, callback);
+ }
+ }
+
@Override
public void fail(Throwable failure)
{
diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ContentCopier.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ContentCopier.java
index 43c04d056c3e..05b61842d929 100644
--- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ContentCopier.java
+++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ContentCopier.java
@@ -26,15 +26,22 @@ public class ContentCopier extends IteratingNestedCallback
private static final Logger LOG = LoggerFactory.getLogger(ContentCopier.class);
private final Content.Source source;
+ private final boolean last;
private final Content.Sink sink;
private final Content.Chunk.Processor chunkProcessor;
private Content.Chunk chunk;
private boolean terminated;
public ContentCopier(Content.Source source, Content.Sink sink, Content.Chunk.Processor chunkProcessor, Callback callback)
+ {
+ this(source, true, sink, chunkProcessor, callback);
+ }
+
+ public ContentCopier(Content.Source source, boolean last, Content.Sink sink, Content.Chunk.Processor chunkProcessor, Callback callback)
{
super(callback);
this.source = source;
+ this.last = last;
this.sink = sink;
this.chunkProcessor = chunkProcessor;
}
@@ -64,7 +71,7 @@ protected Action process() throws Throwable
return Action.SCHEDULED;
}
- sink.write(chunk.isLast(), chunk.getByteBuffer(), this);
+ sink.write(chunk.isLast() && last, chunk.getByteBuffer(), this);
return Action.SCHEDULED;
}
diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/Transferable.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/Transferable.java
new file mode 100644
index 000000000000..ade8489474ca
--- /dev/null
+++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/Transferable.java
@@ -0,0 +1,83 @@
+//
+// ========================================================================
+// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
+//
+// This program and the accompanying materials are made available under the
+// terms of the Eclipse Public License v. 2.0 which is available at
+// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
+// which is available at https://www.apache.org/licenses/LICENSE-2.0.
+//
+// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
+// ========================================================================
+//
+
+package org.eclipse.jetty.io.internal;
+
+import java.nio.channels.FileChannel;
+
+import org.eclipse.jetty.io.Content;
+import org.eclipse.jetty.io.SocketChannelEndPoint;
+import org.eclipse.jetty.util.Callback;
+import org.eclipse.jetty.util.IteratingNestedCallback;
+
+public class Transferable
+{
+ private Transferable()
+ {
+ }
+
+ public static void transfer(FileChannel sourceChannel, long offset, long length, SocketChannelEndPoint endPoint, Callback callback)
+ {
+ Transferrer transferrer = new Transferrer(sourceChannel, offset, length, endPoint, callback);
+ transferrer.iterate();
+ }
+
+ public interface From
+ {
+ boolean transferTo(Content.Sink sink, long length, Callback callback);
+ }
+
+ public interface To
+ {
+ boolean transferFrom(FileChannel fileChannel, long offset, long length, Callback callback);
+ }
+
+ private static class Transferrer extends IteratingNestedCallback
+ {
+ private final FileChannel fileChannel;
+ private final long offset;
+ private final long length;
+ private final SocketChannelEndPoint endPoint;
+ private long transferred;
+
+ private Transferrer(FileChannel fileChannel, long offset, long length, SocketChannelEndPoint endPoint, Callback callback)
+ {
+ super(callback);
+ this.fileChannel = fileChannel;
+ this.offset = offset;
+ this.length = length;
+ this.endPoint = endPoint;
+ }
+
+ @Override
+ protected Action process() throws Throwable
+ {
+ long count = length - transferred;
+ if (count == 0)
+ return Action.SUCCEEDED;
+
+ long transfer = fileChannel.transferTo(offset + transferred, count, endPoint.getChannel());
+ transferred += transfer;
+
+ if (transfer > 0)
+ {
+ endPoint.notIdle();
+ succeeded();
+ return Action.SCHEDULED;
+ }
+
+ endPoint.onIncompleteTransfer(this);
+ return Action.SCHEDULED;
+ }
+ }
+}
diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java
index 6e1380623b47..e4ec95c4065a 100644
--- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java
+++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java
@@ -748,20 +748,20 @@ static Content.Sink asBufferedSink(Request request, Response response)
return Content.Sink.asBuffered(response, bufferPool, useOutputDirectByteBuffers, outputAggregationSize, bufferSize);
}
- class Wrapper implements Response
+ class Wrapper extends Content.Sink.Wrapper implements Response
{
private final Request _request;
- private final Response _wrapped;
public Wrapper(Request request, Response wrapped)
{
+ super(wrapped);
_request = request;
- _wrapped = wrapped;
}
+ @Override
public Response getWrapped()
{
- return _wrapped;
+ return (Response)super.getWrapped();
}
@Override
@@ -829,11 +829,5 @@ public CompletableFuture writeInterim(int status, HttpFields headers)
{
return getWrapped().writeInterim(status, headers);
}
-
- @Override
- public void write(boolean last, ByteBuffer byteBuffer, Callback callback)
- {
- getWrapped().write(last, byteBuffer, callback);
- }
}
}
diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java
index 82f15a8f0b01..9ceba5644668 100644
--- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java
+++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java
@@ -1155,11 +1155,13 @@ public String toString()
* method when calling
* {@link HttpStream#send(MetaData.Request, MetaData.Response, boolean, ByteBuffer, Callback)}
*/
- public static class ChannelResponse implements Response, Callback
+ public static class ChannelResponse implements Response, Content.Source.Aware, Callback
{
private final ChannelRequest _request;
private final ResponseHttpFields _httpFields;
- protected int _status;
+ private MetaData.Response _responseMetaData;
+ private int _status;
+ private Content.Source _source;
private long _contentBytesWritten;
private Supplier _trailers;
private Callback _writeCallback;
@@ -1255,11 +1257,12 @@ public void setTrailersSupplier(Supplier trailers)
public void write(boolean last, ByteBuffer content, Callback callback)
{
long length = BufferUtil.length(content);
+ if (length == 0 && _source != null)
+ length = _source.getLength();
HttpChannelState httpChannelState;
HttpStream stream;
Throwable writeFailure;
- MetaData.Response responseMetaData = null;
try (AutoLock ignored = _request._lock.lock())
{
httpChannelState = _request.lockedGetHttpChannelState();
@@ -1320,17 +1323,19 @@ else if (last && !(totalWritten == 0 && HttpMethod.HEAD.is(_request.getMethod())
return;
}
+ // TODO: check that we don't have both a ByteBuffer and a Content.Source.
+
// No failure, do the actual stream send using the ChannelResponse as the callback.
_writeCallback = callback;
_contentBytesWritten = totalWritten;
stream = httpChannelState._stream;
if (_httpFields.commit())
- responseMetaData = lockedPrepareResponse(httpChannelState, last);
+ _responseMetaData = lockedPrepareResponse(httpChannelState, last);
}
if (LOG.isDebugEnabled())
LOG.debug("writing last={} {} {}", last, BufferUtil.toDetailString(content), this);
- stream.send(_request._metaData, responseMetaData, last, content, this);
+ stream.send(_request._metaData, _responseMetaData, last, content, this);
}
/**
@@ -1434,6 +1439,19 @@ public void reset()
_request.getHttpChannelState().resetResponse();
}
+ @Override
+ public Content.Source getContentSource()
+ {
+ return _source;
+ }
+
+ @Override
+ public void setContentSource(Content.Source source)
+ {
+ // TODO: need to store the source into the MetaData.Response if it already exists.
+ _source = source;
+ }
+
@Override
public CompletableFuture writeInterim(int status, HttpFields headers)
{
@@ -1490,12 +1508,14 @@ MetaData.Response lockedPrepareResponse(HttpChannelState httpChannel, boolean la
httpChannel._stream.prepareResponse(mutableHeaders);
- return new MetaData.Response(
+ MetaData.Response response = new MetaData.Response(
_status, null, httpChannel.getConnectionMetaData().getHttpVersion(),
_httpFields,
httpChannel._committedContentLength,
getTrailersSupplier()
);
+ response.setContentSource(getContentSource());
+ return response;
}
@Override
@@ -1696,7 +1716,7 @@ private static class ErrorResponse extends ChannelResponse
public ErrorResponse(ChannelRequest request)
{
super(request);
- _status = HttpStatus.INTERNAL_SERVER_ERROR_500;
+ setStatus(HttpStatus.INTERNAL_SERVER_ERROR_500);
}
@Override
@@ -1720,7 +1740,7 @@ MetaData.Response lockedPrepareResponse(HttpChannelState httpChannelState, boole
{
assert httpChannelState._request._lock.isHeldByCurrentThread();
MetaData.Response httpFields = super.lockedPrepareResponse(httpChannelState, last);
- httpChannelState._response._status = _status;
+ httpChannelState._response.setStatus(getStatus());
HttpFields.Mutable originalResponseFields = httpChannelState._responseHeaders.getMutableHttpFields();
originalResponseFields.clear();
originalResponseFields.add(getResponseHttpFields());
@@ -1802,7 +1822,7 @@ public void failed(Throwable x)
{
failure = _failure;
httpChannelState = _request.lockedGetHttpChannelState();
- httpChannelState._response._status = _errorResponse._status;
+ httpChannelState._response.setStatus(_errorResponse.getStatus());
}
ExceptionUtil.addSuppressedIfNotAssociated(failure, x);
HttpChannelState.failed(httpChannelState._handlerInvoker, failure);
diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java
index 38256094f193..55ef47f12a83 100644
--- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java
+++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java
@@ -943,7 +943,11 @@ public Action process() throws Exception
getEndPoint().write(this, _content);
break;
default:
- succeeded();
+ Content.Source source = _info.getContentSource();
+ if (source != null)
+ Content.copy(source, source.getLength(), _lastContent, getEndPoint(), this);
+ else
+ succeeded();
}
return Action.SCHEDULED;
@@ -1533,6 +1537,7 @@ public void prepareResponse(HttpFields.Mutable headers)
@Override
public void send(MetaData.Request request, MetaData.Response response, boolean last, ByteBuffer content, Callback callback)
{
+ // TODO: do not rely on response==null.
if (response == null)
{
if (!last && BufferUtil.isEmpty(content))
diff --git a/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/ResponseContentSourceTest.java b/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/ResponseContentSourceTest.java
new file mode 100644
index 000000000000..e78c6e6b7c13
--- /dev/null
+++ b/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/ResponseContentSourceTest.java
@@ -0,0 +1,104 @@
+//
+// ========================================================================
+// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
+//
+// This program and the accompanying materials are made available under the
+// terms of the Eclipse Public License v. 2.0 which is available at
+// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
+// which is available at https://www.apache.org/licenses/LICENSE-2.0.
+//
+// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
+// ========================================================================
+//
+
+package org.eclipse.jetty.test.client.transport;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.concurrent.TimeUnit;
+
+import org.eclipse.jetty.client.CompletableResponseListener;
+import org.eclipse.jetty.client.ContentResponse;
+import org.eclipse.jetty.http.HttpStatus;
+import org.eclipse.jetty.io.Content;
+import org.eclipse.jetty.server.Handler;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.Response;
+import org.eclipse.jetty.toolchain.test.MavenPaths;
+import org.eclipse.jetty.util.Blocker;
+import org.eclipse.jetty.util.Callback;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class ResponseContentSourceTest extends AbstractTest
+{
+ @ParameterizedTest
+ @MethodSource("transportsNoFCGI")
+ public void testResponseContentSource(TransportType transportType) throws Exception
+ {
+ // Prepare a "small" file. TODO: also use a >2GiB file.
+ int contentLength = 1024 * 1024;
+ Path dir = Files.createDirectories(MavenPaths.targetTestDir(getClass().getSimpleName()));
+ Path file = Files.createTempFile(dir, "file-", ".bin");
+ try (var channel = Files.newByteChannel(file, StandardOpenOption.WRITE))
+ {
+ channel.write(ByteBuffer.allocateDirect(contentLength));
+ }
+
+ start(transportType, new Handler.Abstract()
+ {
+ @Override
+ public boolean handle(Request request, Response response, Callback callback)
+ {
+ Content.Source source = Content.Source.from(file);
+ // TODO: possible alternative?
+// source.writeTo(response, true, callback);
+ Content.Sink.write(response, true, source, callback);
+ return true;
+ }
+ });
+
+ ContentResponse response = new CompletableResponseListener(client.newRequest(newURI(transportType)), contentLength)
+ .send()
+ .get(5, TimeUnit.SECONDS);
+
+ assertEquals(HttpStatus.OK_200, response.getStatus());
+ assertEquals(contentLength, response.getContent().length);
+ }
+
+ @ParameterizedTest
+ @MethodSource("transportsNoFCGI")
+ public void testResponseContentSourceInChunks(TransportType transportType) throws Exception
+ {
+ start(transportType, new Handler.Abstract()
+ {
+ @Override
+ public boolean handle(Request request, Response response, Callback callback) throws Exception
+ {
+ int contentLength = 1024 * 1024;
+ Path dir = Files.createDirectories(MavenPaths.targetTestDir(getClass().getSimpleName()));
+ Path file = Files.createTempFile(dir, "file-", ".bin");
+ try (var channel = Files.newByteChannel(file, StandardOpenOption.WRITE))
+ {
+ channel.write(ByteBuffer.allocateDirect(contentLength));
+ }
+
+ // Write first chunk.
+ int length1 = contentLength / 2;
+ try (Blocker.Callback blocker = Blocker.callback())
+ {
+ Content.Sink.write(response, false, Content.Source.from(file, 0, length1), blocker);
+ blocker.block();
+ }
+
+ // Write last chunk.
+ Content.Sink.write(response, true, Content.Source.from(file, length1, contentLength - length1), callback);
+ return true;
+ }
+ });
+ }
+}