Skip to content

Consolidate Example: dataframe_output.rs into dataframe.rs #13877

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions datafusion-examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ cargo run --example dataframe
- [`custom_datasource.rs`](examples/custom_datasource.rs): Run queries against a custom datasource (TableProvider)
- [`custom_file_format.rs`](examples/custom_file_format.rs): Write data to a custom file format
- [`dataframe-to-s3.rs`](examples/external_dependency/dataframe-to-s3.rs): Run a query using a DataFrame against a parquet file from s3 and writing back to s3
- [`dataframe.rs`](examples/dataframe.rs): Run a query using a DataFrame API against parquet files, csv files, and in-memory data
- [`dataframe_output.rs`](examples/dataframe_output.rs): Examples of methods which write data out from a DataFrame
- [`dataframe.rs`](examples/dataframe.rs): Run a query using a DataFrame API against parquet files, csv files, and in-memory data. Also demonstrates the various methods to write out a DataFrame to a table, parquet file, csv file, and json file.
- [`deserialize_to_struct.rs`](examples/deserialize_to_struct.rs): Convert query results into rust structs using serde
- [`expr_api.rs`](examples/expr_api.rs): Create, execute, simplify, analyze and coerce `Expr`s
- [`file_stream_provider.rs`](examples/file_stream_provider.rs): Run a query on `FileStreamProvider` which implements `StreamProvider` for reading and writing to arbitrary stream sources / sinks.
Expand Down
67 changes: 67 additions & 0 deletions datafusion-examples/examples/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@

use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::dataframe::DataFrameWriteOptions;
use datafusion::error::Result;
use datafusion::prelude::*;
use datafusion_common::config::CsvOptions;
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::DataFusionError;
use std::fs::File;
use std::io::Write;
use std::sync::Arc;
Expand All @@ -29,13 +33,19 @@ use tempfile::tempdir;
/// * [read_parquet]: execute queries against parquet files
/// * [read_csv]: execute queries against csv files
/// * [read_memory]: execute queries against in-memory arrow data
///
/// This example demonstrates the various methods to write out a DataFrame to local storage.
/// See datafusion-examples/examples/external_dependency/dataframe-to-s3.rs for an example
/// using a remote object store.
/// * [write_out]: write out a DataFrame to a table, parquet file, csv file, or json file
#[tokio::main]
async fn main() -> Result<()> {
// The SessionContext is the main high level API for interacting with DataFusion
let ctx = SessionContext::new();
read_parquet(&ctx).await?;
read_csv(&ctx).await?;
read_memory(&ctx).await?;
write_out(&ctx).await?;
Ok(())
}

Expand Down Expand Up @@ -139,3 +149,60 @@ async fn read_memory(ctx: &SessionContext) -> Result<()> {

Ok(())
}

/// Use the DataFrame API to:
/// 1. Write out a DataFrame to a table
/// 2. Write out a DataFrame to a parquet file
/// 3. Write out a DataFrame to a csv file
/// 4. Write out a DataFrame to a json file
async fn write_out(ctx: &SessionContext) -> std::result::Result<(), DataFusionError> {
let mut df = ctx.sql("values ('a'), ('b'), ('c')").await.unwrap();

// Ensure the column names and types match the target table
df = df.with_column_renamed("column1", "tablecol1").unwrap();

ctx.sql(
"create external table
test(tablecol1 varchar)
stored as parquet
location './datafusion-examples/test_table/'",
)
.await?
.collect()
.await?;

// This is equivalent to INSERT INTO test VALUES ('a'), ('b'), ('c').
// The behavior of write_table depends on the TableProvider's implementation
// of the insert_into method.
df.clone()
.write_table("test", DataFrameWriteOptions::new())
.await?;

df.clone()
.write_parquet(
"./datafusion-examples/test_parquet/",
DataFrameWriteOptions::new(),
None,
)
.await?;

df.clone()
.write_csv(
"./datafusion-examples/test_csv/",
// DataFrameWriteOptions contains options which control how data is written
// such as compression codec
DataFrameWriteOptions::new(),
Some(CsvOptions::default().with_compression(CompressionTypeVariant::GZIP)),
)
.await?;

df.clone()
.write_json(
"./datafusion-examples/test_json/",
DataFrameWriteOptions::new(),
None,
)
.await?;

Ok(())
}
78 changes: 0 additions & 78 deletions datafusion-examples/examples/dataframe_output.rs

This file was deleted.

Loading