Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions flow/connectors/mysql/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ func QValueFromMysqlFieldValue(qkind types.QValueKind, mytype byte, fv mysql.Fie
if strings.HasPrefix(unsafeString, "0000-00-00") {
return types.QValueTimestamp{Val: time.Unix(0, 0)}, nil
}
val, err := time.Parse("2006-01-02 15:04:05.999999", unsafeString)
val, err := time.Parse("2006-01-02 15:04:05.999999", strings.ReplaceAll(unsafeString, "-00", "-01"))
if err != nil {
return nil, err
}
Expand All @@ -334,7 +334,7 @@ func QValueFromMysqlFieldValue(qkind types.QValueKind, mytype byte, fv mysql.Fie
if unsafeString == "0000-00-00" {
return types.QValueDate{Val: time.Unix(0, 0)}, nil
}
val, err := time.Parse(time.DateOnly, unsafeString)
val, err := time.Parse(time.DateOnly, strings.ReplaceAll(unsafeString, "-00", "-01"))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -469,13 +469,13 @@ func QValueFromMysqlRowEvent(
return types.QValueTime{Val: tm}, nil
case types.QValueKindDate:
if val == "0000-00-00" {
return types.QValueDate{Val: time.Unix(0, 0)}, nil
return types.QValueDate{Val: time.Unix(0, 0).UTC()}, nil
}
val, err := time.Parse(time.DateOnly, val)
val, err := time.Parse(time.DateOnly, strings.ReplaceAll(val, "-00", "-01"))
if err != nil {
return nil, err
}
return types.QValueDate{Val: val}, nil
return types.QValueDate{Val: val.UTC()}, nil
case types.QValueKindTimestamp: // 0000-00-00 ends up here
if mytype == mysql.MYSQL_TYPE_TIME || mytype == mysql.MYSQL_TYPE_TIME2 {
tm, err := processTime(val)
Expand All @@ -485,13 +485,13 @@ func QValueFromMysqlRowEvent(
return types.QValueTimestamp{Val: time.Unix(0, 0).UTC().Add(tm)}, nil
}
if strings.HasPrefix(val, "0000-00-00") {
return types.QValueTimestamp{Val: time.Unix(0, 0)}, nil
return types.QValueTimestamp{Val: time.Unix(0, 0).UTC()}, nil
}
tm, err := time.Parse("2006-01-02 15:04:05.999999", val)
tm, err := time.Parse("2006-01-02 15:04:05.999999", strings.ReplaceAll(val, "-00", "-01"))
if err != nil {
return nil, err
}
return types.QValueTimestamp{Val: tm}, nil
return types.QValueTimestamp{Val: tm.UTC()}, nil
}
}
return nil, fmt.Errorf("unexpected type %T for mysql type %d, qkind %s", val, mytype, qkind)
Expand Down
3 changes: 1 addition & 2 deletions flow/connectors/postgres/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,8 +689,7 @@ func (c *PostgresConnector) IsQRepPartitionSynced(ctx context.Context,

// prepare and execute the query
var result bool
err := c.conn.QueryRow(ctx, queryString, req.PartitionId).Scan(&result)
if err != nil {
if err := c.conn.QueryRow(ctx, queryString, req.PartitionId).Scan(&result); err != nil {
return false, fmt.Errorf("failed to execute query: %w", err)
}

Expand Down
4 changes: 2 additions & 2 deletions flow/e2e/clickhouse/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (s ClickHouseSuite) Test_MySQL_Time() {
require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf(`INSERT INTO %s ("key",d,dt,tm,t) VALUES
('init','1935-01-01','1953-02-02 12:01:02','1973-02-02 13:01:02.123','14:21.654321'),
('init','0000-00-00','0000-00-00 00:00:00','0000-00-00 00:00:00.000','00:00'),
('init','0000-00-00','0000-00-00 00:00:00','0000-00-00 00:00:00.000','-800:0:1')`,
('init','2000-01-00','2000-00-01 00:00:00','2000-01-01 00:00:00.000','-800:0:1')`,
quotedSrcFullName)))

connectionGen := e2e.FlowConnectionGenerationConfig{
Expand All @@ -108,7 +108,7 @@ func (s ClickHouseSuite) Test_MySQL_Time() {
require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf(`INSERT INTO %s ("key",d,dt,tm,t) VALUES
('cdc','1935-01-01','1953-02-02 12:01:02','1973-02-02 13:01:02.123','14:21.654321'),
('cdc','0000-00-00','0000-00-00 00:00:00','0000-00-00 00:00:00.000','00:00'),
('cdc','0000-00-00','0000-00-00 00:00:00','0000-00-00 00:00:00.000','-800:0:1')`,
('cdc','2000-01-00','2000-00-01 00:00:00','2000-01-01 00:00:00.000','-800:0:1')`,
quotedSrcFullName)))

e2e.EnvWaitForEqualTablesWithNames(env, s, "waiting on cdc", srcTableName, dstTableName, "id,\"key\",d,dt,tm,t")
Expand Down
3 changes: 1 addition & 2 deletions flow/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ require (
github.com/aws/smithy-go v1.22.4
github.com/cockroachdb/pebble v1.1.5
github.com/elastic/go-elasticsearch/v8 v8.18.1
github.com/go-mysql-org/go-mysql v1.12.0
github.com/go-mysql-org/go-mysql v1.12.1-0.20250706035254-4a082cf9bd9a
github.com/google/uuid v1.6.0
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.0
github.com/hamba/avro/v2 v2.29.0
Expand Down Expand Up @@ -89,7 +89,6 @@ require (
github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.29.0 // indirect
github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.53.0 // indirect
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.53.0 // indirect
github.com/Masterminds/semver v1.5.0 // indirect
github.com/VividCortex/ewma v1.2.0 // indirect
github.com/apache/arrow-go/v18 v18.3.1 // indirect
github.com/apache/arrow/go/v15 v15.0.2 // indirect
Expand Down
6 changes: 2 additions & 4 deletions flow/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,6 @@ github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/cloudmock v0
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/cloudmock v0.53.0/go.mod h1:jUZ5LYlw40WMd07qxcQJD5M40aUxrfwqQX1g7zxYnrQ=
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.53.0 h1:Ron4zCA/yk6U7WOBXhTJcDpsUBG9npumK6xw2auFltQ=
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.53.0/go.mod h1:cSgYe11MCNYunTnRXrKiR/tHc0eoKjICUuWpNZoVCOo=
github.com/Masterminds/semver v1.5.0 h1:H65muMkzWKEuNDnfl9d70GUjFniHKHRbFPGBuZ3QEww=
github.com/Masterminds/semver v1.5.0/go.mod h1:MB6lktGJrhw8PrUyiEoblNEGEQ+RzHPF078ddwwvV3Y=
github.com/PeerDB-io/glua64 v1.0.1 h1:biXLlFF/L5pnJCwDon7hkWkuQPozC8NjKS3J7Wzi69I=
github.com/PeerDB-io/glua64 v1.0.1/go.mod h1:UHmAhniv61bJPMhQvxkpC7jXbn353dSbQviu83bgQVg=
github.com/PeerDB-io/gluabit32 v1.0.2 h1:AGI1Z7dwDVotakpuOOuyTX4/QGi5HUYsipL/VfodmO4=
Expand Down Expand Up @@ -278,8 +276,8 @@ github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-mysql-org/go-mysql v1.12.0 h1:tyToNggfCfl11OY7GbWa2Fq3ofyScO9GY8b5f5wAmE4=
github.com/go-mysql-org/go-mysql v1.12.0/go.mod h1:/XVjs1GlT6NPSf13UgXLv/V5zMNricTCqeNaehSBghs=
github.com/go-mysql-org/go-mysql v1.12.1-0.20250706035254-4a082cf9bd9a h1:xmHPT1ElX3AzFn8uSWlsJArgcjPjTt/KFFsaJe28qBY=
github.com/go-mysql-org/go-mysql v1.12.1-0.20250706035254-4a082cf9bd9a/go.mod h1:FQxw17uRbFvMZFK+dPtIPufbU46nBdrGaxOw0ac9MFs=
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-ole/go-ole v1.3.0 h1:Dt6ye7+vXGIKZ7Xtk4s6/xVdGDQynvom7xCFEdWr6uE=
github.com/go-ole/go-ole v1.3.0/go.mod h1:5LS6F96DhAwUc7C+1HLexzMXY1xGRSryjyPPKW6zv78=
Expand Down
Loading