Skip to content

Commit 8ec5100

Browse files
committed
[task #8987]add_to_date_function
Signed-off-by: tangruilin <[email protected]>
1 parent ca37ce3 commit 8ec5100

File tree

12 files changed

+328
-2
lines changed

12 files changed

+328
-2
lines changed
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::sync::Arc;
19+
20+
use datafusion::arrow::array::StringArray;
21+
use datafusion::arrow::datatypes::{DataType, Field, Schema};
22+
use datafusion::arrow::record_batch::RecordBatch;
23+
use datafusion::error::Result;
24+
use datafusion::prelude::*;
25+
26+
/// This example demonstrates how to use the to_date series
27+
/// of functions in the DataFrame API as well as via sql.
28+
#[tokio::main]
29+
async fn main() -> Result<()> {
30+
// define a schema.
31+
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, false)]));
32+
33+
// define data.
34+
let batch = RecordBatch::try_new(
35+
schema,
36+
vec![Arc::new(StringArray::from(vec![
37+
"2020-09-08T13:42:29Z",
38+
"2020-09-08T13:42:29.190855-05:00",
39+
"2020-08-09 12:13:29",
40+
"2020-01-02",
41+
]))],
42+
)?;
43+
44+
// declare a new context. In spark API, this corresponds to a new spark SQLsession
45+
let ctx = SessionContext::new();
46+
47+
// declare a table in memory. In spark API, this corresponds to createDataFrame(...).
48+
ctx.register_batch("t", batch)?;
49+
let df = ctx.table("t").await?;
50+
51+
// use to_date function to convert col 'a' to timestamp type using the default parsing
52+
let df = df.with_column("a", to_date(vec![col("a")]))?;
53+
54+
let df = df.select_columns(&["a"])?;
55+
56+
// print the results
57+
df.show().await?;
58+
59+
Ok(())
60+
}

