Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 9 additions & 11 deletions crates/core/src/client_events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use tokio::sync::mpsc::{self, UnboundedSender};
use crate::contract::{ClientResponsesReceiver, ContractHandlerEvent};
use crate::message::{NodeEvent, QueryResult};
use crate::node::OpManager;
use crate::operations::{get, put, update, OpError};
use crate::operations::{get, put, subscribe, update, OpError};
use crate::{config::GlobalExecutor, contract::StoreResponse};

// pub(crate) mod admin_endpoints; // TODO: Add axum dependencies
Expand Down Expand Up @@ -932,19 +932,17 @@ async fn process_open_request(
"Starting new SUBSCRIBE network operation via RequestRouter",
);

let op_id = crate::node::subscribe(
op_manager.clone(),
key,
Some(client_id),
)
.await
.inspect_err(|err| {
tracing::error!("Subscribe error: {}", err);
})?;
// Phase 2 fix: Use the transaction_id from RequestRouter for consistency
let op = subscribe::start_op_with_id(key, transaction_id);
subscribe::request_subscribe(&op_manager, op)
.await
.inspect_err(|err| {
tracing::error!("Subscribe error: {}", err);
})?;

tracing::debug!(
request_id = %request_id,
transaction_id = %op_id,
transaction_id = %transaction_id,
operation = "subscribe",
"Request-Transaction correlation"
);
Expand Down
19 changes: 8 additions & 11 deletions crates/core/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,24 +308,21 @@ impl<T> From<SendError<T>> for OpError {

/// If the contract is not found, it will try to get it first if the `try_get` parameter is set.
async fn start_subscription_request(op_manager: &OpManager, key: ContractKey) {
// Check if we are the optimal location for this contract
// If we are and no other peers are suitable, skip subscription for now
// TODO: In the future, still find next-best peers for redundancy (issue #1793)
// Phase 1 fix for issue #1848: Allow nodes at optimal location to subscribe to next-best peers
// This prevents isolation when a node is at the optimal network position
let own_location = op_manager.ring.connection_manager.own_location();
let closest = op_manager
.ring
.closest_potentially_caching(&key, [&own_location.peer].as_slice());

if closest.is_none() {
// No other peers available for caching
// Check if we should be caching this locally
if op_manager.ring.should_seed(&key) {
tracing::debug!(
%key,
"Skipping subscription - node is optimal location and no other peers available"
);
return;
}
tracing::warn!(
%key,
"No remote peers available for subscription - node may become isolated"
);
// Still attempt subscription in case new peers join later
// The subscribe operation will handle the no-peers case appropriately
}

let sub_op = subscribe::start_op(key);
Expand Down
21 changes: 16 additions & 5 deletions crates/core/src/operations/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,19 +60,30 @@ pub(crate) fn start_op(key: ContractKey) -> SubscribeOp {
SubscribeOp { id, state }
}

/// Phase 2 fix for issue #1848: Allow subscribe with specific transaction ID
pub(crate) fn start_op_with_id(key: ContractKey, id: Transaction) -> SubscribeOp {
let state = Some(SubscribeState::PrepareRequest { id, key });
SubscribeOp { id, state }
}

/// Request to subscribe to value changes from a contract.
pub(crate) async fn request_subscribe(
op_manager: &OpManager,
sub_op: SubscribeOp,
) -> Result<(), OpError> {
if let Some(SubscribeState::PrepareRequest { id, key }) = &sub_op.state {
// Find a remote peer to handle the subscription
const EMPTY: &[PeerId] = &[];
let target = match op_manager.ring.closest_potentially_caching(key, EMPTY) {
// Phase 1 fix for issue #1848: Properly find next-best peer for subscription
// Exclude ourselves to ensure we find a remote peer, even if we're at optimal location
let own_location = op_manager.ring.connection_manager.own_location();
let skip_self = [own_location.peer.clone()];
let target = match op_manager
.ring
.closest_potentially_caching(key, &skip_self[..])
{
Some(peer) => peer,
None => {
// No remote peers available
tracing::debug!(%key, "No remote peers available for subscription");
// No remote peers available - this may happen when node is isolated
tracing::warn!(%key, "No remote peers available for subscription - node may be isolated");
return Err(RingError::NoCachingPeers(*key).into());
}
};
Expand Down
Loading