diff --git a/spaces-cli/Cargo.toml b/spaces-cli/Cargo.toml index 53fd1d8b..8b0dbe03 100644 --- a/spaces-cli/Cargo.toml +++ b/spaces-cli/Cargo.toml @@ -8,16 +8,16 @@ description = "spacesvm cli for issuing RPC commands" license = "BSD-3-Clause" homepage = "https://avax.network" +[[bin]] +name = "spaces-cli" +path = "src/bin/spaces-cli/main.rs" + [dependencies] -avalanche-types = { version = "0.0.138", features = ["subnet"] } clap = { version = "4.0", features = ["derive"] } hex = "0.4.3" jsonrpc-core = "18.0.0" -jsonrpc-core-client = { version = "18.0.0" } -jsonrpc-client-transports = "18.0.0" -jsonrpc-derive = "18.0" log = "0.4.17" -spacesvm = { path = "../spacesvm" } serde = { version = "1.0.147", features = ["derive"] } serde_json = "1.0.87" -tokio = { version = "1.21.2", features = ["full"] } +spacesvm = { path = "../spacesvm" } +tokio = { version = "1.22.0", features = ["full"] } diff --git a/spaces-cli/src/bin/spaces-cli/main.rs b/spaces-cli/src/bin/spaces-cli/main.rs new file mode 100644 index 00000000..d1736293 --- /dev/null +++ b/spaces-cli/src/bin/spaces-cli/main.rs @@ -0,0 +1,110 @@ +use std::error; + +use clap::{Parser, Subcommand}; +use jsonrpc_core::futures; +use spacesvm::{ + api::{ + client::{claim_tx, delete_tx, get_or_create_pk, set_tx, Client, Uri}, + DecodeTxArgs, IssueTxArgs, ResolveArgs, + }, + chain::tx::{decoder, unsigned::TransactionData}, +}; + +#[derive(Subcommand, Debug)] +enum Command { + Claim { + space: String, + }, + Set { + space: String, + key: String, + value: String, + }, + Delete { + space: String, + key: String, + }, + Get { + space: String, + key: String, + }, + Ping {}, +} + +#[derive(Parser)] +#[command(version, about, long_about = None)] +struct Cli { + /// Endpoint for RPC calls. + #[clap(long)] + endpoint: String, + + /// Private key file. + #[clap(long, default_value = ".spacesvm-cli-pk")] + private_key_file: String, + + /// Which subcommand to call. + #[command(subcommand)] + command: Command, +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let cli = Cli::parse(); + + let secret_key = get_or_create_pk(&cli.private_key_file)?; + let uri = cli.endpoint.parse::()?; + let mut client = Client::new(uri); + + if let Command::Get { space, key } = &cli.command { + let resp = futures::executor::block_on(client.resolve(ResolveArgs { + space: space.as_bytes().to_vec(), + key: key.as_bytes().to_vec(), + })) + .map_err(|e| e.to_string())?; + log::debug!("resolve response: {:?}", resp); + + println!("{}", serde_json::to_string(&resp)?); + return Ok(()); + } + + if let Command::Ping {} = &cli.command { + let resp = futures::executor::block_on(client.ping()).map_err(|e| e.to_string())?; + + println!("{}", serde_json::to_string(&resp)?); + return Ok(()); + } + + // decode tx + let tx_data = command_to_tx(cli.command)?; + let resp = futures::executor::block_on(client.decode_tx(DecodeTxArgs { tx_data })) + .map_err(|e| e.to_string())?; + + let typed_data = &resp.typed_data; + + // create signature + let dh = decoder::hash_structured_data(typed_data)?; + let sig = secret_key.sign_digest(&dh.as_bytes())?; + + // issue tx + let resp = futures::executor::block_on(client.issue_tx(IssueTxArgs { + typed_data: resp.typed_data, + signature: sig.to_bytes().to_vec(), + })) + .map_err(|e| e.to_string())?; + println!("{}", serde_json::to_string(&resp)?); + + Ok(()) +} + +/// Takes a TX command and returns transaction data. +fn command_to_tx(command: Command) -> std::io::Result { + match command { + Command::Claim { space } => Ok(claim_tx(space)), + Command::Set { space, key, value } => Ok(set_tx(space, key, value.as_bytes().to_vec())), + Command::Delete { space, key } => Ok(delete_tx(space, key)), + _ => Err(std::io::Error::new( + std::io::ErrorKind::Other, + "not a supported tx", + )), + } +} diff --git a/spaces-cli/src/main.rs b/spaces-cli/src/main.rs deleted file mode 100644 index 80615022..00000000 --- a/spaces-cli/src/main.rs +++ /dev/null @@ -1,194 +0,0 @@ -use std::{ - error, - fs::File, - io::{Result, Write}, - path::Path, -}; - -use avalanche_types::key; -use clap::{Parser, Subcommand}; -use jsonrpc_client_transports::{transports, RpcError}; -use jsonrpc_core::futures; -use spacesvm::{ - api::{ - DecodeTxArgs, IssueTxArgs, IssueTxResponse, PingResponse, ResolveArgs, ResolveResponse, - ServiceClient as Client, - }, - chain::tx::{decoder, tx::TransactionType, unsigned::TransactionData}, -}; - -#[derive(Parser)] -#[command(version, about, long_about = None)] -struct Cli { - /// Endpoint for RPC calls. - #[clap(long)] - endpoint: String, - - /// Private key file. - #[clap(long, default_value = ".spacesvm-cli-pk")] - private_key_file: String, - - /// Which subcommand to call. - #[command(subcommand)] - command: Command, -} - -#[derive(Subcommand, Debug)] -enum Command { - Claim { - space: String, - }, - Set { - space: String, - key: String, - value: String, - }, - Delete { - space: String, - key: String, - }, - Get { - space: String, - key: String, - }, - Ping {}, -} - -#[tokio::main] -async fn main() -> std::result::Result<(), Box> { - let cli = Cli::parse(); - - let secret_key = get_or_create_pk(&cli.private_key_file)?; - let connection = transports::http::connect::(&cli.endpoint); - let client = futures::executor::block_on(connection)?; - - // prints the value returned if available. - if let Command::Get { space, key } = &cli.command { - let resp = - futures::executor::block_on(get(&client, space, key)).map_err(|e| e.to_string())?; - log::debug!("{:?}", resp); - - println!("{}", String::from_utf8_lossy(&resp.value)); - return Ok(()); - } - - // returns on success and errors on failure - if let Command::Ping {} = &cli.command { - let resp = futures::executor::block_on(ping(&client)).map_err(|e| e.to_string())?; - log::debug!("{:?}", resp); - - return Ok(()); - } - - let tx = command_to_tx(cli.command)?; - - // prints the id of a successful transaction. - let resp = futures::executor::block_on(sign_and_submit(&client, &secret_key, tx)) - .map_err(|e| e.to_string())?; - println!("{}", resp.tx_id); - - Ok(()) -} - -/// Takes a TX command and returns transaction data. -fn command_to_tx(command: Command) -> Result { - match command { - Command::Claim { space } => Ok(claim_tx(space)), - Command::Set { space, key, value } => Ok(set_tx(space, key, value.as_bytes().to_vec())), - Command::Delete { space, key } => Ok(delete_tx(space, key)), - _ => Err(std::io::Error::new( - std::io::ErrorKind::Other, - "not a supported tx", - )), - } -} - -/// Returns a private key from a given path or creates new. -fn get_or_create_pk(path: &str) -> Result { - if !Path::new(path).try_exists()? { - let secret_key = key::secp256k1::private_key::Key::generate().unwrap(); - let mut f = File::create(path)?; - let hex = hex::encode(&secret_key.to_bytes()); - f.write_all(hex.as_bytes())?; - return Ok(secret_key); - } - let contents = std::fs::read_to_string(path)?; - let parsed = hex::decode(contents) - .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?; - key::secp256k1::private_key::Key::from_bytes(&parsed) - .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string())) -} - -fn claim_tx(space: String) -> TransactionData { - TransactionData { - typ: TransactionType::Claim, - space, - key: "".to_string(), - value: vec![], - } -} - -fn set_tx(space: String, key: String, value: Vec) -> TransactionData { - TransactionData { - typ: TransactionType::Set, - space, - key, - value, - } -} - -fn delete_tx(space: String, key: String) -> TransactionData { - TransactionData { - typ: TransactionType::Delete, - space, - key, - value: vec![], - } -} - -async fn ping(client: &Client) -> Result { - let error_handling = - |e: RpcError| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()); - client.ping().await.map_err(error_handling) -} - -/// Decodes transaction signs the typed data ans issues tx returning IssueTxResponse. -async fn sign_and_submit( - client: &Client, - pk: &key::secp256k1::private_key::Key, - tx_data: TransactionData, -) -> Result { - let error_handling = - |e: RpcError| std::io::Error::new(std::io::ErrorKind::Other, dbg!(e).to_string()); - let resp = client - .decode_tx(DecodeTxArgs { tx_data }) - .await - .map_err(error_handling)?; - - let typed_data = &resp.typed_data; - - let dh = decoder::hash_structured_data(typed_data)?; - let sig = pk.sign_digest(&dh.as_bytes())?; - - client - .issue_tx(IssueTxArgs { - typed_data: resp.typed_data, - signature: sig.to_bytes().to_vec(), - }) - .await - .map_err(error_handling) -} - -/// Get returns a ResolveResponse. -async fn get(client: &Client, space: &str, key: &str) -> Result { - let error_handling = - |e: RpcError| std::io::Error::new(std::io::ErrorKind::Other, dbg!(e).to_string()); - - client - .resolve(ResolveArgs { - space: space.as_bytes().to_vec(), - key: key.as_bytes().to_vec(), - }) - .await - .map_err(error_handling) -} diff --git a/spacesvm/Cargo.toml b/spacesvm/Cargo.toml index 063a4a57..71dcfb7e 100644 --- a/spacesvm/Cargo.toml +++ b/spacesvm/Cargo.toml @@ -15,7 +15,7 @@ name = "spacesvm" path = "src/bin/spaces/main.rs" [dependencies] -avalanche-types = { version = "0.0.138", features = ["subnet"] } +avalanche-types = { version = "0.0.140", features = ["subnet"] } byteorder = "1.4.3" chrono = "0.4.22" crossbeam-channel = "0.5.6" @@ -27,6 +27,7 @@ eip-712 = "0.1.0" env_logger = "0.9.3" hex = "0.4.3" http = "0.2.8" +hyper = "0.14.23" jsonrpc-core = "18.0.0" jsonrpc-core-client = { version = "18.0.0" } jsonrpc-derive = "18.0" @@ -47,4 +48,8 @@ typetag = "0.2" [dev-dependencies] jsonrpc-tcp-server = "18.0.0" -futures-test = "0.3.24" \ No newline at end of file +futures-test = "0.3.24" + +[[test]] +name = "integration" +path = "tests/integration_tests.rs" diff --git a/spacesvm/src/api/client.rs b/spacesvm/src/api/client.rs new file mode 100644 index 00000000..9e85e02d --- /dev/null +++ b/spacesvm/src/api/client.rs @@ -0,0 +1,186 @@ +use std::{ + fs::File, + io::{Result, Write}, + path::Path, +}; + +use crate::{ + api::{ + DecodeTxArgs, DecodeTxResponse, IssueTxArgs, IssueTxResponse, PingResponse, ResolveArgs, + ResolveResponse, + }, + chain::tx::{tx::TransactionType, unsigned::TransactionData}, +}; +use avalanche_types::key; +use http::{Method, Request}; +use hyper::{body, client::HttpConnector, Body, Client as HyperClient}; +use jsonrpc_core::{Call, Id, MethodCall, Params, Version}; +use serde::de; + +pub use http::Uri; + +/// HTTP client for interacting with the API, assumes single threaded use. +pub struct Client { + id: u64, + client: HyperClient, + pub uri: Uri, +} + +impl Client { + pub fn new(uri: Uri) -> Self { + let client = HyperClient::new(); + Self { id: 0, client, uri } + } +} + +impl Client { + fn next_id(&mut self) -> Id { + let id = self.id; + self.id = id + 1; + Id::Num(id) + } + + /// Returns a serialized json request as string and the request id. + pub fn raw_request(&mut self, method: &str, params: &Params) -> (Id, String) { + let id = self.next_id(); + let request = jsonrpc_core::Request::Single(Call::MethodCall(MethodCall { + jsonrpc: Some(Version::V2), + method: method.to_owned(), + params: params.to_owned(), + id: id.clone(), + })); + ( + id, + serde_json::to_string(&request).expect("jsonrpc request should be serializable"), + ) + } + + /// Returns a PingResponse from client request. + pub async fn ping(&mut self) -> Result { + let (_id, json_request) = self.raw_request("ping", &Params::None); + let resp = self.post_de::(&json_request).await?; + + Ok(resp) + } + + /// Returns a DecodeTxResponse from client request. + pub async fn decode_tx(&mut self, args: DecodeTxArgs) -> Result { + let arg_bytes = serde_json::to_vec(&args)?; + let params: Params = serde_json::from_slice(&arg_bytes)?; + let (_id, json_request) = self.raw_request("decodeTx", ¶ms); + let resp = self.post_de::(&json_request).await?; + + Ok(resp) + } + + /// Returns a IssueTxResponse from client request. + pub async fn issue_tx(&mut self, args: IssueTxArgs) -> Result { + let arg_bytes = serde_json::to_vec(&args)?; + let params: Params = serde_json::from_slice(&arg_bytes)?; + let (_id, json_request) = self.raw_request("issueTx", ¶ms); + let resp = self.post_de::(&json_request).await?; + + Ok(resp) + } + + /// Returns a ResolveResponse from client request. + pub async fn resolve(&mut self, args: ResolveArgs) -> Result { + let arg_bytes = serde_json::to_vec(&args)?; + let params: Params = serde_json::from_slice(&arg_bytes)?; + let (_id, json_request) = self.raw_request("resolve", ¶ms); + let resp = self.post_de::(&json_request).await?; + + Ok(resp) + } + + /// Returns a deserialized response from client request. + pub async fn post_de(&self, json: &str) -> Result { + let req = Request::builder() + .method(Method::POST) + .uri(self.uri.to_string()) + .header("content-type", "application/json-rpc") + .body(Body::from(json.to_owned())) + .map_err(|e| { + std::io::Error::new( + std::io::ErrorKind::Other, + format!("failed to create client request: {}", e), + ) + })?; + + let resp = self.client.request(req).await.map_err(|e| { + std::io::Error::new( + std::io::ErrorKind::Other, + format!("client post request failed: {}", e), + ) + })?; + + let bytes = body::to_bytes(resp.into_body()) + .await + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?; + let resp = serde_json::from_slice(&bytes).map_err(|e| { + std::io::Error::new( + std::io::ErrorKind::Other, + format!("failed to create client request: {}", e), + ) + })?; + + Ok(resp) + } +} + +pub fn claim_tx(space: String) -> TransactionData { + TransactionData { + typ: TransactionType::Claim, + space, + key: String::new(), + value: vec![], + } +} + +pub fn set_tx(space: String, key: String, value: Vec) -> TransactionData { + TransactionData { + typ: TransactionType::Set, + space, + key, + value, + } +} + +pub fn delete_tx(space: String, key: String) -> TransactionData { + TransactionData { + typ: TransactionType::Delete, + space, + key, + value: vec![], + } +} + +/// Returns a private key from a given path or creates new. +pub fn get_or_create_pk(path: &str) -> Result { + if !Path::new(path).try_exists()? { + let secret_key = key::secp256k1::private_key::Key::generate().unwrap(); + let mut f = File::create(path)?; + let hex = hex::encode(&secret_key.to_bytes()); + f.write_all(hex.as_bytes())?; + return Ok(secret_key); + } + + let contents = std::fs::read_to_string(path)?; + let parsed = hex::decode(contents) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?; + key::secp256k1::private_key::Key::from_bytes(&parsed) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string())) +} + +#[tokio::test] +async fn test_raw_request() { + let mut cli = Client::new(Uri::from_static("http://test.url")); + let (id, _) = cli.raw_request("ping", &Params::None); + assert_eq!(id, jsonrpc_core::Id::Num(0)); + let (id, req) = cli.raw_request("ping", &Params::None); + assert_eq!(id, jsonrpc_core::Id::Num(1)); + assert_eq!( + req, + r#"{"jsonrpc":"2.0","method":"ping","params":null,"id":1}"# + ); +} diff --git a/spacesvm/src/api/mod.rs b/spacesvm/src/api/mod.rs index 506f3ba0..47059a2c 100644 --- a/spacesvm/src/api/mod.rs +++ b/spacesvm/src/api/mod.rs @@ -1,3 +1,4 @@ +pub mod client; pub mod service; use avalanche_types::ids; diff --git a/spacesvm/src/api/rpc.rs b/spacesvm/src/api/rpc.rs deleted file mode 100644 index 7741abcf..00000000 --- a/spacesvm/src/api/rpc.rs +++ /dev/null @@ -1,10 +0,0 @@ -use serde::{Deserialize, Serialize}; - -#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)] -struct Request { - jsonrpc: String, - method: String, - #[serde(skip_serializing_if = "Option::is_none")] - params: Option, - id: serde_json::Value, -} diff --git a/spacesvm/src/chain/tx/decoder.rs b/spacesvm/src/chain/tx/decoder.rs index e6b73fb3..9a5663be 100644 --- a/spacesvm/src/chain/tx/decoder.rs +++ b/spacesvm/src/chain/tx/decoder.rs @@ -34,7 +34,7 @@ pub struct TypedDataDomain { pub fn mini_kvvm_domain(_m: u64) -> TypedDataDomain { TypedDataDomain { - name: "MiniKvvm".to_string(), + name: "SpacesVm".to_string(), magic: "0x00".to_string(), // radix(m, 10).to_string(), } } diff --git a/spacesvm/tests/common.rs b/spacesvm/tests/common.rs new file mode 100644 index 00000000..06271641 --- /dev/null +++ b/spacesvm/tests/common.rs @@ -0,0 +1,40 @@ +use std::io::{Error, ErrorKind}; + +use avalanche_types::{ + self, + proto::{ + grpcutil::default_server, + rpcdb::database_server::{Database, DatabaseServer}, + }, +}; +use tokio::net::TcpListener; +use tokio_stream::wrappers::TcpListenerStream; +use tonic::transport::Channel; + +pub async fn create_conn() -> Channel { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + Channel::builder(format!("http://{}", addr).parse().unwrap()) + .connect() + .await + .unwrap() +} + +pub async fn serve_test_database( + database: D, + listener: TcpListener, +) -> std::io::Result<()> +where + D: Database, +{ + default_server() + .add_service(DatabaseServer::new(database)) + .serve_with_incoming(TcpListenerStream::new(listener)) + .await + .map_err(|e| { + Error::new( + ErrorKind::Other, + format!("failed to serve test database service: {}", e), + ) + }) +} diff --git a/spacesvm/tests/integration_tests.rs b/spacesvm/tests/integration_tests.rs new file mode 100644 index 00000000..b592fe86 --- /dev/null +++ b/spacesvm/tests/integration_tests.rs @@ -0,0 +1,2 @@ +mod common; +mod vm; diff --git a/spacesvm/tests/vm/client.rs b/spacesvm/tests/vm/client.rs new file mode 100644 index 00000000..326cc983 --- /dev/null +++ b/spacesvm/tests/vm/client.rs @@ -0,0 +1,226 @@ +use std::{ + collections::HashMap, + io::{Error, ErrorKind, Result}, +}; + +use avalanche_types::{self, ids, proto, subnet}; +use prost::bytes::Bytes; +use tokio::{net::TcpListener, sync::mpsc}; +use tonic::transport::Channel; + +/// Test Vm client which interacts with rpcchainvm server service. +pub struct Client { + inner: proto::vm::vm_client::VmClient, + pub stop_ch: tokio::sync::broadcast::Sender<()>, +} + +impl Client { + pub fn new(client_conn: Channel) -> Box { + // Initialize broadcast stop channel used to terminate gRPC servers during shutdown. + let (stop_ch, _): ( + tokio::sync::broadcast::Sender<()>, + tokio::sync::broadcast::Receiver<()>, + ) = tokio::sync::broadcast::channel(1); + + Box::new(Self { + inner: avalanche_types::proto::vm::vm_client::VmClient::new(client_conn), + stop_ch, + }) + } +} + +#[tonic::async_trait] +impl subnet::rpc::common::vm::Vm for Client { + async fn initialize( + &mut self, + _ctx: Option, + _db_manager: Box, + genesis_bytes: &[u8], + _upgrade_bytes: &[u8], + _config_bytes: &[u8], + _to_engine: mpsc::Sender, + _fxs: &[subnet::rpc::common::vm::Fx], + _app_sender: Box, + ) -> Result<()> { + // memdb wrapped in rpcdb + let db = subnet::rpc::database::rpcdb::server::Server::new( + subnet::rpc::database::memdb::Database::new(), + ); + + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + log::info!("starting rpcdb server"); + tokio::spawn(async move { + crate::common::serve_test_database(db, listener) + .await + .unwrap(); + }); + + let versiondb_servers = proto::vm::VersionedDbServer { + server_addr: addr.clone().to_string(), + version: "0.0.7".to_owned(), + }; + + let mut db_servers = Vec::with_capacity(1); + db_servers.push(versiondb_servers); + + let request = proto::vm::InitializeRequest { + network_id: 0, + subnet_id: Bytes::from(ids::Id::empty().to_vec()), + chain_id: Bytes::from(ids::Id::empty().to_vec()), + node_id: Bytes::from(ids::node::Id::empty().to_vec()), + x_chain_id: Bytes::from(ids::Id::empty().to_vec()), + avax_asset_id: Bytes::from(ids::Id::empty().to_vec()), + genesis_bytes: Bytes::from(genesis_bytes.to_vec()), + upgrade_bytes: Bytes::from(""), + config_bytes: Bytes::from(""), + db_servers, + server_addr: addr.to_string(), //dummy + }; + + // in this context we don't care about the response unless its an error + let resp = self.inner.initialize(request).await.map_err(|e| { + Error::new( + ErrorKind::Other, + format!("initialize request failed: {:?}", e), + ) + }); + assert!(resp.is_ok()); + Ok(()) + } + + async fn set_state(&self, _state: subnet::rpc::snow::State) -> Result<()> { + // TODO: + Ok(()) + } + + async fn shutdown(&self) -> Result<()> { + // TODO: + Ok(()) + } + + async fn version(&self) -> Result { + Ok(String::new()) + } + + async fn create_static_handlers( + &mut self, + ) -> Result> { + let resp = self + .inner + .create_static_handlers(proto::google::protobuf::Empty {}) + .await + .map_err(|e| { + Error::new( + ErrorKind::Other, + format!("create static handler request failed: {:?}", e), + ) + })?; + + let resp = resp.into_inner(); + + let mut http_handler: HashMap = + HashMap::new(); + + for h in resp.handlers.iter() { + let lock_option = + subnet::rpc::common::http_handler::LockOptions::try_from(h.lock_options) + .map_err(|_| Error::new(ErrorKind::Other, "invalid lock option"))?; + http_handler.insert( + h.prefix.clone(), + subnet::rpc::common::http_handler::HttpHandler { + lock_option, + handler: None, + server_addr: Some(h.server_addr.clone()), + }, + ); + } + + Ok(http_handler) + } + + async fn create_handlers( + &mut self, + ) -> Result> { + let resp = self + .inner + .create_handlers(proto::google::protobuf::Empty {}) + .await + .map_err(|e| { + Error::new( + ErrorKind::Other, + format!("create handler request failed: {:?}", e), + ) + })?; + + let resp = resp.into_inner(); + + let mut http_handler: HashMap = + HashMap::new(); + + for h in resp.handlers.iter() { + let lock_option = + subnet::rpc::common::http_handler::LockOptions::try_from(h.lock_options) + .map_err(|_| Error::new(ErrorKind::Other, "invalid lock option"))?; + http_handler.insert( + h.prefix.clone(), + subnet::rpc::common::http_handler::HttpHandler { + lock_option, + handler: None, + server_addr: Some(h.server_addr.clone()), + }, + ); + } + + Ok(http_handler) + } +} + +#[tonic::async_trait] +impl subnet::rpc::health::Checkable for Client { + async fn health_check(&self) -> Result> { + // TODO: + Ok(Vec::new()) + } +} + +#[tonic::async_trait] +impl subnet::rpc::common::vm::Connector for Client { + async fn connected(&self, _id: &ids::node::Id) -> Result<()> { + Ok(()) + } + + async fn disconnected(&self, _id: &ids::node::Id) -> Result<()> { + Ok(()) + } +} + +#[tonic::async_trait] +impl subnet::rpc::common::apphandler::AppHandler for Client { + async fn app_request( + &self, + _node_id: &ids::node::Id, + _request_id: u32, + _deadline: chrono::DateTime, + _request: &[u8], + ) -> Result<()> { + Ok(()) + } + + async fn app_request_failed(&self, _node_id: &ids::node::Id, _request_id: u32) -> Result<()> { + Ok(()) + } + + async fn app_response( + &self, + _node_id: &ids::node::Id, + _request_id: u32, + _response: &[u8], + ) -> Result<()> { + Ok(()) + } + + async fn app_gossip(&self, _node_id: &ids::node::Id, _msg: &[u8]) -> Result<()> { + Ok(()) + } +} diff --git a/spacesvm/tests/vm/mod.rs b/spacesvm/tests/vm/mod.rs new file mode 100644 index 00000000..4b7a378b --- /dev/null +++ b/spacesvm/tests/vm/mod.rs @@ -0,0 +1,219 @@ +pub mod client; + +use avalanche_types::{ids, subnet::rpc::utils}; +use jsonrpc_core::Params; +use spacesvm::{ + api::{client::claim_tx, DecodeTxArgs}, + vm::{self, PUBLIC_API_ENDPOINT}, +}; +use std::io::{Error, ErrorKind}; +use tokio::{ + sync::{ + broadcast::{Receiver, Sender}, + mpsc, + }, + time::{sleep, Duration}, +}; +use tonic::transport::Channel; + +#[tokio::test] +async fn test_api() { + // init logger + let _ = env_logger::builder() + .filter_level(log::LevelFilter::Info) + .is_test(true) + .try_init(); + + // setup stop channel for grpc services. + let (stop_ch_tx, stop_ch_rx): (Sender<()>, Receiver<()>) = tokio::sync::broadcast::channel(1); + let vm_server = avalanche_types::subnet::rpc::vm::server::Server::new( + Box::new(vm::ChainVm::new()), + stop_ch_tx, + ); + + // start Vm service + let vm_addr = utils::new_socket_addr(); + tokio::spawn(async move { + avalanche_types::subnet::rpc::plugin::serve_with_address(vm_server, vm_addr, stop_ch_rx) + .await + .expect("failed to start gRPC server"); + }); + log::info!("started subnet vm"); + + // wait for server to start + sleep(Duration::from_millis(100)).await; + + // create gRPC client for Vm client. + let client_conn = Channel::builder(format!("http://{}", vm_addr).parse().unwrap()) + .connect() + .await + .unwrap(); + + let mut vm_client = crate::vm::client::Client::new(client_conn); + + let mut versioned_dbs: Vec< + avalanche_types::subnet::rpc::database::manager::versioned_database::VersionedDatabase, + > = Vec::with_capacity(1); + versioned_dbs.push( + avalanche_types::subnet::rpc::database::manager::versioned_database::VersionedDatabase::new( + avalanche_types::subnet::rpc::database::memdb::Database::new(), + semver::Version::parse("0.0.7").unwrap(), + ), + ); + + let db_manager = + avalanche_types::subnet::rpc::database::manager::DatabaseManager::new_from_databases( + Vec::new(), + ); + let app_sender = MockAppSender::new(); + let (tx_engine, mut rx_engine): ( + mpsc::Sender, + mpsc::Receiver, + ) = mpsc::channel(1); + + tokio::spawn(async move { + while let Some(msg) = rx_engine.recv().await { + log::info!("engine received message: {:?}", msg); + } + }); + + let genesis_bytes = + "{\"author\":\"subnet creator\",\"welcome_message\":\"Hello from Rust VM!\"}".as_bytes(); + + let resp = vm_client + .initialize( + None, + db_manager, + genesis_bytes, + &[], + &[], + tx_engine, + &[()], + app_sender, + ) + .await; + + assert!(resp.is_ok()); + + // call create_handlers. + let resp = vm_client.create_handlers().await; + assert!(resp.is_ok()); + + let handlers = resp.unwrap(); + + // get the "/public" handler we assume it exists because it was created during initialize. + let handler = handlers.get(PUBLIC_API_ENDPOINT).unwrap(); + + let http_addr = handler.clone().server_addr.as_ref().unwrap(); + + // create client for http service which was started during create_handlers RPC. + let client_conn = Channel::builder(format!("http://{}", http_addr).parse().unwrap()) + .connect() + .await + .unwrap(); + + let mut client = spacesvm::api::client::Client::new(http::Uri::from_static("http://test.url")); + + // ping + let (_id, json_str) = client.raw_request("ping", &Params::None); + let req = http::request::Builder::new() + .body(json_str.as_bytes().to_vec()) + .unwrap(); + + // pass the http request to the serve_http_simple RPC. this same process + // takes place when the avalanchego router passes a request to the + // subnet process. this process also simulates a raw JSON request from + // curl or postman. + log::info!("sending http request over grpc"); + let mut http_client = avalanche_types::subnet::rpc::http::client::Client::new(client_conn); + let resp = http_client.serve_http_simple(req).await.map_err(|e| { + Error::new( + ErrorKind::Other, + format!("failed to initialize vm: {:?}", e), + ) + }); + + assert!(resp.is_ok()); + let resp = resp.unwrap(); + let body = std::str::from_utf8(&resp.body()).unwrap(); + log::info!("ping response {}", body); + + let tx_data = claim_tx("test_claim".to_owned()); + let arg_bytes = serde_json::to_value(&DecodeTxArgs { tx_data }).unwrap(); + + let (_id, json_str) = client.raw_request("decodeTx", &Params::Array(vec![arg_bytes])); + let req = http::request::Builder::new() + .body(json_str.as_bytes().to_vec()) + .unwrap(); + let resp = http_client.serve_http_simple(req).await.map_err(|e| { + Error::new( + ErrorKind::Other, + format!("failed to initialize vm: {:?}", e), + ) + }); + assert!(resp.is_ok()); + let resp = resp.unwrap(); + let body = std::str::from_utf8(&resp.body()).unwrap(); + log::info!("decode response {}", body); + + // TODO shutdown; +} + +#[derive(Clone)] +struct MockAppSender; + +impl MockAppSender { + fn new() -> Box { + Box::new(MockAppSender {}) + } +} + +#[tonic::async_trait] +impl avalanche_types::subnet::rpc::common::appsender::AppSender for MockAppSender { + async fn send_app_request( + &self, + _node_ids: ids::node::Set, + _request_id: u32, + _request: Vec, + ) -> std::io::Result<()> { + Ok(()) + } + + async fn send_app_response( + &self, + _node_if: ids::node::Id, + _request_id: u32, + _response: Vec, + ) -> std::io::Result<()> { + Ok(()) + } + + async fn send_app_gossip(&self, _msg: Vec) -> std::io::Result<()> { + Ok(()) + } + + async fn send_app_gossip_specific( + &self, + _node_ids: ids::node::Set, + _msg: Vec, + ) -> std::io::Result<()> { + Ok(()) + } + + async fn send_cross_chain_app_request( + &self, + _chain_id: ids::Id, + _request_id: u32, + _app_request_bytes: Vec, + ) -> std::io::Result<()> { + Ok(()) + } + async fn send_cross_chain_app_response( + &self, + _chain_id: ids::Id, + _request_id: u32, + _app_response_bytes: Vec, + ) -> std::io::Result<()> { + Ok(()) + } +}