From 2e0383ac48522c487af3ce885f53948e86a0cac5 Mon Sep 17 00:00:00 2001 From: Filipp Zhinkin Date: Thu, 27 Jun 2024 13:39:17 +0200 Subject: [PATCH 01/20] Add a test on segments leackage --- core/jvm/test/PoolingTest.kt | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) create mode 100644 core/jvm/test/PoolingTest.kt diff --git a/core/jvm/test/PoolingTest.kt b/core/jvm/test/PoolingTest.kt new file mode 100644 index 000000000..f07caf96a --- /dev/null +++ b/core/jvm/test/PoolingTest.kt @@ -0,0 +1,36 @@ +/* + * Copyright 2010-2024 JetBrains s.r.o. and Kotlin Programming Language contributors. + * Use of this source code is governed by the Apache 2.0 license that can be found in the LICENSE.txt file. + */ + +package kotlinx.io + +import kotlin.test.Test +import kotlin.test.assertTrue + +class PoolingTest { + @Test + fun segmentSharing() { + val buffer = Buffer() + + buffer.writeByte(1) + var poolSize = SegmentPool.byteCount + buffer.clear() + // clear should return a segment to a pool, so the pool size should grow + assertTrue(poolSize < SegmentPool.byteCount) + + buffer.writeByte(1) + poolSize = SegmentPool.byteCount + val copy = buffer.copy() + buffer.clear() + copy.clear() + assertTrue(poolSize < SegmentPool.byteCount) + + buffer.writeByte(1) + poolSize = SegmentPool.byteCount + val peek = buffer.peek().buffered() + peek.readByte() + buffer.clear() + assertTrue(poolSize < SegmentPool.byteCount) + } +} From b9dd6d14a412c8b5db333a1bf1a0a8f843e11f20 Mon Sep 17 00:00:00 2001 From: Filipp Zhinkin Date: Thu, 27 Jun 2024 13:40:20 +0200 Subject: [PATCH 02/20] Replace the shared flag with a copy-tracker --- core/api/kotlinx-io-core.api | 3 +- core/common/src/Segment.kt | 77 ++++++++++++++++--- .../src/unsafe/UnsafeBufferOperations.kt | 3 +- core/common/test/SimpleCopyTrackerTest.kt | 27 +++++++ core/js/src/SegmentPool.kt | 2 +- core/jvm/src/SegmentPool.kt | 49 +++++++++++- core/jvm/test/RefCounteringCopyTrackerTest.kt | 35 +++++++++ core/native/src/SegmentPool.kt | 2 +- core/wasm/src/SegmentPool.kt | 2 +- 9 files changed, 180 insertions(+), 20 deletions(-) create mode 100644 core/common/test/SimpleCopyTrackerTest.kt create mode 100644 core/jvm/test/RefCounteringCopyTrackerTest.kt diff --git a/core/api/kotlinx-io-core.api b/core/api/kotlinx-io-core.api index 2fb84fbd6..802d2f8bd 100644 --- a/core/api/kotlinx-io-core.api +++ b/core/api/kotlinx-io-core.api @@ -101,7 +101,8 @@ public abstract interface class kotlinx/io/RawSource : java/lang/AutoCloseable { } public final class kotlinx/io/Segment { - public synthetic fun ([BIIZZLkotlin/jvm/internal/DefaultConstructorMarker;)V + public synthetic fun (Lkotlinx/io/SegmentCopyTracker;Lkotlin/jvm/internal/DefaultConstructorMarker;)V + public synthetic fun ([BIILkotlinx/io/SegmentCopyTracker;ZLkotlin/jvm/internal/DefaultConstructorMarker;)V public final synthetic fun dataAsByteArray (Z)[B public final synthetic fun getLimit ()I public final synthetic fun getNext ()Lkotlinx/io/Segment; diff --git a/core/common/src/Segment.kt b/core/common/src/Segment.kt index f72e4deb1..55f40c061 100644 --- a/core/common/src/Segment.kt +++ b/core/common/src/Segment.kt @@ -20,9 +20,60 @@ */ package kotlinx.io +import kotlin.concurrent.Volatile import kotlin.jvm.JvmField import kotlin.jvm.JvmSynthetic +/** + * Tracks shared segment copies. + * + * A new [SegmentCopyTracker] instance should be not shared by default (i.e. `shared == false`). + * Any further [addCopy] calls should move the tracker to a shared state (i.e. `shared == true`). + * Once a shared segment copy is recycled, [removeCopyIfShared] should be called. + * Depending on implementation, calling [removeCopyIfShared] the same number of times as [addCopy] may + * or may not transition the tracked back to unshared stated. + * + * The class is not intended for public use and currently designed to fit the only use case - within JVM SegmentPool + * implementation. + */ +internal abstract class SegmentCopyTracker { + /** + * Track a new copy created by sharing an associated segment. + */ + abstract fun addCopy() + + /** + * Record reclamation of a shared segment copy associated with this tracker. + * If a tracker was in unshared state, this call should not affect an internal state. + * + * @return `true` if the segment was not shared *before* this called. + */ + abstract fun removeCopyIfShared(): Boolean + + /** + * `true` if a tracker shared by multiple segment copies. + */ + abstract val shared: Boolean +} + +/** + * Simple [SegmentCopyTracker] transitioning from unshared to shared state only. + * [removeCopyIfShared] calls do not affect [shared] value. + */ +internal class SimpleCopyTracker : SegmentCopyTracker() { + @Volatile + private var shared_: Boolean = false + + override fun addCopy() { + shared_ = true + } + + override fun removeCopyIfShared(): Boolean = shared + + override val shared: Boolean + get() = shared_ +} + /** * A segment of a buffer. * @@ -59,8 +110,13 @@ public class Segment { internal var limit: Int = 0 /** True if other segments or byte strings use the same byte array. */ - @JvmField - internal var shared: Boolean = false + internal val shared: Boolean + get() { + return copyTracker.shared + } + + /** Tracks number shared copies */ + internal val copyTracker: SegmentCopyTracker /** True if this segment owns the byte array and can append to it, extending `limit`. */ @JvmField @@ -76,17 +132,17 @@ public class Segment { @JvmField internal var prev: Segment? = null - private constructor() { + private constructor(shareToken: SegmentCopyTracker) { this.data = ByteArray(SIZE) this.owner = true - this.shared = false + this.copyTracker = shareToken } - private constructor(data: ByteArray, pos: Int, limit: Int, shared: Boolean, owner: Boolean) { + private constructor(data: ByteArray, pos: Int, limit: Int, shareToken: SegmentCopyTracker, owner: Boolean) { this.data = data this.pos = pos this.limit = limit - this.shared = shared + this.copyTracker = shareToken this.owner = owner } @@ -96,8 +152,7 @@ public class Segment { * prevents it from being pooled. */ internal fun sharedCopy(): Segment { - shared = true - return Segment(data, pos, limit, true, false) + return Segment(data, pos, limit, copyTracker.also { it.addCopy() }, false) } /** @@ -247,11 +302,11 @@ public class Segment { internal const val SHARE_MINIMUM = 1024 @JvmSynthetic - internal fun new(): Segment = Segment() + internal fun new(copyTracker: SegmentCopyTracker): Segment = Segment(copyTracker) @JvmSynthetic - internal fun new(data: ByteArray, pos: Int, limit: Int, shared: Boolean, owner: Boolean): Segment - = Segment(data, pos, limit, shared, owner) + internal fun new(data: ByteArray, pos: Int, limit: Int, copyTracker: SegmentCopyTracker, owner: Boolean): Segment + = Segment(data, pos, limit, copyTracker, owner) } } diff --git a/core/common/src/unsafe/UnsafeBufferOperations.kt b/core/common/src/unsafe/UnsafeBufferOperations.kt index 545e62ade..6bf183edc 100644 --- a/core/common/src/unsafe/UnsafeBufferOperations.kt +++ b/core/common/src/unsafe/UnsafeBufferOperations.kt @@ -37,7 +37,8 @@ public object UnsafeBufferOperations { public fun moveToTail(buffer: Buffer, bytes: ByteArray, startIndex: Int = 0, endIndex: Int = bytes.size) { checkBounds(bytes.size, startIndex, endIndex) val segment = Segment.new( - bytes, startIndex, endIndex, shared = true /* to prevent recycling */, + bytes, startIndex, endIndex, + SimpleCopyTracker().also { it.addCopy() } /* to prevent recycling */, owner = false /* can't append to it */ ) val tail = buffer.tail diff --git a/core/common/test/SimpleCopyTrackerTest.kt b/core/common/test/SimpleCopyTrackerTest.kt new file mode 100644 index 000000000..1dd331b80 --- /dev/null +++ b/core/common/test/SimpleCopyTrackerTest.kt @@ -0,0 +1,27 @@ +/* + * Copyright 2010-2024 JetBrains s.r.o. and Kotlin Programming Language contributors. + * Use of this source code is governed by the Apache 2.0 license that can be found in the LICENSE.txt file. + */ + +package kotlinx.io + +import kotlin.test.Test +import kotlin.test.assertFalse +import kotlin.test.assertTrue + +class SimpleCopyTrackerTest { + @Test + fun stateTransition() { + val tracker = SimpleCopyTracker() + assertFalse(tracker.shared) + + assertFalse(tracker.removeCopyIfShared()) + assertFalse(tracker.shared) + + tracker.addCopy() + assertTrue(tracker.shared) + + assertTrue(tracker.removeCopyIfShared()) + assertTrue(tracker.shared) + } +} diff --git a/core/js/src/SegmentPool.kt b/core/js/src/SegmentPool.kt index 615a14977..143a5fd94 100644 --- a/core/js/src/SegmentPool.kt +++ b/core/js/src/SegmentPool.kt @@ -10,7 +10,7 @@ internal actual object SegmentPool { actual val byteCount: Int = 0 - actual fun take(): Segment = Segment.new() + actual fun take(): Segment = Segment.new(SimpleCopyTracker()) actual fun recycle(segment: Segment) { } diff --git a/core/jvm/src/SegmentPool.kt b/core/jvm/src/SegmentPool.kt index f70d22dae..fa5008fec 100644 --- a/core/jvm/src/SegmentPool.kt +++ b/core/jvm/src/SegmentPool.kt @@ -25,8 +25,46 @@ import kotlinx.io.SegmentPool.LOCK import kotlinx.io.SegmentPool.MAX_SIZE import kotlinx.io.SegmentPool.recycle import kotlinx.io.SegmentPool.take +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater import java.util.concurrent.atomic.AtomicReference +/** + * Precise [SegmentCopyTracker] implementation tracking a number of shared segment copies. + * Every [addCopy] call increments the counter, every [removeCopyIfShared] decrements it. + * + * After calling [removeCopyIfShared] the same number of time [addCopy] was called, tracker returns to the unshared state. + * + * The class is internal for testing only. + */ +internal class RefCountingCopyTracker : SegmentCopyTracker() { + companion object { + @JvmStatic + private val fieldUpdater = AtomicIntegerFieldUpdater.newUpdater(RefCountingCopyTracker::class.java, "refs") + } + + @Volatile + private var refs: Int = 0 + + override fun addCopy() { + fieldUpdater.incrementAndGet(this) + } + + override fun removeCopyIfShared(): Boolean { + while (true) { + val value = refs + check(value >= 0) { "Shared copies count is negative: $value" } + if (fieldUpdater.compareAndSet(this, value, (value - 1).coerceAtLeast(0))) { + return value > 0 + } + } + } + + override val shared: Boolean + get() { + return refs > 0 + } +} + /** * This class pools segments in a lock-free singly-linked stack. Though this code is lock-free it * does use a sentinel [LOCK] value to defend against races. Conflicted operations are not retried, @@ -52,7 +90,8 @@ internal actual object SegmentPool { actual val MAX_SIZE = 64 * 1024 // 64 KiB. /** A sentinel segment to indicate that the linked list is currently being modified. */ - private val LOCK = Segment.new(ByteArray(0), pos = 0, limit = 0, shared = false, owner = false) + private val LOCK = + Segment.new(ByteArray(0), pos = 0, limit = 0, copyTracker = RefCountingCopyTracker(), owner = false) /** * The number of hash buckets. This number needs to balance keeping the pool small and contention @@ -87,13 +126,13 @@ internal actual object SegmentPool { when { first === LOCK -> { // We didn't acquire the lock. Don't take a pooled segment. - return Segment.new() + return Segment.new(RefCountingCopyTracker()) } first == null -> { // We acquired the lock but the pool was empty. Unlock and return a new segment. firstRef.set(null) - return Segment.new() + return Segment.new(RefCountingCopyTracker()) } else -> { @@ -101,6 +140,7 @@ internal actual object SegmentPool { firstRef.set(first.next) first.next = null first.limit = 0 + check(!first.shared) return first } } @@ -109,7 +149,7 @@ internal actual object SegmentPool { @JvmStatic actual fun recycle(segment: Segment) { require(segment.next == null && segment.prev == null) - if (segment.shared) return // This segment cannot be recycled. + if (segment.copyTracker.removeCopyIfShared()) return // This segment cannot be recycled. val firstRef = firstRef() @@ -120,6 +160,7 @@ internal actual object SegmentPool { segment.next = first segment.pos = 0 + segment.owner = true segment.limit = firstLimit + Segment.SIZE // If we lost a race with another operation, don't recycle this segment. diff --git a/core/jvm/test/RefCounteringCopyTrackerTest.kt b/core/jvm/test/RefCounteringCopyTrackerTest.kt new file mode 100644 index 000000000..e516e9c3f --- /dev/null +++ b/core/jvm/test/RefCounteringCopyTrackerTest.kt @@ -0,0 +1,35 @@ +/* + * Copyright 2010-2024 JetBrains s.r.o. and Kotlin Programming Language contributors. + * Use of this source code is governed by the Apache 2.0 license that can be found in the LICENSE.txt file. + */ + +package kotlinx.io + +import kotlin.test.Test +import kotlin.test.assertFalse +import kotlin.test.assertTrue + +class RefCounteringCopyTrackerTest { + @Test + fun stateTransition() { + val tracker = RefCountingCopyTracker() + assertFalse(tracker.shared) + + assertFalse(tracker.removeCopyIfShared()) + assertFalse(tracker.shared) + + tracker.addCopy() + assertTrue(tracker.shared) + assertTrue(tracker.removeCopyIfShared()) + assertFalse(tracker.shared) + + tracker.addCopy() + assertTrue(tracker.shared) + tracker.addCopy() + assertTrue(tracker.shared) + assertTrue(tracker.removeCopyIfShared()) + assertTrue(tracker.shared) + assertTrue(tracker.removeCopyIfShared()) + assertFalse(tracker.shared) + } +} diff --git a/core/native/src/SegmentPool.kt b/core/native/src/SegmentPool.kt index 709e6f894..99c996b70 100644 --- a/core/native/src/SegmentPool.kt +++ b/core/native/src/SegmentPool.kt @@ -25,7 +25,7 @@ internal actual object SegmentPool { actual val byteCount: Int = 0 - actual fun take(): Segment = Segment.new() + actual fun take(): Segment = Segment.new(SimpleCopyTracker()) actual fun recycle(segment: Segment) { } diff --git a/core/wasm/src/SegmentPool.kt b/core/wasm/src/SegmentPool.kt index 615a14977..143a5fd94 100644 --- a/core/wasm/src/SegmentPool.kt +++ b/core/wasm/src/SegmentPool.kt @@ -10,7 +10,7 @@ internal actual object SegmentPool { actual val byteCount: Int = 0 - actual fun take(): Segment = Segment.new() + actual fun take(): Segment = Segment.new(SimpleCopyTracker()) actual fun recycle(segment: Segment) { } From 0d8d33f719468eadb594ff7c81cfeb6959193ceb Mon Sep 17 00:00:00 2001 From: Filipp Zhinkin Date: Tue, 2 Jul 2024 16:54:16 +0200 Subject: [PATCH 03/20] Make share token nullable --- core/api/kotlinx-io-core.api | 1 - core/common/src/Segment.kt | 35 ++++++++++++++++++++++++---------- core/common/src/SegmentPool.kt | 2 ++ core/js/src/SegmentPool.kt | 4 +++- core/jvm/src/SegmentPool.kt | 9 ++++++--- core/native/src/SegmentPool.kt | 4 +++- core/wasm/src/SegmentPool.kt | 4 +++- 7 files changed, 42 insertions(+), 17 deletions(-) diff --git a/core/api/kotlinx-io-core.api b/core/api/kotlinx-io-core.api index 802d2f8bd..533ec5e67 100644 --- a/core/api/kotlinx-io-core.api +++ b/core/api/kotlinx-io-core.api @@ -101,7 +101,6 @@ public abstract interface class kotlinx/io/RawSource : java/lang/AutoCloseable { } public final class kotlinx/io/Segment { - public synthetic fun (Lkotlinx/io/SegmentCopyTracker;Lkotlin/jvm/internal/DefaultConstructorMarker;)V public synthetic fun ([BIILkotlinx/io/SegmentCopyTracker;ZLkotlin/jvm/internal/DefaultConstructorMarker;)V public final synthetic fun dataAsByteArray (Z)[B public final synthetic fun getLimit ()I diff --git a/core/common/src/Segment.kt b/core/common/src/Segment.kt index 55f40c061..55c80cd9d 100644 --- a/core/common/src/Segment.kt +++ b/core/common/src/Segment.kt @@ -112,11 +112,18 @@ public class Segment { /** True if other segments or byte strings use the same byte array. */ internal val shared: Boolean get() { - return copyTracker.shared + val t = copyTracker + return t != null && t.shared } - /** Tracks number shared copies */ - internal val copyTracker: SegmentCopyTracker + /** + * Tracks number shared copies + * + * Note that this reference is not `@Volatile` as segments are not thread-safe and it's an error + * to modify the same segment concurrently. + * At the same time, an object [copyTracker] refers to could be modified concurrently. + */ + internal var copyTracker: SegmentCopyTracker? = null /** True if this segment owns the byte array and can append to it, extending `limit`. */ @JvmField @@ -132,13 +139,13 @@ public class Segment { @JvmField internal var prev: Segment? = null - private constructor(shareToken: SegmentCopyTracker) { + private constructor() { this.data = ByteArray(SIZE) this.owner = true - this.copyTracker = shareToken + this.copyTracker = null } - private constructor(data: ByteArray, pos: Int, limit: Int, shareToken: SegmentCopyTracker, owner: Boolean) { + private constructor(data: ByteArray, pos: Int, limit: Int, shareToken: SegmentCopyTracker?, owner: Boolean) { this.data = data this.pos = pos this.limit = limit @@ -152,7 +159,10 @@ public class Segment { * prevents it from being pooled. */ internal fun sharedCopy(): Segment { - return Segment(data, pos, limit, copyTracker.also { it.addCopy() }, false) + val t = copyTracker ?: SegmentPool.tracker().also { + copyTracker = it + } + return Segment(data, pos, limit, t.also { it.addCopy() }, false) } /** @@ -302,11 +312,16 @@ public class Segment { internal const val SHARE_MINIMUM = 1024 @JvmSynthetic - internal fun new(copyTracker: SegmentCopyTracker): Segment = Segment(copyTracker) + internal fun new(): Segment = Segment() @JvmSynthetic - internal fun new(data: ByteArray, pos: Int, limit: Int, copyTracker: SegmentCopyTracker, owner: Boolean): Segment - = Segment(data, pos, limit, copyTracker, owner) + internal fun new( + data: ByteArray, + pos: Int, + limit: Int, + copyTracker: SegmentCopyTracker?, + owner: Boolean + ): Segment = Segment(data, pos, limit, copyTracker, owner) } } diff --git a/core/common/src/SegmentPool.kt b/core/common/src/SegmentPool.kt index 070cafbdc..046371bb8 100644 --- a/core/common/src/SegmentPool.kt +++ b/core/common/src/SegmentPool.kt @@ -38,4 +38,6 @@ internal expect object SegmentPool { /** Recycle a segment that the caller no longer needs. */ fun recycle(segment: Segment) + + fun tracker(): SegmentCopyTracker } diff --git a/core/js/src/SegmentPool.kt b/core/js/src/SegmentPool.kt index 143a5fd94..7c7e819e2 100644 --- a/core/js/src/SegmentPool.kt +++ b/core/js/src/SegmentPool.kt @@ -10,8 +10,10 @@ internal actual object SegmentPool { actual val byteCount: Int = 0 - actual fun take(): Segment = Segment.new(SimpleCopyTracker()) + actual fun take(): Segment = Segment.new() actual fun recycle(segment: Segment) { } + + actual fun tracker(): SegmentCopyTracker = SimpleCopyTracker() } diff --git a/core/jvm/src/SegmentPool.kt b/core/jvm/src/SegmentPool.kt index fa5008fec..a75b1efdb 100644 --- a/core/jvm/src/SegmentPool.kt +++ b/core/jvm/src/SegmentPool.kt @@ -126,13 +126,13 @@ internal actual object SegmentPool { when { first === LOCK -> { // We didn't acquire the lock. Don't take a pooled segment. - return Segment.new(RefCountingCopyTracker()) + return Segment.new() } first == null -> { // We acquired the lock but the pool was empty. Unlock and return a new segment. firstRef.set(null) - return Segment.new(RefCountingCopyTracker()) + return Segment.new() } else -> { @@ -149,7 +149,7 @@ internal actual object SegmentPool { @JvmStatic actual fun recycle(segment: Segment) { require(segment.next == null && segment.prev == null) - if (segment.copyTracker.removeCopyIfShared()) return // This segment cannot be recycled. + if (segment.copyTracker?.removeCopyIfShared() == true) return // This segment cannot be recycled. val firstRef = firstRef() @@ -169,6 +169,9 @@ internal actual object SegmentPool { } } + @JvmStatic + actual fun tracker(): SegmentCopyTracker = RefCountingCopyTracker() + private fun firstRef(): AtomicReference { // Get a value in [0..HASH_BUCKET_COUNT) based on the current thread. @Suppress("DEPRECATION") // TODO: switch to threadId after JDK19 diff --git a/core/native/src/SegmentPool.kt b/core/native/src/SegmentPool.kt index 99c996b70..4f532095d 100644 --- a/core/native/src/SegmentPool.kt +++ b/core/native/src/SegmentPool.kt @@ -25,8 +25,10 @@ internal actual object SegmentPool { actual val byteCount: Int = 0 - actual fun take(): Segment = Segment.new(SimpleCopyTracker()) + actual fun take(): Segment = Segment.new() actual fun recycle(segment: Segment) { } + + actual fun tracker(): SegmentCopyTracker = SimpleCopyTracker() } diff --git a/core/wasm/src/SegmentPool.kt b/core/wasm/src/SegmentPool.kt index 143a5fd94..7c7e819e2 100644 --- a/core/wasm/src/SegmentPool.kt +++ b/core/wasm/src/SegmentPool.kt @@ -10,8 +10,10 @@ internal actual object SegmentPool { actual val byteCount: Int = 0 - actual fun take(): Segment = Segment.new(SimpleCopyTracker()) + actual fun take(): Segment = Segment.new() actual fun recycle(segment: Segment) { } + + actual fun tracker(): SegmentCopyTracker = SimpleCopyTracker() } From f4de95b70751e3b02a1bc8b0b66928c6c2e4b773 Mon Sep 17 00:00:00 2001 From: Filipp Zhinkin Date: Tue, 2 Jul 2024 17:10:07 +0200 Subject: [PATCH 04/20] Support two-level segment pool --- core/build.gradle.kts | 4 +++ core/jvm/src/SegmentPool.kt | 72 +++++++++++++++++++++++++++++++++++-- 2 files changed, 74 insertions(+), 2 deletions(-) diff --git a/core/build.gradle.kts b/core/build.gradle.kts index 39ea02e77..bfef87b7b 100644 --- a/core/build.gradle.kts +++ b/core/build.gradle.kts @@ -54,6 +54,10 @@ kotlin { } } +tasks.jvmTest { + systemProperty("kotlinx.io.l2.pool.size.bytes", "8388608") // 8Mb +} + tasks.named("wasmWasiNodeTest") { // TODO: remove once https://youtrack.jetbrains.com/issue/KT-65179 solved doFirst { diff --git a/core/jvm/src/SegmentPool.kt b/core/jvm/src/SegmentPool.kt index a75b1efdb..244071306 100644 --- a/core/jvm/src/SegmentPool.kt +++ b/core/jvm/src/SegmentPool.kt @@ -83,15 +83,20 @@ internal class RefCountingCopyTracker : SegmentCopyTracker() { * This tracks the number of bytes in each linked list in its [Segment.limit] property. Each element * has a limit that's one segment size greater than its successor element. The maximum size of the * pool is a product of [MAX_SIZE] and [HASH_BUCKET_COUNT]. + * + * TODO: update kdoc with info about L2 pool */ internal actual object SegmentPool { /** The maximum number of bytes to pool per hash bucket. */ // TODO: Is 64 KiB a good maximum size? Do we ever have that many idle segments? actual val MAX_SIZE = 64 * 1024 // 64 KiB. + private val SECOND_LEVEL_POOL_SIZE = + System.getProperty("kotlinx.io.l2.pool.size.bytes", "0").toInt().coerceAtLeast(0) + /** A sentinel segment to indicate that the linked list is currently being modified. */ private val LOCK = - Segment.new(ByteArray(0), pos = 0, limit = 0, copyTracker = RefCountingCopyTracker(), owner = false) + Segment.new(ByteArray(0), pos = 0, limit = 0, copyTracker = SimpleCopyTracker(), owner = false) /** * The number of hash buckets. This number needs to balance keeping the pool small and contention @@ -112,6 +117,8 @@ internal actual object SegmentPool { AtomicReference() // null value implies an empty bucket } + private val secondLevelPoolRoot: AtomicReference = AtomicReference() + actual val byteCount: Int get() { val first = firstRef().get() ?: return 0 @@ -132,6 +139,11 @@ internal actual object SegmentPool { first == null -> { // We acquired the lock but the pool was empty. Unlock and return a new segment. firstRef.set(null) + + if (SECOND_LEVEL_POOL_SIZE > 0) { + return takeL2() + } + return Segment.new() } @@ -146,6 +158,33 @@ internal actual object SegmentPool { } } + @JvmStatic + private fun takeL2(): Segment { + while (true) { + when (val first = secondLevelPoolRoot.getAndSet(LOCK)) { + LOCK -> { + // We didn't acquire the lock, retry + continue + } + + null -> { + // We acquired the lock but the pool was empty. Unlock and return a new segment. + secondLevelPoolRoot.set(null) + return Segment.new() + } + + else -> { + // We acquired the lock and the pool was not empty. Pop the first element and return it. + secondLevelPoolRoot.set(first.next) + first.next = null + first.limit = 0 + check(!first.shared) + return first + } + } + } + } + @JvmStatic actual fun recycle(segment: Segment) { require(segment.next == null && segment.prev == null) @@ -156,7 +195,13 @@ internal actual object SegmentPool { val first = firstRef.get() if (first === LOCK) return // A take() is currently in progress. val firstLimit = first?.limit ?: 0 - if (firstLimit >= MAX_SIZE) return // Pool is full. + if (firstLimit >= MAX_SIZE) { + // L1 pool is full. + if (SECOND_LEVEL_POOL_SIZE > 0) { + recycleL2(segment) + } + return + } segment.next = first segment.pos = 0 @@ -169,6 +214,29 @@ internal actual object SegmentPool { } } + @JvmStatic + private fun recycleL2(segment: Segment) { + segment.pos = 0 + segment.owner = true + + while (true) { + val first = secondLevelPoolRoot.get() + if (first === LOCK) continue // A take() is currently in progress. + val firstLimit = first?.limit ?: 0 + if (firstLimit >= SECOND_LEVEL_POOL_SIZE) { + // L2 pool is full. + return + } + + segment.next = first + segment.limit = firstLimit + Segment.SIZE + + if (secondLevelPoolRoot.compareAndSet(first, segment)) { + return + } + } + } + @JvmStatic actual fun tracker(): SegmentCopyTracker = RefCountingCopyTracker() From 238f909ca5ee51be41909bdcfa0b379c588806bc Mon Sep 17 00:00:00 2001 From: Filipp Zhinkin Date: Wed, 3 Jul 2024 16:35:42 +0200 Subject: [PATCH 05/20] Retry take/recycle until successful CAS --- core/jvm/src/SegmentPool.kt | 79 +++++++++++++++++++------------------ 1 file changed, 41 insertions(+), 38 deletions(-) diff --git a/core/jvm/src/SegmentPool.kt b/core/jvm/src/SegmentPool.kt index 244071306..028060c74 100644 --- a/core/jvm/src/SegmentPool.kt +++ b/core/jvm/src/SegmentPool.kt @@ -129,31 +129,32 @@ internal actual object SegmentPool { actual fun take(): Segment { val firstRef = firstRef() - val first = firstRef.getAndSet(LOCK) - when { - first === LOCK -> { - // We didn't acquire the lock. Don't take a pooled segment. - return Segment.new() - } + while (true) { + when (val first = firstRef.getAndSet(LOCK)) { + LOCK -> { + // We didn't acquire the lock. Let's try again + continue + } - first == null -> { - // We acquired the lock but the pool was empty. Unlock and return a new segment. - firstRef.set(null) + null -> { + // We acquired the lock but the pool was empty. Unlock and return a new segment. + firstRef.set(null) - if (SECOND_LEVEL_POOL_SIZE > 0) { - return takeL2() - } + if (SECOND_LEVEL_POOL_SIZE > 0) { + return takeL2() + } - return Segment.new() - } + return Segment.new() + } - else -> { - // We acquired the lock and the pool was not empty. Pop the first element and return it. - firstRef.set(first.next) - first.next = null - first.limit = 0 - check(!first.shared) - return first + else -> { + // We acquired the lock and the pool was not empty. Pop the first element and return it. + firstRef.set(first.next) + first.next = null + first.limit = 0 + // check(!first.shared) + return first + } } } } @@ -190,27 +191,29 @@ internal actual object SegmentPool { require(segment.next == null && segment.prev == null) if (segment.copyTracker?.removeCopyIfShared() == true) return // This segment cannot be recycled. - val firstRef = firstRef() - val first = firstRef.get() - if (first === LOCK) return // A take() is currently in progress. - val firstLimit = first?.limit ?: 0 - if (firstLimit >= MAX_SIZE) { - // L1 pool is full. - if (SECOND_LEVEL_POOL_SIZE > 0) { - recycleL2(segment) - } - return - } - - segment.next = first segment.pos = 0 segment.owner = true - segment.limit = firstLimit + Segment.SIZE + while (true) { + val firstRef = firstRef() - // If we lost a race with another operation, don't recycle this segment. - if (!firstRef.compareAndSet(first, segment)) { - segment.next = null // Don't leak a reference in the pool either! + val first = firstRef.get() + if (first === LOCK) continue // A take() is currently in progress. + val firstLimit = first?.limit ?: 0 + if (firstLimit >= MAX_SIZE) { + // L1 pool is full. + if (SECOND_LEVEL_POOL_SIZE > 0) { + recycleL2(segment) + } + return + } + + segment.next = first + segment.limit = firstLimit + Segment.SIZE + + if (firstRef.compareAndSet(first, segment)) { + return + } } } From 7f101706ae372eb58721a48d3bf2c986d8c653c2 Mon Sep 17 00:00:00 2001 From: Filipp Zhinkin Date: Wed, 3 Jul 2024 16:38:01 +0200 Subject: [PATCH 06/20] Renamed pool size property --- core/jvm/src/SegmentPool.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/jvm/src/SegmentPool.kt b/core/jvm/src/SegmentPool.kt index 028060c74..69e39fe64 100644 --- a/core/jvm/src/SegmentPool.kt +++ b/core/jvm/src/SegmentPool.kt @@ -92,7 +92,7 @@ internal actual object SegmentPool { actual val MAX_SIZE = 64 * 1024 // 64 KiB. private val SECOND_LEVEL_POOL_SIZE = - System.getProperty("kotlinx.io.l2.pool.size.bytes", "0").toInt().coerceAtLeast(0) + System.getProperty("kotlinx.io.pool.size.bytes", "0").toInt().coerceAtLeast(0) /** A sentinel segment to indicate that the linked list is currently being modified. */ private val LOCK = From 8f7a282c3710192296c6cf7bf84f40c13ea1574d Mon Sep 17 00:00:00 2001 From: Filipp Zhinkin Date: Thu, 4 Jul 2024 10:55:06 +0200 Subject: [PATCH 07/20] Update KDoc --- core/jvm/src/SegmentPool.kt | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/core/jvm/src/SegmentPool.kt b/core/jvm/src/SegmentPool.kt index 69e39fe64..ca2d2c934 100644 --- a/core/jvm/src/SegmentPool.kt +++ b/core/jvm/src/SegmentPool.kt @@ -67,24 +67,28 @@ internal class RefCountingCopyTracker : SegmentCopyTracker() { /** * This class pools segments in a lock-free singly-linked stack. Though this code is lock-free it - * does use a sentinel [LOCK] value to defend against races. Conflicted operations are not retried, - * so there is no chance of blocking despite the term "lock". + * does use a sentinel [LOCK] value to defend against races. To reduce the contention, the pool consists + * of several buckets (see [HASH_BUCKET_COUNT]), each holding a reference to its own segments stack. + * Every [take] or [recycle] choose one of the buckets depending on a [Thread.currentThread]'s [Thread.getId]. * * On [take], a caller swaps the stack's next pointer with the [LOCK] sentinel. If the stack was * not already locked, the caller replaces the head node with its successor. * * On [recycle], a caller swaps the head with a new node whose successor is the replaced head. * - * On conflict, operations succeed, but segments are not pushed into the stack. For example, a - * [take] that loses a race allocates a new segment regardless of the pool size. A [recycle] call - * that loses a race will not increase the size of the pool. Under significant contention, this pool - * will have fewer hits and the VM will do more GC and zero filling of arrays. + * On conflict, operations are retried until they succeed. * * This tracks the number of bytes in each linked list in its [Segment.limit] property. Each element * has a limit that's one segment size greater than its successor element. The maximum size of the * pool is a product of [MAX_SIZE] and [HASH_BUCKET_COUNT]. * - * TODO: update kdoc with info about L2 pool + * [MAX_SIZE] is kept relatively small to avoid excessive memory consumption in case of a large [HASH_BUCKET_COUNT]. + * For better handling of scenarios with high segments demand, an optional second-level pool could be enabled + * by setting up a value of `kotlinx.io.pool.size.bytes` system property. + * + * The second-level pool, unlike the pool described above, is not sharded and has a single entry point shared + * across all application threads. That pool is used as a backup in case when [take] or [recycle] failed due to + * an empty or exhausted segments chain in a corresponding bucket (one of [HASH_BUCKET_COUNT] buckets). */ internal actual object SegmentPool { /** The maximum number of bytes to pool per hash bucket. */ @@ -117,6 +121,9 @@ internal actual object SegmentPool { AtomicReference() // null value implies an empty bucket } + /** + * Entry point for a second-level segments pool. + */ private val secondLevelPoolRoot: AtomicReference = AtomicReference() actual val byteCount: Int From 28da07e145dfe3c4e08ccb3559d6018e046788fa Mon Sep 17 00:00:00 2001 From: Filipp Zhinkin Date: Thu, 4 Jul 2024 11:58:51 +0200 Subject: [PATCH 08/20] Use sharded L2-pool --- core/jvm/src/SegmentPool.kt | 67 ++++++++++++++++++++++++++----------- 1 file changed, 47 insertions(+), 20 deletions(-) diff --git a/core/jvm/src/SegmentPool.kt b/core/jvm/src/SegmentPool.kt index ca2d2c934..f4ba6e20d 100644 --- a/core/jvm/src/SegmentPool.kt +++ b/core/jvm/src/SegmentPool.kt @@ -86,8 +86,10 @@ internal class RefCountingCopyTracker : SegmentCopyTracker() { * For better handling of scenarios with high segments demand, an optional second-level pool could be enabled * by setting up a value of `kotlinx.io.pool.size.bytes` system property. * - * The second-level pool, unlike the pool described above, is not sharded and has a single entry point shared - * across all application threads. That pool is used as a backup in case when [take] or [recycle] failed due to + * The second-level pool use half of the [HASH_BUCKET_COUNT] and if an initially selected bucket if empty on [take] or + * full or [recycle], all other buckets will be inspected before finally giving up (which means allocating a new segment + * on [take], or loosing a reference to a segment on [recycle]). + * That pool is used as a backup in case when [take] or [recycle] failed due to * an empty or exhausted segments chain in a corresponding bucket (one of [HASH_BUCKET_COUNT] buckets). */ internal actual object SegmentPool { @@ -95,9 +97,6 @@ internal actual object SegmentPool { // TODO: Is 64 KiB a good maximum size? Do we ever have that many idle segments? actual val MAX_SIZE = 64 * 1024 // 64 KiB. - private val SECOND_LEVEL_POOL_SIZE = - System.getProperty("kotlinx.io.pool.size.bytes", "0").toInt().coerceAtLeast(0) - /** A sentinel segment to indicate that the linked list is currently being modified. */ private val LOCK = Segment.new(ByteArray(0), pos = 0, limit = 0, copyTracker = SimpleCopyTracker(), owner = false) @@ -110,6 +109,14 @@ internal actual object SegmentPool { private val HASH_BUCKET_COUNT = Integer.highestOneBit(Runtime.getRuntime().availableProcessors() * 2 - 1) + private val HASH_BUCKET_COUNT_L2 = + Integer.highestOneBit(Runtime.getRuntime().availableProcessors()) + + private val SECOND_LEVEL_POOL_SIZE = + System.getProperty("kotlinx.io.pool.size.bytes", "0").toInt().coerceAtLeast(0) + + private val SECOND_LEVEL_POOL_SIZE_NORM = (SECOND_LEVEL_POOL_SIZE / HASH_BUCKET_COUNT).coerceAtLeast(Segment.SIZE) + /** * Hash buckets each contain a singly-linked list of segments. The index/key is a hash function of * thread ID because it may reduce contention or increase locality. @@ -121,10 +128,9 @@ internal actual object SegmentPool { AtomicReference() // null value implies an empty bucket } - /** - * Entry point for a second-level segments pool. - */ - private val secondLevelPoolRoot: AtomicReference = AtomicReference() + private val hashBucketsL2: Array> = Array(HASH_BUCKET_COUNT) { + AtomicReference() // null value implies an empty bucket + } actual val byteCount: Int get() { @@ -159,7 +165,6 @@ internal actual object SegmentPool { firstRef.set(first.next) first.next = null first.limit = 0 - // check(!first.shared) return first } } @@ -168,8 +173,11 @@ internal actual object SegmentPool { @JvmStatic private fun takeL2(): Segment { + var bucket = bucketId() + var attempts = 0 + var firstRef = hashBucketsL2[bucket] while (true) { - when (val first = secondLevelPoolRoot.getAndSet(LOCK)) { + when (val first = firstRef.getAndSet(LOCK)) { LOCK -> { // We didn't acquire the lock, retry continue @@ -177,16 +185,23 @@ internal actual object SegmentPool { null -> { // We acquired the lock but the pool was empty. Unlock and return a new segment. - secondLevelPoolRoot.set(null) + firstRef.set(null) + + if (attempts < HASH_BUCKET_COUNT_L2) { + bucket = (bucket + 1) and (HASH_BUCKET_COUNT_L2 - 1) + attempts++ + firstRef = hashBucketsL2[bucket] + continue + } + return Segment.new() } else -> { // We acquired the lock and the pool was not empty. Pop the first element and return it. - secondLevelPoolRoot.set(first.next) + firstRef.set(first.next) first.next = null first.limit = 0 - check(!first.shared) return first } } @@ -198,7 +213,6 @@ internal actual object SegmentPool { require(segment.next == null && segment.prev == null) if (segment.copyTracker?.removeCopyIfShared() == true) return // This segment cannot be recycled. - segment.pos = 0 segment.owner = true while (true) { @@ -229,11 +243,21 @@ internal actual object SegmentPool { segment.pos = 0 segment.owner = true + var bucket = bucketId() + var attempts = 0 + var firstRef = hashBucketsL2[bucket] + while (true) { - val first = secondLevelPoolRoot.get() + val first = firstRef.get() if (first === LOCK) continue // A take() is currently in progress. val firstLimit = first?.limit ?: 0 - if (firstLimit >= SECOND_LEVEL_POOL_SIZE) { + if (firstLimit >= SECOND_LEVEL_POOL_SIZE_NORM) { + if (attempts < HASH_BUCKET_COUNT_L2) { + attempts++ + bucket = (bucket + 1) and (HASH_BUCKET_COUNT_L2 - 1) + firstRef = hashBucketsL2[bucket] + continue + } // L2 pool is full. return } @@ -241,7 +265,7 @@ internal actual object SegmentPool { segment.next = first segment.limit = firstLimit + Segment.SIZE - if (secondLevelPoolRoot.compareAndSet(first, segment)) { + if (firstRef.compareAndSet(first, segment)) { return } } @@ -251,9 +275,12 @@ internal actual object SegmentPool { actual fun tracker(): SegmentCopyTracker = RefCountingCopyTracker() private fun firstRef(): AtomicReference { + return hashBuckets[bucketId()] + } + + private fun bucketId(): Int { // Get a value in [0..HASH_BUCKET_COUNT) based on the current thread. @Suppress("DEPRECATION") // TODO: switch to threadId after JDK19 - val hashBucket = (Thread.currentThread().id and (HASH_BUCKET_COUNT - 1L)).toInt() - return hashBuckets[hashBucket] + return (Thread.currentThread().id and (HASH_BUCKET_COUNT - 1L)).toInt() } } From d809904bae6ca7caccd973a21001c97bdd7b73f6 Mon Sep 17 00:00:00 2001 From: Filipp Zhinkin Date: Thu, 4 Jul 2024 15:20:33 +0200 Subject: [PATCH 09/20] Cleanup --- core/jvm/src/SegmentPool.kt | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/core/jvm/src/SegmentPool.kt b/core/jvm/src/SegmentPool.kt index f4ba6e20d..2da2d2a66 100644 --- a/core/jvm/src/SegmentPool.kt +++ b/core/jvm/src/SegmentPool.kt @@ -109,13 +109,13 @@ internal actual object SegmentPool { private val HASH_BUCKET_COUNT = Integer.highestOneBit(Runtime.getRuntime().availableProcessors() * 2 - 1) - private val HASH_BUCKET_COUNT_L2 = - Integer.highestOneBit(Runtime.getRuntime().availableProcessors()) + private val HASH_BUCKET_COUNT_L2 = (HASH_BUCKET_COUNT / 2).coerceAtLeast(1) - private val SECOND_LEVEL_POOL_SIZE = + private val SECOND_LEVEL_POOL_TOTAL_SIZE = System.getProperty("kotlinx.io.pool.size.bytes", "0").toInt().coerceAtLeast(0) - private val SECOND_LEVEL_POOL_SIZE_NORM = (SECOND_LEVEL_POOL_SIZE / HASH_BUCKET_COUNT).coerceAtLeast(Segment.SIZE) + private val SECOND_LEVEL_POOL_BUCKET_SIZE = + (SECOND_LEVEL_POOL_TOTAL_SIZE / HASH_BUCKET_COUNT).coerceAtLeast(Segment.SIZE) /** * Hash buckets each contain a singly-linked list of segments. The index/key is a hash function of @@ -153,7 +153,7 @@ internal actual object SegmentPool { // We acquired the lock but the pool was empty. Unlock and return a new segment. firstRef.set(null) - if (SECOND_LEVEL_POOL_SIZE > 0) { + if (SECOND_LEVEL_POOL_TOTAL_SIZE > 0) { return takeL2() } @@ -173,7 +173,7 @@ internal actual object SegmentPool { @JvmStatic private fun takeL2(): Segment { - var bucket = bucketId() + var bucket = l2BucketId() var attempts = 0 var firstRef = hashBucketsL2[bucket] while (true) { @@ -223,7 +223,7 @@ internal actual object SegmentPool { val firstLimit = first?.limit ?: 0 if (firstLimit >= MAX_SIZE) { // L1 pool is full. - if (SECOND_LEVEL_POOL_SIZE > 0) { + if (SECOND_LEVEL_POOL_TOTAL_SIZE > 0) { recycleL2(segment) } return @@ -243,7 +243,7 @@ internal actual object SegmentPool { segment.pos = 0 segment.owner = true - var bucket = bucketId() + var bucket = l2BucketId() var attempts = 0 var firstRef = hashBucketsL2[bucket] @@ -251,7 +251,7 @@ internal actual object SegmentPool { val first = firstRef.get() if (first === LOCK) continue // A take() is currently in progress. val firstLimit = first?.limit ?: 0 - if (firstLimit >= SECOND_LEVEL_POOL_SIZE_NORM) { + if (firstLimit >= SECOND_LEVEL_POOL_BUCKET_SIZE) { if (attempts < HASH_BUCKET_COUNT_L2) { attempts++ bucket = (bucket + 1) and (HASH_BUCKET_COUNT_L2 - 1) @@ -275,12 +275,15 @@ internal actual object SegmentPool { actual fun tracker(): SegmentCopyTracker = RefCountingCopyTracker() private fun firstRef(): AtomicReference { - return hashBuckets[bucketId()] + // Get a value in [0..HASH_BUCKET_COUNT) based on the current thread. + @Suppress("DEPRECATION") // TODO: switch to threadId after JDK19 + val bucket = (Thread.currentThread().id and (HASH_BUCKET_COUNT - 1L)).toInt() + return hashBuckets[bucket] } - private fun bucketId(): Int { - // Get a value in [0..HASH_BUCKET_COUNT) based on the current thread. + private fun l2BucketId(): Int { + // Get a value in [0..HASH_BUCKET_COUNT_L2) based on the current thread. @Suppress("DEPRECATION") // TODO: switch to threadId after JDK19 - return (Thread.currentThread().id and (HASH_BUCKET_COUNT - 1L)).toInt() + return (Thread.currentThread().id and (HASH_BUCKET_COUNT_L2 - 1L)).toInt() } } From 76995c2422b2a129db822adc597be0e4d5f1e3a7 Mon Sep 17 00:00:00 2001 From: Filipp Zhinkin Date: Thu, 4 Jul 2024 15:56:11 +0200 Subject: [PATCH 10/20] Cleanup --- core/build.gradle.kts | 4 ---- 1 file changed, 4 deletions(-) diff --git a/core/build.gradle.kts b/core/build.gradle.kts index bfef87b7b..39ea02e77 100644 --- a/core/build.gradle.kts +++ b/core/build.gradle.kts @@ -54,10 +54,6 @@ kotlin { } } -tasks.jvmTest { - systemProperty("kotlinx.io.l2.pool.size.bytes", "8388608") // 8Mb -} - tasks.named("wasmWasiNodeTest") { // TODO: remove once https://youtrack.jetbrains.com/issue/KT-65179 solved doFirst { From 18e0aa31c47bb716dd19f711f0660d0e16c00b8e Mon Sep 17 00:00:00 2001 From: Filipp Zhinkin Date: Wed, 10 Jul 2024 15:24:08 +0200 Subject: [PATCH 11/20] Bump up second level cache size to 4 megs on JVM --- core/jvm/src/SegmentPool.kt | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/core/jvm/src/SegmentPool.kt b/core/jvm/src/SegmentPool.kt index 2da2d2a66..2d8d6a41f 100644 --- a/core/jvm/src/SegmentPool.kt +++ b/core/jvm/src/SegmentPool.kt @@ -111,8 +111,17 @@ internal actual object SegmentPool { private val HASH_BUCKET_COUNT_L2 = (HASH_BUCKET_COUNT / 2).coerceAtLeast(1) + // For now, keep things on Android as they were before, but on JVM - use second level cache. + // See https://developer.android.com/reference/java/lang/System#getProperties() for property name. + private val DEFAULT_SECOND_LEVEL_POOL_TOTAL_SIZE = when (System.getProperty("java.vm.name")) { + "Dalvik" -> "0" + else -> "4194304" // 4MB + } + private val SECOND_LEVEL_POOL_TOTAL_SIZE = - System.getProperty("kotlinx.io.pool.size.bytes", "0").toInt().coerceAtLeast(0) + System.getProperty("kotlinx.io.pool.size.bytes", DEFAULT_SECOND_LEVEL_POOL_TOTAL_SIZE) + .toInt() + .coerceAtLeast(0) private val SECOND_LEVEL_POOL_BUCKET_SIZE = (SECOND_LEVEL_POOL_TOTAL_SIZE / HASH_BUCKET_COUNT).coerceAtLeast(Segment.SIZE) From 40d2f8bf23d19f31101077698cdff373d8be5f02 Mon Sep 17 00:00:00 2001 From: Filipp Zhinkin Date: Wed, 10 Jul 2024 15:28:46 +0200 Subject: [PATCH 12/20] Cleanup --- core/common/src/Segment.kt | 27 ++++++++++++--------------- core/common/src/SegmentPool.kt | 5 +++++ core/jvm/src/SegmentPool.kt | 12 ++++++------ 3 files changed, 23 insertions(+), 21 deletions(-) diff --git a/core/common/src/Segment.kt b/core/common/src/Segment.kt index 55c80cd9d..418b6cfc4 100644 --- a/core/common/src/Segment.kt +++ b/core/common/src/Segment.kt @@ -37,23 +37,23 @@ import kotlin.jvm.JvmSynthetic * implementation. */ internal abstract class SegmentCopyTracker { + /** + * `true` if a tracker shared by multiple segment copies. + */ + abstract val shared: Boolean + /** * Track a new copy created by sharing an associated segment. */ abstract fun addCopy() /** - * Record reclamation of a shared segment copy associated with this tracker. + * Records reclamation of a shared segment copy associated with this tracker. * If a tracker was in unshared state, this call should not affect an internal state. * * @return `true` if the segment was not shared *before* this called. */ abstract fun removeCopyIfShared(): Boolean - - /** - * `true` if a tracker shared by multiple segment copies. - */ - abstract val shared: Boolean } /** @@ -62,16 +62,16 @@ internal abstract class SegmentCopyTracker { */ internal class SimpleCopyTracker : SegmentCopyTracker() { @Volatile - private var shared_: Boolean = false + private var _shared: Boolean = false + + override val shared: Boolean + get() = _shared override fun addCopy() { - shared_ = true + _shared = true } override fun removeCopyIfShared(): Boolean = shared - - override val shared: Boolean - get() = shared_ } /** @@ -111,10 +111,7 @@ public class Segment { /** True if other segments or byte strings use the same byte array. */ internal val shared: Boolean - get() { - val t = copyTracker - return t != null && t.shared - } + get() = copyTracker?.shared ?: false /** * Tracks number shared copies diff --git a/core/common/src/SegmentPool.kt b/core/common/src/SegmentPool.kt index 046371bb8..717a0b1b5 100644 --- a/core/common/src/SegmentPool.kt +++ b/core/common/src/SegmentPool.kt @@ -39,5 +39,10 @@ internal expect object SegmentPool { /** Recycle a segment that the caller no longer needs. */ fun recycle(segment: Segment) + /** + * Allocates a new copy tracker that'll be associated with a segment. + * For performance reasons, there's no tracker attached to a segment initially. + * Instead, it's allocated lazily on the first sharing attempt. + */ fun tracker(): SegmentCopyTracker } diff --git a/core/jvm/src/SegmentPool.kt b/core/jvm/src/SegmentPool.kt index 2d8d6a41f..580a640ba 100644 --- a/core/jvm/src/SegmentPool.kt +++ b/core/jvm/src/SegmentPool.kt @@ -45,6 +45,11 @@ internal class RefCountingCopyTracker : SegmentCopyTracker() { @Volatile private var refs: Int = 0 + override val shared: Boolean + get() { + return refs > 0 + } + override fun addCopy() { fieldUpdater.incrementAndGet(this) } @@ -58,11 +63,6 @@ internal class RefCountingCopyTracker : SegmentCopyTracker() { } } } - - override val shared: Boolean - get() { - return refs > 0 - } } /** @@ -99,7 +99,7 @@ internal actual object SegmentPool { /** A sentinel segment to indicate that the linked list is currently being modified. */ private val LOCK = - Segment.new(ByteArray(0), pos = 0, limit = 0, copyTracker = SimpleCopyTracker(), owner = false) + Segment.new(ByteArray(0), pos = 0, limit = 0, copyTracker = null, owner = false) /** * The number of hash buckets. This number needs to balance keeping the pool small and contention From 3dac356acad23347f04948bc3cde5746b949819d Mon Sep 17 00:00:00 2001 From: Filipp Zhinkin Date: Wed, 10 Jul 2024 15:42:38 +0200 Subject: [PATCH 13/20] Replace arrays of AtomicReferences with AtomicReferenceArrays --- core/jvm/src/SegmentPool.kt | 61 ++++++++++++++++--------------------- 1 file changed, 27 insertions(+), 34 deletions(-) diff --git a/core/jvm/src/SegmentPool.kt b/core/jvm/src/SegmentPool.kt index 580a640ba..de5d58982 100644 --- a/core/jvm/src/SegmentPool.kt +++ b/core/jvm/src/SegmentPool.kt @@ -26,7 +26,7 @@ import kotlinx.io.SegmentPool.MAX_SIZE import kotlinx.io.SegmentPool.recycle import kotlinx.io.SegmentPool.take import java.util.concurrent.atomic.AtomicIntegerFieldUpdater -import java.util.concurrent.atomic.AtomicReference +import java.util.concurrent.atomic.AtomicReferenceArray /** * Precise [SegmentCopyTracker] implementation tracking a number of shared segment copies. @@ -133,26 +133,22 @@ internal actual object SegmentPool { * We don't use [ThreadLocal] because we don't know how many threads the host process has and we * don't want to leak memory for the duration of a thread's life. */ - private val hashBuckets: Array> = Array(HASH_BUCKET_COUNT) { - AtomicReference() // null value implies an empty bucket - } - - private val hashBucketsL2: Array> = Array(HASH_BUCKET_COUNT) { - AtomicReference() // null value implies an empty bucket - } + private val hashBuckets: AtomicReferenceArray = AtomicReferenceArray(HASH_BUCKET_COUNT) + private val hashBucketsL2: AtomicReferenceArray = AtomicReferenceArray(HASH_BUCKET_COUNT_L2) actual val byteCount: Int get() { - val first = firstRef().get() ?: return 0 + val first = hashBuckets[l1BucketId()] ?: return 0 return first.limit } @JvmStatic actual fun take(): Segment { - val firstRef = firstRef() + val buckets = hashBuckets + val bucketId = l1BucketId() while (true) { - when (val first = firstRef.getAndSet(LOCK)) { + when (val first = buckets.getAndSet(bucketId, LOCK)) { LOCK -> { // We didn't acquire the lock. Let's try again continue @@ -160,7 +156,7 @@ internal actual object SegmentPool { null -> { // We acquired the lock but the pool was empty. Unlock and return a new segment. - firstRef.set(null) + buckets.set(bucketId, null) if (SECOND_LEVEL_POOL_TOTAL_SIZE > 0) { return takeL2() @@ -171,7 +167,7 @@ internal actual object SegmentPool { else -> { // We acquired the lock and the pool was not empty. Pop the first element and return it. - firstRef.set(first.next) + buckets.set(bucketId, first.next) first.next = null first.limit = 0 return first @@ -182,11 +178,11 @@ internal actual object SegmentPool { @JvmStatic private fun takeL2(): Segment { + val buckets = hashBuckets var bucket = l2BucketId() var attempts = 0 - var firstRef = hashBucketsL2[bucket] while (true) { - when (val first = firstRef.getAndSet(LOCK)) { + when (val first = buckets.getAndSet(bucket, LOCK)) { LOCK -> { // We didn't acquire the lock, retry continue @@ -194,12 +190,11 @@ internal actual object SegmentPool { null -> { // We acquired the lock but the pool was empty. Unlock and return a new segment. - firstRef.set(null) + buckets.set(bucket, null) if (attempts < HASH_BUCKET_COUNT_L2) { bucket = (bucket + 1) and (HASH_BUCKET_COUNT_L2 - 1) attempts++ - firstRef = hashBucketsL2[bucket] continue } @@ -208,7 +203,7 @@ internal actual object SegmentPool { else -> { // We acquired the lock and the pool was not empty. Pop the first element and return it. - firstRef.set(first.next) + buckets.set(bucket, first.next) first.next = null first.limit = 0 return first @@ -222,12 +217,14 @@ internal actual object SegmentPool { require(segment.next == null && segment.prev == null) if (segment.copyTracker?.removeCopyIfShared() == true) return // This segment cannot be recycled. + val buckets = hashBuckets + val bucketId = l1BucketId() + segment.pos = 0 segment.owner = true - while (true) { - val firstRef = firstRef() - val first = firstRef.get() + while (true) { + val first = buckets[bucketId] if (first === LOCK) continue // A take() is currently in progress. val firstLimit = first?.limit ?: 0 if (firstLimit >= MAX_SIZE) { @@ -241,7 +238,7 @@ internal actual object SegmentPool { segment.next = first segment.limit = firstLimit + Segment.SIZE - if (firstRef.compareAndSet(first, segment)) { + if (buckets.compareAndSet(bucketId, first, segment)) { return } } @@ -253,18 +250,17 @@ internal actual object SegmentPool { segment.owner = true var bucket = l2BucketId() + val buckets = hashBucketsL2 var attempts = 0 - var firstRef = hashBucketsL2[bucket] while (true) { - val first = firstRef.get() + val first = buckets[bucket] if (first === LOCK) continue // A take() is currently in progress. val firstLimit = first?.limit ?: 0 if (firstLimit >= SECOND_LEVEL_POOL_BUCKET_SIZE) { if (attempts < HASH_BUCKET_COUNT_L2) { attempts++ bucket = (bucket + 1) and (HASH_BUCKET_COUNT_L2 - 1) - firstRef = hashBucketsL2[bucket] continue } // L2 pool is full. @@ -274,7 +270,7 @@ internal actual object SegmentPool { segment.next = first segment.limit = firstLimit + Segment.SIZE - if (firstRef.compareAndSet(first, segment)) { + if (buckets.compareAndSet(bucket, first, segment)) { return } } @@ -283,16 +279,13 @@ internal actual object SegmentPool { @JvmStatic actual fun tracker(): SegmentCopyTracker = RefCountingCopyTracker() - private fun firstRef(): AtomicReference { - // Get a value in [0..HASH_BUCKET_COUNT) based on the current thread. - @Suppress("DEPRECATION") // TODO: switch to threadId after JDK19 - val bucket = (Thread.currentThread().id and (HASH_BUCKET_COUNT - 1L)).toInt() - return hashBuckets[bucket] - } + private fun l1BucketId() = bucketId (HASH_BUCKET_COUNT - 1L) + + private fun l2BucketId() = bucketId (HASH_BUCKET_COUNT_L2 - 1L) - private fun l2BucketId(): Int { + private fun bucketId(mask: Long): Int { // Get a value in [0..HASH_BUCKET_COUNT_L2) based on the current thread. @Suppress("DEPRECATION") // TODO: switch to threadId after JDK19 - return (Thread.currentThread().id and (HASH_BUCKET_COUNT_L2 - 1L)).toInt() + return (Thread.currentThread().id and mask).toInt() } } From 58b98cce48fcef4d609547d97e854f7b93057763 Mon Sep 17 00:00:00 2001 From: Filipp Zhinkin Date: Wed, 10 Jul 2024 15:46:59 +0200 Subject: [PATCH 14/20] Fixed description of ops on buckets --- core/jvm/src/SegmentPool.kt | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/core/jvm/src/SegmentPool.kt b/core/jvm/src/SegmentPool.kt index de5d58982..b796a9198 100644 --- a/core/jvm/src/SegmentPool.kt +++ b/core/jvm/src/SegmentPool.kt @@ -155,7 +155,9 @@ internal actual object SegmentPool { } null -> { - // We acquired the lock but the pool was empty. Unlock and return a new segment. + // We acquired the lock but the pool was empty. + // Unlock the bucket and either try to acquire a segment from the second level cache, + // or, if the second level cache is disabled, allocate a brand-new segment. buckets.set(bucketId, null) if (SECOND_LEVEL_POOL_TOTAL_SIZE > 0) { @@ -189,7 +191,9 @@ internal actual object SegmentPool { } null -> { - // We acquired the lock but the pool was empty. Unlock and return a new segment. + // We acquired the lock but the pool was empty. + // Unlock the current bucket and select a new one. + // If all buckets were already scanned, allocate a new segment. buckets.set(bucket, null) if (attempts < HASH_BUCKET_COUNT_L2) { @@ -258,6 +262,7 @@ internal actual object SegmentPool { if (first === LOCK) continue // A take() is currently in progress. val firstLimit = first?.limit ?: 0 if (firstLimit >= SECOND_LEVEL_POOL_BUCKET_SIZE) { + // The current bucket is full, try to find another one and return the segment there. if (attempts < HASH_BUCKET_COUNT_L2) { attempts++ bucket = (bucket + 1) and (HASH_BUCKET_COUNT_L2 - 1) From 3f6e7c0b68cdda35a1694c305c84605e2ac85ff3 Mon Sep 17 00:00:00 2001 From: Filipp Zhinkin Date: Wed, 10 Jul 2024 16:22:36 +0200 Subject: [PATCH 15/20] Improve KDoc Co-authored-by: Vsevolod Tolstopyatov --- core/common/src/SegmentPool.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/common/src/SegmentPool.kt b/core/common/src/SegmentPool.kt index 717a0b1b5..3bc511d64 100644 --- a/core/common/src/SegmentPool.kt +++ b/core/common/src/SegmentPool.kt @@ -40,7 +40,7 @@ internal expect object SegmentPool { fun recycle(segment: Segment) /** - * Allocates a new copy tracker that'll be associated with a segment. + * Allocates a new copy tracker that'll be associated with a segment from this pool. * For performance reasons, there's no tracker attached to a segment initially. * Instead, it's allocated lazily on the first sharing attempt. */ From 8834cf736ce9a604c059485a69449fb812bfb838 Mon Sep 17 00:00:00 2001 From: Filipp Zhinkin Date: Thu, 11 Jul 2024 14:28:26 +0200 Subject: [PATCH 16/20] Rename and reimplement tracker's removeCopy --- core/common/src/Segment.kt | 10 +++--- core/common/test/SimpleCopyTrackerTest.kt | 4 +-- core/jvm/src/SegmentPool.kt | 34 +++++++++++-------- core/jvm/test/RefCounteringCopyTrackerTest.kt | 8 ++--- 4 files changed, 31 insertions(+), 25 deletions(-) diff --git a/core/common/src/Segment.kt b/core/common/src/Segment.kt index 418b6cfc4..50dd1ba9a 100644 --- a/core/common/src/Segment.kt +++ b/core/common/src/Segment.kt @@ -29,8 +29,8 @@ import kotlin.jvm.JvmSynthetic * * A new [SegmentCopyTracker] instance should be not shared by default (i.e. `shared == false`). * Any further [addCopy] calls should move the tracker to a shared state (i.e. `shared == true`). - * Once a shared segment copy is recycled, [removeCopyIfShared] should be called. - * Depending on implementation, calling [removeCopyIfShared] the same number of times as [addCopy] may + * Once a shared segment copy is recycled, [removeCopy] should be called. + * Depending on implementation, calling [removeCopy] the same number of times as [addCopy] may * or may not transition the tracked back to unshared stated. * * The class is not intended for public use and currently designed to fit the only use case - within JVM SegmentPool @@ -53,12 +53,12 @@ internal abstract class SegmentCopyTracker { * * @return `true` if the segment was not shared *before* this called. */ - abstract fun removeCopyIfShared(): Boolean + abstract fun removeCopy(): Boolean } /** * Simple [SegmentCopyTracker] transitioning from unshared to shared state only. - * [removeCopyIfShared] calls do not affect [shared] value. + * [removeCopy] calls do not affect [shared] value. */ internal class SimpleCopyTracker : SegmentCopyTracker() { @Volatile @@ -71,7 +71,7 @@ internal class SimpleCopyTracker : SegmentCopyTracker() { _shared = true } - override fun removeCopyIfShared(): Boolean = shared + override fun removeCopy(): Boolean = shared } /** diff --git a/core/common/test/SimpleCopyTrackerTest.kt b/core/common/test/SimpleCopyTrackerTest.kt index 1dd331b80..24b0e9434 100644 --- a/core/common/test/SimpleCopyTrackerTest.kt +++ b/core/common/test/SimpleCopyTrackerTest.kt @@ -15,13 +15,13 @@ class SimpleCopyTrackerTest { val tracker = SimpleCopyTracker() assertFalse(tracker.shared) - assertFalse(tracker.removeCopyIfShared()) + assertFalse(tracker.removeCopy()) assertFalse(tracker.shared) tracker.addCopy() assertTrue(tracker.shared) - assertTrue(tracker.removeCopyIfShared()) + assertTrue(tracker.removeCopy()) assertTrue(tracker.shared) } } diff --git a/core/jvm/src/SegmentPool.kt b/core/jvm/src/SegmentPool.kt index b796a9198..a48f0d049 100644 --- a/core/jvm/src/SegmentPool.kt +++ b/core/jvm/src/SegmentPool.kt @@ -30,38 +30,44 @@ import java.util.concurrent.atomic.AtomicReferenceArray /** * Precise [SegmentCopyTracker] implementation tracking a number of shared segment copies. - * Every [addCopy] call increments the counter, every [removeCopyIfShared] decrements it. + * Every [addCopy] call increments the counter, every [removeCopy] decrements it. * - * After calling [removeCopyIfShared] the same number of time [addCopy] was called, tracker returns to the unshared state. + * After calling [removeCopy] the same number of time [addCopy] was called, tracker returns to the unshared state. * * The class is internal for testing only. */ internal class RefCountingCopyTracker : SegmentCopyTracker() { companion object { @JvmStatic - private val fieldUpdater = AtomicIntegerFieldUpdater.newUpdater(RefCountingCopyTracker::class.java, "refs") + private val fieldUpdater = AtomicIntegerFieldUpdater.newUpdater(RefCountingCopyTracker::class.java, "copyCount") } @Volatile - private var refs: Int = 0 + private var copyCount: Int = 0 override val shared: Boolean get() { - return refs > 0 + return copyCount > 0 } override fun addCopy() { fieldUpdater.incrementAndGet(this) } - override fun removeCopyIfShared(): Boolean { - while (true) { - val value = refs - check(value >= 0) { "Shared copies count is negative: $value" } - if (fieldUpdater.compareAndSet(this, value, (value - 1).coerceAtLeast(0))) { - return value > 0 - } - } + override fun removeCopy(): Boolean { + // The value could not be incremented from `0` under the race, + // so once it zero, it remains zero in the scope of this call. + if (copyCount == 0) return false + + val updatedValue = fieldUpdater.decrementAndGet(this) + // If there are several copies, the last decrement will update copyCount from 0 to -1. + // That would be the last standing copy, and we can recycle it. + // If, however, the decremented value falls below -1, it's an error as there were more + // `removeCopy` than `addCopy` calls. + if (updatedValue >= 0) return true + check(updatedValue == -1) { "Shared copies count is negative: ${updatedValue + 1}" } + copyCount = 0 + return false } } @@ -219,7 +225,7 @@ internal actual object SegmentPool { @JvmStatic actual fun recycle(segment: Segment) { require(segment.next == null && segment.prev == null) - if (segment.copyTracker?.removeCopyIfShared() == true) return // This segment cannot be recycled. + if (segment.copyTracker?.removeCopy() == true) return // This segment cannot be recycled. val buckets = hashBuckets val bucketId = l1BucketId() diff --git a/core/jvm/test/RefCounteringCopyTrackerTest.kt b/core/jvm/test/RefCounteringCopyTrackerTest.kt index e516e9c3f..49192ea3c 100644 --- a/core/jvm/test/RefCounteringCopyTrackerTest.kt +++ b/core/jvm/test/RefCounteringCopyTrackerTest.kt @@ -15,21 +15,21 @@ class RefCounteringCopyTrackerTest { val tracker = RefCountingCopyTracker() assertFalse(tracker.shared) - assertFalse(tracker.removeCopyIfShared()) + assertFalse(tracker.removeCopy()) assertFalse(tracker.shared) tracker.addCopy() assertTrue(tracker.shared) - assertTrue(tracker.removeCopyIfShared()) + assertTrue(tracker.removeCopy()) assertFalse(tracker.shared) tracker.addCopy() assertTrue(tracker.shared) tracker.addCopy() assertTrue(tracker.shared) - assertTrue(tracker.removeCopyIfShared()) + assertTrue(tracker.removeCopy()) assertTrue(tracker.shared) - assertTrue(tracker.removeCopyIfShared()) + assertTrue(tracker.removeCopy()) assertFalse(tracker.shared) } } From ac846465d19c5a0d13a485107cade01abc75bf77 Mon Sep 17 00:00:00 2001 From: Filipp Zhinkin Date: Thu, 11 Jul 2024 14:36:41 +0200 Subject: [PATCH 17/20] Be more lenient when parsing system property --- core/jvm/src/SegmentPool.kt | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/jvm/src/SegmentPool.kt b/core/jvm/src/SegmentPool.kt index a48f0d049..cd9af2d51 100644 --- a/core/jvm/src/SegmentPool.kt +++ b/core/jvm/src/SegmentPool.kt @@ -126,8 +126,7 @@ internal actual object SegmentPool { private val SECOND_LEVEL_POOL_TOTAL_SIZE = System.getProperty("kotlinx.io.pool.size.bytes", DEFAULT_SECOND_LEVEL_POOL_TOTAL_SIZE) - .toInt() - .coerceAtLeast(0) + .toIntOrNull()?.coerceAtLeast(0) ?: 0 private val SECOND_LEVEL_POOL_BUCKET_SIZE = (SECOND_LEVEL_POOL_TOTAL_SIZE / HASH_BUCKET_COUNT).coerceAtLeast(Segment.SIZE) From 7714e083f1b5c3f6b7372a81523a16b9f10fb2d4 Mon Sep 17 00:00:00 2001 From: Filipp Zhinkin Date: Thu, 11 Jul 2024 14:37:01 +0200 Subject: [PATCH 18/20] Make simple copy tacker an object --- core/common/src/Segment.kt | 20 +++++-------------- .../src/unsafe/UnsafeBufferOperations.kt | 2 +- ...Test.kt => AlwaysSharedCopyTrackerTest.kt} | 11 +++++----- core/js/src/SegmentPool.kt | 2 +- core/native/src/SegmentPool.kt | 2 +- core/wasm/src/SegmentPool.kt | 2 +- 6 files changed, 14 insertions(+), 25 deletions(-) rename core/common/test/{SimpleCopyTrackerTest.kt => AlwaysSharedCopyTrackerTest.kt} (68%) diff --git a/core/common/src/Segment.kt b/core/common/src/Segment.kt index 50dd1ba9a..9de338986 100644 --- a/core/common/src/Segment.kt +++ b/core/common/src/Segment.kt @@ -20,7 +20,6 @@ */ package kotlinx.io -import kotlin.concurrent.Volatile import kotlin.jvm.JvmField import kotlin.jvm.JvmSynthetic @@ -57,21 +56,12 @@ internal abstract class SegmentCopyTracker { } /** - * Simple [SegmentCopyTracker] transitioning from unshared to shared state only. - * [removeCopy] calls do not affect [shared] value. + * Simple [SegmentCopyTracker] that always reports shared state. */ -internal class SimpleCopyTracker : SegmentCopyTracker() { - @Volatile - private var _shared: Boolean = false - - override val shared: Boolean - get() = _shared - - override fun addCopy() { - _shared = true - } - - override fun removeCopy(): Boolean = shared +internal object AlwaysSharedCopyTracker : SegmentCopyTracker() { + override val shared: Boolean = true + override fun addCopy() = Unit + override fun removeCopy(): Boolean = true } /** diff --git a/core/common/src/unsafe/UnsafeBufferOperations.kt b/core/common/src/unsafe/UnsafeBufferOperations.kt index 6bf183edc..0281b06f6 100644 --- a/core/common/src/unsafe/UnsafeBufferOperations.kt +++ b/core/common/src/unsafe/UnsafeBufferOperations.kt @@ -38,7 +38,7 @@ public object UnsafeBufferOperations { checkBounds(bytes.size, startIndex, endIndex) val segment = Segment.new( bytes, startIndex, endIndex, - SimpleCopyTracker().also { it.addCopy() } /* to prevent recycling */, + AlwaysSharedCopyTracker, /* to prevent recycling */ owner = false /* can't append to it */ ) val tail = buffer.tail diff --git a/core/common/test/SimpleCopyTrackerTest.kt b/core/common/test/AlwaysSharedCopyTrackerTest.kt similarity index 68% rename from core/common/test/SimpleCopyTrackerTest.kt rename to core/common/test/AlwaysSharedCopyTrackerTest.kt index 24b0e9434..62418c8c0 100644 --- a/core/common/test/SimpleCopyTrackerTest.kt +++ b/core/common/test/AlwaysSharedCopyTrackerTest.kt @@ -6,17 +6,16 @@ package kotlinx.io import kotlin.test.Test -import kotlin.test.assertFalse import kotlin.test.assertTrue -class SimpleCopyTrackerTest { +class AlwaysSharedCopyTrackerTest { @Test fun stateTransition() { - val tracker = SimpleCopyTracker() - assertFalse(tracker.shared) + val tracker = AlwaysSharedCopyTracker + assertTrue(tracker.shared) - assertFalse(tracker.removeCopy()) - assertFalse(tracker.shared) + assertTrue(tracker.removeCopy()) + assertTrue(tracker.shared) tracker.addCopy() assertTrue(tracker.shared) diff --git a/core/js/src/SegmentPool.kt b/core/js/src/SegmentPool.kt index 7c7e819e2..c61fbeffa 100644 --- a/core/js/src/SegmentPool.kt +++ b/core/js/src/SegmentPool.kt @@ -15,5 +15,5 @@ internal actual object SegmentPool { actual fun recycle(segment: Segment) { } - actual fun tracker(): SegmentCopyTracker = SimpleCopyTracker() + actual fun tracker(): SegmentCopyTracker = AlwaysSharedCopyTracker } diff --git a/core/native/src/SegmentPool.kt b/core/native/src/SegmentPool.kt index 4f532095d..e27321533 100644 --- a/core/native/src/SegmentPool.kt +++ b/core/native/src/SegmentPool.kt @@ -30,5 +30,5 @@ internal actual object SegmentPool { actual fun recycle(segment: Segment) { } - actual fun tracker(): SegmentCopyTracker = SimpleCopyTracker() + actual fun tracker(): SegmentCopyTracker = AlwaysSharedCopyTracker } diff --git a/core/wasm/src/SegmentPool.kt b/core/wasm/src/SegmentPool.kt index 7c7e819e2..c61fbeffa 100644 --- a/core/wasm/src/SegmentPool.kt +++ b/core/wasm/src/SegmentPool.kt @@ -15,5 +15,5 @@ internal actual object SegmentPool { actual fun recycle(segment: Segment) { } - actual fun tracker(): SegmentCopyTracker = SimpleCopyTracker() + actual fun tracker(): SegmentCopyTracker = AlwaysSharedCopyTracker } From 564289dd26a4b326fae9fd616cd9d85093e12ca6 Mon Sep 17 00:00:00 2001 From: Filipp Zhinkin Date: Thu, 11 Jul 2024 14:39:34 +0200 Subject: [PATCH 19/20] Improve bucket exhaustion check --- core/jvm/src/SegmentPool.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/jvm/src/SegmentPool.kt b/core/jvm/src/SegmentPool.kt index cd9af2d51..4a0614e60 100644 --- a/core/jvm/src/SegmentPool.kt +++ b/core/jvm/src/SegmentPool.kt @@ -266,7 +266,7 @@ internal actual object SegmentPool { val first = buckets[bucket] if (first === LOCK) continue // A take() is currently in progress. val firstLimit = first?.limit ?: 0 - if (firstLimit >= SECOND_LEVEL_POOL_BUCKET_SIZE) { + if (firstLimit + Segment.SIZE > SECOND_LEVEL_POOL_BUCKET_SIZE) { // The current bucket is full, try to find another one and return the segment there. if (attempts < HASH_BUCKET_COUNT_L2) { attempts++ From cb5c499a4ba523a5959f49e92c319ae8970083b0 Mon Sep 17 00:00:00 2001 From: Filipp Zhinkin Date: Thu, 11 Jul 2024 14:44:01 +0200 Subject: [PATCH 20/20] Added an extra test case --- core/jvm/test/PoolingTest.kt | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/core/jvm/test/PoolingTest.kt b/core/jvm/test/PoolingTest.kt index f07caf96a..95c5c36c7 100644 --- a/core/jvm/test/PoolingTest.kt +++ b/core/jvm/test/PoolingTest.kt @@ -32,5 +32,12 @@ class PoolingTest { peek.readByte() buffer.clear() assertTrue(poolSize < SegmentPool.byteCount) + + buffer.writeByte(1) + poolSize = SegmentPool.byteCount + val otherBuffer = Buffer() + otherBuffer.write(buffer, buffer.size) + otherBuffer.clear() + assertTrue(poolSize < SegmentPool.byteCount) } }