diff --git a/Cargo.toml b/Cargo.toml index 153316e60..351e1824f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ members = [ "tonic-web", # Non-published crates "examples", "codegen", + "grpc", "interop", # Tests "tests/disable_comments", "tests/included_service", diff --git a/grpc/Cargo.toml b/grpc/Cargo.toml new file mode 100644 index 000000000..b27942297 --- /dev/null +++ b/grpc/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "grpc" +version = "0.9.0-alpha.1" +edition = "2021" +authors = ["gRPC Authors"] +license = "Apache-2.0" + +[dependencies] +url = "2.5.0" +tokio = { version = "1.37.0", features = ["sync"] } +tonic = { version = "0.13.0", path = "../tonic", default-features = false, features = ["codegen"] } diff --git a/grpc/LICENSE b/grpc/LICENSE new file mode 100644 index 000000000..d64569567 --- /dev/null +++ b/grpc/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/grpc/NOTICE.txt b/grpc/NOTICE.txt new file mode 100644 index 000000000..88316812f --- /dev/null +++ b/grpc/NOTICE.txt @@ -0,0 +1,13 @@ +Copyright 2025 gRPC authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/grpc/src/attributes.rs b/grpc/src/attributes.rs new file mode 100644 index 000000000..0e1c72468 --- /dev/null +++ b/grpc/src/attributes.rs @@ -0,0 +1,22 @@ +/* + * + * Copyright 2025 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/// A key-value store for arbitrary configuration data between multiple +/// pluggable components. +#[derive(Debug, Default, Clone)] +pub struct Attributes; diff --git a/grpc/src/client/load_balancing/child_manager.rs b/grpc/src/client/load_balancing/child_manager.rs new file mode 100644 index 000000000..10cdc11ef --- /dev/null +++ b/grpc/src/client/load_balancing/child_manager.rs @@ -0,0 +1,256 @@ +/* + * + * Copyright 2025 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +//! A utility which helps parent LB policies manage multiple children for the +//! purposes of forwarding channel updates. + +// TODO: This is mainly provided as a fairly complex example of the current LB +// policy in use. Complete tests must be written before it can be used in +// production. Also, support for the work scheduler is missing. + +use std::{collections::HashMap, error::Error, hash::Hash, mem, sync::Arc}; + +use crate::client::load_balancing::{ + ChannelController, LbConfig, LbPolicy, LbPolicyBuilder, LbPolicyOptions, LbState, WorkScheduler, +}; +use crate::client::name_resolution::{Address, ResolverUpdate}; + +use super::{Subchannel, SubchannelState}; + +// An LbPolicy implementation that manages multiple children. +pub struct ChildManager { + subchannel_child_map: HashMap, + children: Vec>, + shard_update: Box>, +} + +struct Child { + identifier: T, + policy: Box, + state: LbState, +} + +/// A collection of data sent to a child of the ChildManager. +pub struct ChildUpdate { + /// The identifier the ChildManager should use for this child. + pub child_identifier: T, + /// The builder the ChildManager should use to create this child if it does + /// not exist. + pub child_policy_builder: Box, + /// The relevant ResolverUpdate to send to this child. + pub child_update: ResolverUpdate, +} + +// TODO: convert to a trait? +/// Performs the operation of sharding an aggregate ResolverUpdate into one or +/// more ChildUpdates. Called automatically by the ChildManager when its +/// resolver_update method is called. +pub type ResolverUpdateSharder = + fn( + ResolverUpdate, + ) -> Result>>, Box>; + +impl ChildManager { + /// Creates a new ChildManager LB policy. shard_update is called whenever a + /// resolver_update operation occurs. + pub fn new(shard_update: Box>) -> Self { + Self { + subchannel_child_map: HashMap::default(), + children: Vec::default(), + shard_update, + } + } + + /// Returns data for all current children. + pub fn child_states(&mut self) -> impl Iterator { + self.children + .iter() + .map(|child| (&child.identifier, &child.state)) + } + + // Called to update all accounting in the ChildManager from operations + // performed by a child policy on the WrappedController that was created for + // it. child_idx is an index into the children map for the relevant child. + // + // TODO: this post-processing step can be eliminated by capturing the right + // state inside the WrappedController, however it is fairly complex. Decide + // which way is better. + fn resolve_child_controller( + &mut self, + channel_controller: WrappedController, + child_idx: usize, + ) { + // Add all created subchannels into the subchannel_child_map. + for csc in channel_controller.created_subchannels { + self.subchannel_child_map.insert(csc, child_idx); + } + // Update the tracked state if the child produced an update. + if let Some(state) = channel_controller.picker_update { + self.children[child_idx].state = state; + }; + } +} + +impl LbPolicy for ChildManager { + fn resolver_update( + &mut self, + resolver_update: ResolverUpdate, + config: Option<&LbConfig>, + channel_controller: &mut dyn ChannelController, + ) -> Result<(), Box> { + // First determine if the incoming update is valid. + let child_updates = (self.shard_update)(resolver_update)?; + + // Replace self.children with an empty vec. + let mut old_children = vec![]; + mem::swap(&mut self.children, &mut old_children); + + // Replace the subchannel map with an empty map. + let mut old_subchannel_child_map = HashMap::new(); + mem::swap( + &mut self.subchannel_child_map, + &mut old_subchannel_child_map, + ); + // Reverse the old subchannel map. + let mut old_child_subchannels_map: HashMap> = HashMap::new(); + for (subchannel, child_idx) in old_subchannel_child_map { + old_child_subchannels_map + .entry(child_idx) + .or_default() + .push(subchannel); + } + + // Build a map of the old children from their IDs for efficient lookups. + let old_children = old_children + .into_iter() + .enumerate() + .map(|(old_idx, e)| (e.identifier, (e.policy, e.state, old_idx))); + let mut old_children: HashMap = old_children.collect(); + + // Split the child updates into the IDs and builders, and the + // ResolverUpdates. + let (ids_builders, updates): (Vec<_>, Vec<_>) = child_updates + .map(|e| ((e.child_identifier, e.child_policy_builder), e.child_update)) + .unzip(); + + // Transfer children whose identifiers appear before and after the + // update, and create new children. Add entries back into the + // subchannel map. + for (new_idx, (identifier, builder)) in ids_builders.into_iter().enumerate() { + if let Some((policy, state, old_idx)) = old_children.remove(&identifier) { + for subchannel in old_child_subchannels_map + .remove(&old_idx) + .into_iter() + .flatten() + { + self.subchannel_child_map.insert(subchannel, new_idx); + } + self.children.push(Child { + identifier, + state, + policy, + }); + } else { + let policy = builder.build(LbPolicyOptions { + work_scheduler: Arc::new(UnimplWorkScheduler {}), + }); + let state = LbState::initial(); + self.children.push(Child { + identifier, + state, + policy, + }); + }; + } + + // Anything left in old_children will just be Dropped and cleaned up. + + // Call resolver_update on all children. + let mut updates = updates.into_iter(); + for child_idx in 0..self.children.len() { + let child = &mut self.children[child_idx]; + let child_update = updates.next().unwrap(); + let mut channel_controller = WrappedController::new(channel_controller); + let _ = child + .policy + .resolver_update(child_update, config, &mut channel_controller); + self.resolve_child_controller(channel_controller, child_idx); + } + Ok(()) + } + + fn subchannel_update( + &mut self, + subchannel: &Subchannel, + state: &SubchannelState, + channel_controller: &mut dyn ChannelController, + ) { + // Determine which child created this subchannel. + let child_idx = *self.subchannel_child_map.get(subchannel).unwrap(); + let policy = &mut self.children[child_idx].policy; + // Wrap the channel_controller to track the child's operations. + let mut channel_controller = WrappedController::new(channel_controller); + // Call the proper child. + policy.subchannel_update(subchannel, state, &mut channel_controller); + self.resolve_child_controller(channel_controller, child_idx); + } + + fn work(&mut self, _channel_controller: &mut dyn ChannelController) { + todo!(); + } +} + +struct WrappedController<'a> { + channel_controller: &'a mut dyn ChannelController, + created_subchannels: Vec, + picker_update: Option, +} + +impl<'a> WrappedController<'a> { + fn new(channel_controller: &'a mut dyn ChannelController) -> Self { + Self { + channel_controller, + created_subchannels: vec![], + picker_update: None, + } + } +} + +impl ChannelController for WrappedController<'_> { + fn new_subchannel(&mut self, address: &Address) -> Subchannel { + let subchannel = self.channel_controller.new_subchannel(address); + self.created_subchannels.push(subchannel.clone()); + subchannel + } + + fn update_picker(&mut self, update: LbState) { + self.picker_update = Some(update); + } + + fn request_resolution(&mut self) { + self.channel_controller.request_resolution(); + } +} + +pub struct UnimplWorkScheduler; + +impl WorkScheduler for UnimplWorkScheduler { + fn schedule_work(&self) { + todo!(); + } +} diff --git a/grpc/src/client/load_balancing/mod.rs b/grpc/src/client/load_balancing/mod.rs new file mode 100644 index 000000000..91b04f903 --- /dev/null +++ b/grpc/src/client/load_balancing/mod.rs @@ -0,0 +1,267 @@ +/* + * + * Copyright 2025 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +pub mod child_manager; + +use std::{any::Any, error::Error, hash::Hash, sync::Arc}; + +use tonic::{metadata::MetadataMap, Status}; + +use crate::client::{ + name_resolution::{Address, ResolverUpdate}, + service::Request, + ConnectivityState, +}; + +/// A collection of data configured on the channel that is constructing this +/// LbPolicy. +pub struct LbPolicyOptions { + /// A hook into the channel's work scheduler that allows the LbPolicy to + /// request the ability to perform operations on the ChannelController. + pub work_scheduler: Arc, +} + +/// Used to asynchronously request a call into the LbPolicy's work method if +/// the LbPolicy needs to provide an update without waiting for an update +/// from the channel first. +pub trait WorkScheduler: Send + Sync { + // Schedules a call into the LbPolicy's work method. If there is already a + // pending work call that has not yet started, this may not schedule another + // call. + fn schedule_work(&self); +} + +/// An LB policy factory that produces LbPolicy instances used by the channel +/// to manage connections and pick connections for RPCs. +pub trait LbPolicyBuilder: Send + Sync { + /// Builds and returns a new LB policy instance. + /// + /// Note that build must not fail. Any optional configuration is delivered + /// via the LbPolicy's resolver_update method. + /// + /// An LbPolicy instance is assumed to begin in a Connecting state that + /// queues RPCs until its first update. + fn build(&self, options: LbPolicyOptions) -> Box; + + /// Reports the name of the LB Policy. + fn name(&self) -> &'static str; + + /// Parses the JSON LB policy configuration into an internal representation. + /// + /// LB policies do not need to accept a configuration, in which case the + /// default implementation returns Ok(None). + fn parse_config( + &self, + _config: &str, + ) -> Result, Box> { + Ok(None) + } +} + +/// An LB policy instance. +/// +/// LB policies are responsible for creating connections (modeled as +/// Subchannels) and producing Picker instances for picking connections for +/// RPCs. +pub trait LbPolicy: Send { + /// Called by the channel when the name resolver produces a new set of + /// resolved addresses or a new service config. + fn resolver_update( + &mut self, + update: ResolverUpdate, + config: Option<&LbConfig>, + channel_controller: &mut dyn ChannelController, + ) -> Result<(), Box>; + + /// Called by the channel when any subchannel created by the LB policy + /// changes state. + fn subchannel_update( + &mut self, + subchannel: &Subchannel, + state: &SubchannelState, + channel_controller: &mut dyn ChannelController, + ); + + /// Called by the channel in response to a call from the LB policy to the + /// WorkScheduler's request_work method. + fn work(&mut self, channel_controller: &mut dyn ChannelController); +} + +/// Controls channel behaviors. +pub trait ChannelController: Send + Sync { + /// Creates a new subchannel in IDLE state. + fn new_subchannel(&mut self, address: &Address) -> Subchannel; + + /// Provides a new snapshot of the LB policy's state to the channel. + fn update_picker(&mut self, update: LbState); + + /// Signals the name resolver to attempt to re-resolve addresses. Typically + /// used when connections fail, indicating a possible change in the overall + /// network configuration. + fn request_resolution(&mut self); +} + +/// Represents the current state of a Subchannel. +#[derive(Clone)] +pub struct SubchannelState { + /// The connectivity state of the subchannel. See SubChannel for a + /// description of the various states and their valid transitions. + pub connectivity_state: ConnectivityState, + // Set if connectivity state is TransientFailure to describe the most recent + // connection error. None for any other connectivity_state value. + pub last_connection_error: Option>, +} + +/// A convenience wrapper for an LB policy's configuration object. +pub struct LbConfig { + config: Box, +} + +impl LbConfig { + /// Create a new LbConfig wrapper containing the provided config. + pub fn new(config: Box) -> Self { + LbConfig { config } + } + + /// Converts the wrapped configuration into the type used by the LbPolicy. + pub fn into(&self) -> Option<&T> { + self.config.downcast_ref::() + } +} + +/// A Picker is responsible for deciding what Subchannel to use for any given +/// request. A Picker is only used once for any RPC. If pick() returns Queue, +/// the channel will queue the RPC until a new Picker is produced by the +/// LbPolicy, and will call pick() on the new Picker for the request. +/// +/// Pickers are always paired with a ConnectivityState which the channel will +/// expose to applications so they can predict what might happens when +/// performing RPCs: +/// +/// If the ConnectivityState is Idle, the Picker should ensure connections are +/// initiated by the LbPolicy that produced the Picker, and return a Queue +/// result so the request is attempted the next time a Picker is produced. +/// +/// If the ConnectivityState is Connecting, the Picker should return a Queue +/// result and continue to wait for pending connections. +/// +/// If the ConnectivityState is Ready, the Picker should return a Ready +/// Subchannel. +/// +/// If the ConnectivityState is TransientFailure, the Picker should return an +/// Err with an error that describes why connections are failing. +pub trait Picker: Send + Sync { + /// Picks a connection to use for the request. + /// + /// This function should not block. If the Picker needs to do blocking or + /// time-consuming work to service this request, it should return Queue, and + /// the Pick call will be repeated by the channel when a new Picker is + /// produced by the LbPolicy. + fn pick(&self, request: &Request) -> PickResult; +} + +pub enum PickResult { + /// Indicates the Subchannel in the Pick should be used for the request. + Pick(Pick), + /// Indicates the LbPolicy is attempting to connect to a server to use for + /// the request. + Queue, + /// Indicates that the request should fail with the included error status + /// (with the code converted to UNAVAILABLE). If the RPC is wait-for-ready, + /// then it will not be terminated, but instead attempted on a new picker if + /// one is produced before it is cancelled. + Fail(Status), + /// Indicates that the request should fail with the included status + /// immediately, even if the RPC is wait-for-ready. The channel will + /// convert the status code to INTERNAL if it is not a valid code for the + /// gRPC library to produce, per [gRFC A54]. + /// + /// [gRFC A54]: + /// https://github.com/grpc/proposal/blob/master/A54-restrict-control-plane-status-codes.md + Drop(Status), +} + +/// Data provided by the LB policy. +#[derive(Clone)] +pub struct LbState { + pub connectivity_state: super::ConnectivityState, + pub picker: Arc, +} + +impl LbState { + /// Returns a generic initial LbState which is Connecting and a picker which + /// queues all picks. + pub fn initial() -> Self { + Self { + connectivity_state: ConnectivityState::Connecting, + picker: Arc::new(QueuingPicker {}), + } + } +} + +/// A collection of data used by the channel for routing a request. +pub struct Pick { + /// The Subchannel for the request. + pub subchannel: Subchannel, + // Metadata to be added to existing outgoing metadata. + pub metadata: MetadataMap, +} + +/// A Subchannel represents a method of communicating with a server which may be +/// connected or disconnected many times across its lifetime. +/// +/// - Subchannels start IDLE. +/// +/// - IDLE transitions to CONNECTING when connect() is called. +/// +/// - CONNECTING transitions to READY on success or TRANSIENT_FAILURE on error. +/// +/// - READY transitions to IDLE when the connection is lost. +/// +/// - TRANSIENT_FAILURE transitions to CONNECTING when the reconnect backoff +/// timer has expired. This timer scales exponentially and is reset when the +/// subchannel becomes READY. +/// +/// When a Subchannel is dropped, it is disconnected, and no subsequent state +/// updates will be provided for it to the LB policy. +#[derive(Clone, Debug)] +pub struct Subchannel; + +impl Hash for Subchannel { + fn hash(&self, _state: &mut H) { + todo!() + } +} + +impl PartialEq for Subchannel { + fn eq(&self, _other: &Self) -> bool { + todo!() + } +} + +impl Eq for Subchannel {} + +/// QueuingPicker always returns Queue. LB policies that are not actively +/// Connecting should not use this picker. +pub struct QueuingPicker {} + +impl Picker for QueuingPicker { + fn pick(&self, _request: &Request) -> PickResult { + PickResult::Queue + } +} diff --git a/grpc/src/client/mod.rs b/grpc/src/client/mod.rs new file mode 100644 index 000000000..8bc693e4f --- /dev/null +++ b/grpc/src/client/mod.rs @@ -0,0 +1,41 @@ +/* + * + * Copyright 2025 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +pub(crate) mod load_balancing; +pub(crate) mod name_resolution; +pub mod service; +pub mod service_config; + +/// A representation of the current state of a gRPC channel, also used for the +/// state of subchannels (individual connections within the channel). +/// +/// A gRPC channel begins in the Idle state. When an RPC is attempted, the +/// channel will automatically transition to Connecting. If connections to a +/// backend service are available, the state becomes Ready. Otherwise, if RPCs +/// would fail due to a lack of connections, the state becomes TransientFailure +/// and continues to attempt to reconnect. +/// +/// Channels may re-enter the Idle state if they are unused for longer than +/// their configured idleness timeout. +#[derive(Copy, Clone, PartialEq, Debug)] +pub enum ConnectivityState { + Idle, + Connecting, + Ready, + TransientFailure, +} diff --git a/grpc/src/client/name_resolution/mod.rs b/grpc/src/client/name_resolution/mod.rs new file mode 100644 index 000000000..05c5cf795 --- /dev/null +++ b/grpc/src/client/name_resolution/mod.rs @@ -0,0 +1,189 @@ +/* + * + * Copyright 2025 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +//! Name Resolution for gRPC. +//! +//! Name Resolution is the process by which a channel's target is converted into +//! network addresses (typically IP addresses) used by the channel to connect to +//! a service. +use core::fmt; + +use std::{ + error::Error, + fmt::{Display, Formatter}, + hash::Hash, + sync::Arc, +}; + +use tokio::sync::Notify; +use tonic::async_trait; +use url::Url; + +use crate::attributes::Attributes; + +use super::service_config::ServiceConfig; + +/// A name resolver factory that produces Resolver instances used by the channel +/// to resolve network addresses for the target URI. +pub trait ResolverBuilder: Send + Sync { + /// Builds and returns a new name resolver instance. + /// + /// Note that build must not fail. Instead, an erroring Resolver may be + /// returned that calls ChannelController.update() with an Err value. + fn build( + &self, + target: Url, + resolve_now: Arc, + options: ResolverOptions, + ) -> Box; + + /// Reports the URI scheme handled by this name resolver. + fn scheme(&self) -> &'static str; + + /// Returns the default authority for a channel using this name resolver and + /// target. This is typically the same as the service's name. By default, + /// the default_authority method automatically returns the path portion of + /// the target URI, with the leading prefix removed. + fn default_authority(&self, target: &Url) -> String { + let path = target.path(); + path.strip_prefix("/").unwrap_or(path).to_string() + } +} + +/// A collection of data configured on the channel that is constructing this +/// name resolver. +#[derive(Debug, Default)] +#[non_exhaustive] +pub struct ResolverOptions { + /// The authority that will be used for the channel by default. This + /// contains either the result of the default_authority method of this + /// ResolverBuilder, or another string if the channel was configured to + /// override the default. + authority: String, +} + +#[async_trait] +/// A collection of operations a Resolver may perform on the channel which +/// constructed it. +pub trait ChannelController: Send + Sync { + /// Parses the provided JSON service config. + fn parse_config(&self, config: &str) -> Result>; // TODO + + /// Notifies the channel about the current state of the name resolver. If + /// an error value is returned, the name resolver should attempt to + /// re-resolve, if possible. The resolver is responsible for applying an + /// appropriate backoff mechanism to avoid overloading the system or the + /// remote resolver. + async fn update(&self, update: ResolverUpdate) -> Result<(), Box>; +} + +/// A name resolver update expresses the current state of the resolver. +pub enum ResolverUpdate { + /// Indicates the name resolver encountered an error. + Err(Box), + /// Indicates the name resolver produced a valid result. + Data(ResolverData), +} + +/// Data provided by the name resolver to the channel. +#[derive(Debug, Default)] +#[non_exhaustive] +pub struct ResolverData { + /// A list of endpoints which each identify a logical host serving the + /// service indicated by the target URI. + pub endpoints: Vec, + /// The service config which the client should use for communicating with + /// the service. + pub service_config: Option, + // Optional data which may be used by the LB Policy or channel. + pub attributes: Attributes, +} + +/// An Endpoint is an address or a collection of addresses which reference one +/// logical server. Multiple addresses may be used if there are multiple ways +/// which the server can be reached, e.g. via IPv4 and IPv6 addresses. +#[derive(Debug, Default, Clone)] +#[non_exhaustive] +pub struct Endpoint { + /// The list of addresses used to connect to the server. + pub addresses: Vec
, + /// Optional data which may be used by the LB policy or channel. + pub attributes: Attributes, +} + +impl Eq for Endpoint {} + +impl PartialEq for Endpoint { + fn eq(&self, _other: &Self) -> bool { + todo!() + } +} + +impl Hash for Endpoint { + fn hash(&self, _state: &mut H) { + todo!() + } +} + +/// An Address is an identifier that indicates how to connect to a server. +#[derive(Debug, Default, Clone)] +#[non_exhaustive] +pub struct Address { + /// The address type is used to identify what kind of transport to create + /// when connecting to this address. Typically TCP_IP_ADDRESS_TYPE. + pub address_type: String, // TODO: &'static str? + /// The address itself is passed to the transport in order to create a + /// connection to it. + pub address: String, + // Optional data which the transport may use for the connection. + pub attributes: Attributes, +} + +impl Eq for Address {} + +impl PartialEq for Address { + fn eq(&self, _other: &Self) -> bool { + todo!() + } +} + +impl Hash for Address { + fn hash(&self, _state: &mut H) { + todo!() + } +} + +impl Display for Address { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + write!(f, "{}:{}", self.address_type, self.address) + } +} + +/// Indicates the address is an IPv4 or IPv6 address that should be connected to +/// via TCP/IP. +pub static TCP_IP_ADDRESS_TYPE: &str = "tcp"; + +/// A name resolver instance. +#[async_trait] +pub trait Resolver: Send + Sync { + /// The entry point of the resolver. Will only be called once by the + /// channel. Should not return unless the resolver never will need to + /// update its state. The future will be dropped when the channel shuts + /// down or enters idle mode. + async fn run(&mut self, channel_controller: Box); +} diff --git a/grpc/src/client/service.rs b/grpc/src/client/service.rs new file mode 100644 index 000000000..d1d3c0534 --- /dev/null +++ b/grpc/src/client/service.rs @@ -0,0 +1,23 @@ +/* + * + * Copyright 2025 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/// A gRPC Request. +pub struct Request; + +/// A gRPC Response. +pub struct Response; diff --git a/grpc/src/client/service_config.rs b/grpc/src/client/service_config.rs new file mode 100644 index 000000000..3639c03e5 --- /dev/null +++ b/grpc/src/client/service_config.rs @@ -0,0 +1,22 @@ +/* + * + * Copyright 2025 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/// An in-memory representation of a service config, usually provided to gRPC as +/// a JSON object. +#[derive(Debug, Default)] +pub(crate) struct ServiceConfig; diff --git a/grpc/src/lib.rs b/grpc/src/lib.rs new file mode 100644 index 000000000..064ce6077 --- /dev/null +++ b/grpc/src/lib.rs @@ -0,0 +1,31 @@ +/* + * + * Copyright 2025 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +//! The official Rust implementation of [gRPC], a high performance, open source, +//! universal RPC framework +//! +//! This version is in progress and not recommended for any production use. All +//! APIs are unstable. Proceed at your own risk. +//! +//! [gRPC]: https://grpc.io + +#![allow(dead_code)] + +pub mod client; + +pub(crate) mod attributes;