diff --git a/arrow/extensions/extensions.go b/arrow/extensions/extensions.go index 04566c75..6f13aa64 100644 --- a/arrow/extensions/extensions.go +++ b/arrow/extensions/extensions.go @@ -21,11 +21,12 @@ import ( ) var canonicalExtensionTypes = []arrow.ExtensionType{ - NewBool8Type(), - NewUUIDType(), - &OpaqueType{}, &JSONType{}, + &OpaqueType{}, + &TimestampWithOffsetType{}, &VariantType{}, + NewBool8Type(), + NewUUIDType(), } func init() { diff --git a/arrow/extensions/timestamp_with_offset.go b/arrow/extensions/timestamp_with_offset.go new file mode 100644 index 00000000..f84ad3c8 --- /dev/null +++ b/arrow/extensions/timestamp_with_offset.go @@ -0,0 +1,354 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package extensions + +import ( + _ "bytes" + "fmt" + "reflect" + "strings" + "time" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/apache/arrow-go/v18/internal/json" +) + +// TimestampWithOffsetType represents a timestamp column that stores a timezone offset per row instead of +// applying the same timezone offset to the entire column. +type TimestampWithOffsetType struct { + arrow.ExtensionBase +} + +// Whether the storageType is compatible with TimestampWithOffset. +// +// Returns (time_unit, ok). If ok is false, time unit is garbage. +func isDataTypeCompatible(storageType arrow.DataType) (arrow.TimeUnit, bool) { + timeUnit := arrow.Second + switch t := storageType.(type) { + case *arrow.StructType: + if t.NumFields() != 2 { + return timeUnit, false + } + + maybeTimestamp := t.Field(0); + maybeOffset := t.Field(1); + + timestampOk := false + switch timestampType := maybeTimestamp.Type.(type) { + case *arrow.TimestampType: + if timestampType.TimeZone == "UTC" { + timestampOk = true + timeUnit = timestampType.TimeUnit() + } + default: + } + + ok := maybeTimestamp.Name == "timestamp" && + timestampOk && + !maybeTimestamp.Nullable && + maybeOffset.Name == "offset_minutes" && + arrow.TypeEqual(maybeOffset.Type, arrow.PrimitiveTypes.Int16) && + !maybeOffset.Nullable + + return timeUnit, ok + default: + return timeUnit, false + } +} + + +// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the underlying storage type set correctly to +// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=Int16), where T is any TimeUnit. +func NewTimestampWithOffsetType(unit arrow.TimeUnit) *TimestampWithOffsetType { + return &TimestampWithOffsetType{ + ExtensionBase: arrow.ExtensionBase{ + Storage: arrow.StructOf( + arrow.Field{ + Name: "timestamp", + Type: &arrow.TimestampType{ + Unit: unit, + TimeZone: "UTC", + }, + Nullable: false, + }, + arrow.Field{ + Name: "offset_minutes", + Type: arrow.PrimitiveTypes.Int16, + Nullable: false, + }, + ), + }, + } +} + +func (b *TimestampWithOffsetType) ArrayType() reflect.Type { return reflect.TypeOf(TimestampWithOffsetArray{}) } + +func (b *TimestampWithOffsetType) ExtensionName() string { return "arrow.timestamp_with_offset" } + +func (b *TimestampWithOffsetType) String() string { return fmt.Sprintf("extension<%s>", b.ExtensionName()) } + +func (e *TimestampWithOffsetType) MarshalJSON() ([]byte, error) { + return []byte(fmt.Sprintf(`{"name":"%s","metadata":%s}`, e.ExtensionName(), e.Serialize())), nil +} + +func (b *TimestampWithOffsetType) Serialize() string { return "" } + +func (b *TimestampWithOffsetType) Deserialize(storageType arrow.DataType, data string) (arrow.ExtensionType, error) { + timeUnit, ok := isDataTypeCompatible(storageType) + if !ok { + return nil, fmt.Errorf("invalid storage type for TimestampWithOffsetType: %s", storageType.Name()) + } + return NewTimestampWithOffsetType(timeUnit), nil +} + +func (b *TimestampWithOffsetType) ExtensionEquals(other arrow.ExtensionType) bool { + return b.ExtensionName() == other.ExtensionName() +} + +func (b *TimestampWithOffsetType) TimeUnit() arrow.TimeUnit { + return b.ExtensionBase.Storage.(*arrow.StructType).Fields()[0].Type.(*arrow.TimestampType).TimeUnit() +} + +func (b *TimestampWithOffsetType) NewBuilder(mem memory.Allocator) array.Builder { + return NewTimestampWithOffsetBuilder(mem, b.TimeUnit()) +} + +// TimestampWithOffsetArray is a simple array of struct +type TimestampWithOffsetArray struct { + array.ExtensionArrayBase +} + +func (a *TimestampWithOffsetArray) String() string { + var o strings.Builder + o.WriteString("[") + for i := 0; i < a.Len(); i++ { + if i > 0 { + o.WriteString(" ") + } + switch { + case a.IsNull(i): + o.WriteString(array.NullValueStr) + default: + fmt.Fprintf(&o, "\"%s\"", a.Value(i)) + } + } + o.WriteString("]") + return o.String() +} + +func timeFromFieldValues(utcTimestamp arrow.Timestamp, offsetMinutes int16, unit arrow.TimeUnit) time.Time { + hours := offsetMinutes / 60; + minutes := offsetMinutes % 60 + if minutes < 0 { + minutes = -minutes + } + + loc := time.FixedZone(fmt.Sprintf("UTC%+03d:%02d", hours, minutes), int(offsetMinutes) * 60) + return utcTimestamp.ToTime(unit).In(loc) +} + +func fieldValuesFromTime(t time.Time, unit arrow.TimeUnit) (arrow.Timestamp, int16) { + // naive "bitwise" conversion to UTC, keeping the underlying date the same + utc := t.UTC() + naiveUtc := time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), time.UTC) + offsetMinutes := int16(naiveUtc.Sub(t).Minutes()) + // SAFETY: unit MUST have been validated to a valid arrow.TimeUnit value before + // this function. Otherwise, ignoring this error is not safe. + timestamp, _ := arrow.TimestampFromTime(utc, unit) + return timestamp, offsetMinutes +} + +// Get the raw arrow values at the given index +// +// SAFETY: the value at i must not be nil +func (a *TimestampWithOffsetArray) rawValueUnsafe(i int) (arrow.Timestamp, int16, arrow.TimeUnit) { + structs := a.Storage().(*array.Struct) + + timestampField := structs.Field(0) + timestamps := timestampField.(*array.Timestamp) + offsets := structs.Field(1).(*array.Int16) + + timeUnit := timestampField.DataType().(*arrow.TimestampType).Unit + utcTimestamp := timestamps.Value(i) + offsetMinutes := offsets.Value(i) + + return utcTimestamp, offsetMinutes, timeUnit +} + +func (a *TimestampWithOffsetArray) Value(i int) time.Time { + if a.IsNull(i) { + return time.Unix(0, 0) + } + utcTimestamp, offsetMinutes, timeUnit := a.rawValueUnsafe(i) + return timeFromFieldValues(utcTimestamp, offsetMinutes, timeUnit) +} + +func (a *TimestampWithOffsetArray) Values() []time.Time { + values := make([]time.Time, a.Len()) + for i := range a.Len() { + val := a.Value(i) + values[i] = val + } + return values +} + +func (a *TimestampWithOffsetArray) ValueStr(i int) string { + switch { + case a.IsNull(i): + return array.NullValueStr + default: + return a.Value(i).String() + } +} + +func (a *TimestampWithOffsetArray) MarshalJSON() ([]byte, error) { + values := make([]interface{}, a.Len()) + for i := 0; i < a.Len(); i++ { + if a.IsValid(i) { + utcTimestamp, offsetMinutes, timeUnit := a.rawValueUnsafe(i) + values[i] = timeFromFieldValues(utcTimestamp, offsetMinutes, timeUnit) + } else { + values[i] = nil + } + } + return json.Marshal(values) +} + +func (a *TimestampWithOffsetArray) GetOneForMarshal(i int) interface{} { + if a.IsNull(i) { + return nil + } + return a.Value(i) +} + +// TimestampWithOffsetBuilder is a convenience builder for the TimestampWithOffset extension type, +// allowing arrays to be built with boolean values rather than the underlying storage type. +type TimestampWithOffsetBuilder struct { + *array.ExtensionBuilder + + // The layout used to parse any timestamps from strings. Defaults to time.RFC3339 + Layout string + unit arrow.TimeUnit +} + +// NewTimestampWithOffsetBuilder creates a new TimestampWithOffsetBuilder, exposing a convenient and efficient interface +// for writing time.Time values to the underlying storage array. +func NewTimestampWithOffsetBuilder(mem memory.Allocator, unit arrow.TimeUnit) *TimestampWithOffsetBuilder { + return &TimestampWithOffsetBuilder{ + unit: unit, + Layout: time.RFC3339, + ExtensionBuilder: array.NewExtensionBuilder(mem, NewTimestampWithOffsetType(unit)), + } +} + +func (b *TimestampWithOffsetBuilder) Append(v time.Time) { + timestamp, offsetMinutes := fieldValuesFromTime(v, b.unit) + structBuilder := b.ExtensionBuilder.Builder.(*array.StructBuilder) + + structBuilder.Append(true) + structBuilder.FieldBuilder(0).(*array.TimestampBuilder).Append(timestamp) + structBuilder.FieldBuilder(1).(*array.Int16Builder).Append(int16(offsetMinutes)) +} + +func (b *TimestampWithOffsetBuilder) UnsafeAppend(v time.Time) { + timestamp, offsetMinutes := fieldValuesFromTime(v, b.unit) + structBuilder := b.ExtensionBuilder.Builder.(*array.StructBuilder) + + structBuilder.Append(true) + structBuilder.FieldBuilder(0).(*array.TimestampBuilder).UnsafeAppend(timestamp) + structBuilder.FieldBuilder(1).(*array.Int16Builder).UnsafeAppend(int16(offsetMinutes)) +} + +// By default, this will try to parse the string using the RFC3339 layout. +// +// You can change the default layout by using builder.SetLayout() +func (b *TimestampWithOffsetBuilder) AppendValueFromString(s string) error { + if s == array.NullValueStr { + b.AppendNull() + return nil + } + + parsed, err := time.Parse(b.Layout, s) + if err != nil { + return err + } + + b.Append(parsed) + return nil +} + +func (b *TimestampWithOffsetBuilder) AppendValues(values []time.Time, valids []bool) { + structBuilder := b.ExtensionBuilder.Builder.(*array.StructBuilder) + timestamps := structBuilder.FieldBuilder(0).(*array.TimestampBuilder); + offsets := structBuilder.FieldBuilder(1).(*array.Int16Builder); + + structBuilder.AppendValues(valids) + + for i, v := range values { + timestamp, offsetMinutes := fieldValuesFromTime(v, b.unit) + + // SAFETY: by this point we know all buffers have available space given the earlier + // call to structBuilder.AppendValues which calls Reserve internally + if valids[i] { + timestamps.UnsafeAppend(timestamp) + offsets.UnsafeAppend(offsetMinutes) + } else { + timestamps.UnsafeAppendBoolToBitmap(false) + offsets.UnsafeAppendBoolToBitmap(false) + } + } +} + +func (b *TimestampWithOffsetBuilder) UnmarshalOne(dec *json.Decoder) error { + tok, err := dec.Token() + if err != nil { + return fmt.Errorf("failed to decode json: %w", err) + } + + switch raw := tok.(type) { + case string: + t, err := time.Parse(b.Layout, raw) + if err != nil { + return fmt.Errorf("failed to parse string \"%s\" as time.Time using layout \"%s\"", raw, b.Layout) + } + b.Append(t) + case nil: + b.AppendNull() + default: + return fmt.Errorf("expected date string") + } + + return nil +} + +func (b *TimestampWithOffsetBuilder) Unmarshal(dec *json.Decoder) error { + for dec.More() { + if err := b.UnmarshalOne(dec); err != nil { + return err + } + } + return nil +} + +var ( + _ arrow.ExtensionType = (*TimestampWithOffsetType)(nil) + _ array.CustomExtensionBuilder = (*TimestampWithOffsetType)(nil) + _ array.ExtensionArray = (*TimestampWithOffsetArray)(nil) + _ array.Builder = (*TimestampWithOffsetBuilder)(nil) +) diff --git a/arrow/extensions/timestamp_with_offset_test.go b/arrow/extensions/timestamp_with_offset_test.go new file mode 100644 index 00000000..9c198afd --- /dev/null +++ b/arrow/extensions/timestamp_with_offset_test.go @@ -0,0 +1,209 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package extensions_test + +import ( + "bytes" + _ "bytes" + "fmt" + "strings" + _ "strings" + "testing" + "time" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/extensions" + "github.com/apache/arrow-go/v18/arrow/ipc" + _ "github.com/apache/arrow-go/v18/arrow/ipc" + "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/apache/arrow-go/v18/internal/json" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var testTimeUnit = arrow.Microsecond + +var testDate1 = time.Date(2025, 01, 01, 00, 00, 00, 00, time.FixedZone("UTC+00:00", 0)) + +var testZone1 = time.FixedZone("UTC-08:00", -8*60*60) +var testDate2 = testDate1.In(testZone1) + +var testZone2 = time.FixedZone("UTC+06:00", +6*60*60) +var testDate3 = testDate1.In(testZone2) + +func TestTimestampWithOffsetTypeBasics(t *testing.T) { + typ := extensions.NewTimestampWithOffsetType(testTimeUnit) + + assert.Equal(t, "arrow.timestamp_with_offset", typ.ExtensionName()) + assert.True(t, typ.ExtensionEquals(typ)) + + assert.True(t, arrow.TypeEqual(typ, typ)) + assert.True(t, arrow.TypeEqual( + arrow.StructOf( + arrow.Field{ + Name: "timestamp", + Type: &arrow.TimestampType{ + Unit: testTimeUnit, + TimeZone: "UTC", + }, + Nullable: false, + }, + arrow.Field{ + Name: "offset_minutes", + Type: arrow.PrimitiveTypes.Int16, + Nullable: false, + }, + ), + typ.StorageType())) + + assert.Equal(t, "extension", typ.String()) +} + +func TestTimestampWithOffsetExtensionBuilder(t *testing.T) { + mem := memory.NewCheckedAllocator(memory.DefaultAllocator) + defer mem.AssertSize(t, 0) + + builder := extensions.NewTimestampWithOffsetBuilder(mem, testTimeUnit) + builder.Append(testDate1) + builder.AppendNull() + builder.Append(testDate2) + builder.Append(testDate3) + + // it should build the array with the correct size + arr := builder.NewArray() + typedArr := arr.(*extensions.TimestampWithOffsetArray) + assert.Equal(t, 4, arr.Data().Len()) + defer arr.Release() + + // typedArr.Value(i) should return values adjusted for their original timezone + assert.Equal(t, testDate1, typedArr.Value(0)) + assert.Equal(t, testDate2, typedArr.Value(2)) + assert.Equal(t, testDate3, typedArr.Value(3)) + + // storage TimeUnit should be the same as we pass in to the builder, and storage timezone should be UTC + timestampStructField := typedArr.Storage().(*array.Struct).Field(0) + timestampStructDataType := timestampStructField.DataType().(*arrow.TimestampType) + assert.Equal(t, timestampStructDataType.Unit, testTimeUnit) + assert.Equal(t, timestampStructDataType.TimeZone, "UTC") + + // stored values should be equivalent to the raw values in UTC + timestampsArr := timestampStructField.(*array.Timestamp) + assert.Equal(t, testDate1.In(time.UTC), timestampsArr.Value(0).ToTime(testTimeUnit)) + assert.Equal(t, testDate2.In(time.UTC), timestampsArr.Value(2).ToTime(testTimeUnit)) + assert.Equal(t, testDate3.In(time.UTC), timestampsArr.Value(3).ToTime(testTimeUnit)) + + // the array should encode itself as JSON and string + arrStr := arr.String() + assert.Equal(t, fmt.Sprintf(`["%[1]s" (null) "%[2]s" "%[3]s"]`, testDate1, testDate2, testDate3), arrStr) + jsonStr, err := json.Marshal(arr) + assert.NoError(t, err) + + // roundtripping from JSON with array.FromJSON should work + roundtripped, _, err := array.FromJSON(mem, extensions.NewTimestampWithOffsetType(testTimeUnit), bytes.NewReader(jsonStr)) + defer roundtripped.Release() + assert.NoError(t, err) + assert.Truef(t, array.Equal(arr, roundtripped), "expected %s got %s", arr, roundtripped) +} + +func TestTimestampWithOffsetExtensionRecordBuilder(t *testing.T) { + schema := arrow.NewSchema([]arrow.Field{ + { + Name: "timestamp_with_offset", + Nullable: true, + Type: extensions.NewTimestampWithOffsetType(testTimeUnit), + }, + }, nil) + builder := array.NewRecordBuilder(memory.DefaultAllocator, schema) + defer builder.Release() + + fieldBuilder := builder.Field(0).(*extensions.TimestampWithOffsetBuilder) + + // append a simple time.Time + fieldBuilder.Append(testDate1) + + // append a null and 2 time.Time all at once + values := []time.Time{ + time.Unix(0, 0).In(time.UTC), + testDate2, + testDate3, + } + valids := []bool{false, true, true} + fieldBuilder.AppendValues(values, valids) + + // append a value from RFC3339 string + fieldBuilder.AppendValueFromString(testDate1.Format(time.RFC3339)) + + // append value formatted in a different string layout + fieldBuilder.Layout = time.RFC3339Nano + fieldBuilder.AppendValueFromString(testDate2.Format(time.RFC3339Nano)) + + record := builder.NewRecordBatch() + + // Record batch should JSON-encode values containing per-row timezone info + json, err := record.MarshalJSON() + require.NoError(t, err) + expect := `[{"timestamp_with_offset":"2025-01-01T00:00:00Z"} +,{"timestamp_with_offset":null} +,{"timestamp_with_offset":"2024-12-31T16:00:00-08:00"} +,{"timestamp_with_offset":"2025-01-01T06:00:00+06:00"} +,{"timestamp_with_offset":"2025-01-01T00:00:00Z"} +,{"timestamp_with_offset":"2024-12-31T16:00:00-08:00"} +]` + require.Equal(t, expect, string(json)) + + // Record batch roundtrip to JSON should work + roundtripped, _, err := array.RecordFromJSON(memory.DefaultAllocator, schema, bytes.NewReader(json)) + require.NoError(t, err) + defer roundtripped.Release() + require.Equal(t, schema, roundtripped.Schema()) + assert.Truef(t, array.RecordEqual(record, roundtripped), "expected %s\n\ngot %s", record, roundtripped) +} + +func TestTimestampWithOffsetTypeBatchIPCRoundTrip(t *testing.T) { + raw := `["2025-01-01T00:00:00Z",null,"2024-12-31T16:00:00-08:00","2025-01-01T06:00:00+06:00","2025-01-01T00:00:00Z","2024-12-31T16:00:00-08:00"]` + typ := extensions.NewTimestampWithOffsetType(testTimeUnit) + + arr, _, err := array.FromJSON(memory.DefaultAllocator, typ, strings.NewReader(raw)) + require.NoError(t, err) + defer arr.Release() + + batch := array.NewRecordBatch(arrow.NewSchema([]arrow.Field{{Name: "timestamp_with_offset", Type: typ, Nullable: true}}, nil), []arrow.Array{arr}, -1) + defer batch.Release() + + var written arrow.RecordBatch + { + var buf bytes.Buffer + wr := ipc.NewWriter(&buf, ipc.WithSchema(batch.Schema())) + require.NoError(t, wr.Write(batch)) + require.NoError(t, wr.Close()) + + rdr, err := ipc.NewReader(&buf) + require.NoError(t, err) + written, err = rdr.Read() + require.NoError(t, err) + written.Retain() + defer written.Release() + rdr.Release() + } + + assert.Truef(t, batch.Schema().Equal(written.Schema()), "expected: %s\n\ngot: %s", + batch.Schema(), written.Schema()) + + assert.Truef(t, array.RecordEqual(batch, written), "expected: %s\n\ngot: %s", + batch, written) +}