From 16375ed6047e7d9094fe891e5b28b8e5b4da4a57 Mon Sep 17 00:00:00 2001 From: Timothy Maloney Date: Mon, 12 Aug 2024 20:57:52 -0700 Subject: [PATCH 01/10] feat(timestamp_ns): first commit --- crates/catalog/glue/src/schema.rs | 2 ++ crates/catalog/hms/src/schema.rs | 2 ++ crates/iceberg/src/arrow/schema.rs | 7 +++++++ crates/iceberg/src/avro/schema.rs | 3 +++ crates/iceberg/src/spec/datatypes.rs | 10 ++++++++++ crates/iceberg/src/spec/manifest.rs | 11 +++++++++++ 6 files changed, 35 insertions(+) diff --git a/crates/catalog/glue/src/schema.rs b/crates/catalog/glue/src/schema.rs index c349219f6..bb676e36e 100644 --- a/crates/catalog/glue/src/schema.rs +++ b/crates/catalog/glue/src/schema.rs @@ -164,6 +164,8 @@ impl SchemaVisitor for GlueSchemaBuilder { PrimitiveType::Double => "double".to_string(), PrimitiveType::Date => "date".to_string(), PrimitiveType::Timestamp => "timestamp".to_string(), + PrimitiveType::TimestampNs => "timestamp_ns".to_string(), + PrimitiveType::TimestamptzNs => "timestamptz_ns".to_string(), PrimitiveType::Time | PrimitiveType::String | PrimitiveType::Uuid => { "string".to_string() } diff --git a/crates/catalog/hms/src/schema.rs b/crates/catalog/hms/src/schema.rs index fa7819d62..4012098c2 100644 --- a/crates/catalog/hms/src/schema.rs +++ b/crates/catalog/hms/src/schema.rs @@ -121,6 +121,8 @@ impl SchemaVisitor for HiveSchemaBuilder { PrimitiveType::Double => "double".to_string(), PrimitiveType::Date => "date".to_string(), PrimitiveType::Timestamp => "timestamp".to_string(), + PrimitiveType::TimestampNs => "timestamp_ns".to_string(), + PrimitiveType::TimestamptzNs => "timestamptz_ns".to_string(), PrimitiveType::Time | PrimitiveType::String | PrimitiveType::Uuid => { "string".to_string() } diff --git a/crates/iceberg/src/arrow/schema.rs b/crates/iceberg/src/arrow/schema.rs index a69605e90..80a8d2846 100644 --- a/crates/iceberg/src/arrow/schema.rs +++ b/crates/iceberg/src/arrow/schema.rs @@ -577,6 +577,13 @@ impl SchemaVisitor for ToArrowSchemaConverter { // Timestampz always stored as UTC DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())), )), + crate::spec::PrimitiveType::TimestampNs => Ok(ArrowSchemaOrFieldOrType::Type( + DataType::Timestamp(TimeUnit::Nanosecond, None), + )), + crate::spec::PrimitiveType::TimestamptzNs => Ok(ArrowSchemaOrFieldOrType::Type( + // Store timestamptz_ns as UTC + DataType::Timestamp(TimeUnit::Nanosecond, Some("+00.00".into())), + )), crate::spec::PrimitiveType::String => { Ok(ArrowSchemaOrFieldOrType::Type(DataType::Utf8)) } diff --git a/crates/iceberg/src/avro/schema.rs b/crates/iceberg/src/avro/schema.rs index 7f8142745..cfcf38dea 100644 --- a/crates/iceberg/src/avro/schema.rs +++ b/crates/iceberg/src/avro/schema.rs @@ -229,6 +229,8 @@ impl SchemaVisitor for SchemaToAvroSchema { PrimitiveType::Time => AvroSchema::TimeMicros, PrimitiveType::Timestamp => AvroSchema::TimestampMicros, PrimitiveType::Timestamptz => AvroSchema::TimestampMicros, + PrimitiveType::TimestampNs => AvroSchema::TimestampNanos, + PrimitiveType::TimestamptzNs => AvroSchema::TimestampNanos, PrimitiveType::String => AvroSchema::String, PrimitiveType::Uuid => avro_fixed_schema(UUID_BYTES, Some(UUID_LOGICAL_TYPE))?, PrimitiveType::Fixed(len) => avro_fixed_schema((*len) as usize, None)?, @@ -519,6 +521,7 @@ impl AvroSchemaVisitor for AvroSchemaToSchema { AvroSchema::Date => Type::Primitive(PrimitiveType::Date), AvroSchema::TimeMicros => Type::Primitive(PrimitiveType::Time), AvroSchema::TimestampMicros => Type::Primitive(PrimitiveType::Timestamp), + AvroSchema::TimestampNanos => Type::Primitive(PrimitiveType::TimestampNs), AvroSchema::Boolean => Type::Primitive(PrimitiveType::Boolean), AvroSchema::Int => Type::Primitive(PrimitiveType::Int), AvroSchema::Long => Type::Primitive(PrimitiveType::Long), diff --git a/crates/iceberg/src/spec/datatypes.rs b/crates/iceberg/src/spec/datatypes.rs index d8883878e..8b166618c 100644 --- a/crates/iceberg/src/spec/datatypes.rs +++ b/crates/iceberg/src/spec/datatypes.rs @@ -225,6 +225,10 @@ pub enum PrimitiveType { Timestamp, /// Timestamp in microsecond precision, with timezone Timestamptz, + /// Timestamp in nanosecond precision, without timezone + TimestampNs, + /// Timestamp in nanosecond precision with timezone + TimestamptzNs, /// Arbitrary-length character sequences encoded in utf-8 String, /// Universally Unique Identifiers, should use 16-byte fixed @@ -250,6 +254,8 @@ impl PrimitiveType { | (PrimitiveType::Time, PrimitiveLiteral::Time(_)) | (PrimitiveType::Timestamp, PrimitiveLiteral::Timestamp(_)) | (PrimitiveType::Timestamptz, PrimitiveLiteral::Timestamptz(_)) + | (PrimitiveType::TimestampNs, PrimitiveLiteral::TimestampNs()) + | (PrimitiveType::TimestamptzNs, PrimitiveLiteral::TimestamptzNs()) | (PrimitiveType::String, PrimitiveLiteral::String(_)) | (PrimitiveType::Uuid, PrimitiveLiteral::Uuid(_)) | (PrimitiveType::Fixed(_), PrimitiveLiteral::Fixed(_)) @@ -360,6 +366,8 @@ impl fmt::Display for PrimitiveType { PrimitiveType::Time => write!(f, "time"), PrimitiveType::Timestamp => write!(f, "timestamp"), PrimitiveType::Timestamptz => write!(f, "timestamptz"), + PrimitiveType::TimestampNs => write!(f, "timestamp_ns"), + PrimitiveType::TimestamptzNs => write!(f, "timestamptz_ns"), PrimitiveType::String => write!(f, "string"), PrimitiveType::Uuid => write!(f, "uuid"), PrimitiveType::Fixed(size) => write!(f, "fixed({})", size), @@ -1141,6 +1149,8 @@ mod tests { PrimitiveType::Time, PrimitiveType::Timestamp, PrimitiveType::Timestamptz, + PrimitiveType::TimestampNs, + PrimitiveType::TimestamptzNs, PrimitiveType::String, PrimitiveType::Uuid, PrimitiveType::Fixed(8), diff --git a/crates/iceberg/src/spec/manifest.rs b/crates/iceberg/src/spec/manifest.rs index e2f8251c1..8fcd6f036 100644 --- a/crates/iceberg/src/spec/manifest.rs +++ b/crates/iceberg/src/spec/manifest.rs @@ -1499,6 +1499,7 @@ mod tests { use std::sync::Arc; use tempfile::TempDir; + use tokio::io::unix::AsyncFdTryNewError; use super::*; use crate::io::FileIOBuilder; @@ -1570,6 +1571,11 @@ mod tests { "v_ts_ntz", Type::Primitive(PrimitiveType::Timestamp), )), + Arc::new(NestedField::optional( + 12, + "v_ts_ns_ntz", + Type::Primitive(PrimitiveType::TimestampNs + ))), ]) .build() .unwrap(), @@ -1678,6 +1684,11 @@ mod tests { "v_ts_ntz", Type::Primitive(PrimitiveType::Timestamp), )), + Arc::new(NestedField::optional( + 12, + "v_ts_ns_ntz", + Type::Primitive(PrimitiveType::TimestampNs + ))) ]) .build() .unwrap(), From 1c2e50c469acb54a608ec23208df4fe925a9ac5f Mon Sep 17 00:00:00 2001 From: Timothy Maloney Date: Tue, 13 Aug 2024 08:24:09 -0700 Subject: [PATCH 02/10] feat(timestamp_ns): Add mappings for timestamp_ns/timestamptz_ns --- crates/iceberg/src/spec/datatypes.rs | 7 ++- crates/iceberg/src/spec/transform.rs | 11 +++-- crates/iceberg/src/spec/values.rs | 45 ++++++++++++++++++- crates/iceberg/src/transform/bucket.rs | 4 +- crates/iceberg/src/transform/identity.rs | 4 +- crates/iceberg/src/transform/temporal.rs | 10 ++++- crates/iceberg/src/transform/truncate.rs | 4 +- .../src/writer/file_writer/parquet_writer.rs | 16 ++++++- 8 files changed, 89 insertions(+), 12 deletions(-) diff --git a/crates/iceberg/src/spec/datatypes.rs b/crates/iceberg/src/spec/datatypes.rs index 8b166618c..688d7fd85 100644 --- a/crates/iceberg/src/spec/datatypes.rs +++ b/crates/iceberg/src/spec/datatypes.rs @@ -254,8 +254,11 @@ impl PrimitiveType { | (PrimitiveType::Time, PrimitiveLiteral::Time(_)) | (PrimitiveType::Timestamp, PrimitiveLiteral::Timestamp(_)) | (PrimitiveType::Timestamptz, PrimitiveLiteral::Timestamptz(_)) - | (PrimitiveType::TimestampNs, PrimitiveLiteral::TimestampNs()) - | (PrimitiveType::TimestamptzNs, PrimitiveLiteral::TimestamptzNs()) + | (PrimitiveType::TimestampNs, PrimitiveLiteral::TimestampNs(_)) + | ( + PrimitiveType::TimestamptzNs, + PrimitiveLiteral::TimestamptzNs(_) + ) | (PrimitiveType::String, PrimitiveLiteral::String(_)) | (PrimitiveType::Uuid, PrimitiveLiteral::Uuid(_)) | (PrimitiveType::Fixed(_), PrimitiveLiteral::Fixed(_)) diff --git a/crates/iceberg/src/spec/transform.rs b/crates/iceberg/src/spec/transform.rs index 54e2105ff..92de09392 100644 --- a/crates/iceberg/src/spec/transform.rs +++ b/crates/iceberg/src/spec/transform.rs @@ -159,6 +159,8 @@ impl Transform { | PrimitiveType::Time | PrimitiveType::Timestamp | PrimitiveType::Timestamptz + | PrimitiveType::TimestampNs + | PrimitiveType::TimestamptzNs | PrimitiveType::String | PrimitiveType::Uuid | PrimitiveType::Fixed(_) @@ -200,6 +202,8 @@ impl Transform { match p { PrimitiveType::Timestamp | PrimitiveType::Timestamptz + | PrimitiveType::TimestampNs + | PrimitiveType::TimestamptzNs | PrimitiveType::Date => Ok(Type::Primitive(PrimitiveType::Date)), _ => Err(Error::new( ErrorKind::DataInvalid, @@ -216,9 +220,10 @@ impl Transform { Transform::Hour => { if let Type::Primitive(p) = input_type { match p { - PrimitiveType::Timestamp | PrimitiveType::Timestamptz => { - Ok(Type::Primitive(PrimitiveType::Int)) - } + PrimitiveType::Timestamp + | PrimitiveType::Timestamptz + | PrimitiveType::TimestampNs + | PrimitiveType::TimestamptzNs => Ok(Type::Primitive(PrimitiveType::Int)), _ => Err(Error::new( ErrorKind::DataInvalid, format!("{input_type} is not a valid input type of {self} transform",), diff --git a/crates/iceberg/src/spec/values.rs b/crates/iceberg/src/spec/values.rs index 8c2e4abe3..f513cae19 100644 --- a/crates/iceberg/src/spec/values.rs +++ b/crates/iceberg/src/spec/values.rs @@ -38,6 +38,7 @@ use serde::ser::SerializeStruct; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; use serde_json::{Map as JsonMap, Number, Value as JsonValue}; +use timestamp::nanoseconds_to_datetime; use uuid::Uuid; use super::datatypes::{PrimitiveType, Type}; @@ -45,7 +46,7 @@ use crate::error::Result; use crate::spec::values::date::{date_from_naive_date, days_to_date, unix_epoch}; use crate::spec::values::time::microseconds_to_time; use crate::spec::values::timestamp::microseconds_to_datetime; -use crate::spec::values::timestamptz::microseconds_to_datetimetz; +use crate::spec::values::timestamptz::{microseconds_to_datetimetz, nanoseconds_to_datetimetz}; use crate::spec::MAX_DECIMAL_PRECISION; use crate::{ensure_data_valid, Error, ErrorKind}; @@ -73,6 +74,10 @@ pub enum PrimitiveLiteral { Timestamp(i64), /// Timestamp with timezone Timestamptz(i64), + /// Timestamp in nanosecond precision without timezone + TimestampNs(i64), + /// Timestamp in nanosecond precision with timezone + TimestamptzNs(i64), /// UTF-8 bytes (without length) String(String), /// 16-byte big-endian value @@ -332,6 +337,12 @@ impl Display for Datum { (_, PrimitiveLiteral::Timestamptz(val)) => { write!(f, "{}", microseconds_to_datetimetz(*val)) } + (_, PrimitiveLiteral::TimestampNs(val)) => { + write!(f, "{}", nanoseconds_to_datetime(*val)) + } + (_, PrimitiveLiteral::TimestamptzNs(val)) => { + write!(f, "{}", nanoseconds_to_datetimetz(*val)) + } (_, PrimitiveLiteral::String(val)) => write!(f, r#""{}""#, val), (_, PrimitiveLiteral::Uuid(val)) => write!(f, "{}", val), (_, PrimitiveLiteral::Fixed(val)) => display_bytes(val, f), @@ -406,6 +417,12 @@ impl Datum { PrimitiveType::Timestamptz => { PrimitiveLiteral::Timestamptz(i64::from_le_bytes(bytes.try_into()?)) } + PrimitiveType::TimestampNs => { + PrimitiveLiteral::TimestampNs(i64::from_le_bytes(bytes.try_into()?)) + } + PrimitiveType::TimestamptzNs => { + PrimitiveLiteral::TimestampNs(i64::from_le_bytes(bytes.try_into()?)) + } PrimitiveType::String => { PrimitiveLiteral::String(std::str::from_utf8(bytes)?.to_string()) } @@ -442,6 +459,8 @@ impl Datum { PrimitiveLiteral::Time(val) => ByteBuf::from(val.to_le_bytes()), PrimitiveLiteral::Timestamp(val) => ByteBuf::from(val.to_le_bytes()), PrimitiveLiteral::Timestamptz(val) => ByteBuf::from(val.to_le_bytes()), + PrimitiveLiteral::TimestampNs(val) => ByteBuf::from(val.to_be_bytes()), + PrimitiveLiteral::TimestamptzNs(val) => ByteBuf::from(val.to_be_bytes()), PrimitiveLiteral::String(val) => ByteBuf::from(val.as_bytes()), PrimitiveLiteral::Uuid(val) => ByteBuf::from(val.as_u128().to_be_bytes()), PrimitiveLiteral::Fixed(val) => ByteBuf::from(val.as_slice()), @@ -1805,6 +1824,16 @@ impl Literal { .format("%Y-%m-%dT%H:%M:%S%.f+00:00") .to_string(), )), + PrimitiveLiteral::TimestampNs(val) => Ok(JsonValue::String( + timestamp::nanoseconds_to_datetime(val) + .format("%Y-%m-%dT%H:%M:%S%.f") + .to_string(), + )), + PrimitiveLiteral::TimestamptzNs(val) => Ok(JsonValue::String( + timestamptz::microseconds_to_datetimetz(val) + .format("%Y-%m-%dT%H:%M:%S%.f+00:00") + .to_string(), + )), PrimitiveLiteral::String(val) => Ok(JsonValue::String(val.clone())), PrimitiveLiteral::Uuid(val) => Ok(JsonValue::String(val.to_string())), PrimitiveLiteral::Fixed(val) => Ok(JsonValue::String(val.iter().fold( @@ -1891,6 +1920,8 @@ impl Literal { PrimitiveLiteral::Date(any) => Box::new(any), PrimitiveLiteral::Time(any) => Box::new(any), PrimitiveLiteral::Timestamp(any) => Box::new(any), + PrimitiveLiteral::TimestampNs(any) => Box::new(any), + PrimitiveLiteral::TimestamptzNs(any) => Box::new(any), PrimitiveLiteral::Timestamptz(any) => Box::new(any), PrimitiveLiteral::Fixed(any) => Box::new(any), PrimitiveLiteral::Binary(any) => Box::new(any), @@ -1962,6 +1993,10 @@ mod timestamp { // This shouldn't fail until the year 262000 DateTime::from_timestamp_micros(micros).unwrap().naive_utc() } + + pub(crate) fn nanoseconds_to_datetime(nanos: i64) -> NaiveDateTime { + DateTime::from_timestamp_nanos(nanos).naive_utc() + } } mod timestamptz { @@ -1976,6 +2011,12 @@ mod timestamptz { DateTime::from_timestamp(secs, rem as u32 * 1_000).unwrap() } + + pub(crate) fn nanoseconds_to_datetimetz(nanos: i64) -> DateTime { + let (secs, rem) = (nanos / 1_000_000_000, nanos % 1_000_000_000); + + DateTime::from_timestamp(secs, rem as u32 * 1_000).unwrap() + } } mod _serde { @@ -2201,6 +2242,8 @@ mod _serde { super::PrimitiveLiteral::Time(v) => RawLiteralEnum::Long(v), super::PrimitiveLiteral::Timestamp(v) => RawLiteralEnum::Long(v), super::PrimitiveLiteral::Timestamptz(v) => RawLiteralEnum::Long(v), + super::PrimitiveLiteral::TimestampNs(v) => RawLiteralEnum::Long(v), + super::PrimitiveLiteral::TimestamptzNs(v) => RawLiteralEnum::Long(v), super::PrimitiveLiteral::String(v) => RawLiteralEnum::String(v), super::PrimitiveLiteral::Uuid(v) => { RawLiteralEnum::Bytes(ByteBuf::from(v.as_u128().to_be_bytes())) diff --git a/crates/iceberg/src/transform/bucket.rs b/crates/iceberg/src/transform/bucket.rs index e19e5b841..843a525f4 100644 --- a/crates/iceberg/src/transform/bucket.rs +++ b/crates/iceberg/src/transform/bucket.rs @@ -254,7 +254,7 @@ mod test { use crate::expr::PredicateOperator; use crate::spec::PrimitiveType::{ Binary, Date, Decimal, Fixed, Int, Long, String as StringType, Time, Timestamp, - Timestamptz, Uuid, + TimestampNs, Timestamptz, TimestamptzNs, Uuid, }; use crate::spec::Type::{Primitive, Struct}; use crate::spec::{Datum, NestedField, PrimitiveType, StructType, Transform, Type}; @@ -295,6 +295,8 @@ mod test { (Primitive(Time), Some(Primitive(Int))), (Primitive(Timestamp), Some(Primitive(Int))), (Primitive(Timestamptz), Some(Primitive(Int))), + (Primitive(TimestampNs), Some(Primitive(Int))), + (Primitive(TimestamptzNs), Some(Primitive(Int))), ( Struct(StructType::new(vec![NestedField::optional( 1, diff --git a/crates/iceberg/src/transform/identity.rs b/crates/iceberg/src/transform/identity.rs index e23ccffa9..68e5a0b1a 100644 --- a/crates/iceberg/src/transform/identity.rs +++ b/crates/iceberg/src/transform/identity.rs @@ -38,7 +38,7 @@ impl TransformFunction for Identity { mod test { use crate::spec::PrimitiveType::{ Binary, Date, Decimal, Fixed, Int, Long, String as StringType, Time, Timestamp, - Timestamptz, Uuid, + TimestampNs, Timestamptz, TimestamptzNs, Uuid, }; use crate::spec::Type::{Primitive, Struct}; use crate::spec::{NestedField, StructType, Transform}; @@ -81,6 +81,8 @@ mod test { (Primitive(Time), Some(Primitive(Time))), (Primitive(Timestamp), Some(Primitive(Timestamp))), (Primitive(Timestamptz), Some(Primitive(Timestamptz))), + (Primitive(TimestampNs), Some(Primitive(TimestampNs))), + (Primitive(TimestamptzNs), Some(Primitive(TimestamptzNs))), ( Struct(StructType::new(vec![NestedField::optional( 1, diff --git a/crates/iceberg/src/transform/temporal.rs b/crates/iceberg/src/transform/temporal.rs index 44e96af94..e525f88f2 100644 --- a/crates/iceberg/src/transform/temporal.rs +++ b/crates/iceberg/src/transform/temporal.rs @@ -299,7 +299,7 @@ mod test { use crate::expr::PredicateOperator; use crate::spec::PrimitiveType::{ Binary, Date, Decimal, Fixed, Int, Long, String as StringType, Time, Timestamp, - Timestamptz, Uuid, + TimestampNs, Timestamptz, TimestamptzNs, Uuid, }; use crate::spec::Type::{Primitive, Struct}; use crate::spec::{Datum, NestedField, PrimitiveType, StructType, Transform, Type}; @@ -342,6 +342,8 @@ mod test { (Primitive(Time), None), (Primitive(Timestamp), Some(Primitive(Date))), (Primitive(Timestamptz), Some(Primitive(Date))), + (Primitive(TimestampNs), Some(Primitive(Date))), + (Primitive(TimestamptzNs), Some(Primitive(Date))), ( Struct(StructType::new(vec![NestedField::optional( 1, @@ -392,6 +394,8 @@ mod test { (Primitive(Time), None), (Primitive(Timestamp), Some(Primitive(Date))), (Primitive(Timestamptz), Some(Primitive(Date))), + (Primitive(TimestampNs), Some(Primitive(Date))), + (Primitive(TimestamptzNs), Some(Primitive(Date))), ( Struct(StructType::new(vec![NestedField::optional( 1, @@ -442,6 +446,8 @@ mod test { (Primitive(Time), None), (Primitive(Timestamp), Some(Primitive(Date))), (Primitive(Timestamptz), Some(Primitive(Date))), + (Primitive(TimestampNs), Some(Primitive(Date))), + (Primitive(TimestamptzNs), Some(Primitive(Date))), ( Struct(StructType::new(vec![NestedField::optional( 1, @@ -492,6 +498,8 @@ mod test { (Primitive(Time), None), (Primitive(Timestamp), Some(Primitive(Int))), (Primitive(Timestamptz), Some(Primitive(Int))), + (Primitive(TimestampNs), Some(Primitive(Int))), + (Primitive(TimestamptzNs), Some(Primitive(Int))), ( Struct(StructType::new(vec![NestedField::optional( 1, diff --git a/crates/iceberg/src/transform/truncate.rs b/crates/iceberg/src/transform/truncate.rs index 04fe8445a..642e069ef 100644 --- a/crates/iceberg/src/transform/truncate.rs +++ b/crates/iceberg/src/transform/truncate.rs @@ -174,7 +174,7 @@ mod test { use crate::expr::PredicateOperator; use crate::spec::PrimitiveType::{ Binary, Date, Decimal, Fixed, Int, Long, String as StringType, Time, Timestamp, - Timestamptz, Uuid, + TimestampNs, Timestamptz, TimestamptzNs, Uuid, }; use crate::spec::Type::{Primitive, Struct}; use crate::spec::{Datum, NestedField, PrimitiveType, StructType, Transform, Type}; @@ -219,6 +219,8 @@ mod test { (Primitive(Time), None), (Primitive(Timestamp), None), (Primitive(Timestamptz), None), + (Primitive(TimestampNs), None), + (Primitive(TimestamptzNs), None), ( Struct(StructType::new(vec![NestedField::optional( 1, diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index ef21f9d33..fe01af536 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -634,6 +634,18 @@ mod tests { .into(), NestedField::optional( 11, + "timestamp_ns", + Type::Primitive(PrimitiveType::TimestampNs), + ) + .into(), + NestedField::optional( + 12, + "timestamptz_ns", + Type::Primitive(PrimitiveType::TimestamptzNs), + ) + .into(), + NestedField::optional( + 13, "decimal", Type::Primitive(PrimitiveType::Decimal { precision: 10, @@ -641,8 +653,8 @@ mod tests { }), ) .into(), - NestedField::optional(12, "uuid", Type::Primitive(PrimitiveType::Uuid)).into(), - NestedField::optional(13, "fixed", Type::Primitive(PrimitiveType::Fixed(10))) + NestedField::optional(14, "uuid", Type::Primitive(PrimitiveType::Uuid)).into(), + NestedField::optional(15, "fixed", Type::Primitive(PrimitiveType::Fixed(10))) .into(), ]) .build() From 16db5dbce2e76f1ba8295524020048509feed8bb Mon Sep 17 00:00:00 2001 From: Timothy Maloney Date: Tue, 13 Aug 2024 08:33:35 -0700 Subject: [PATCH 03/10] feat(timestamp_ns): Remove unused dep --- crates/iceberg/src/spec/manifest.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/iceberg/src/spec/manifest.rs b/crates/iceberg/src/spec/manifest.rs index 8fcd6f036..8cd04916f 100644 --- a/crates/iceberg/src/spec/manifest.rs +++ b/crates/iceberg/src/spec/manifest.rs @@ -1499,7 +1499,6 @@ mod tests { use std::sync::Arc; use tempfile::TempDir; - use tokio::io::unix::AsyncFdTryNewError; use super::*; use crate::io::FileIOBuilder; From 14af9b4325caa2982472753c8a78724c456cb0df Mon Sep 17 00:00:00 2001 From: Timothy Maloney Date: Tue, 13 Aug 2024 08:49:04 -0700 Subject: [PATCH 04/10] feat(timestamp_ns): Fix unit test --- crates/iceberg/src/spec/datatypes.rs | 2 ++ crates/iceberg/src/transform/void.rs | 4 +++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/crates/iceberg/src/spec/datatypes.rs b/crates/iceberg/src/spec/datatypes.rs index 688d7fd85..999a0dec3 100644 --- a/crates/iceberg/src/spec/datatypes.rs +++ b/crates/iceberg/src/spec/datatypes.rs @@ -1170,6 +1170,8 @@ mod tests { PrimitiveLiteral::Time(1), PrimitiveLiteral::Timestamp(1), PrimitiveLiteral::Timestamptz(1), + PrimitiveLiteral::TimestampNs(1), + PrimitiveLiteral::TimestamptzNs(1), PrimitiveLiteral::String("1".to_string()), PrimitiveLiteral::Uuid(Uuid::new_v4()), PrimitiveLiteral::Fixed(vec![1]), diff --git a/crates/iceberg/src/transform/void.rs b/crates/iceberg/src/transform/void.rs index 92d76b141..53e22140e 100644 --- a/crates/iceberg/src/transform/void.rs +++ b/crates/iceberg/src/transform/void.rs @@ -37,7 +37,7 @@ impl TransformFunction for Void { mod test { use crate::spec::PrimitiveType::{ Binary, Date, Decimal, Fixed, Int, Long, String as StringType, Time, Timestamp, - Timestamptz, Uuid, + Timestamptz, TimestampNs, TimestamptzNs, Uuid, }; use crate::spec::Type::{Primitive, Struct}; use crate::spec::{NestedField, StructType, Transform}; @@ -81,6 +81,8 @@ mod test { (Primitive(Time), Some(Primitive(Time))), (Primitive(Timestamp), Some(Primitive(Timestamp))), (Primitive(Timestamptz), Some(Primitive(Timestamptz))), + (Primitive(TimestampNs), Some(Primitive(TimestampNs))), + (Primitive(TimestamptzNs), Some(Primitive(TimestamptzNs))), ( Struct(StructType::new(vec![NestedField::optional( 1, From c60ab826dbe4ffdb4740376cc534aec04431d587 Mon Sep 17 00:00:00 2001 From: Timothy Maloney Date: Tue, 13 Aug 2024 11:36:47 -0700 Subject: [PATCH 05/10] feat(timestamp_ns): Fix test_all_type_for_write() --- crates/iceberg/src/arrow/schema.rs | 2 +- crates/iceberg/src/spec/values.rs | 34 ++++++++++++++++ .../src/writer/file_writer/parquet_writer.rs | 40 +++++++++++++------ 3 files changed, 63 insertions(+), 13 deletions(-) diff --git a/crates/iceberg/src/arrow/schema.rs b/crates/iceberg/src/arrow/schema.rs index 80a8d2846..8f4e1b6d5 100644 --- a/crates/iceberg/src/arrow/schema.rs +++ b/crates/iceberg/src/arrow/schema.rs @@ -582,7 +582,7 @@ impl SchemaVisitor for ToArrowSchemaConverter { )), crate::spec::PrimitiveType::TimestamptzNs => Ok(ArrowSchemaOrFieldOrType::Type( // Store timestamptz_ns as UTC - DataType::Timestamp(TimeUnit::Nanosecond, Some("+00.00".into())), + DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())), )), crate::spec::PrimitiveType::String => { Ok(ArrowSchemaOrFieldOrType::Type(DataType::Utf8)) diff --git a/crates/iceberg/src/spec/values.rs b/crates/iceberg/src/spec/values.rs index f513cae19..b99d78606 100644 --- a/crates/iceberg/src/spec/values.rs +++ b/crates/iceberg/src/spec/values.rs @@ -763,6 +763,23 @@ impl Datum { } } + /// Creates a timestamp from unix epoch in microseconds. + /// + /// Example: + /// + /// ```rust + /// use iceberg::spec::Datum; + /// let t = Datum::timestamp_micros(1000); + /// + /// assert_eq!(&format!("{t}"), "1970-01-01 00:00:00.001"); + /// ``` + pub fn timestamp_nanos(value: i64) -> Self { + Self { + r#type: PrimitiveType::TimestampNs, + literal: PrimitiveLiteral::TimestampNs(value), + } + } + /// Creates a timestamp from [`DateTime`]. /// /// Example: @@ -821,6 +838,23 @@ impl Datum { } } + /// Creates a timestamp with timezone from unix epoch in microseconds. + /// + /// Example: + /// + /// ```rust + /// use iceberg::spec::Datum; + /// let t = Datum::timestamptz_micros(1000); + /// + /// assert_eq!(&format!("{t}"), "1970-01-01 00:00:00.001 UTC"); + /// ``` + pub fn timestamptz_nanos(value: i64) -> Self { + Self { + r#type: PrimitiveType::TimestamptzNs, + literal: PrimitiveLiteral::TimestamptzNs(value), + } + } + /// Creates a timestamp with timezone from [`DateTime`]. /// Example: /// diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index fe01af536..a0414f837 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -337,6 +337,14 @@ impl MinMaxColAggregator { let convert_func = |v: i64| Result::::Ok(Datum::timestamptz_micros(v)); self.update_state::(field_id, &stat, convert_func) } + (PrimitiveType::TimestampNs, Statistics::Int64(stat,)) => { + let convert_func = |v: i64| Result::::Ok(Datum::timestamp_nanos(v)); + self.update_state::(field_id, &stat, convert_func) + } + (PrimitiveType::TimestamptzNs, Statistics::Int64(stat,)) => { + let convert_func = |v: i64| Result::::Ok(Datum::timestamptz_nanos(v)); + self.update_state::(field_id, &stat, convert_func) + } ( PrimitiveType::Decimal { precision: _, @@ -597,7 +605,7 @@ mod tests { use anyhow::Result; use arrow_array::types::Int64Type; use arrow_array::{ - ArrayRef, BooleanArray, Int32Array, Int64Array, ListArray, RecordBatch, StructArray, + Array, ArrayRef, BooleanArray, Int32Array, Int64Array, ListArray, RecordBatch, StructArray }; use arrow_schema::{DataType, SchemaRef as ArrowSchemaRef}; use arrow_select::concat::concat_batches; @@ -1106,11 +1114,17 @@ mod tests { .with_timezone_utc(), ) as ArrayRef; let col11 = Arc::new( + arrow_array::TimestampNanosecondArray::from(vec![Some(0), Some(1), None, Some(3)]) + ) as ArrayRef; + let col12 = Arc::new( + arrow_array::TimestampNanosecondArray::from(vec![Some(0), Some(1), None, Some(3)]).with_timezone_utc() + ) as ArrayRef; + let col13 = Arc::new( arrow_array::Decimal128Array::from(vec![Some(1), Some(2), None, Some(100)]) .with_precision_and_scale(10, 5) .unwrap(), ) as ArrayRef; - let col12 = Arc::new( + let col14 = Arc::new( arrow_array::FixedSizeBinaryArray::try_from_sparse_iter_with_size( vec![ Some(Uuid::from_u128(0).as_bytes().to_vec()), @@ -1123,7 +1137,7 @@ mod tests { ) .unwrap(), ) as ArrayRef; - let col13 = Arc::new( + let col15 = Arc::new( arrow_array::FixedSizeBinaryArray::try_from_sparse_iter_with_size( vec![ Some(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), @@ -1137,7 +1151,7 @@ mod tests { .unwrap(), ) as ArrayRef; let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![ - col0, col1, col2, col3, col4, col5, col6, col7, col8, col9, col10, col11, col12, col13, + col0, col1, col2, col3, col4, col5, col6, col7, col8, col9, col10, col11, col12, col13, col14, col15 ]) .unwrap(); @@ -1185,8 +1199,10 @@ mod tests { (8, Datum::time_micros(0).unwrap()), (9, Datum::timestamp_micros(0)), (10, Datum::timestamptz_micros(0)), + (11, Datum::timestamp_nanos(0)), + (12, Datum::timestamptz_nanos(0)), ( - 11, + 13, Datum::new( PrimitiveType::Decimal { precision: 10, @@ -1195,10 +1211,8 @@ mod tests { PrimitiveLiteral::Decimal(1) ) ), - (12, Datum::uuid(Uuid::from_u128(0))), - (13, Datum::fixed(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])), - (12, Datum::uuid(Uuid::from_u128(0))), - (13, Datum::fixed(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])), + (14, Datum::uuid(Uuid::from_u128(0))), + (15, Datum::fixed(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])), ]) ); assert_eq!( @@ -1215,8 +1229,10 @@ mod tests { (8, Datum::time_micros(3).unwrap()), (9, Datum::timestamp_micros(3)), (10, Datum::timestamptz_micros(3)), + (11, Datum::timestamp_nanos(3)), + (12, Datum::timestamptz_nanos(3)), ( - 11, + 13, Datum::new( PrimitiveType::Decimal { precision: 10, @@ -1225,9 +1241,9 @@ mod tests { PrimitiveLiteral::Decimal(100) ) ), - (12, Datum::uuid(Uuid::from_u128(3))), + (14, Datum::uuid(Uuid::from_u128(3))), ( - 13, + 15, Datum::fixed(vec![21, 22, 23, 24, 25, 26, 27, 28, 29, 30]) ), ]) From 0b28660d831aae9800f81dfca1bd97d45939b1ad Mon Sep 17 00:00:00 2001 From: Timothy Maloney Date: Wed, 14 Aug 2024 10:33:32 -0700 Subject: [PATCH 06/10] feat(timestamp_ns): fix test_transform_days_literal --- crates/iceberg/src/spec/values.rs | 10 +- crates/iceberg/src/transform/temporal.rs | 155 +++++++++++++++++- crates/iceberg/src/transform/void.rs | 2 +- .../src/writer/file_writer/parquet_writer.rs | 21 ++- 4 files changed, 167 insertions(+), 21 deletions(-) diff --git a/crates/iceberg/src/spec/values.rs b/crates/iceberg/src/spec/values.rs index b99d78606..424b83d4d 100644 --- a/crates/iceberg/src/spec/values.rs +++ b/crates/iceberg/src/spec/values.rs @@ -763,15 +763,15 @@ impl Datum { } } - /// Creates a timestamp from unix epoch in microseconds. + /// Creates a timestamp from unix epoch in nanoseconds. /// /// Example: /// /// ```rust /// use iceberg::spec::Datum; - /// let t = Datum::timestamp_micros(1000); + /// let t = Datum::timestamp_nanos(1000); /// - /// assert_eq!(&format!("{t}"), "1970-01-01 00:00:00.001"); + /// assert_eq!(&format!("{t}"), "1970-01-01 00:00:00.000001"); /// ``` pub fn timestamp_nanos(value: i64) -> Self { Self { @@ -838,13 +838,13 @@ impl Datum { } } - /// Creates a timestamp with timezone from unix epoch in microseconds. + /// Creates a timestamp with timezone from unix epoch in nanoseconds. /// /// Example: /// /// ```rust /// use iceberg::spec::Datum; - /// let t = Datum::timestamptz_micros(1000); + /// let t = Datum::timestamptz_nanos(1000); /// /// assert_eq!(&format!("{t}"), "1970-01-01 00:00:00.001 UTC"); /// ``` diff --git a/crates/iceberg/src/transform/temporal.rs b/crates/iceberg/src/transform/temporal.rs index e525f88f2..64053497c 100644 --- a/crates/iceberg/src/transform/temporal.rs +++ b/crates/iceberg/src/transform/temporal.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use arrow_arith::arity::binary; use arrow_arith::temporal::{date_part, DatePart}; use arrow_array::types::Date32Type; -use arrow_array::{Array, ArrayRef, Date32Array, Int32Array, TimestampMicrosecondArray}; +use arrow_array::{Array, ArrayRef, Date32Array, Int32Array, TimestampMicrosecondArray, TimestampNanosecondArray}; use arrow_schema::{DataType, TimeUnit}; use chrono::{DateTime, Datelike, Duration}; @@ -34,6 +34,8 @@ const HOUR_PER_SECOND: f64 = 1.0_f64 / 3600.0_f64; const UNIX_EPOCH_YEAR: i32 = 1970; /// One second in micros. const MICROS_PER_SECOND: i64 = 1_000_000; +/// One second in nanos. +const NANOS_PER_SECOND: i64 = 1_000_000_000; /// Extract a date or timestamp year, as years from 1970 #[derive(Debug)] @@ -41,7 +43,7 @@ pub struct Year; impl Year { #[inline] - fn timestamp_to_year(timestamp: i64) -> Result { + fn timestamp_to_year_micros(timestamp: i64) -> Result { Ok(DateTime::from_timestamp_micros(timestamp) .ok_or_else(|| { Error::new( @@ -52,6 +54,13 @@ impl Year { .year() - UNIX_EPOCH_YEAR) } + + #[inline] + fn timestamp_to_year_nanos(timestamp: i64) -> Result { + Ok(DateTime::from_timestamp_nanos(timestamp) + .year() + - UNIX_EPOCH_YEAR) + } } impl TransformFunction for Year { @@ -70,8 +79,10 @@ impl TransformFunction for Year { fn transform_literal(&self, input: &crate::spec::Datum) -> Result> { let val = match input.literal() { PrimitiveLiteral::Date(v) => Date32Type::to_naive_date(*v).year() - UNIX_EPOCH_YEAR, - PrimitiveLiteral::Timestamp(v) => Self::timestamp_to_year(*v)?, - PrimitiveLiteral::Timestamptz(v) => Self::timestamp_to_year(*v)?, + PrimitiveLiteral::Timestamp(v) => Self::timestamp_to_year_micros(*v)?, + PrimitiveLiteral::Timestamptz(v) => Self::timestamp_to_year_micros(*v)?, + PrimitiveLiteral::TimestampNs(v) => Self::timestamp_to_year_nanos(*v)?, + PrimitiveLiteral::TimestamptzNs(v) => Self::timestamp_to_year_nanos(*v)?, _ => { return Err(crate::Error::new( crate::ErrorKind::FeatureUnsupported, @@ -92,7 +103,7 @@ pub struct Month; impl Month { #[inline] - fn timestamp_to_month(timestamp: i64) -> Result { + fn timestamp_to_month_micros(timestamp: i64) -> Result { // date: aaaa-aa-aa // unix epoch date: 1970-01-01 // if date > unix epoch date, delta month = (aa - 1) + 12 * (aaaa-1970) @@ -112,6 +123,22 @@ impl Month { Ok(-delta) } } + + #[inline] + fn timestamp_to_month_nanos(timestamp: i64) -> Result { + // date: aaaa-aa-aa + // unix epoch date: 1970-01-01 + // if date > unix epoch date, delta month = (aa - 1) + 12 * (aaaa-1970) + // if date < unix epoch date, delta month = (12 - (aa - 1)) + 12 * (1970-aaaa-1) + let date = DateTime::from_timestamp_nanos(timestamp); + let unix_epoch_date = DateTime::from_timestamp_nanos(0); + if date > unix_epoch_date { + Ok((date.month0() as i32) + 12 * (date.year() - UNIX_EPOCH_YEAR)) + } else { + let delta = (12 - date.month0() as i32) + 12 * (UNIX_EPOCH_YEAR - date.year() - 1); + Ok(-delta) + } + } } impl TransformFunction for Month { @@ -142,8 +169,10 @@ impl TransformFunction for Month { (Date32Type::to_naive_date(*v).year() - UNIX_EPOCH_YEAR) * 12 + Date32Type::to_naive_date(*v).month0() as i32 } - PrimitiveLiteral::Timestamp(v) => Self::timestamp_to_month(*v)?, - PrimitiveLiteral::Timestamptz(v) => Self::timestamp_to_month(*v)?, + PrimitiveLiteral::Timestamp(v) => Self::timestamp_to_month_micros(*v)?, + PrimitiveLiteral::Timestamptz(v) => Self::timestamp_to_month_micros(*v)?, + PrimitiveLiteral::TimestampNs(v) => Self::timestamp_to_month_nanos(*v)?, + PrimitiveLiteral::TimestamptzNs(v) => Self::timestamp_to_month_nanos(*v)?, _ => { return Err(crate::Error::new( crate::ErrorKind::FeatureUnsupported, @@ -192,6 +221,35 @@ impl Day { Ok(days) } + + fn day_timestamp_nano(v: i64) -> Result { + let secs = v / NANOS_PER_SECOND; + + let (nanos, offset) = if v >= 0 { + let nanos = (v.rem_euclid(NANOS_PER_SECOND)) as u32; + let offset = 0i64; + (nanos, offset) + } else { + let v = v + 1; + let nanos = (v.rem_euclid(NANOS_PER_SECOND)) as u32; + let offset = 1i64; + (nanos, offset) + }; + + let delta = Duration::new(secs, nanos).ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Failed to create 'TimeDelta' from seconds {} and nanos {}", + secs, nanos + ), + ) + })?; + + let days = (delta.num_days() - offset) as i32; + + Ok(days) + } } impl TransformFunction for Day { @@ -202,6 +260,11 @@ impl TransformFunction for Day { .downcast_ref::() .unwrap() .try_unary(|v| -> Result { Self::day_timestamp_micro(v) })?, + DataType::Timestamp(TimeUnit::Nanosecond, _) => input + .as_any() + .downcast_ref::() + .unwrap() + .try_unary(|v| -> Result { Self::day_timestamp_nano(v) })?, DataType::Date32 => input .as_any() .downcast_ref::() @@ -225,6 +288,8 @@ impl TransformFunction for Day { PrimitiveLiteral::Date(v) => *v, PrimitiveLiteral::Timestamp(v) => Self::day_timestamp_micro(*v)?, PrimitiveLiteral::Timestamptz(v) => Self::day_timestamp_micro(*v)?, + PrimitiveLiteral::TimestampNs(v) => Self::day_timestamp_nano(*v)?, + PrimitiveLiteral::TimestamptzNs(v) => Self::day_timestamp_nano(*v)?, _ => { return Err(crate::Error::new( crate::ErrorKind::FeatureUnsupported, @@ -248,6 +313,11 @@ impl Hour { fn hour_timestamp_micro(v: i64) -> i32 { (v as f64 / 1000.0 / 1000.0 * HOUR_PER_SECOND) as i32 } + + #[inline] + fn hour_timestamp_nano(v: i64) -> i32 { + (v as f64 / 1000_000.0 / 1000.0 * HOUR_PER_SECOND) as i32 + } } impl TransformFunction for Hour { @@ -275,6 +345,8 @@ impl TransformFunction for Hour { let val = match input.literal() { PrimitiveLiteral::Timestamp(v) => Self::hour_timestamp_micro(*v), PrimitiveLiteral::Timestamptz(v) => Self::hour_timestamp_micro(*v), + PrimitiveLiteral::TimestampNs(v) => Self::hour_timestamp_nano(*v), + PrimitiveLiteral::TimestamptzNs(v) => Self::hour_timestamp_nano(*v), _ => { return Err(crate::Error::new( crate::ErrorKind::FeatureUnsupported, @@ -2323,6 +2395,47 @@ mod test { assert_eq!(res, expect); } + fn test_timestamp_ns_and_tz_transform( + time: &str, + transform: &BoxedTransformFunction, + expect: Datum, + ) { + let timestamp_ns = Datum::timestamp_nanos( + NaiveDateTime::parse_from_str(time, "%Y-%m-%d %H:%M:%S.%f") + .unwrap() + .and_utc() + .timestamp_nanos_opt() + .unwrap(), + ); + let timestamptz_ns = Datum::timestamptz_nanos( + NaiveDateTime::parse_from_str(time, "%Y-%m-%d %H:%M:%S.%f") + .unwrap() + .and_utc() + .timestamp_nanos_opt() + .unwrap(), + ); + let res = transform.transform_literal(×tamp_ns).unwrap().unwrap(); + assert_eq!(res, expect); + let res = transform + .transform_literal(×tamptz_ns) + .unwrap() + .unwrap(); + assert_eq!(res, expect); + } + + fn test_timestamp_ns_and_tz_transform_using_i64( + time: i64, + transform: &BoxedTransformFunction, + expect: Datum, + ) { + let timestamp_ns = Datum::timestamp_nanos(time); + let timestamptz_ns = Datum::timestamptz_nanos(time); + let res = transform.transform_literal(×tamp_ns).unwrap().unwrap(); + assert_eq!(res, expect); + let res = transform.transform_literal(×tamptz_ns).unwrap().unwrap(); + assert_eq!(res, expect); + } + #[test] fn test_transform_year_literal() { let year = Box::new(super::Year) as BoxedTransformFunction; @@ -2338,6 +2451,14 @@ mod test { Datum::int(1970 - super::UNIX_EPOCH_YEAR), ); test_timestamp_and_tz_transform("1969-01-01 00:00:00.00", &year, Datum::int(-1)); + + // Test TimestampNanosecond + test_timestamp_ns_and_tz_transform_using_i64( + 186280000000, + &year, + Datum::int(1970 - super::UNIX_EPOCH_YEAR), + ); + test_timestamp_ns_and_tz_transform("1969-01-01 00:00:00.00", &year, Datum::int(-1)); } #[test] @@ -2432,6 +2553,17 @@ mod test { test_timestamp_and_tz_transform("2017-12-01 00:00:00.00", &month, Datum::int(575)); test_timestamp_and_tz_transform("1970-01-01 00:00:00.00", &month, Datum::int(0)); test_timestamp_and_tz_transform("1969-12-31 00:00:00.00", &month, Datum::int(-1)); + + // Test TimestampNanosecond + test_timestamp_ns_and_tz_transform_using_i64( + 186280000000, + &month, + Datum::int((1970 - super::UNIX_EPOCH_YEAR) * 12), + ); + test_timestamp_ns_and_tz_transform("1969-12-01 23:00:00.00", &month, Datum::int(-1)); + test_timestamp_ns_and_tz_transform("2017-12-01 00:00:00.00", &month, Datum::int(575)); + test_timestamp_ns_and_tz_transform("1970-01-01 00:00:00.00", &month, Datum::int(0)); + test_timestamp_ns_and_tz_transform("1969-12-31 00:00:00.00", &month, Datum::int(-1)); } #[test] @@ -2523,6 +2655,11 @@ mod test { test_timestamp_and_tz_transform_using_i64(1512151975038194, &day, Datum::int(17501)); test_timestamp_and_tz_transform_using_i64(-115200000000, &day, Datum::int(-2)); test_timestamp_and_tz_transform("2017-12-01 10:30:42.123", &day, Datum::int(17501)); + + // Test TimestampNanosecond + test_timestamp_ns_and_tz_transform_using_i64(1512151975038194, &day, Datum::int(17)); + test_timestamp_ns_and_tz_transform_using_i64(-115200000000, &day, Datum::int(-1)); + test_timestamp_ns_and_tz_transform("2017-12-01 10:30:42.123", &day, Datum::int(17501)); } #[test] @@ -2591,5 +2728,9 @@ mod test { // Test TimestampMicrosecond test_timestamp_and_tz_transform("2017-12-01 18:00:00.00", &hour, Datum::int(420042)); test_timestamp_and_tz_transform("1969-12-31 23:00:00.00", &hour, Datum::int(-1)); + + // Test TimestampNanosecond + test_timestamp_ns_and_tz_transform("2017-12-01 18:00:00.00", &hour, Datum::int(420042)); + test_timestamp_ns_and_tz_transform("1969-12-31 23:00:00.00", &hour, Datum::int(-1)); } } diff --git a/crates/iceberg/src/transform/void.rs b/crates/iceberg/src/transform/void.rs index 53e22140e..5d429a593 100644 --- a/crates/iceberg/src/transform/void.rs +++ b/crates/iceberg/src/transform/void.rs @@ -37,7 +37,7 @@ impl TransformFunction for Void { mod test { use crate::spec::PrimitiveType::{ Binary, Date, Decimal, Fixed, Int, Long, String as StringType, Time, Timestamp, - Timestamptz, TimestampNs, TimestamptzNs, Uuid, + TimestampNs, Timestamptz, TimestamptzNs, Uuid, }; use crate::spec::Type::{Primitive, Struct}; use crate::spec::{NestedField, StructType, Transform}; diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index a0414f837..38de5f14b 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -337,11 +337,11 @@ impl MinMaxColAggregator { let convert_func = |v: i64| Result::::Ok(Datum::timestamptz_micros(v)); self.update_state::(field_id, &stat, convert_func) } - (PrimitiveType::TimestampNs, Statistics::Int64(stat,)) => { + (PrimitiveType::TimestampNs, Statistics::Int64(stat)) => { let convert_func = |v: i64| Result::::Ok(Datum::timestamp_nanos(v)); self.update_state::(field_id, &stat, convert_func) } - (PrimitiveType::TimestamptzNs, Statistics::Int64(stat,)) => { + (PrimitiveType::TimestamptzNs, Statistics::Int64(stat)) => { let convert_func = |v: i64| Result::::Ok(Datum::timestamptz_nanos(v)); self.update_state::(field_id, &stat, convert_func) } @@ -605,7 +605,7 @@ mod tests { use anyhow::Result; use arrow_array::types::Int64Type; use arrow_array::{ - Array, ArrayRef, BooleanArray, Int32Array, Int64Array, ListArray, RecordBatch, StructArray + Array, ArrayRef, BooleanArray, Int32Array, Int64Array, ListArray, RecordBatch, StructArray, }; use arrow_schema::{DataType, SchemaRef as ArrowSchemaRef}; use arrow_select::concat::concat_batches; @@ -1113,11 +1113,15 @@ mod tests { arrow_array::TimestampMicrosecondArray::from(vec![Some(0), Some(1), None, Some(3)]) .with_timezone_utc(), ) as ArrayRef; - let col11 = Arc::new( - arrow_array::TimestampNanosecondArray::from(vec![Some(0), Some(1), None, Some(3)]) - ) as ArrayRef; + let col11 = Arc::new(arrow_array::TimestampNanosecondArray::from(vec![ + Some(0), + Some(1), + None, + Some(3), + ])) as ArrayRef; let col12 = Arc::new( - arrow_array::TimestampNanosecondArray::from(vec![Some(0), Some(1), None, Some(3)]).with_timezone_utc() + arrow_array::TimestampNanosecondArray::from(vec![Some(0), Some(1), None, Some(3)]) + .with_timezone_utc(), ) as ArrayRef; let col13 = Arc::new( arrow_array::Decimal128Array::from(vec![Some(1), Some(2), None, Some(100)]) @@ -1151,7 +1155,8 @@ mod tests { .unwrap(), ) as ArrayRef; let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![ - col0, col1, col2, col3, col4, col5, col6, col7, col8, col9, col10, col11, col12, col13, col14, col15 + col0, col1, col2, col3, col4, col5, col6, col7, col8, col9, col10, col11, col12, col13, + col14, col15, ]) .unwrap(); From 673aace2945ca05aa90ba6479deec4609a83990c Mon Sep 17 00:00:00 2001 From: Timothy Maloney Date: Wed, 14 Aug 2024 11:03:54 -0700 Subject: [PATCH 07/10] feat(timestamp_ns): fix math for timestamptz_nanos --- crates/iceberg/src/spec/values.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/iceberg/src/spec/values.rs b/crates/iceberg/src/spec/values.rs index 424b83d4d..719fc0af2 100644 --- a/crates/iceberg/src/spec/values.rs +++ b/crates/iceberg/src/spec/values.rs @@ -421,7 +421,7 @@ impl Datum { PrimitiveLiteral::TimestampNs(i64::from_le_bytes(bytes.try_into()?)) } PrimitiveType::TimestamptzNs => { - PrimitiveLiteral::TimestampNs(i64::from_le_bytes(bytes.try_into()?)) + PrimitiveLiteral::TimestamptzNs(i64::from_le_bytes(bytes.try_into()?)) } PrimitiveType::String => { PrimitiveLiteral::String(std::str::from_utf8(bytes)?.to_string()) @@ -846,7 +846,7 @@ impl Datum { /// use iceberg::spec::Datum; /// let t = Datum::timestamptz_nanos(1000); /// - /// assert_eq!(&format!("{t}"), "1970-01-01 00:00:00.001 UTC"); + /// assert_eq!(&format!("{t}"), "1970-01-01 00:00:00.000001 UTC"); /// ``` pub fn timestamptz_nanos(value: i64) -> Self { Self { @@ -1864,7 +1864,7 @@ impl Literal { .to_string(), )), PrimitiveLiteral::TimestamptzNs(val) => Ok(JsonValue::String( - timestamptz::microseconds_to_datetimetz(val) + timestamptz::nanoseconds_to_datetimetz(val) .format("%Y-%m-%dT%H:%M:%S%.f+00:00") .to_string(), )), @@ -2049,7 +2049,7 @@ mod timestamptz { pub(crate) fn nanoseconds_to_datetimetz(nanos: i64) -> DateTime { let (secs, rem) = (nanos / 1_000_000_000, nanos % 1_000_000_000); - DateTime::from_timestamp(secs, rem as u32 * 1_000).unwrap() + DateTime::from_timestamp(secs, rem as u32).unwrap() } } From 7ddfef7c6f418a48d90c562000bf6cf59465ee29 Mon Sep 17 00:00:00 2001 From: Timothy Maloney Date: Wed, 14 Aug 2024 11:05:46 -0700 Subject: [PATCH 08/10] chore: formatting --- crates/iceberg/src/transform/temporal.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/crates/iceberg/src/transform/temporal.rs b/crates/iceberg/src/transform/temporal.rs index 64053497c..0698d38b5 100644 --- a/crates/iceberg/src/transform/temporal.rs +++ b/crates/iceberg/src/transform/temporal.rs @@ -20,7 +20,9 @@ use std::sync::Arc; use arrow_arith::arity::binary; use arrow_arith::temporal::{date_part, DatePart}; use arrow_array::types::Date32Type; -use arrow_array::{Array, ArrayRef, Date32Array, Int32Array, TimestampMicrosecondArray, TimestampNanosecondArray}; +use arrow_array::{ + Array, ArrayRef, Date32Array, Int32Array, TimestampMicrosecondArray, TimestampNanosecondArray, +}; use arrow_schema::{DataType, TimeUnit}; use chrono::{DateTime, Datelike, Duration}; @@ -57,9 +59,7 @@ impl Year { #[inline] fn timestamp_to_year_nanos(timestamp: i64) -> Result { - Ok(DateTime::from_timestamp_nanos(timestamp) - .year() - - UNIX_EPOCH_YEAR) + Ok(DateTime::from_timestamp_nanos(timestamp).year() - UNIX_EPOCH_YEAR) } } @@ -2432,7 +2432,10 @@ mod test { let timestamptz_ns = Datum::timestamptz_nanos(time); let res = transform.transform_literal(×tamp_ns).unwrap().unwrap(); assert_eq!(res, expect); - let res = transform.transform_literal(×tamptz_ns).unwrap().unwrap(); + let res = transform + .transform_literal(×tamptz_ns) + .unwrap() + .unwrap(); assert_eq!(res, expect); } From ea31dabc043d731093f81fbd25547da6147e5b31 Mon Sep 17 00:00:00 2001 From: Timothy Maloney Date: Wed, 14 Aug 2024 13:36:11 -0700 Subject: [PATCH 09/10] chore: formatting --- crates/iceberg/src/spec/values.rs | 12 +++++---- crates/iceberg/src/transform/temporal.rs | 32 ++++++++++++++++++------ 2 files changed, 31 insertions(+), 13 deletions(-) diff --git a/crates/iceberg/src/spec/values.rs b/crates/iceberg/src/spec/values.rs index 0a6e6c0a8..3568d3dcd 100644 --- a/crates/iceberg/src/spec/values.rs +++ b/crates/iceberg/src/spec/values.rs @@ -1857,11 +1857,13 @@ impl Literal { .format("%Y-%m-%dT%H:%M:%S%.f") .to_string(), )), - (PrimitiveType::TimestamptzNs, PrimitiveLiteral::Long(val)) => Ok(JsonValue::String( - timestamptz::nanoseconds_to_datetimetz(val) - .format("%Y-%m-%dT%H:%M:%S%.f+00:00") - .to_string(), - )), + (PrimitiveType::TimestamptzNs, PrimitiveLiteral::Long(val)) => { + Ok(JsonValue::String( + timestamptz::nanoseconds_to_datetimetz(val) + .format("%Y-%m-%dT%H:%M:%S%.f+00:00") + .to_string(), + )) + } (PrimitiveType::String, PrimitiveLiteral::String(val)) => { Ok(JsonValue::String(val.clone())) } diff --git a/crates/iceberg/src/transform/temporal.rs b/crates/iceberg/src/transform/temporal.rs index 5a05ed438..6784568e8 100644 --- a/crates/iceberg/src/transform/temporal.rs +++ b/crates/iceberg/src/transform/temporal.rs @@ -81,10 +81,18 @@ impl TransformFunction for Year { (PrimitiveType::Date, PrimitiveLiteral::Int(v)) => { Date32Type::to_naive_date(*v).year() - UNIX_EPOCH_YEAR } - (PrimitiveType::Timestamp, PrimitiveLiteral::Long(v)) => Self::timestamp_to_year_micros(*v)?, - (PrimitiveType::Timestamptz, PrimitiveLiteral::Long(v)) => Self::timestamp_to_year_micros(*v)?, - (PrimitiveType::TimestampNs, PrimitiveLiteral::Long(v)) => Self::timestamp_to_year_nanos(*v)?, - (PrimitiveType::TimestamptzNs, PrimitiveLiteral::Long(v)) => Self::timestamp_to_year_nanos(*v)?, + (PrimitiveType::Timestamp, PrimitiveLiteral::Long(v)) => { + Self::timestamp_to_year_micros(*v)? + } + (PrimitiveType::Timestamptz, PrimitiveLiteral::Long(v)) => { + Self::timestamp_to_year_micros(*v)? + } + (PrimitiveType::TimestampNs, PrimitiveLiteral::Long(v)) => { + Self::timestamp_to_year_nanos(*v)? + } + (PrimitiveType::TimestamptzNs, PrimitiveLiteral::Long(v)) => { + Self::timestamp_to_year_nanos(*v)? + } _ => { return Err(crate::Error::new( crate::ErrorKind::FeatureUnsupported, @@ -171,11 +179,15 @@ impl TransformFunction for Month { (Date32Type::to_naive_date(*v).year() - UNIX_EPOCH_YEAR) * 12 + Date32Type::to_naive_date(*v).month0() as i32 } - (PrimitiveType::Timestamp, PrimitiveLiteral::Long(v)) => Self::timestamp_to_month_micros(*v)?, + (PrimitiveType::Timestamp, PrimitiveLiteral::Long(v)) => { + Self::timestamp_to_month_micros(*v)? + } (PrimitiveType::Timestamptz, PrimitiveLiteral::Long(v)) => { Self::timestamp_to_month_micros(*v)? } - (PrimitiveType::TimestampNs, PrimitiveLiteral::Long(v)) => Self::timestamp_to_month_nanos(*v)?, + (PrimitiveType::TimestampNs, PrimitiveLiteral::Long(v)) => { + Self::timestamp_to_month_nanos(*v)? + } (PrimitiveType::TimestamptzNs, PrimitiveLiteral::Long(v)) => { Self::timestamp_to_month_nanos(*v)? } @@ -296,7 +308,9 @@ impl TransformFunction for Day { (PrimitiveType::Timestamptz, PrimitiveLiteral::Long(v)) => { Self::day_timestamp_micro(*v)? } - (PrimitiveType::TimestampNs, PrimitiveLiteral::Long(v)) => Self::day_timestamp_nano(*v)?, + (PrimitiveType::TimestampNs, PrimitiveLiteral::Long(v)) => { + Self::day_timestamp_nano(*v)? + } (PrimitiveType::TimestamptzNs, PrimitiveLiteral::Long(v)) => { Self::day_timestamp_nano(*v)? } @@ -357,7 +371,9 @@ impl TransformFunction for Hour { (PrimitiveType::Timestamptz, PrimitiveLiteral::Long(v)) => { Self::hour_timestamp_micro(*v) } - (PrimitiveType::TimestampNs, PrimitiveLiteral::Long(v)) => Self::hour_timestamp_nano(*v), + (PrimitiveType::TimestampNs, PrimitiveLiteral::Long(v)) => { + Self::hour_timestamp_nano(*v) + } (PrimitiveType::TimestamptzNs, PrimitiveLiteral::Long(v)) => { Self::hour_timestamp_nano(*v) } From dee3446b144f0b3ec789fd68f3d4821fde86ad80 Mon Sep 17 00:00:00 2001 From: Timothy Maloney Date: Wed, 14 Aug 2024 13:46:45 -0700 Subject: [PATCH 10/10] chore: Appease clippy --- crates/iceberg/src/transform/temporal.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/iceberg/src/transform/temporal.rs b/crates/iceberg/src/transform/temporal.rs index 6784568e8..f326cfed6 100644 --- a/crates/iceberg/src/transform/temporal.rs +++ b/crates/iceberg/src/transform/temporal.rs @@ -340,7 +340,7 @@ impl Hour { #[inline] fn hour_timestamp_nano(v: i64) -> i32 { - (v as f64 / 1000_000.0 / 1000.0 * HOUR_PER_SECOND) as i32 + (v as f64 / 1_000_000.0 / 1000.0 * HOUR_PER_SECOND) as i32 } }