From d147cce9d2195e65a56143d0d8e4d45280c0994e Mon Sep 17 00:00:00 2001 From: Ian Botsford <83236726+ianbotsf@users.noreply.github.com> Date: Wed, 13 Aug 2025 18:44:16 +0000 Subject: [PATCH] chore: fix more instances of flaky native reference handling --- .../crt/auth/signing/AwsSignerNative.kt | 4 +-- .../http/HttpClientConnectionManagerNative.kt | 13 ++++--- .../crt/http/HttpClientConnectionNative.kt | 34 +++++++++++++------ .../sdk/kotlin/crt/http/HttpStreamNative.kt | 7 +++- .../sdk/kotlin/crt/io/HostResolverNative.kt | 8 ++--- .../aws/sdk/kotlin/crt/util/DigestNative.kt | 3 +- 6 files changed, 44 insertions(+), 25 deletions(-) diff --git a/aws-crt-kotlin/native/src/aws/sdk/kotlin/crt/auth/signing/AwsSignerNative.kt b/aws-crt-kotlin/native/src/aws/sdk/kotlin/crt/auth/signing/AwsSignerNative.kt index 03040c4b..ab32e5c7 100644 --- a/aws-crt-kotlin/native/src/aws/sdk/kotlin/crt/auth/signing/AwsSignerNative.kt +++ b/aws-crt-kotlin/native/src/aws/sdk/kotlin/crt/auth/signing/AwsSignerNative.kt @@ -6,9 +6,7 @@ package aws.sdk.kotlin.crt.auth.signing import aws.sdk.kotlin.crt.* -import aws.sdk.kotlin.crt.Allocator import aws.sdk.kotlin.crt.auth.credentials.Credentials -import aws.sdk.kotlin.crt.awsAssertOpSuccess import aws.sdk.kotlin.crt.http.* import aws.sdk.kotlin.crt.util.asAwsByteCursor import aws.sdk.kotlin.crt.util.initFromCursor @@ -23,7 +21,7 @@ import platform.posix.UINT64_MAX /** * Static class for a variety of AWS signing APIs. */ -public actual object AwsSigner { +public actual object AwsSigner : WithCrt() { public actual suspend fun signRequest( request: HttpRequest, config: AwsSigningConfig, diff --git a/aws-crt-kotlin/native/src/aws/sdk/kotlin/crt/http/HttpClientConnectionManagerNative.kt b/aws-crt-kotlin/native/src/aws/sdk/kotlin/crt/http/HttpClientConnectionManagerNative.kt index b4b84daa..3ee8da6f 100644 --- a/aws-crt-kotlin/native/src/aws/sdk/kotlin/crt/http/HttpClientConnectionManagerNative.kt +++ b/aws-crt-kotlin/native/src/aws/sdk/kotlin/crt/http/HttpClientConnectionManagerNative.kt @@ -6,14 +6,13 @@ package aws.sdk.kotlin.crt.http import aws.sdk.kotlin.crt.* -import aws.sdk.kotlin.crt.Allocator -import aws.sdk.kotlin.crt.awsAssertOpSuccess import aws.sdk.kotlin.crt.io.SocketDomain import aws.sdk.kotlin.crt.io.SocketOptions import aws.sdk.kotlin.crt.io.SocketType import aws.sdk.kotlin.crt.io.requiresTls import aws.sdk.kotlin.crt.util.* import cnames.structs.aws_http_connection_manager +import kotlinx.atomicfu.atomic import kotlinx.cinterop.* import libcrt.* import kotlin.coroutines.Continuation @@ -23,8 +22,12 @@ import kotlin.coroutines.suspendCoroutine public actual class HttpClientConnectionManager actual constructor( public actual val options: HttpClientConnectionManagerOptions, -) : Closeable, +) : WithCrt(), + Closeable, AsyncShutdown { + + private val closed = atomic(false) + public actual val managerMetrics: HttpManagerMetrics get() = memScoped { val metrics = alloc() @@ -152,7 +155,9 @@ public actual class HttpClientConnectionManager actual constructor( } actual override fun close() { - aws_http_connection_manager_release(manager) + if (closed.compareAndSet(false, true)) { + aws_http_connection_manager_release(manager) + } } } diff --git a/aws-crt-kotlin/native/src/aws/sdk/kotlin/crt/http/HttpClientConnectionNative.kt b/aws-crt-kotlin/native/src/aws/sdk/kotlin/crt/http/HttpClientConnectionNative.kt index e36615e9..3db41519 100644 --- a/aws-crt-kotlin/native/src/aws/sdk/kotlin/crt/http/HttpClientConnectionNative.kt +++ b/aws-crt-kotlin/native/src/aws/sdk/kotlin/crt/http/HttpClientConnectionNative.kt @@ -5,15 +5,13 @@ package aws.sdk.kotlin.crt.http import aws.sdk.kotlin.crt.* -import aws.sdk.kotlin.crt.Allocator -import aws.sdk.kotlin.crt.NativeHandle -import aws.sdk.kotlin.crt.awsAssertOpSuccess import aws.sdk.kotlin.crt.io.Buffer import aws.sdk.kotlin.crt.io.ByteCursorBuffer import aws.sdk.kotlin.crt.util.asAwsByteCursor import aws.sdk.kotlin.crt.util.initFromCursor import aws.sdk.kotlin.crt.util.toKString import aws.sdk.kotlin.crt.util.withAwsByteCursor +import kotlinx.atomicfu.atomic import kotlinx.cinterop.* import libcrt.* import platform.posix.size_t @@ -21,14 +19,17 @@ import platform.posix.size_t internal class HttpClientConnectionNative( private val manager: HttpClientConnectionManager, override val ptr: CPointer, -) : Closeable, +) : WithCrt(), + Closeable, HttpClientConnection, NativeHandle { + private val closed = atomic(false) + override val id: String = ptr.rawValue.toString() override fun makeRequest(httpReq: HttpRequest, handler: HttpStreamResponseHandler): HttpStream { val nativeReq = httpReq.toNativeRequest() - val cbData = HttpStreamContext(handler, nativeReq) + val cbData = HttpStreamContext(null, handler, nativeReq) val stableRef = StableRef.create(cbData) val reqOptions = cValue { self_size = sizeOf().convert() @@ -50,7 +51,7 @@ internal class HttpClientConnectionNative( throw CrtRuntimeException("aws_http_connection_make_request()") } - return HttpStreamNative(stream) + return HttpStreamNative(stream).also { cbData.stream = it } } override fun shutdown() { @@ -58,7 +59,9 @@ internal class HttpClientConnectionNative( } override fun close() { - manager.releaseConnection(this) + if (closed.compareAndSet(false, true)) { + manager.releaseConnection(this) + } } } @@ -66,6 +69,12 @@ internal class HttpClientConnectionNative( * Userdata passed through the native callbacks for HTTP responses */ private class HttpStreamContext( + /** + * The Kotlin stream object. This starts as null because the context is created before the stream itself. We need + * the stream in callbacks so we set it lazily. + */ + var stream: HttpStreamNative? = null, + /** * The actual Kotlin handler for each callback */ @@ -85,7 +94,7 @@ private fun onResponseHeaders( userdata: COpaquePointer?, ): Int { val ctx = userdata?.asStableRef()?.get() ?: return aws_raise_error(AWS_ERROR_HTTP_CALLBACK_FAILURE.toInt()) - val stream = nativeStream?.let { HttpStreamNative(it) } ?: return aws_raise_error(AWS_ERROR_HTTP_CALLBACK_FAILURE.toInt()) + val stream = ctx.stream ?: return AWS_OP_ERR val hdrCnt = numHeaders.toInt() val headers: List? = if (hdrCnt > 0 && headerArray != null) { @@ -106,6 +115,7 @@ private fun onResponseHeaders( log(LogLevel.Error, "onResponseHeaders: $ex") return aws_raise_error(AWS_ERROR_HTTP_CALLBACK_FAILURE.toInt()) } + return AWS_OP_SUCCESS } @@ -115,7 +125,8 @@ private fun onResponseHeaderBlockDone( userdata: COpaquePointer?, ): Int { val ctx = userdata?.asStableRef()?.get() ?: return AWS_OP_ERR - val stream = nativeStream?.let { HttpStreamNative(it) } ?: return AWS_OP_ERR + val stream = ctx.stream ?: return AWS_OP_ERR + try { ctx.handler.onResponseHeadersDone(stream, blockType.value.toInt()) } catch (ex: Exception) { @@ -132,7 +143,7 @@ private fun onIncomingBody( userdata: COpaquePointer?, ): Int { val ctx = userdata?.asStableRef()?.get() ?: return AWS_OP_ERR - val stream = nativeStream?.let { HttpStreamNative(it) } ?: return AWS_OP_ERR + val stream = ctx.stream ?: return AWS_OP_ERR try { val body = if (data != null) ByteCursorBuffer(data) else Buffer.Empty @@ -159,7 +170,8 @@ private fun onStreamComplete( ) { val stableRef = userdata?.asStableRef() ?: return val ctx = stableRef.get() - val stream = nativeStream?.let { HttpStreamNative(it) } ?: return + val stream = ctx.stream ?: return + try { ctx.handler.onResponseComplete(stream, errorCode) } catch (ex: Exception) { diff --git a/aws-crt-kotlin/native/src/aws/sdk/kotlin/crt/http/HttpStreamNative.kt b/aws-crt-kotlin/native/src/aws/sdk/kotlin/crt/http/HttpStreamNative.kt index 897da883..7e93da39 100644 --- a/aws-crt-kotlin/native/src/aws/sdk/kotlin/crt/http/HttpStreamNative.kt +++ b/aws-crt-kotlin/native/src/aws/sdk/kotlin/crt/http/HttpStreamNative.kt @@ -9,6 +9,7 @@ import aws.sdk.kotlin.crt.CrtRuntimeException import aws.sdk.kotlin.crt.NativeHandle import aws.sdk.kotlin.crt.awsAssertOpSuccess import aws.sdk.kotlin.crt.util.asAwsByteCursor +import kotlinx.atomicfu.atomic import kotlinx.cinterop.* import libcrt.* import kotlin.coroutines.Continuation @@ -21,6 +22,8 @@ internal class HttpStreamNative( ) : HttpStream, NativeHandle { + private val closed = atomic(false) + override val responseStatusCode: Int get() { return memScoped { @@ -90,7 +93,9 @@ internal class HttpStreamNative( } override fun close() { - aws_http_stream_release(ptr) + if (closed.compareAndSet(false, true)) { + aws_http_stream_release(ptr) + } } } diff --git a/aws-crt-kotlin/native/src/aws/sdk/kotlin/crt/io/HostResolverNative.kt b/aws-crt-kotlin/native/src/aws/sdk/kotlin/crt/io/HostResolverNative.kt index 7fe33e80..4fa044c2 100644 --- a/aws-crt-kotlin/native/src/aws/sdk/kotlin/crt/io/HostResolverNative.kt +++ b/aws-crt-kotlin/native/src/aws/sdk/kotlin/crt/io/HostResolverNative.kt @@ -5,10 +5,7 @@ package aws.sdk.kotlin.crt.io -import aws.sdk.kotlin.crt.Allocator -import aws.sdk.kotlin.crt.AsyncShutdown -import aws.sdk.kotlin.crt.Closeable -import aws.sdk.kotlin.crt.NativeHandle +import aws.sdk.kotlin.crt.* import aws.sdk.kotlin.crt.util.ShutdownChannel import aws.sdk.kotlin.crt.util.shutdownChannel import kotlinx.cinterop.* @@ -19,7 +16,8 @@ public actual class HostResolver private constructor( private val elg: EventLoopGroup, private val manageElg: Boolean, private val maxEntries: Int, -) : NativeHandle, +) : WithCrt(), + NativeHandle, Closeable, AsyncShutdown { diff --git a/aws-crt-kotlin/native/src/aws/sdk/kotlin/crt/util/DigestNative.kt b/aws-crt-kotlin/native/src/aws/sdk/kotlin/crt/util/DigestNative.kt index 021ec8e6..7afb8a38 100644 --- a/aws-crt-kotlin/native/src/aws/sdk/kotlin/crt/util/DigestNative.kt +++ b/aws-crt-kotlin/native/src/aws/sdk/kotlin/crt/util/DigestNative.kt @@ -4,12 +4,13 @@ */ package aws.sdk.kotlin.crt.util +import aws.sdk.kotlin.crt.WithCrt import aws.sdk.kotlin.crt.util.hashing.Sha256 /** * Utility object for various hash functions */ -public actual object Digest { +public actual object Digest : WithCrt() { /** * Calculate the SHA-256 hash of the input [buffer] */