Skip to content

OssClient support full Async/await #140

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@
Backup*

bin
obj
obj
/.vs
23 changes: 23 additions & 0 deletions sdk/Commands/OssCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
using Aliyun.OSS.Common.Communication;
using Aliyun.OSS.Util;
using Aliyun.OSS.Transform;
using System.Threading.Tasks;
using System.Threading;
using ExecutionContext = Aliyun.OSS.Common.Communication.ExecutionContext;

namespace Aliyun.OSS.Commands
{
Expand Down Expand Up @@ -96,6 +99,20 @@ public ServiceResponse Execute()
}
}

public async Task<ServiceResponse> ExecuteAsync(CancellationToken cancellationToken=default)
{
var request = BuildRequest();
try
{
return await Client.SendAsync(request, Context, cancellationToken);
}
finally
{
if (!LeaveRequestOpen)
request.Dispose();
}
}

public IAsyncResult AsyncExecute(AsyncCallback callback, Object state)
{
var request = BuildRequest();
Expand Down Expand Up @@ -161,6 +178,12 @@ protected OssCommand(IServiceClient client, Uri endpoint, ExecutionContext conte
return DeserializeResponse(response);
}

public new async Task<T> ExecuteAsync(CancellationToken cancellationToken= default)
{
var response = await base.ExecuteAsync(cancellationToken);
return DeserializeResponse(response);
}

public T DeserializeResponse(ServiceResponse response)
{
try
Expand Down
11 changes: 11 additions & 0 deletions sdk/Common/Communication/IServiceClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
*/

using System;
using System.Threading;
using System.Threading.Tasks;

namespace Aliyun.OSS.Common.Communication
{
Expand Down Expand Up @@ -37,5 +39,14 @@ internal interface IServiceClient
/// <param name="asyncResult">An instance of <see cref="IAsyncResult"/>.</param>
/// <returns>The response data.</returns>
ServiceResponse EndSend(IAsyncResult asyncResult);

/// <summary>
/// Sends a request to the service.
/// </summary>
/// <param name="request"></param>
/// <param name="context"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
Task<ServiceResponse> SendAsync(ServiceRequest request, ExecutionContext context, CancellationToken cancellationToken=default);
}
}
32 changes: 32 additions & 0 deletions sdk/Common/Communication/RetryableServiceClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Threading;
using System.IO;
using Aliyun.OSS.Util;
using System.Threading.Tasks;

namespace Aliyun.OSS.Common.Communication
{
Expand Down Expand Up @@ -79,6 +80,32 @@ private ServiceResponse SendImpl(ServiceRequest request, ExecutionContext contex
}
}

private async Task<ServiceResponse> SendImplAsync(ServiceRequest request, ExecutionContext context, int retryTimes, CancellationToken cancellationToken = default)
{
long originalContentPosition = -1;
try
{
if (request.Content != null && request.Content.CanSeek)
originalContentPosition = request.Content.Position;
return await _innerClient.SendAsync(request, context, cancellationToken);
}
catch (Exception ex)
{
if (ShouldRetry(request, ex, retryTimes))
{
if (request.Content != null && (originalContentPosition >= 0 && request.Content.CanSeek))
request.Content.Seek(originalContentPosition, SeekOrigin.Begin);

Pause(retryTimes);

return await SendImplAsync(request, context, ++retryTimes, cancellationToken);
}

// Rethrow
throw;
}
}

public IAsyncResult BeginSend(ServiceRequest request, ExecutionContext context,
AsyncCallback callback, object state)
{
Expand Down Expand Up @@ -166,6 +193,11 @@ private static void Pause(int retryTimes)
Thread.Sleep(delay);
}

public async Task<ServiceResponse> SendAsync(ServiceRequest request, ExecutionContext context, CancellationToken cancellationToken = default)
{
return await SendImplAsync(request, context, 0, cancellationToken);
}

#endregion

}
Expand Down
17 changes: 14 additions & 3 deletions sdk/Common/Communication/ServiceClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
using Aliyun.OSS.Util;
using Aliyun.OSS.Common.Handlers;
using Aliyun.OSS.Properties;
using System.Threading.Tasks;
using System.Threading;

namespace Aliyun.OSS.Common.Communication
{
Expand Down Expand Up @@ -56,7 +58,7 @@ public static ServiceClient Create(ClientConfiguration configuration)

#endregion

#region IServiceClient Members
#region IServiceClient Members

public ServiceResponse Send(ServiceRequest request, ExecutionContext context)
{
Expand Down Expand Up @@ -96,10 +98,11 @@ public ServiceResponse EndSend(IAsyncResult aysncResult)
}
}

#endregion
#endregion

protected abstract ServiceResponse SendCore(ServiceRequest request, ExecutionContext context);

protected abstract Task<ServiceResponse> SendCoreAsync(ServiceRequest request, ExecutionContext context, CancellationToken cancellationToken=default);

protected abstract IAsyncResult BeginSendCore(ServiceRequest request, ExecutionContext context,
AsyncCallback callback, Object state);

