Skip to content

Conversation

fm3
Copy link
Member

@fm3 fm3 commented Feb 19, 2025

Before this PR, the tick functions returned Unit, meaning that if they started asynchronous work, the IntervalScheduler couldn’t wait for it to be completed.

Now, tick returns Fox[Unit]. The IntervalScheduler runs with an internal interval of 100ms and checks if the time since the last tick’s completion is long enough to start a new one. That means the real interval will be a little higher than the specified one, because it includes the time to compute the tick (plus the time until the next internal 100ms-spaced tick).

Steps to test:

  • To test this, I added a 10s sleep to one of the tick’s asynchronous bodies, and turned down the interval to 2 seconds, and added some logging. The output was like this, which matches what I’d expect:
13:21:05,022 Tick! Sleeping 10s...
13:21:15,022 Sleeping done.
13:21:17,133 Tick! Sleeping 10s...
13:21:27,133 Sleeping done.
13:21:29,253 Tick! Sleeping 10s...
13:21:39,253 Sleeping done.
13:21:41,373 Tick! Sleeping 10s...
13:21:51,373 Sleeping done.
13:21:53,492 Tick! Sleeping 10s...
13:22:03,492 Sleeping done.
13:22:05,612 Tick! Sleeping 10s...
13:22:15,612 Sleeping done.
13:22:17,732 Tick! Sleeping 10s...

Issues:

  • Hopefully fixes overloaded postgres when receiving datasource upgrades from the datastore. (My theory is that the next one came, after 1min, before the first one was completely ingested).

  • Updated changelog
  • Removed dev-only changes like prints and application.conf edits
  • Considered common edge cases
  • Needs datastore update after deployment

@fm3 fm3 self-assigned this Feb 19, 2025
Copy link
Contributor

coderabbitai bot commented Feb 19, 2025

📝 Walkthrough

Walkthrough

The changes update several controller and service classes to improve type safety, clarity, and logging. Method signatures for tick-like operations and updateAll have been modified to return Fox types instead of Unit or JsValue. Constructor parameters formerly named “system” have been renamed to “actorSystem” throughout the code. In addition, enhanced logging using timestamps and refined scheduling mechanisms have been introduced, while some concurrency control elements were simplified or removed.

Changes

File(s) Change Summary
app/controllers/WKRemoteDataStoreController.scala updateAll now returns Action[List[InboxDataSource]] using validateJson[List[InboxDataSource]] instead of parse.json, with added timing via Instant.nowFox and logging if the operation exceeds 30 seconds.
app/mail/MailchimpTicker.scala, app/models/annotation/AnnotationMutexService.scala, app/models/job/Worker.scala,
webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/DSRemoteWebknossosClient.scala,
webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/DataSourceService.scala,
webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/DatasetErrorLoggingService.scala
Constructor parameter renamed from system to actorSystem; tick methods updated to return Fox[Unit] (or Fox[_]), with for-comprehension logic in some cases and, in DataSourceService, a fixed ticker interval of 2 seconds.
app/models/storage/UsedStorageService.scala Renamed parameter to actorSystem; removed the private atomic isRunning and eliminated the tickAsync method; tick now directly returns Fox[Unit] without concurrency state management.
webknossos-datastore/app/com/scalableminds/webknossos/datastore/helpers/IntervalScheduler.scala Renamed system to actorSystem and enabled to tickerEnabled; tick’s signature updated to Fox[_]; introduced new scheduling variables (innerTickerInterval, lastCompletionTimeMillis, and isRunning), and modified the scheduled task variable to an Option[Cancellable] with refined timing checks.
webknossos-tracingstore/app/com/scalableminds/webknossos/tracingstore/TracingStoreModule.scala, webknossos-tracingstore/app/com/scalableminds/webknossos/tracingstore/tracings/TemporaryTracingStore.scala,
webknossos-tracingstore/app/com/scalableminds/webknossos/tracingstore/tracings/TracingDataStore.scala
Constructor parameter renamed from system to actorSystem; no changes to functionality or logic.

Possibly related PRs

  • Log slow RPC requests #8349: Enhances time tracking and logging using the Instant class for RPC requests, directly relating to the modifications in the updateAll method.

Suggested labels

refactoring

Suggested reviewers

  • frcroth
  • MichaelBuessemeyer

Poem

I'm a little rabbit on the code trail,
Hopping through changes without fail,
Logging time with a precise little hop,
Fox types making errors stop,
With carrots of refactoring—let's celebrate and never drop!
🥕🌟


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (5)
webknossos-datastore/app/com/scalableminds/webknossos/datastore/helpers/IntervalScheduler.scala (2)

20-20: New tickerEnabled flag.
The toggle is helpful for conditionally disabling the scheduler. Consider making it configurable if runtime control is needed.


