Skip to content

Commit e5adc84

Browse files
committed
[task #8987]add_to_date_function
Signed-off-by: tangruilin <[email protected]>
1 parent d4357ec commit e5adc84

File tree

12 files changed

+360
-2
lines changed

12 files changed

+360
-2
lines changed
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::sync::Arc;
19+
20+
use datafusion::arrow::array::StringArray;
21+
use datafusion::arrow::datatypes::{DataType, Field, Schema};
22+
use datafusion::arrow::record_batch::RecordBatch;
23+
use datafusion::error::Result;
24+
use datafusion::prelude::*;
25+
26+
/// This example demonstrates how to use the to_timestamp series
27+
/// of functions in the DataFrame API as well as via sql.
28+
#[tokio::main]
29+
async fn main() -> Result<()> {
30+
// define a schema.
31+
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, false)]));
32+
33+
// define data.
34+
let batch = RecordBatch::try_new(
35+
schema,
36+
vec![Arc::new(StringArray::from(vec![
37+
"2020-09-08T13:42:29Z",
38+
"2020-09-08T13:42:29.190855-05:00",
39+
"2020-08-09 12:13:29",
40+
"2020-01-02",
41+
]))],
42+
)?;
43+
44+
// declare a new context. In spark API, this corresponds to a new spark SQLsession
45+
let ctx = SessionContext::new();
46+
47+
// declare a table in memory. In spark API, this corresponds to createDataFrame(...).
48+
ctx.register_batch("t", batch)?;
49+
let df = ctx.table("t").await?;
50+
51+
// use to_timestamp function to convert col 'a' to timestamp type using the default parsing
52+
let df = df.with_column("a", to_date(vec![col("a")]))?;
53+
54+
let df = df.select_columns(&["a"])?;
55+
56+
// print the results
57+
df.show().await?;
58+
59+
Ok(())
60+
}