Expand All @@ -114,5 +117,13 @@ protected static void HandleResponse(ServiceResponse response, IEnumerable<IResp
foreach (var handler in handlers)
handler.Handle(response);
}

public async Task<ServiceResponse> SendAsync(ServiceRequest request, ExecutionContext context, System.Threading.CancellationToken cancellationToken = default)
{
SignRequest(request, context);
var response = await SendCoreAsync(request, context);
HandleResponse(response, context.ResponseHandlers);
return response;
}
}
}
64 changes: 64 additions & 0 deletions sdk/Common/Communication/ServiceClientImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
using System.Net.Security;

using Aliyun.OSS.Util;
using System.Threading.Tasks;

namespace Aliyun.OSS.Common.Communication
{
Expand Down Expand Up @@ -363,6 +364,69 @@ private static ServiceResponse HandleException(WebException ex)
return new ResponseImpl(ex);
}

protected override async Task<ServiceResponse> SendCoreAsync(ServiceRequest serviceRequest, ExecutionContext context, System.Threading.CancellationToken cancellationToken = default)
{
var request = HttpFactory.CreateWebRequest(serviceRequest, Configuration);
await SetRequestContentAsync(request, serviceRequest, Configuration);
try
{
var response = (await request.GetResponseAsync()) as HttpWebResponse;
return new ResponseImpl(response);
}
catch (WebException ex)
{
return HandleException(ex);
}
}


private static async Task SetRequestContentAsync(HttpWebRequest webRequest,
ServiceRequest serviceRequest,
ClientConfiguration clientConfiguration)
{
var data = serviceRequest.BuildRequestContent();

if (data == null ||
(serviceRequest.Method != HttpMethod.Put &&
serviceRequest.Method != HttpMethod.Post))
{
return;
}

// Write data to the request stream.
long userSetContentLength = -1;
if (serviceRequest.Headers.ContainsKey(HttpHeaders.ContentLength))
userSetContentLength = long.Parse(serviceRequest.Headers[HttpHeaders.ContentLength]);

if (serviceRequest.UseChunkedEncoding || !data.CanSeek) // when data cannot seek, we have to use chunked encoding as there's no way to set the length
{
webRequest.SendChunked = true;
webRequest.AllowWriteStreamBuffering = false; // when using chunked encoding, the data is likely big and thus not use write buffer;
}
else
{
long streamLength = data.Length - data.Position;
webRequest.ContentLength = (userSetContentLength >= 0 &&
userSetContentLength <= streamLength) ? userSetContentLength : streamLength;
if (webRequest.ContentLength > clientConfiguration.DirectWriteStreamThreshold)
{
webRequest.AllowWriteStreamBuffering = false;
}
}

using (var requestStream = webRequest.GetRequestStream())
{
if (!webRequest.SendChunked)
{
await IoUtils.WriteToAsync(data, requestStream, webRequest.ContentLength);
}
else
{
await IoUtils.WriteToAsync(data, requestStream);
}
}
}

#endregion

}
Expand Down
14 changes: 12 additions & 2 deletions sdk/Common/Communication/netcore/ServiceClientNewImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -370,8 +370,18 @@ private bool CanReuseHttpClient(ClientConfiguration dst, ClientConfiguration src
return true;
}
return false;
}

}

protected override async Task<ServiceResponse> SendCoreAsync(ServiceRequest request, ExecutionContext context, System.Threading.CancellationToken cancellationToken = default)
{
var req = new HttpRequestMessage(Convert(request.Method), request.BuildRequestUri());
this.SetRequestContent(req, request);
this.SetHeaders(req, request);
HttpClient client = GetClient();
HttpResponseMessage resp = await client.SendAsync(req, HttpCompletionOption.ResponseHeadersRead);
return new ResponseImpl(resp);
}

private static HttpClient _httpClient;
private static HttpClient _httpClientNoProxy;
private static object _clientLock = new object();
Expand Down
16 changes: 16 additions & 0 deletions sdk/Common/ResumableDownloadManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
using Aliyun.OSS.Util;
using Aliyun.OSS.Common;
using Aliyun.OSS.Common.Internal;
using System.Threading.Tasks;

namespace Aliyun.OSS
{
Expand Down Expand Up @@ -158,6 +159,21 @@ internal static long WriteTo(Stream src, Stream dest)
return totalBytes;
}

internal static async Task<long> WriteToAsync(Stream src, Stream dest, CancellationToken cancellation)
{
var buffer = new byte[32 * 1024];
var bytesRead = 0;
var totalBytes = 0;
while ((bytesRead = await src.ReadAsync(buffer, 0, buffer.Length, cancellation)) > 0)
{
await dest.WriteAsync(buffer, 0, bytesRead, cancellation);
totalBytes += bytesRead;
}
await dest.FlushAsync();

return totalBytes;
}

internal class DownloadTaskParam
{
public DownloadObjectRequest Request
Expand Down
Loading