45-46: Clock-based logic may be affected by system time changes.
Relying on wall-clock time could behave unexpectedly if the system time is altered. If needed, consider using a monotonic clock or verifying time correctness.

app/models/storage/UsedStorageService.scala (1)

44-53: Consider adding overlap protection.

The removal of the isRunning check could potentially lead to overlapping scans if the Fox chain takes longer than the ticker interval. While the Fox monad handles asynchronous operations, it doesn't prevent a new tick from starting while the previous one is still running.

Consider implementing overlap protection using a semaphore or mutex to ensure only one scan runs at a time:

private val scanLock = new java.util.concurrent.Semaphore(1)

override protected def tick(): Fox[Unit] =
  Fox.guard(scanLock.tryAcquire()) {
    try {
      for {
        organizations <- organizationDAO.findNotRecentlyScanned(...)
        dataStores <- dataStoreDAO.findAllWithStorageReporting
        _ <- Fox.serialCombined(organizations)(...)
      } yield ()
    } finally {
      scanLock.release()
    }
  }
webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/DSRemoteWebknossosClient.scala (1)

63-63: Consider using Fox.void for cleaner code.

The current implementation can be simplified using Fox.void instead of map(_ => ()).

-  def tick(): Fox[Unit] = reportStatus().map(_ => ())
+  def tick(): Fox[Unit] = Fox.void(reportStatus())
webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/DataSourceService.scala (1)

51-56: Simplify the tick implementation.

While the change to Fox[Unit] is good for making ticks overlap-free, the implementation can be simplified.

Consider this simpler implementation:

-def tick(): Fox[Unit] =
-  for {
-    _ <- checkInbox(verbose = inboxCheckVerboseCounter == 0)
-    _ = inboxCheckVerboseCounter += 1
-    _ = if (inboxCheckVerboseCounter >= 10) inboxCheckVerboseCounter = 0
-  } yield ()
+def tick(): Fox[Unit] = {
+  val verbose = inboxCheckVerboseCounter == 0
+  inboxCheckVerboseCounter = (inboxCheckVerboseCounter + 1) % 10
+  checkInbox(verbose)
+}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 84d3b9a and 1edb450.

📒 Files selected for processing (9)
  • app/controllers/WKRemoteDataStoreController.scala (3 hunks)
  • app/mail/MailchimpTicker.scala (2 hunks)
  • app/models/annotation/AnnotationMutexService.scala (2 hunks)
  • app/models/job/Worker.scala (2 hunks)
  • app/models/storage/UsedStorageService.scala (2 hunks)
  • webknossos-datastore/app/com/scalableminds/webknossos/datastore/helpers/IntervalScheduler.scala (1 hunks)
  • webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/DSRemoteWebknossosClient.scala (2 hunks)
  • webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/DataSourceService.scala (2 hunks)
  • webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/DatasetErrorLoggingService.scala (2 hunks)
🔇 Additional comments (20)
webknossos-datastore/app/com/scalableminds/webknossos/datastore/helpers/IntervalScheduler.scala (10)

3-3: Importing Fox is consistent with the new return type.
No concerns here; this aligns nicely with the updated tick() signature.


7-7: Importing AtomicLong for concurrency tracking.
This is a reasonable approach to managing shared mutable state for time tracking.


10-10: Importing Instant for timestamp handling.
This is a clear way to handle time references and epoch milliseconds.


18-18: Renaming system to actorSystem.
This improves clarity; no issues.


26-26: Changing tick() from Unit to Fox[_].
This is a good step toward asynchronous/functional handling. However, using Fox[Unit] instead of Fox[_] might improve type clarity if no generic variance is required.


28-30: Introducing concurrency fields.
innerTickerInterval, lastCompletionTimeMillis, and isRunning collectively manage timing and concurrency. This setup is logically consistent.


49-51: Conditional cancellation of scheduled.
Cleaning up existing tasks is important and this approach looks sound.


55-55: scheduled now an Option[Cancellable].
Using Option is a neat way to handle the absence of a scheduled task. Ensure concurrency safety if manipulated from multiple threads.


58-58: Lifecycle’s addStopHook.
Tidy integration to ensure the scheduler stops cleanly.


60-67: Initializing the ticker only if tickerEnabled.
Logic is straightforward: checking intervals, throwing an exception if too short, and scheduling. Looks good.

app/mail/MailchimpTicker.scala (2)

19-19: Renaming constructor parameter to actorSystem.
This improves consistency with the rest of the updated codebase.


31-31: Changing tick() return type to Fox[Unit].
Adopting a Fox return type for asynchronous handling is consistent with the new scheduling pattern. Looks good.

webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/DatasetErrorLoggingService.scala (2)