datafusion/common/src/scalar/mod.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ use arrow::{
4545
compute::kernels::cast::{cast_with_options, CastOptions},
4646
datatypes::{
4747
i256, ArrowDictionaryKeyType, ArrowNativeType, ArrowTimestampType, DataType,
48-
Field, Float32Type, Int16Type, Int32Type, Int64Type, Int8Type,
48+
Date32Type, Field, Float32Type, Int16Type, Int32Type, Int64Type, Int8Type,
4949
IntervalDayTimeType, IntervalMonthDayNanoType, IntervalUnit,
5050
IntervalYearMonthType, TimeUnit, TimestampMicrosecondType,
5151
TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
@@ -3293,6 +3293,12 @@ impl ScalarType<i64> for TimestampNanosecondType {
32933293
}
32943294
}
32953295

3296+
impl ScalarType<i32> for Date32Type {
3297+
fn scalar(r: Option<i32>) -> ScalarValue {
3298+
ScalarValue::Date32(r)
3299+
}
3300+
}
3301+
32963302
#[cfg(test)]
32973303
mod tests {
32983304
use std::cmp::Ordering;

datafusion/expr/src/built_in_function.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,8 @@ pub enum BuiltinScalarFunction {
284284
ToTimestampSeconds,
285285
/// from_unixtime
286286
FromUnixtime,
287+
/// to_date
288+
ToDate,
287289
///now
288290
Now,
289291
///current_date
@@ -486,6 +488,7 @@ impl BuiltinScalarFunction {
486488
BuiltinScalarFunction::Upper => Volatility::Immutable,
487489
BuiltinScalarFunction::Struct => Volatility::Immutable,
488490
BuiltinScalarFunction::FromUnixtime => Volatility::Immutable,
491+
BuiltinScalarFunction::ToDate => Volatility::Immutable,
489492
BuiltinScalarFunction::ArrowTypeof => Volatility::Immutable,
490493
BuiltinScalarFunction::OverLay => Volatility::Immutable,
491494
BuiltinScalarFunction::Levenshtein => Volatility::Immutable,
@@ -797,6 +800,7 @@ impl BuiltinScalarFunction {
797800
BuiltinScalarFunction::ToTimestampMicros => Ok(Timestamp(Microsecond, None)),
798801
BuiltinScalarFunction::ToTimestampSeconds => Ok(Timestamp(Second, None)),
799802
BuiltinScalarFunction::FromUnixtime => Ok(Timestamp(Second, None)),
803+
BuiltinScalarFunction::ToDate => Ok(Date32),
800804
BuiltinScalarFunction::Now => {
801805
Ok(Timestamp(Nanosecond, Some("+00:00".into())))
802806
}
@@ -1096,6 +1100,7 @@ impl BuiltinScalarFunction {
10961100
BuiltinScalarFunction::FromUnixtime => {
10971101
Signature::uniform(1, vec![Int64], self.volatility())
10981102
}
1103+
BuiltinScalarFunction::ToDate => Signature::variadic_any(self.volatility()),
10991104
BuiltinScalarFunction::Digest => Signature::one_of(
11001105
vec![
11011106
Exact(vec![Utf8, Utf8]),
@@ -1544,6 +1549,7 @@ impl BuiltinScalarFunction {
15441549
BuiltinScalarFunction::ToTimestampSeconds => &["to_timestamp_seconds"],
15451550
BuiltinScalarFunction::ToTimestampNanos => &["to_timestamp_nanos"],
15461551
BuiltinScalarFunction::FromUnixtime => &["from_unixtime"],
1552+
BuiltinScalarFunction::ToDate => &["to_date"],
15471553

15481554
// hashing functions
15491555
BuiltinScalarFunction::Digest => &["digest"],

datafusion/expr/src/expr_fn.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -895,6 +895,11 @@ scalar_expr!(
895895
datetime format,
896896
"converts a date, time, timestamp or duration to a string based on the provided format"
897897
);
898+
nary_scalar_expr!(
899+
ToDate,
900+
to_date,
901+
"converts string to date according to the given format"
902+
);
898903
nary_scalar_expr!(
899904
ToTimestamp,
900905
to_timestamp,

datafusion/physical-expr/src/datetime_expressions.rs

Lines changed: 110 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ use datafusion_common::cast::{
5454
as_timestamp_nanosecond_array, as_timestamp_second_array,
5555
};
5656
use datafusion_common::{
57-
exec_err, not_impl_err, DataFusionError, Result, ScalarType, ScalarValue,
57+
exec_err, internal_datafusion_err, not_impl_err, DataFusionError, Result, ScalarType,
58+
ScalarValue,
5859
};
5960
use datafusion_expr::ColumnarValue;
6061

@@ -424,6 +425,84 @@ fn to_timestamp_impl<T: ArrowTimestampType + ScalarType<i64>>(
424425
}
425426
}
426427

428+
/// # Examples
429+
///
430+
/// ```
431+
/// # use std::sync::Arc;
432+
433+
/// # use datafusion::arrow::array::StringArray;
434+
/// # use datafusion::arrow::datatypes::{DataType, Field, Schema};
435+
/// # use datafusion::arrow::record_batch::RecordBatch;
436+
/// # use datafusion::error::Result;
437+
/// # use datafusion::prelude::*;
438+
439+
/// # #[tokio::main]
440+
/// # async fn main() -> Result<()> {
441+
/// // define a schema.
442+
/// let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, false)]));
443+
444+
/// // define data.
445+
/// let batch = RecordBatch::try_new(
446+
/// schema,
447+
/// vec![Arc::new(StringArray::from(vec![
448+
/// "2020-09-08T13:42:29Z",
449+
/// "2020-09-08T13:42:29.190855-05:00",
450+
/// "2020-08-09 12:13:29",
451+
/// "2020-01-02",
452+
/// ]))],
453+
/// )?;
454+
455+
/// // declare a new context. In spark API, this corresponds to a new spark SQLsession
456+
/// let ctx = SessionContext::new();
457+
458+
/// // declare a table in memory. In spark API, this corresponds to createDataFrame(...).
459+
/// ctx.register_batch("t", batch)?;
460+
/// let df = ctx.table("t").await?;
461+
462+
/// // use to_timestamp function to convert col 'a' to timestamp type using the default parsing
463+
/// let df = df.with_column("a", to_date(vec![col("a")]))?;
464+
465+
/// let df = df.select_columns(&["a"])?;
466+
467+
/// // print the results
468+
/// df.show().await?;
469+
470+
/// Ok(())
471+
/// }
472+
/// ```
473+
pub fn to_date(args: &[ColumnarValue]) -> Result<ColumnarValue> {
474+
match args.len() {
475+
1 => handle::<Date32Type, _, Date32Type>(
476+
args,
477+
|s| {
478+
string_to_timestamp_nanos_shim(s)
479+
.map(|n| n / (1_000_000 * 24 * 60 * 60 * 1_000))
480+
.and_then(|v| {
481+
v.try_into().map_err(|_| {
482+
internal_datafusion_err!("Unable to cast to Date32 for converting from i64 to i32 failed")
483+
})
484+
})
485+
},
486+
"to_date",
487+
),
488+
n if n >= 2 => handle_multiple::<Date32Type, _, Date32Type, _>(
489+
args,
490+
|s, format| {
491+
string_to_timestamp_nanos_formatted(s, format)
492+
.map(|n| n / (1_000_000 * 24 * 60 * 60 * 1_000))
493+
.and_then(|v| {
494+
v.try_into().map_err(|_| {
495+
internal_datafusion_err!("Unable to cast to Date32 for converting from i64 to i32 failed")
496+
})
497+
})
498+
},
499+
|n| n,
500+
"to_date",
501+
),
502+
_ => exec_err!("Unsupported 0 argument count for function to_date"),
503+
}
504+
}
505+
427506
/// to_timestamp SQL function
428507
///
429508
/// Note: `to_timestamp` returns `Timestamp(Nanosecond)` though its arguments are interpreted as **seconds**.
@@ -1567,6 +1646,36 @@ fn validate_to_timestamp_data_types(
15671646
None
15681647
}
15691648

1649+
/// to_date SQL function implementation
1650+
pub fn to_date_invoke(args: &[ColumnarValue]) -> Result<ColumnarValue> {
1651+
if args.is_empty() {
1652+
return exec_err!(
1653+
"to_date function requires 1 or more arguments, got {}",
1654+
args.len()
1655+
);
1656+
}
1657+
1658+
// validate that any args after the first one are Utf8
1659+
if args.len() > 1 {
1660+
if let Some(value) = validate_to_timestamp_data_types(args, "to_date") {
1661+
return value;
1662+
}
1663+
}
1664+
1665+
match args[0].data_type() {
1666+
DataType::Int32
1667+
| DataType::Int64
1668+
| DataType::Null
1669+
| DataType::Float64
1670+
| DataType::Date32
1671+
| DataType::Date64 => cast_column(&args[0], &DataType::Date32, None),
1672+
DataType::Utf8 => to_date(args),
1673+
other => {
1674+
exec_err!("Unsupported data type {:?} for function to_date", other)
1675+
}
1676+
}
1677+
}
1678+
15701679
/// to_timestamp() SQL function implementation
15711680
pub fn to_timestamp_invoke(args: &[ColumnarValue]) -> Result<ColumnarValue> {
15721681
if args.is_empty() {

datafusion/physical-expr/src/functions.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -531,6 +531,7 @@ pub fn create_physical_fun(
531531
BuiltinScalarFunction::FromUnixtime => {
532532
Arc::new(datetime_expressions::from_unixtime_invoke)
533533
}
534+
BuiltinScalarFunction::ToDate => Arc::new(datetime_expressions::to_date_invoke),
534535
BuiltinScalarFunction::InitCap => Arc::new(|args| match args[0].data_type() {
535536
DataType::Utf8 => {
536537
make_scalar_function_inner(string_expressions::initcap::<i32>)(args)

datafusion/proto/proto/datafusion.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
/*
2+
23
* Licensed to the Apache Software Foundation (ASF) under one
34
* or more contributor license agreements. See the NOTICE file
45
* distributed with this work for additional information
@@ -679,6 +680,7 @@ enum ScalarFunction {
679680
ArrayReverse = 134;
680681
RegexpLike = 135;
681682
ToChar = 136;
683+
ToDate = 137;
682684
}
683685

684686
message ScalarFunctionNode {

datafusion/proto/src/generated/pbjson.rs

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/generated/prost.rs

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/logical_plan/from_proto.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -574,6 +574,7 @@ impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction {
574574
ScalarFunction::Levenshtein => Self::Levenshtein,
575575
ScalarFunction::SubstrIndex => Self::SubstrIndex,
576576
ScalarFunction::FindInSet => Self::FindInSet,
577+
ScalarFunction::ToDate => Self::ToDate,
577578
}
578579
}
579580
}
@@ -1822,6 +1823,16 @@ pub fn parse_expr(
18221823
ScalarFunction::StructFun => {
18231824
Ok(struct_fun(parse_expr(&args[0], registry)?))
18241825
}
1826+
ScalarFunction::ToDate => {
1827+
let args: Vec<_> = args
1828+
.iter()
1829+
.map(|expr| parse_expr(expr, registry))
1830+
.collect::<std::result::Result<_, _>>()?;
1831+
Ok(Expr::ScalarFunction(expr::ScalarFunction::new(
1832+
BuiltinScalarFunction::ToDate,
1833+
args,
1834+
)))
1835+
}
18251836
}
18261837
}
18271838
ExprType::ScalarUdfExpr(protobuf::ScalarUdfExprNode { fun_name, args }) => {

datafusion/proto/src/logical_plan/to_proto.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1552,6 +1552,7 @@ impl TryFrom<&BuiltinScalarFunction> for protobuf::ScalarFunction {
15521552
BuiltinScalarFunction::Levenshtein => Self::Levenshtein,
15531553
BuiltinScalarFunction::SubstrIndex => Self::SubstrIndex,
15541554
BuiltinScalarFunction::FindInSet => Self::FindInSet,
1555+
BuiltinScalarFunction::ToDate => Self::ToDate,
15551556
};
15561557

15571558
Ok(scalar_function)

0 commit comments

Comments
 (0)