Skip to content

Commit 09765db

Browse files
authored
feat: Bump hive_metastore to use pure rust thrift impl volo (#174)
1 parent 390cd51 commit 09765db

File tree

4 files changed

+59
-47
lines changed

4 files changed

+59
-47
lines changed

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,5 +67,6 @@ typed-builder = "^0.18"
6767
url = "2"
6868
urlencoding = "2"
6969
uuid = "1.6.1"
70-
70+
volo-thrift = "0.9.2"
71+
hive_metastore = "0.0.2"
7172
tera = "1"

crates/catalog/hms/Cargo.toml

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,9 @@ license = { workspace = true }
2828
keywords = ["iceberg", "hive", "catalog"]
2929

3030
[dependencies]
31+
anyhow = { workspace = true }
3132
async-trait = { workspace = true }
32-
hive_metastore = "0.0.1"
33+
hive_metastore = { workspace = true }
3334
iceberg = { workspace = true }
34-
# the thrift upstream suffered from no regular rust release.
35-
#
36-
# [test-rs](https://github.com/tent-rs) is an organization that helps resolves this
37-
# issue. And [tent-thrift](https://github.com/tent-rs/thrift) is a fork of the thrift
38-
# crate, built from the thrift upstream with only version bumped.
39-
thrift = { package = "tent-thrift", version = "0.18.1" }
4035
typed-builder = { workspace = true }
36+
volo-thrift = { workspace = true }

crates/catalog/hms/src/catalog.rs

Lines changed: 38 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,16 @@
1717

1818
use super::utils::*;
1919
use async_trait::async_trait;
20-
use hive_metastore::{TThriftHiveMetastoreSyncClient, ThriftHiveMetastoreSyncClient};
20+
use hive_metastore::ThriftHiveMetastoreClient;
21+
use hive_metastore::ThriftHiveMetastoreClientBuilder;
2122
use iceberg::table::Table;
22-
use iceberg::{Catalog, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, TableIdent};
23+
use iceberg::{
24+
Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation,
25+
TableIdent,
26+
};
2327
use std::collections::HashMap;
2428
use std::fmt::{Debug, Formatter};
25-
use std::sync::{Arc, Mutex};
26-
use thrift::protocol::{TBinaryInputProtocol, TBinaryOutputProtocol};
27-
use thrift::transport::{
28-
ReadHalf, TBufferedReadTransport, TBufferedWriteTransport, TIoChannel, WriteHalf,
29-
};
29+
use std::net::ToSocketAddrs;
3030
use typed_builder::TypedBuilder;
3131

3232
/// Hive metastore Catalog configuration.
@@ -35,24 +35,7 @@ pub struct HmsCatalogConfig {
3535
address: String,
3636
}
3737

38-
/// TODO: We only support binary protocol for now.
39-
type HmsClientType = ThriftHiveMetastoreSyncClient<
40-
TBinaryInputProtocol<TBufferedReadTransport<ReadHalf<thrift::transport::TTcpChannel>>>,
41-
TBinaryOutputProtocol<TBufferedWriteTransport<WriteHalf<thrift::transport::TTcpChannel>>>,
42-
>;
43-
44-
/// # TODO
45-
///
46-
/// we are using the same connection everytime, we should support connection
47-
/// pool in the future.
48-
struct HmsClient(Arc<Mutex<HmsClientType>>);
49-
50-
impl HmsClient {
51-
fn call<T>(&self, f: impl FnOnce(&mut HmsClientType) -> thrift::Result<T>) -> Result<T> {
52-
let mut client = self.0.lock().unwrap();
53-
f(&mut client).map_err(from_thrift_error)
54-
}
55-
}
38+
struct HmsClient(ThriftHiveMetastoreClient);
5639

5740
/// Hive metastore Catalog.
5841
pub struct HmsCatalog {
@@ -71,19 +54,29 @@ impl Debug for HmsCatalog {
7154
impl HmsCatalog {
7255
/// Create a new hms catalog.
7356
pub fn new(config: HmsCatalogConfig) -> Result<Self> {
74-
let mut channel = thrift::transport::TTcpChannel::new();
75-
channel
76-
.open(config.address.as_str())
77-
.map_err(from_thrift_error)?;
78-
let (i_chan, o_chan) = channel.split().map_err(from_thrift_error)?;
79-
let i_chan = TBufferedReadTransport::new(i_chan);
80-
let o_chan = TBufferedWriteTransport::new(o_chan);
81-
let i_proto = TBinaryInputProtocol::new(i_chan, true);
82-
let o_proto = TBinaryOutputProtocol::new(o_chan, true);
83-
let client = ThriftHiveMetastoreSyncClient::new(i_proto, o_proto);
57+
let address = config
58+
.address
59+
.as_str()
60+
.to_socket_addrs()
61+
.map_err(from_io_error)?
62+
.next()
63+
.ok_or_else(|| {
64+
Error::new(
65+
ErrorKind::Unexpected,
66+
format!("invalid address: {}", config.address),
67+
)
68+
})?;
69+
70+
let client = ThriftHiveMetastoreClientBuilder::new("hms")
71+
.address(address)
72+
// Framed thrift rpc is not enabled by default in HMS, we use
73+
// buffered instead.
74+
.make_codec(volo_thrift::codec::default::DefaultMakeCodec::buffered())
75+
.build();
76+
8477
Ok(Self {
8578
config,
86-
client: HmsClient(Arc::new(Mutex::new(client))),
79+
client: HmsClient(client),
8780
})
8881
}
8982
}
@@ -103,10 +96,17 @@ impl Catalog for HmsCatalog {
10396
let dbs = if parent.is_some() {
10497
return Ok(vec![]);
10598
} else {
106-
self.client.call(|client| client.get_all_databases())?
99+
self.client
100+
.0
101+
.get_all_databases()
102+
.await
103+
.map_err(from_thrift_error)?
107104
};
108105

109-
Ok(dbs.into_iter().map(NamespaceIdent::new).collect())
106+
Ok(dbs
107+
.into_iter()
108+
.map(|v| NamespaceIdent::new(v.into()))
109+
.collect())
110110
}
111111

112112
async fn create_namespace(

crates/catalog/hms/src/utils.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,28 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use anyhow::anyhow;
1819
use iceberg::{Error, ErrorKind};
20+
use std::fmt::Debug;
21+
use std::io;
1922

2023
/// Format a thrift error into iceberg error.
21-
pub fn from_thrift_error(error: thrift::Error) -> Error {
24+
pub fn from_thrift_error<T>(error: volo_thrift::error::ResponseError<T>) -> Error
25+
where
26+
T: Debug,
27+
{
2228
Error::new(
2329
ErrorKind::Unexpected,
2430
"operation failed for hitting thrift error".to_string(),
2531
)
32+
.with_source(anyhow!("thrift error: {:?}", error))
33+
}
34+
35+
/// Format an io error into iceberg error.
36+
pub fn from_io_error(error: io::Error) -> Error {
37+
Error::new(
38+
ErrorKind::Unexpected,
39+
"operation failed for hitting io error".to_string(),
40+
)
2641
.with_source(error)
2742
}

0 commit comments

Comments
 (0)