Skip to content

FFI support for versions and alternate tokio runtimes #13937

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Feb 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ jobs:
with:
rust-version: stable
- name: Prepare cargo build
run: cargo check --profile ci --all-targets
run: cargo check --profile ci --all-targets --features integration-tests

# cargo check common, functions and substrait with no default features
linux-cargo-check-no-default-features:
Expand Down Expand Up @@ -92,8 +92,8 @@ jobs:
- name: Check workspace in debug mode
run: cargo check --profile ci --all-targets --workspace

- name: Check workspace with avro,json features
run: cargo check --profile ci --workspace --benches --features avro,json
- name: Check workspace with additional features
run: cargo check --profile ci --workspace --benches --features avro,json,integration-tests

- name: Check Cargo.lock for datafusion-cli
run: |
Expand Down Expand Up @@ -185,7 +185,7 @@ jobs:
with:
rust-version: stable
- name: Run tests (excluding doctests)
run: cargo test --profile ci --exclude datafusion-examples --exclude datafusion-benchmarks --workspace --lib --tests --bins --features avro,json,backtrace
run: cargo test --profile ci --exclude datafusion-examples --exclude ffi_example_table_provider --exclude datafusion-benchmarks --workspace --lib --tests --bins --features avro,json,backtrace,integration-tests
- name: Verify Working Directory Clean
run: git diff --exit-code

Expand Down Expand Up @@ -417,7 +417,7 @@ jobs:
- name: Run tests (excluding doctests)
shell: bash
run: |
cargo test --profile ci --lib --tests --bins --features avro,json,backtrace
cargo test --profile ci --lib --tests --bins --features avro,json,backtrace,integration-tests
cd datafusion-cli
cargo test --profile ci --lib --tests --bins --all-features

Expand Down
2 changes: 1 addition & 1 deletion ci/scripts/rust_clippy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@
# under the License.

set -ex
cargo clippy --all-targets --workspace --features avro,pyarrow -- -D warnings
cargo clippy --all-targets --workspace --features avro,pyarrow,integration-tests -- -D warnings
cd datafusion-cli
cargo clippy --all-targets --all-features -- -D warnings
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ extern "C" fn construct_simple_table_provider() -> FFI_TableProvider {

let table_provider = MemTable::try_new(schema, vec![batches]).unwrap();

FFI_TableProvider::new(Arc::new(table_provider), true)
FFI_TableProvider::new(Arc::new(table_provider), true, None)
}

#[export_root_module]
Expand Down
9 changes: 8 additions & 1 deletion datafusion/ffi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,25 @@ workspace = true
[lib]
name = "datafusion_ffi"
path = "src/lib.rs"
crate-type = ["cdylib", "rlib"]

[dependencies]
abi_stable = "0.11.3"
arrow = { workspace = true, features = ["ffi"] }
arrow-array = { workspace = true }
arrow-schema = { workspace = true }
async-ffi = { version = "0.5.0", features = ["abi_stable"] }
async-trait = { workspace = true }
datafusion = { workspace = true, default-features = false }
datafusion-proto = { workspace = true }
futures = { workspace = true }
log = { workspace = true }
prost = { workspace = true }
semver = "1.0.24"
tokio = { workspace = true }

[dev-dependencies]
doc-comment = { workspace = true }
tokio = { workspace = true }

[features]
integration-tests = []
30 changes: 24 additions & 6 deletions datafusion/ffi/src/execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use datafusion::{
execution::{SendableRecordBatchStream, TaskContext},
physical_plan::{DisplayAs, ExecutionPlan, PlanProperties},
};
use tokio::runtime::Runtime;