datafusion/common/src/scalar/mod.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ use arrow::{
4545
compute::kernels::cast::{cast_with_options, CastOptions},
4646
datatypes::{
4747
i256, ArrowDictionaryKeyType, ArrowNativeType, ArrowTimestampType, DataType,
48-
Field, Float32Type, Int16Type, Int32Type, Int64Type, Int8Type,
48+
Date32Type, Field, Float32Type, Int16Type, Int32Type, Int64Type, Int8Type,
4949
IntervalDayTimeType, IntervalMonthDayNanoType, IntervalUnit,
5050
IntervalYearMonthType, TimeUnit, TimestampMicrosecondType,
5151
TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
@@ -3317,6 +3317,12 @@ impl ScalarType<i64> for TimestampNanosecondType {
33173317
}
33183318
}
33193319

3320+
impl ScalarType<i32> for Date32Type {
3321+
fn scalar(r: Option<i32>) -> ScalarValue {
3322+
ScalarValue::Date32(r)
3323+
}
3324+
}
3325+
33203326
#[cfg(test)]
33213327
mod tests {
33223328
use std::cmp::Ordering;

datafusion/expr/src/built_in_function.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,8 @@ pub enum BuiltinScalarFunction {
275275
ToTimestampSeconds,
276276
/// from_unixtime
277277
FromUnixtime,
278+
/// to_date
279+
ToDate,
278280
///now
279281
Now,
280282
///current_date
@@ -472,6 +474,7 @@ impl BuiltinScalarFunction {
472474
BuiltinScalarFunction::Upper => Volatility::Immutable,
473475
BuiltinScalarFunction::Struct => Volatility::Immutable,
474476
BuiltinScalarFunction::FromUnixtime => Volatility::Immutable,
477+
BuiltinScalarFunction::ToDate => Volatility::Immutable,
475478
BuiltinScalarFunction::ArrowTypeof => Volatility::Immutable,
476479
BuiltinScalarFunction::OverLay => Volatility::Immutable,
477480
BuiltinScalarFunction::Levenshtein => Volatility::Immutable,
@@ -781,6 +784,7 @@ impl BuiltinScalarFunction {
781784
BuiltinScalarFunction::ToTimestampMicros => Ok(Timestamp(Microsecond, None)),
782785
BuiltinScalarFunction::ToTimestampSeconds => Ok(Timestamp(Second, None)),
783786
BuiltinScalarFunction::FromUnixtime => Ok(Timestamp(Second, None)),
787+
BuiltinScalarFunction::ToDate => Ok(Date32),
784788
BuiltinScalarFunction::Now => {
785789
Ok(Timestamp(Nanosecond, Some("+00:00".into())))
786790
}
@@ -1058,6 +1062,7 @@ impl BuiltinScalarFunction {
10581062
BuiltinScalarFunction::FromUnixtime => {
10591063
Signature::uniform(1, vec![Int64], self.volatility())
10601064
}
1065+
BuiltinScalarFunction::ToDate => Signature::variadic_any(self.volatility()),
10611066
BuiltinScalarFunction::Digest => Signature::one_of(
10621067
vec![
10631068
Exact(vec![Utf8, Utf8]),
@@ -1490,6 +1495,7 @@ impl BuiltinScalarFunction {
14901495
BuiltinScalarFunction::ToTimestampSeconds => &["to_timestamp_seconds"],
14911496
BuiltinScalarFunction::ToTimestampNanos => &["to_timestamp_nanos"],
14921497
BuiltinScalarFunction::FromUnixtime => &["from_unixtime"],
1498+
BuiltinScalarFunction::ToDate => &["to_date"],
14931499

14941500
// hashing functions
14951501
BuiltinScalarFunction::Digest => &["digest"],

datafusion/expr/src/expr_fn.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -881,6 +881,11 @@ scalar_expr!(
881881
datetime format,
882882
"converts a date, time, timestamp or duration to a string based on the provided format"
883883
);
884+
nary_scalar_expr!(
885+
ToDate,
886+
to_date,
887+
"converts string to date according to the given format"
888+
);
884889
nary_scalar_expr!(
885890
ToTimestamp,
886891
to_timestamp,

datafusion/physical-expr/src/datetime_expressions.rs

Lines changed: 110 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ use datafusion_common::cast::{
5454
as_timestamp_nanosecond_array, as_timestamp_second_array,
5555
};
5656
use datafusion_common::{
57-
exec_err, not_impl_err, DataFusionError, Result, ScalarType, ScalarValue,
57+
exec_err, internal_datafusion_err, not_impl_err, DataFusionError, Result, ScalarType,
58+
ScalarValue,
5859
};
5960
use datafusion_expr::ColumnarValue;
6061

@@ -424,6 +425,84 @@ fn to_timestamp_impl<T: ArrowTimestampType + ScalarType<i64>>(
424425
}
425426
}
426427

428+
/// # Examples
429+
///
430+
/// ```ignore
431+
/// # use std::sync::Arc;
432+
433+
/// # use datafusion::arrow::array::StringArray;
434+
/// # use datafusion::arrow::datatypes::{DataType, Field, Schema};
435+
/// # use datafusion::arrow::record_batch::RecordBatch;
436+
/// # use datafusion::error::Result;
437+
/// # use datafusion::prelude::*;
438+
439+
/// # #[tokio::main]
440+
/// # async fn main() -> Result<()> {
441+
/// // define a schema.
442+
/// let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, false)]));
443+
444+
/// // define data.
445+
/// let batch = RecordBatch::try_new(
446+
/// schema,
447+
/// vec![Arc::new(StringArray::from(vec![
448+
/// "2020-09-08T13:42:29Z",
449+
/// "2020-09-08T13:42:29.190855-05:00",
450+
/// "2020-08-09 12:13:29",
451+
/// "2020-01-02",
452+
/// ]))],
453+
/// )?;
454+
455+
/// // declare a new context. In spark API, this corresponds to a new spark SQLsession
456+
/// let ctx = SessionContext::new();
457+
458+
/// // declare a table in memory. In spark API, this corresponds to createDataFrame(...).
459+
/// ctx.register_batch("t", batch)?;
460+
/// let df = ctx.table("t").await?;
461+
462+
/// // use to_date function to convert col 'a' to timestamp type using the default parsing
463+
/// let df = df.with_column("a", to_date(vec![col("a")]))?;
464+
465+
/// let df = df.select_columns(&["a"])?;
466+
467+
/// // print the results
468+
/// df.show().await?;
469+
470+
/// # Ok(())
471+
/// # }
472+
/// ```
473+
pub fn to_date(args: &[ColumnarValue]) -> Result<ColumnarValue> {
474+
match args.len() {
475+
1 => handle::<Date32Type, _, Date32Type>(
476+
args,
477+
|s| {
478+
string_to_timestamp_nanos_shim(s)
479+
.map(|n| n / (1_000_000 * 24 * 60 * 60 * 1_000))
480+
.and_then(|v| {
481+
v.try_into().map_err(|_| {
482+
internal_datafusion_err!("Unable to cast to Date32 for converting from i64 to i32 failed")
483+
})
484+
})
485+
},
486+
"to_date",
487+
),
488+
n if n >= 2 => handle_multiple::<Date32Type, _, Date32Type, _>(
489+
args,
490+
|s, format| {
491+
string_to_timestamp_nanos_formatted(s, format)
492+
.map(|n| n / (1_000_000 * 24 * 60 * 60 * 1_000))
493+
.and_then(|v| {
494+
v.try_into().map_err(|_| {
495+
internal_datafusion_err!("Unable to cast to Date32 for converting from i64 to i32 failed")
496+
})
497+
})
498+
},
499+
|n| n,
500+
"to_date",
501+
),
502+
_ => exec_err!("Unsupported 0 argument count for function to_date"),
503+
}
504+
}
505+
427506
/// to_timestamp SQL function
428507
///
429508
/// Note: `to_timestamp` returns `Timestamp(Nanosecond)` though its arguments are interpreted as **seconds**.
@@ -1567,6 +1646,36 @@ fn validate_to_timestamp_data_types(
15671646
None
15681647
}
15691648

1649+
/// to_date SQL function implementation
1650+
pub fn to_date_invoke(args: &[ColumnarValue]) -> Result<ColumnarValue> {
1651+
if args.is_empty() {
1652+
return exec_err!(
1653+
"to_date function requires 1 or more arguments, got {}",
1654+
args.len()
1655+
);
1656+
}
1657+
1658+
// validate that any args after the first one are Utf8
1659+
if args.len() > 1 {
1660+
if let Some(value) = validate_to_timestamp_data_types(args, "to_date") {
1661+
return value;
1662+
}
1663+
}
1664+
1665+
match args[0].data_type() {
1666+
DataType::Int32
1667+
| DataType::Int64
1668+
| DataType::Null
1669+
| DataType::Float64
1670+
| DataType::Date32
1671+
| DataType::Date64 => cast_column(&args[0], &DataType::Date32, None),
1672+
DataType::Utf8 => to_date(args),
1673+
other => {
1674+
exec_err!("Unsupported data type {:?} for function to_date", other)
1675+
}
1676+
}
1677+
}
1678+
15701679
/// to_timestamp() SQL function implementation
15711680
pub fn to_timestamp_invoke(args: &[ColumnarValue]) -> Result<ColumnarValue> {
15721681
if args.is_empty() {

datafusion/physical-expr/src/functions.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -523,6 +523,7 @@ pub fn create_physical_fun(
523523
BuiltinScalarFunction::FromUnixtime => {
524524
Arc::new(datetime_expressions::from_unixtime_invoke)
525525
}
526+
BuiltinScalarFunction::ToDate => Arc::new(datetime_expressions::to_date_invoke),
526527
BuiltinScalarFunction::InitCap => Arc::new(|args| match args[0].data_type() {
527528
DataType::Utf8 => {
528529
make_scalar_function_inner(string_expressions::initcap::<i32>)(args)

datafusion/proto/proto/datafusion.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
/*
2+
23
* Licensed to the Apache Software Foundation (ASF) under one
34
* or more contributor license agreements. See the NOTICE file
45
* distributed with this work for additional information
@@ -682,6 +683,7 @@ enum ScalarFunction {
682683
ArrayReverse = 134;
683684
RegexpLike = 135;
684685
ToChar = 136;
686+
ToDate = 137;
685687
}
686688

687689
message ScalarFunctionNode {

datafusion/proto/src/generated/pbjson.rs

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/generated/prost.rs

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/logical_plan/from_proto.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -570,6 +570,7 @@ impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction {
570570
ScalarFunction::Levenshtein => Self::Levenshtein,
571571
ScalarFunction::SubstrIndex => Self::SubstrIndex,
572572
ScalarFunction::FindInSet => Self::FindInSet,
573+
ScalarFunction::ToDate => Self::ToDate,
573574
}
574575
}
575576
}
@@ -1810,6 +1811,16 @@ pub fn parse_expr(
18101811
ScalarFunction::StructFun => {
18111812
Ok(struct_fun(parse_expr(&args[0], registry)?))
18121813
}
1814+
ScalarFunction::ToDate => {
1815+
let args: Vec<_> = args
1816+
.iter()
1817+
.map(|expr| parse_expr(expr, registry))
1818+
.collect::<std::result::Result<_, _>>()?;
1819+
Ok(Expr::ScalarFunction(expr::ScalarFunction::new(
1820+
BuiltinScalarFunction::ToDate,
1821+
args,
1822+
)))
1823+
}
18131824
}
18141825
}
18151826
ExprType::ScalarUdfExpr(protobuf::ScalarUdfExprNode { fun_name, args }) => {

datafusion/proto/src/logical_plan/to_proto.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1549,6 +1549,7 @@ impl TryFrom<&BuiltinScalarFunction> for protobuf::ScalarFunction {
15491549
BuiltinScalarFunction::Levenshtein => Self::Levenshtein,
15501550
BuiltinScalarFunction::SubstrIndex => Self::SubstrIndex,
15511551
BuiltinScalarFunction::FindInSet => Self::FindInSet,
1552+
BuiltinScalarFunction::ToDate => Self::ToDate,
15521553
};
15531554

15541555
Ok(scalar_function)

0 commit comments

Comments
 (0)