Skip to content

Implement mergeable queue loop #265

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 23, 2025

Conversation

Sakib25800
Copy link
Contributor

@Sakib25800 Sakib25800 commented Apr 2, 2025

Prerequisite to #247.

This PR implements the mergeable queue processing loop.

  • Pushes to the queue after pushes to the base branch
  • Pushes to the queue after PR edit

@Sakib25800 Sakib25800 force-pushed the add-mergeable-queue-loop branch from 76e3e15 to 0c58413 Compare April 3, 2025 18:26
@Sakib25800 Sakib25800 marked this pull request as ready for review April 3, 2025 21:43
@Sakib25800
Copy link
Contributor Author

Sakib25800 commented Apr 3, 2025

I've not actually written much (low level) async Rust code, so sorry for the dumb mistakes that this PR probably has.

Though, this has been the most fun PR to work on so far even with the skill issues 🫠.

Copy link
Member

@Kobzol Kobzol left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a complex one! I left some comments.

@Sakib25800 Sakib25800 force-pushed the add-mergeable-queue-loop branch from 4cb42d0 to 4bbb38c Compare April 12, 2025 19:12
@Sakib25800
Copy link
Contributor Author

Sakib25800 commented Apr 12, 2025

Due to this PR being reworked a few times 😓, most of these comments should be resolved as they no longer apply.

Side note, I've got exams soon so will be slow to respond.

@@ -616,6 +626,7 @@ impl BorsTester {
async fn finish(self, bors: JoinHandle<()>) -> GitHubState {
// Make sure that the event channel senders are closed
drop(self.app);
self.mergeable_queue_tx.close();
Copy link
Contributor Author

@Sakib25800 Sakib25800 Apr 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted mergeable_queue to behave like an mpsc channel, returning None when all senders are dropped.

But consume_mergeable_queue holds onto a sender so it can re‑enqueue items, thus keeping one sender reference alive indefinitely.

So I ended up opting for this explicit .close() approach (though I feel it doesn't look right).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we used a normal tokio queue, we could use https://docs.rs/tokio/1.44.2/tokio/sync/mpsc/struct.Sender.html#method.downgrade. Since you already store Arc in the sender, I guess that you could emulate it with a "Weaksender" variant that only stores a Weak pointer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or, an alternative design could be that dequeue returns you both the item and a sender, and then before calling dequeue you drop the sender, so there should not be any outstanding senders.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll go for the Weaksender approach. I realised that the approach of checking if Arc::strong_count(&self.inner) == 2 is a little brittle.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that is a bit sketchy. I'll try to play around with it locally to see if we can figure out something better.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would go with one of the following:

  • Store a weak sender Rc in the receiver. When a message is received, try to upgrade the weak to strong and return it together with the message. If it can't be upgraded, stop the reading loop. When a sender is dropped, it has to notify the reader, otherwise it might not ever realized that the last sender was removed.
  • Just add a message type to the reader that tells it to quit.

Honestly, I would just go with option 2. Dealing with the notify, RC counters, weak/strong references etc. seems like something that is too low-level for bors. We don't need high performance here, we just need to be sure that it is rock solid. With an explicit message type, that should be the easiest. We will need to make sure that the retry logic can deal with a failing send (in case the receiver has been stopped in the meantime), but that shouldn't be hard.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Option 2 sounds like what I had initially but with an explicit message type for shut down. I've now gone for this.

Option 1 looks quite complicated and not worth it considering shutdowns are only for tests.

Copy link
Member

@Kobzol Kobzol left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The design with the closing boolean isn't super nice, and it's tricky to figure out all the possible interleavings for the dequeue method. An alternative could be to use weak senders, or create an explicit Stop message that can be sent to the queue to stop it. But even with the boolean I think that the implementation is fine, and it will run forever in the live service anyway, so it's not a real issue outside of tests.

@@ -616,6 +626,7 @@ impl BorsTester {
async fn finish(self, bors: JoinHandle<()>) -> GitHubState {
// Make sure that the event channel senders are closed
drop(self.app);
self.mergeable_queue_tx.close();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we used a normal tokio queue, we could use https://docs.rs/tokio/1.44.2/tokio/sync/mpsc/struct.Sender.html#method.downgrade. Since you already store Arc in the sender, I guess that you could emulate it with a "Weaksender" variant that only stores a Weak pointer.

@@ -616,6 +626,7 @@ impl BorsTester {
async fn finish(self, bors: JoinHandle<()>) -> GitHubState {
// Make sure that the event channel senders are closed
drop(self.app);
self.mergeable_queue_tx.close();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or, an alternative design could be that dequeue returns you both the item and a sender, and then before calling dequeue you drop the sender, so there should not be any outstanding senders.

@Kobzol
Copy link
Member

Kobzol commented Apr 23, 2025

Thanks! We don't actually need the closed boolean anymore if we consider the shutdown to be more of an "interrupt" that closes the queue only once (interrupts the reader who uses dequeue as an iterator), as that is how the queue is supposed to be used, but we can keep it, it doesn't really matter. If cou yould please squash the commits, and we can finally merge this :) Great work!

- Implement mergeable queue loop
- Add to queue on PR edit base change
- Add to queue on push to branch
@Sakib25800 Sakib25800 force-pushed the add-mergeable-queue-loop branch from 6f0406b to b4c28c3 Compare April 23, 2025 08:27
@Sakib25800
Copy link
Contributor Author

Squashed.

Also removed the closed boolean.

Copy link
Member

@Kobzol Kobzol left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for putting up with all my review comments 😆 Really awesome job.

@Kobzol Kobzol added this pull request to the merge queue Apr 23, 2025
Merged via the queue into rust-lang:main with commit c888eda Apr 23, 2025
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants