Skip to content

Improve AsyncMemoize tests #16612

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 1, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@ let waitFor (mre: ManualResetEvent) =
if not <| mre.WaitOne timeout then
failwith "waitFor timed out"

let waitUntil condition value =
task {
let sw = Stopwatch.StartNew()
while not <| condition value do
if sw.Elapsed > timeout then
failwith "waitUntil timed out"
do! Task.Delay 10
}

let rec internal spinFor (duration: TimeSpan) =
node {
Expand All @@ -29,6 +37,24 @@ let rec internal spinFor (duration: TimeSpan) =
}


type internal EventRecorder<'a, 'b, 'c when 'a : equality and 'b : equality>(memoize: AsyncMemoize<'a,'b,'c>) as self =

let events = ConcurrentQueue()

do memoize.OnEvent self.Add

member _.Add (e, (_label, k, _version)) = events.Enqueue (e, k)

member _.Received value = events |> Seq.exists (fst >> (=) value)

member _.CountOf value count = events |> Seq.filter (fst >> (=) value) |> Seq.length |> (=) count

member _.ShouldBe (expected) =
let expected = expected |> Seq.toArray
let actual = events |> Seq.toArray
Assert.Equal<_ array>(expected, actual)


[<Fact>]
let ``Basics``() =

Expand Down Expand Up @@ -69,21 +95,14 @@ let ``We can cancel a job`` () =

let jobStarted = new ManualResetEvent(false)

let jobCanceled = new ManualResetEvent(false)

let computation action = node {
action() |> ignore
do! spinFor timeout
failwith "Should be canceled before it gets here"
}

let eventLog = ConcurrentQueue()
let memoize = AsyncMemoize<int, int, _>()
memoize.OnEvent(fun (e, (_label, k, _version)) ->
eventLog.Enqueue (e, k)
if e = Canceled then
jobCanceled.Set() |> ignore
)
let memoize = AsyncMemoize<_, int, _>()
let events = EventRecorder(memoize)

use cts1 = new CancellationTokenSource()
use cts2 = new CancellationTokenSource()
Expand All @@ -96,13 +115,10 @@ let ``We can cancel a job`` () =
waitFor jobStarted
jobStarted.Reset() |> ignore

let jobRequested = new ManualResetEvent(false)
memoize.OnEvent(fun (e, _) -> if e = Requested then jobRequested.Set() |> ignore)

let _task2 = NodeCode.StartAsTask_ForTesting( memoize.Get'(key, computation ignore), ct = cts2.Token)
let _task3 = NodeCode.StartAsTask_ForTesting( memoize.Get'(key, computation ignore), ct = cts3.Token)

waitFor jobRequested
do! waitUntil (events.CountOf Requested) 3

cts1.Cancel()
cts2.Cancel()
Expand All @@ -111,16 +127,16 @@ let ``We can cancel a job`` () =

cts3.Cancel()

waitFor jobCanceled
do! waitUntil events.Received Canceled

Assert.Equal<(JobEvent * int) array>([|
events.ShouldBe [
Requested, key
Started, key
Requested, key
Requested, key
Restarted, key
Canceled, key
|], eventLog |> Seq.toArray )
]
}

[<Fact>]
Expand All @@ -136,9 +152,9 @@ let ``Job is restarted if first requestor cancels`` () =
return key * 2
}

let eventLog = ConcurrentStack()
let memoize = AsyncMemoize<int, int, _>()
memoize.OnEvent(fun (e, (_, k, _version)) -> eventLog.Push (e, k))
let memoize = AsyncMemoize<_, int, _>()
let events = EventRecorder(memoize)


use cts1 = new CancellationTokenSource()
use cts2 = new CancellationTokenSource()
Expand All @@ -151,13 +167,10 @@ let ``Job is restarted if first requestor cancels`` () =
waitFor jobStarted
jobStarted.Reset() |> ignore

let jobRequested = new ManualResetEvent(false)
memoize.OnEvent(fun (e, _) -> if e = Requested then jobRequested.Set() |> ignore)

let _task2 = NodeCode.StartAsTask_ForTesting( memoize.Get'(key, computation key), ct = cts2.Token)
let _task3 = NodeCode.StartAsTask_ForTesting( memoize.Get'(key, computation key), ct = cts3.Token)

waitFor jobRequested
do! waitUntil (events.CountOf Requested) 3

cts1.Cancel()

Expand All @@ -168,16 +181,13 @@ let ``Job is restarted if first requestor cancels`` () =
let! result = _task2
Assert.Equal(2, result)

let orderedLog = eventLog |> Seq.rev |> Seq.toList
let expected = [
events.ShouldBe [
Requested, key
Started, key
Requested, key
Requested, key
Restarted, key
Finished, key ]

Assert.Equal<_ list>(expected, orderedLog)
}

[<Fact>]
Expand All @@ -192,10 +202,10 @@ let ``Job is restarted if first requestor cancels but keeps running if second re
waitFor jobCanComplete
return key * 2
}

let memoize = AsyncMemoize<_, int, _>()
let events = EventRecorder(memoize)

let eventLog = ConcurrentStack()
let memoize = AsyncMemoize<int, int, _>()
memoize.OnEvent(fun (e, (_label, k, _version)) -> eventLog.Push (e, k))

use cts1 = new CancellationTokenSource()
use cts2 = new CancellationTokenSource()
Expand All @@ -208,13 +218,10 @@ let ``Job is restarted if first requestor cancels but keeps running if second re
waitFor jobStarted
jobStarted.Reset() |> ignore

let jobRequested = new ManualResetEvent(false)
memoize.OnEvent(fun (e, _) -> if e = Requested then jobRequested.Set() |> ignore)

let _task2 = NodeCode.StartAsTask_ForTesting( memoize.Get'(key, computation key), ct = cts2.Token)
let _task3 = NodeCode.StartAsTask_ForTesting( memoize.Get'(key, computation key), ct = cts3.Token)

waitFor jobRequested
do! waitUntil (events.CountOf Requested) 3

cts1.Cancel()

Expand All @@ -227,16 +234,13 @@ let ``Job is restarted if first requestor cancels but keeps running if second re
let! result = _task3
Assert.Equal(2, result)

let orderedLog = eventLog |> Seq.rev |> Seq.toList
let expected = [
events.ShouldBe [
Requested, key
Started, key
Requested, key
Requested, key
Restarted, key
Finished, key ]

Assert.Equal<_ list>(expected, orderedLog)
}


Expand Down