Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@
package aws.sdk.kotlin.crt.auth.signing

import aws.sdk.kotlin.crt.*
import aws.sdk.kotlin.crt.Allocator
import aws.sdk.kotlin.crt.auth.credentials.Credentials
import aws.sdk.kotlin.crt.awsAssertOpSuccess
import aws.sdk.kotlin.crt.http.*
import aws.sdk.kotlin.crt.util.asAwsByteCursor
import aws.sdk.kotlin.crt.util.initFromCursor
Expand All @@ -23,7 +21,7 @@ import platform.posix.UINT64_MAX
/**
* Static class for a variety of AWS signing APIs.
*/
public actual object AwsSigner {
public actual object AwsSigner : WithCrt() {
public actual suspend fun signRequest(
request: HttpRequest,
config: AwsSigningConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@
package aws.sdk.kotlin.crt.http

import aws.sdk.kotlin.crt.*
import aws.sdk.kotlin.crt.Allocator
import aws.sdk.kotlin.crt.awsAssertOpSuccess
import aws.sdk.kotlin.crt.io.SocketDomain
import aws.sdk.kotlin.crt.io.SocketOptions
import aws.sdk.kotlin.crt.io.SocketType
import aws.sdk.kotlin.crt.io.requiresTls
import aws.sdk.kotlin.crt.util.*
import cnames.structs.aws_http_connection_manager
import kotlinx.atomicfu.atomic
import kotlinx.cinterop.*
import libcrt.*
import kotlin.coroutines.Continuation
Expand All @@ -23,8 +22,12 @@ import kotlin.coroutines.suspendCoroutine

public actual class HttpClientConnectionManager actual constructor(
public actual val options: HttpClientConnectionManagerOptions,
) : Closeable,
) : WithCrt(),
Closeable,
AsyncShutdown {

private val closed = atomic(false)

public actual val managerMetrics: HttpManagerMetrics
get() = memScoped {
val metrics = alloc<aws_http_manager_metrics>()
Expand Down Expand Up @@ -152,7 +155,9 @@ public actual class HttpClientConnectionManager actual constructor(
}

actual override fun close() {
aws_http_connection_manager_release(manager)
if (closed.compareAndSet(false, true)) {
aws_http_connection_manager_release(manager)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,31 @@
package aws.sdk.kotlin.crt.http

import aws.sdk.kotlin.crt.*
import aws.sdk.kotlin.crt.Allocator
import aws.sdk.kotlin.crt.NativeHandle
import aws.sdk.kotlin.crt.awsAssertOpSuccess
import aws.sdk.kotlin.crt.io.Buffer
import aws.sdk.kotlin.crt.io.ByteCursorBuffer
import aws.sdk.kotlin.crt.util.asAwsByteCursor
import aws.sdk.kotlin.crt.util.initFromCursor
import aws.sdk.kotlin.crt.util.toKString
import aws.sdk.kotlin.crt.util.withAwsByteCursor
import kotlinx.atomicfu.atomic
import kotlinx.cinterop.*
import libcrt.*
import platform.posix.size_t

internal class HttpClientConnectionNative(
private val manager: HttpClientConnectionManager,
override val ptr: CPointer<cnames.structs.aws_http_connection>,
) : Closeable,
) : WithCrt(),
Closeable,
HttpClientConnection,
NativeHandle<cnames.structs.aws_http_connection> {

private val closed = atomic(false)

override val id: String = ptr.rawValue.toString()
override fun makeRequest(httpReq: HttpRequest, handler: HttpStreamResponseHandler): HttpStream {
val nativeReq = httpReq.toNativeRequest()
val cbData = HttpStreamContext(handler, nativeReq)
val cbData = HttpStreamContext(null, handler, nativeReq)
val stableRef = StableRef.create(cbData)
val reqOptions = cValue<aws_http_make_request_options> {
self_size = sizeOf<aws_http_make_request_options>().convert()
Expand All @@ -50,22 +51,30 @@ internal class HttpClientConnectionNative(
throw CrtRuntimeException("aws_http_connection_make_request()")
}

return HttpStreamNative(stream)
return HttpStreamNative(stream).also { cbData.stream = it }
}

override fun shutdown() {
aws_http_connection_close(ptr)
}

override fun close() {
manager.releaseConnection(this)
if (closed.compareAndSet(false, true)) {
manager.releaseConnection(this)
}
}
}

/**
* Userdata passed through the native callbacks for HTTP responses
*/
private class HttpStreamContext(
/**
* The Kotlin stream object. This starts as null because the context is created before the stream itself. We need
* the stream in callbacks so we set it lazily.
*/
var stream: HttpStreamNative? = null,

/**
* The actual Kotlin handler for each callback
*/
Expand All @@ -85,7 +94,7 @@ private fun onResponseHeaders(
userdata: COpaquePointer?,
): Int {
val ctx = userdata?.asStableRef<HttpStreamContext>()?.get() ?: return aws_raise_error(AWS_ERROR_HTTP_CALLBACK_FAILURE.toInt())
val stream = nativeStream?.let { HttpStreamNative(it) } ?: return aws_raise_error(AWS_ERROR_HTTP_CALLBACK_FAILURE.toInt())
val stream = ctx.stream ?: return AWS_OP_ERR

val hdrCnt = numHeaders.toInt()
val headers: List<HttpHeader>? = if (hdrCnt > 0 && headerArray != null) {
Expand All @@ -106,6 +115,7 @@ private fun onResponseHeaders(
log(LogLevel.Error, "onResponseHeaders: $ex")
return aws_raise_error(AWS_ERROR_HTTP_CALLBACK_FAILURE.toInt())
}

return AWS_OP_SUCCESS
}

Expand All @@ -115,7 +125,8 @@ private fun onResponseHeaderBlockDone(
userdata: COpaquePointer?,
): Int {
val ctx = userdata?.asStableRef<HttpStreamContext>()?.get() ?: return AWS_OP_ERR
val stream = nativeStream?.let { HttpStreamNative(it) } ?: return AWS_OP_ERR
val stream = ctx.stream ?: return AWS_OP_ERR

try {
ctx.handler.onResponseHeadersDone(stream, blockType.value.toInt())
} catch (ex: Exception) {
Expand All @@ -132,7 +143,7 @@ private fun onIncomingBody(
userdata: COpaquePointer?,
): Int {
val ctx = userdata?.asStableRef<HttpStreamContext>()?.get() ?: return AWS_OP_ERR
val stream = nativeStream?.let { HttpStreamNative(it) } ?: return AWS_OP_ERR
val stream = ctx.stream ?: return AWS_OP_ERR

try {
val body = if (data != null) ByteCursorBuffer(data) else Buffer.Empty
Expand All @@ -159,7 +170,8 @@ private fun onStreamComplete(
) {
val stableRef = userdata?.asStableRef<HttpStreamContext>() ?: return
val ctx = stableRef.get()
val stream = nativeStream?.let { HttpStreamNative(it) } ?: return
val stream = ctx.stream ?: return

try {
ctx.handler.onResponseComplete(stream, errorCode)
} catch (ex: Exception) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import aws.sdk.kotlin.crt.CrtRuntimeException
import aws.sdk.kotlin.crt.NativeHandle
import aws.sdk.kotlin.crt.awsAssertOpSuccess
import aws.sdk.kotlin.crt.util.asAwsByteCursor
import kotlinx.atomicfu.atomic
import kotlinx.cinterop.*
import libcrt.*
import kotlin.coroutines.Continuation
Expand All @@ -21,6 +22,8 @@ internal class HttpStreamNative(
) : HttpStream,
NativeHandle<cnames.structs.aws_http_stream> {

private val closed = atomic(false)

override val responseStatusCode: Int
get() {
return memScoped {
Expand Down Expand Up @@ -90,7 +93,9 @@ internal class HttpStreamNative(
}

override fun close() {
aws_http_stream_release(ptr)
if (closed.compareAndSet(false, true)) {
aws_http_stream_release(ptr)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@

package aws.sdk.kotlin.crt.io

import aws.sdk.kotlin.crt.Allocator
import aws.sdk.kotlin.crt.AsyncShutdown
import aws.sdk.kotlin.crt.Closeable
import aws.sdk.kotlin.crt.NativeHandle
import aws.sdk.kotlin.crt.*
import aws.sdk.kotlin.crt.util.ShutdownChannel
import aws.sdk.kotlin.crt.util.shutdownChannel
import kotlinx.cinterop.*
Expand All @@ -19,7 +16,8 @@ public actual class HostResolver private constructor(
private val elg: EventLoopGroup,
private val manageElg: Boolean,
private val maxEntries: Int,
) : NativeHandle<aws_host_resolver>,
) : WithCrt(),
NativeHandle<aws_host_resolver>,
Closeable,
AsyncShutdown {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@
*/
package aws.sdk.kotlin.crt.util

import aws.sdk.kotlin.crt.WithCrt
import aws.sdk.kotlin.crt.util.hashing.Sha256

/**
* Utility object for various hash functions
*/
public actual object Digest {
public actual object Digest : WithCrt() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/**
* Calculate the SHA-256 hash of the input [buffer]
*/
Expand Down