21-21: Renaming to actorSystem.
No concerns; aligns naming with other services.


50-50: Converting tick() from Unit to Fox[Unit].
Returning Fox[Unit] is coherent with the broader asynchronous approach. Clearing recentErrors is unlikely to fail, so this seems sufficient.

app/models/annotation/AnnotationMutexService.scala (1)

27-27: LGTM! The changes improve error handling.

The conversion to Fox[Unit] return type is well-implemented, properly handling potential database operation failures when cleaning up expired mutexes.

Also applies to: 37-40

app/models/job/Worker.scala (1)

105-105: LGTM! The changes align with async worker monitoring.

The conversion to Fox[Unit] return type is appropriate for the worker liveness monitoring, properly handling potential database operation failures.

Also applies to: 114-118

webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/DataSourceService.scala (2)

33-33: LGTM! Improved parameter naming.

The rename from system to actorSystem enhances code clarity by being more specific about the parameter's purpose.


39-40: LGTM! Improved ticker configuration.

The changes enhance clarity and stability:

  • Renamed enabled to tickerEnabled for better consistency
  • Fixed interval of 2 seconds helps prevent overlapping ticks, addressing the PostgreSQL overload issue
app/controllers/WKRemoteDataStoreController.scala (2)

5-5: LGTM! Added necessary imports.

The imports support the new performance monitoring feature.

Also applies to: 36-36


189-204: LGTM! Improved type safety and added performance monitoring.

The changes enhance the method in several ways:

  • Type-safe request body validation using validateJson[List[InboxDataSource]]
  • Performance monitoring to identify slow updates (>30s)
  • Cleaner implementation by removing explicit JSON validation