use crate::{
plan_properties::FFI_PlanProperties, record_batch_stream::FFI_RecordBatchStream,
Expand Down Expand Up @@ -71,6 +72,7 @@ unsafe impl Sync for FFI_ExecutionPlan {}
pub struct ExecutionPlanPrivateData {
pub plan: Arc<dyn ExecutionPlan>,
pub context: Arc<TaskContext>,
pub runtime: Option<Arc<Runtime>>,
}

unsafe extern "C" fn properties_fn_wrapper(
Expand All @@ -88,11 +90,14 @@ unsafe extern "C" fn children_fn_wrapper(
let private_data = plan.private_data as *const ExecutionPlanPrivateData;
let plan = &(*private_data).plan;
let ctx = &(*private_data).context;
let runtime = &(*private_data).runtime;

let children: Vec<_> = plan
.children()
.into_iter()
.map(|child| FFI_ExecutionPlan::new(Arc::clone(child), Arc::clone(ctx)))
.map(|child| {
FFI_ExecutionPlan::new(Arc::clone(child), Arc::clone(ctx), runtime.clone())
})
.collect();

children.into()
Expand All @@ -105,9 +110,10 @@ unsafe extern "C" fn execute_fn_wrapper(
let private_data = plan.private_data as *const ExecutionPlanPrivateData;
let plan = &(*private_data).plan;
let ctx = &(*private_data).context;
let runtime = (*private_data).runtime.as_ref().map(Arc::clone);

match plan.execute(partition, Arc::clone(ctx)) {
Ok(rbs) => RResult::ROk(rbs.into()),
Ok(rbs) => RResult::ROk(FFI_RecordBatchStream::new(rbs, runtime)),
Err(e) => RResult::RErr(
format!("Error occurred during FFI_ExecutionPlan execute: {}", e).into(),
),
Expand All @@ -129,7 +135,11 @@ unsafe extern "C" fn clone_fn_wrapper(plan: &FFI_ExecutionPlan) -> FFI_Execution
let private_data = plan.private_data as *const ExecutionPlanPrivateData;
let plan_data = &(*private_data);

FFI_ExecutionPlan::new(Arc::clone(&plan_data.plan), Arc::clone(&plan_data.context))
FFI_ExecutionPlan::new(
Arc::clone(&plan_data.plan),
Arc::clone(&plan_data.context),
plan_data.runtime.clone(),
)
}

impl Clone for FFI_ExecutionPlan {
Expand All @@ -140,8 +150,16 @@ impl Clone for FFI_ExecutionPlan {

impl FFI_ExecutionPlan {
/// This function is called on the provider's side.
pub fn new(plan: Arc<dyn ExecutionPlan>, context: Arc<TaskContext>) -> Self {
let private_data = Box::new(ExecutionPlanPrivateData { plan, context });
pub fn new(
plan: Arc<dyn ExecutionPlan>,
context: Arc<TaskContext>,
runtime: Option<Arc<Runtime>>,
) -> Self {
let private_data = Box::new(ExecutionPlanPrivateData {
plan,
context,
runtime,
});

Self {
properties: properties_fn_wrapper,
Expand Down Expand Up @@ -357,7 +375,7 @@ mod tests {
let original_plan = Arc::new(EmptyExec::new(schema));
let original_name = original_plan.name().to_string();

let local_plan = FFI_ExecutionPlan::new(original_plan, ctx.task_ctx());
let local_plan = FFI_ExecutionPlan::new(original_plan, ctx.task_ctx(), None);

let foreign_plan: ForeignExecutionPlan = (&local_plan).try_into()?;

Expand Down
13 changes: 13 additions & 0 deletions datafusion/ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,18 @@ pub mod session_config;
pub mod table_provider;
pub mod table_source;

#[cfg(feature = "integration-tests")]
pub mod tests;

/// Returns the major version of the FFI implementation. If the API evolves,
/// we use the major version to identify compatibility over the unsafe
/// boundary. This call is intended to be used by implementers to validate
/// they have compatible libraries.
pub extern "C" fn version() -> u64 {
let version_str = env!("CARGO_PKG_VERSION");
let version = semver::Version::parse(version_str).expect("Invalid version string");
version.major
}

#[cfg(doctest)]
doc_comment::doctest!("../README.md", readme_example_test);
28 changes: 24 additions & 4 deletions datafusion/ffi/src/record_batch_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::{ffi::c_void, task::Poll};
use std::{ffi::c_void, sync::Arc, task::Poll};

use abi_stable::{
std_types::{ROption, RResult, RString},
Expand All @@ -33,6 +33,7 @@ use datafusion::{
execution::{RecordBatchStream, SendableRecordBatchStream},
};
use futures::{Stream, TryStreamExt};
use tokio::runtime::Runtime;

use crate::arrow_wrappers::{WrappedArray, WrappedSchema};

Expand All @@ -58,20 +59,36 @@ pub struct FFI_RecordBatchStream {
pub private_data: *mut c_void,
}

pub struct RecordBatchStreamPrivateData {
pub rbs: SendableRecordBatchStream,
pub runtime: Option<Arc<Runtime>>,
}

impl From<SendableRecordBatchStream> for FFI_RecordBatchStream {
fn from(stream: SendableRecordBatchStream) -> Self {
Self::new(stream, None)
}
}

impl FFI_RecordBatchStream {
pub fn new(stream: SendableRecordBatchStream, runtime: Option<Arc<Runtime>>) -> Self {
let private_data = Box::into_raw(Box::new(RecordBatchStreamPrivateData {
rbs: stream,
runtime,
})) as *mut c_void;
FFI_RecordBatchStream {
poll_next: poll_next_fn_wrapper,
schema: schema_fn_wrapper,
private_data: Box::into_raw(Box::new(stream)) as *mut c_void,
private_data,
}
}
}

unsafe impl Send for FFI_RecordBatchStream {}

unsafe extern "C" fn schema_fn_wrapper(stream: &FFI_RecordBatchStream) -> WrappedSchema {
let stream = stream.private_data as *const SendableRecordBatchStream;
let private_data = stream.private_data as *const RecordBatchStreamPrivateData;
let stream = &(*private_data).rbs;

(*stream).schema().into()
}
Expand Down Expand Up @@ -106,7 +123,10 @@ unsafe extern "C" fn poll_next_fn_wrapper(
stream: &FFI_RecordBatchStream,
cx: &mut FfiContext,
) -> FfiPoll<ROption<RResult<WrappedArray, RString>>> {
let stream = stream.private_data as *mut SendableRecordBatchStream;
let private_data = stream.private_data as *mut RecordBatchStreamPrivateData;
let stream = &mut (*private_data).rbs;

let _guard = (*private_data).runtime.as_ref().map(|rt| rt.enter());

let poll_result = cx.with_context(|std_cx| {
(*stream)
Expand Down
21 changes: 18 additions & 3 deletions datafusion/ffi/src/table_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use datafusion_proto::{
protobuf::LogicalExprList,
};
use prost::Message;
use tokio::runtime::Runtime;

use crate::{
arrow_wrappers::WrappedSchema,
Expand Down Expand Up @@ -139,6 +140,9 @@ pub struct FFI_TableProvider {
/// Release the memory of the private data when it is no longer being used.
pub release: unsafe extern "C" fn(arg: &mut Self),

/// Return the major DataFusion version number of this provider.
pub version: unsafe extern "C" fn() -> u64,

/// Internal data. This is only to be accessed by the provider of the plan.
/// A [`ForeignExecutionPlan`] should never attempt to access this data.
pub private_data: *mut c_void,
Expand All @@ -149,6 +153,7 @@ unsafe impl Sync for FFI_TableProvider {}

struct ProviderPrivateData {
provider: Arc<dyn TableProvider + Send>,
runtime: Option<Arc<Runtime>>,
}

unsafe extern "C" fn schema_fn_wrapper(provider: &FFI_TableProvider) -> WrappedSchema {
Expand Down Expand Up @@ -216,6 +221,7 @@ unsafe extern "C" fn scan_fn_wrapper(
let private_data = provider.private_data as *mut ProviderPrivateData;
let internal_provider = &(*private_data).provider;
let session_config = session_config.clone();
let runtime = &(*private_data).runtime;

async move {
let config = match ForeignSessionConfig::try_from(&session_config) {
Expand Down Expand Up @@ -261,7 +267,11 @@ unsafe extern "C" fn scan_fn_wrapper(
Err(e) => return RResult::RErr(e.to_string().into()),
};

RResult::ROk(FFI_ExecutionPlan::new(plan, ctx.task_ctx()))
RResult::ROk(FFI_ExecutionPlan::new(
plan,
ctx.task_ctx(),
runtime.clone(),
))
}
.into_ffi()
}
Expand All @@ -273,9 +283,11 @@ unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_TableProvider) {

unsafe extern "C" fn clone_fn_wrapper(provider: &FFI_TableProvider) -> FFI_TableProvider {
let old_private_data = provider.private_data as *const ProviderPrivateData;
let runtime = (*old_private_data).runtime.clone();

let private_data = Box::into_raw(Box::new(ProviderPrivateData {
provider: Arc::clone(&(*old_private_data).provider),
runtime,
})) as *mut c_void;

FFI_TableProvider {
Expand All @@ -285,6 +297,7 @@ unsafe extern "C" fn clone_fn_wrapper(provider: &FFI_TableProvider) -> FFI_Table
supports_filters_pushdown: provider.supports_filters_pushdown,
clone: clone_fn_wrapper,
release: release_fn_wrapper,
version: super::version,
private_data,
}
}
Expand All @@ -300,8 +313,9 @@ impl FFI_TableProvider {
pub fn new(
provider: Arc<dyn TableProvider + Send>,
can_support_pushdown_filters: bool,
runtime: Option<Arc<Runtime>>,
) -> Self {
let private_data = Box::new(ProviderPrivateData { provider });
let private_data = Box::new(ProviderPrivateData { provider, runtime });

Self {
schema: schema_fn_wrapper,
Expand All @@ -313,6 +327,7 @@ impl FFI_TableProvider {
},
clone: clone_fn_wrapper,
release: release_fn_wrapper,
version: super::version,
private_data: Box::into_raw(private_data) as *mut c_void,
}
}
Expand Down Expand Up @@ -463,7 +478,7 @@ mod tests {
let provider =
Arc::new(MemTable::try_new(schema, vec![vec![batch1], vec![batch2]])?);

let ffi_provider = FFI_TableProvider::new(provider, true);
let ffi_provider = FFI_TableProvider::new(provider, true, None);

let foreign_table_provider: ForeignTableProvider = (&ffi_provider).into();

Expand Down
Loading