Comment on lines +32 to +42
private def innerTick: Runnable = () => {
if (lastCompletionIsLongEnoughPast) {
if (isRunning.compareAndSet(false, true)) {
for {
_ <- tick().futureBox
_ = lastCompletionTimeMillis.set(Instant.now.epochMillis)
_ = isRunning.set(false)
} yield ()
}
}
()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ensure isRunning is released on exceptions.
If tick().futureBox fails, isRunning remains true, causing a permanent block. Use a recover or similar mechanism to reset isRunning in error cases.

Example fix:

for {
  _ <- tick().futureBox.recover {
+   case e =>
+     isRunning.set(false)
+     throw e
  }
  _ = lastCompletionTimeMillis.set(Instant.now.epochMillis)
  _ = isRunning.set(false)
} yield ()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
private def innerTick: Runnable = () => {
if (lastCompletionIsLongEnoughPast) {
if (isRunning.compareAndSet(false, true)) {
for {
_ <- tick().futureBox
_ = lastCompletionTimeMillis.set(Instant.now.epochMillis)
_ = isRunning.set(false)
} yield ()
}
}
()
private def innerTick: Runnable = () => {
if (lastCompletionIsLongEnoughPast) {
if (isRunning.compareAndSet(false, true)) {
for {
- _ <- tick().futureBox
+ _ <- tick().futureBox.recover {
+ case e =>
+ isRunning.set(false)
+ throw e
+ }
_ = lastCompletionTimeMillis.set(Instant.now.epochMillis)
_ = isRunning.set(false)
} yield ()
}
}
()
}

Copy link
Contributor

@MichaelBuessemeyer MichaelBuessemeyer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this PR. 🎉

Hopefully this fixes the recent errors 🤞

As you renamed the system: ActorSystem injects, could you maybe do this as well for the other cases which do not use the interval ticking? I found a few other occurences which still use system: ActorSystem.

Besides this and my comment everything looks good and works 🎉

Comment on lines 29 to 31
private val lastCompletionTimeMillis = new AtomicLong(0)
private val isRunning = new java.util.concurrent.atomic.AtomicBoolean(false)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty nitpicky but this is a little inconsistent here. In the "header part" you import AtomicLong but not AtomicBoolean which you import inline here. I'd prefer to also import the AtomicBoolean in the header together with the AtomicLong.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
webknossos-datastore/app/com/scalableminds/webknossos/datastore/helpers/IntervalScheduler.scala (1)

32-43: ⚠️ Potential issue

Add error handling to ensure isRunning is released on exceptions.

The current implementation might leave isRunning in a locked state if tick().futureBox fails.

Apply this diff to ensure proper cleanup:

 private def innerTick: Runnable = () => {
   if (lastCompletionIsLongEnoughPast) {
     if (isRunning.compareAndSet(false, true)) {
       for {
-        _ <- tick().futureBox
+        _ <- tick().futureBox.recover {
+          case e =>
+            isRunning.set(false)
+            throw e
+        }
         _ = lastCompletionTimeMillis.set(Instant.now.epochMillis)
         _ = isRunning.set(false)
       } yield ()
     }
   }
   ()
 }
🧹 Nitpick comments (3)
app/controllers/WKRemoteDataStoreController.scala (1)

189-204: Excellent improvements to type safety and monitoring!

The changes enhance the code in several ways:

  1. Type-safe request body validation using List[InboxDataSource]
  2. Performance monitoring for long-running updates (>30s)
  3. Clear logging of active/inactive dataset counts

Consider adding metrics collection for the update duration to track performance trends over time. This would help identify gradual degradation in update performance.

webknossos-datastore/app/com/scalableminds/webknossos/datastore/helpers/IntervalScheduler.scala (2)

48-54: Consider adding synchronization to the stop method.

The stop method might have a race condition when checking and updating the scheduled variable. Multiple concurrent calls to stop could lead to inconsistent state.

Consider this improvement:

- private def stop(): Future[Unit] = {
-   if (scheduled.isDefined) {
-     scheduled.foreach(_.cancel())
-     scheduled = None
-   }
-   Future.successful(())
- }
+ private def stop(): Future[Unit] = synchronized {
+   scheduled.foreach(_.cancel())
+   scheduled = None
+   Future.successful(())
+ }

61-64: Consider adding validation for negative intervals.

While the code checks if the interval is too small, it should also validate that it's not negative.

Consider this addition:

 if (tickerInterval < innerTickerInterval) {
   throw new IllegalArgumentException(
     s"IntervalScheduler was initialized with interval $tickerInterval. Only intervals larger than the inner ticker interval $innerTickerInterval are supported.")
 }
+if (tickerInterval.toMillis < 0) {
+  throw new IllegalArgumentException(
+    s"IntervalScheduler was initialized with negative interval $tickerInterval.")
+}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1edb450 and d4b05d7.

📒 Files selected for processing (9)
  • app/controllers/WKRemoteDataStoreController.scala (3 hunks)
  • app/models/voxelytics/LokiClient.scala (2 hunks)
  • webknossos-datastore/app/com/scalableminds/webknossos/datastore/DataStoreModule.scala (1 hunks)
  • webknossos-datastore/app/com/scalableminds/webknossos/datastore/helpers/IntervalScheduler.scala (1 hunks)
  • webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/DataSourceRepository.scala (1 hunks)
  • webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/DataSourceService.scala (2 hunks)
  • webknossos-tracingstore/app/com/scalableminds/webknossos/tracingstore/TracingStoreModule.scala (1 hunks)
  • webknossos-tracingstore/app/com/scalableminds/webknossos/tracingstore/tracings/TemporaryTracingStore.scala (1 hunks)
  • webknossos-tracingstore/app/com/scalableminds/webknossos/tracingstore/tracings/TracingDataStore.scala (1 hunks)
✅ Files skipped from review due to trivial changes (4)
  • webknossos-tracingstore/app/com/scalableminds/webknossos/tracingstore/tracings/TracingDataStore.scala
  • webknossos-tracingstore/app/com/scalableminds/webknossos/tracingstore/tracings/TemporaryTracingStore.scala
  • webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/DataSourceRepository.scala
  • webknossos-tracingstore/app/com/scalableminds/webknossos/tracingstore/TracingStoreModule.scala
🚧 Files skipped from review as they are similar to previous changes (1)
  • webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/DataSourceService.scala
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: circleci_build
🔇 Additional comments (5)
webknossos-datastore/app/com/scalableminds/webknossos/datastore/DataStoreModule.scala (2)

12-12: LGTM! Improved encapsulation and naming clarity.

The renaming from system to actorSystem and making it private improves code clarity and encapsulation. This change aligns with similar renaming across other files in the codebase.


17-17: LGTM! Binding correctly updated.

The binding has been correctly updated to use the renamed actorSystem variable while maintaining the same functionality.

app/models/voxelytics/LokiClient.scala (1)

23-23: LGTM! Parameter renaming improves clarity.

The renaming of system to actorSystem enhances code readability by making the parameter's purpose more explicit.

Also applies to: 46-46

app/controllers/WKRemoteDataStoreController.scala (1)

5-5: LGTM! Import statements support new functionality.

The added imports enable timing measurement and duration specification for monitoring update operations.

Also applies to: 36-36

webknossos-datastore/app/com/scalableminds/webknossos/datastore/helpers/IntervalScheduler.scala (1)

18-29: LGTM! The changes improve type safety and scheduling control.

The renaming of system to actorSystem improves clarity, and the introduction of Fox[_] return type enables proper handling of asynchronous operations. The innerTickerInterval of 100ms provides a good safety buffer between ticks.

@fm3 fm3 merged commit 5bd4e22 into master Feb 24, 2025
3 checks passed
@fm3 fm3 deleted the ticker-await-futures branch February 24, 2025 11:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants