diff --git a/flow/connectors/mysql/qvalue_convert.go b/flow/connectors/mysql/qvalue_convert.go index a23a32a47f..7e9ef5f603 100644 --- a/flow/connectors/mysql/qvalue_convert.go +++ b/flow/connectors/mysql/qvalue_convert.go @@ -316,7 +316,7 @@ func QValueFromMysqlFieldValue(qkind types.QValueKind, mytype byte, fv mysql.Fie if strings.HasPrefix(unsafeString, "0000-00-00") { return types.QValueTimestamp{Val: time.Unix(0, 0)}, nil } - val, err := time.Parse("2006-01-02 15:04:05.999999", unsafeString) + val, err := time.Parse("2006-01-02 15:04:05.999999", strings.ReplaceAll(unsafeString, "-00", "-01")) if err != nil { return nil, err } @@ -334,7 +334,7 @@ func QValueFromMysqlFieldValue(qkind types.QValueKind, mytype byte, fv mysql.Fie if unsafeString == "0000-00-00" { return types.QValueDate{Val: time.Unix(0, 0)}, nil } - val, err := time.Parse(time.DateOnly, unsafeString) + val, err := time.Parse(time.DateOnly, strings.ReplaceAll(unsafeString, "-00", "-01")) if err != nil { return nil, err } @@ -469,13 +469,13 @@ func QValueFromMysqlRowEvent( return types.QValueTime{Val: tm}, nil case types.QValueKindDate: if val == "0000-00-00" { - return types.QValueDate{Val: time.Unix(0, 0)}, nil + return types.QValueDate{Val: time.Unix(0, 0).UTC()}, nil } - val, err := time.Parse(time.DateOnly, val) + val, err := time.Parse(time.DateOnly, strings.ReplaceAll(val, "-00", "-01")) if err != nil { return nil, err } - return types.QValueDate{Val: val}, nil + return types.QValueDate{Val: val.UTC()}, nil case types.QValueKindTimestamp: // 0000-00-00 ends up here if mytype == mysql.MYSQL_TYPE_TIME || mytype == mysql.MYSQL_TYPE_TIME2 { tm, err := processTime(val) @@ -485,13 +485,13 @@ func QValueFromMysqlRowEvent( return types.QValueTimestamp{Val: time.Unix(0, 0).UTC().Add(tm)}, nil } if strings.HasPrefix(val, "0000-00-00") { - return types.QValueTimestamp{Val: time.Unix(0, 0)}, nil + return types.QValueTimestamp{Val: time.Unix(0, 0).UTC()}, nil } - tm, err := time.Parse("2006-01-02 15:04:05.999999", val) + tm, err := time.Parse("2006-01-02 15:04:05.999999", strings.ReplaceAll(val, "-00", "-01")) if err != nil { return nil, err } - return types.QValueTimestamp{Val: tm}, nil + return types.QValueTimestamp{Val: tm.UTC()}, nil } } return nil, fmt.Errorf("unexpected type %T for mysql type %d, qkind %s", val, mytype, qkind) diff --git a/flow/connectors/postgres/qrep.go b/flow/connectors/postgres/qrep.go index 09e85faf86..7a088426ec 100644 --- a/flow/connectors/postgres/qrep.go +++ b/flow/connectors/postgres/qrep.go @@ -689,8 +689,7 @@ func (c *PostgresConnector) IsQRepPartitionSynced(ctx context.Context, // prepare and execute the query var result bool - err := c.conn.QueryRow(ctx, queryString, req.PartitionId).Scan(&result) - if err != nil { + if err := c.conn.QueryRow(ctx, queryString, req.PartitionId).Scan(&result); err != nil { return false, fmt.Errorf("failed to execute query: %w", err) } diff --git a/flow/e2e/clickhouse/mysql_test.go b/flow/e2e/clickhouse/mysql_test.go index d85ea13f64..0e2864f7b3 100644 --- a/flow/e2e/clickhouse/mysql_test.go +++ b/flow/e2e/clickhouse/mysql_test.go @@ -88,7 +88,7 @@ func (s ClickHouseSuite) Test_MySQL_Time() { require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf(`INSERT INTO %s ("key",d,dt,tm,t) VALUES ('init','1935-01-01','1953-02-02 12:01:02','1973-02-02 13:01:02.123','14:21.654321'), ('init','0000-00-00','0000-00-00 00:00:00','0000-00-00 00:00:00.000','00:00'), - ('init','0000-00-00','0000-00-00 00:00:00','0000-00-00 00:00:00.000','-800:0:1')`, + ('init','2000-01-00','2000-00-01 00:00:00','2000-01-01 00:00:00.000','-800:0:1')`, quotedSrcFullName))) connectionGen := e2e.FlowConnectionGenerationConfig{ @@ -108,7 +108,7 @@ func (s ClickHouseSuite) Test_MySQL_Time() { require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf(`INSERT INTO %s ("key",d,dt,tm,t) VALUES ('cdc','1935-01-01','1953-02-02 12:01:02','1973-02-02 13:01:02.123','14:21.654321'), ('cdc','0000-00-00','0000-00-00 00:00:00','0000-00-00 00:00:00.000','00:00'), - ('cdc','0000-00-00','0000-00-00 00:00:00','0000-00-00 00:00:00.000','-800:0:1')`, + ('cdc','2000-01-00','2000-00-01 00:00:00','2000-01-01 00:00:00.000','-800:0:1')`, quotedSrcFullName))) e2e.EnvWaitForEqualTablesWithNames(env, s, "waiting on cdc", srcTableName, dstTableName, "id,\"key\",d,dt,tm,t") diff --git a/flow/go.mod b/flow/go.mod index 1aca06874c..4f387c9f78 100644 --- a/flow/go.mod +++ b/flow/go.mod @@ -30,7 +30,7 @@ require ( github.com/aws/smithy-go v1.22.4 github.com/cockroachdb/pebble v1.1.5 github.com/elastic/go-elasticsearch/v8 v8.18.1 - github.com/go-mysql-org/go-mysql v1.12.0 + github.com/go-mysql-org/go-mysql v1.12.1-0.20250706035254-4a082cf9bd9a github.com/google/uuid v1.6.0 github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.0 github.com/hamba/avro/v2 v2.29.0 @@ -89,7 +89,6 @@ require ( github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.29.0 // indirect github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.53.0 // indirect github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.53.0 // indirect - github.com/Masterminds/semver v1.5.0 // indirect github.com/VividCortex/ewma v1.2.0 // indirect github.com/apache/arrow-go/v18 v18.3.1 // indirect github.com/apache/arrow/go/v15 v15.0.2 // indirect diff --git a/flow/go.sum b/flow/go.sum index 2c66571b73..77c0231529 100644 --- a/flow/go.sum +++ b/flow/go.sum @@ -82,8 +82,6 @@ github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/cloudmock v0 github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/cloudmock v0.53.0/go.mod h1:jUZ5LYlw40WMd07qxcQJD5M40aUxrfwqQX1g7zxYnrQ= github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.53.0 h1:Ron4zCA/yk6U7WOBXhTJcDpsUBG9npumK6xw2auFltQ= github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.53.0/go.mod h1:cSgYe11MCNYunTnRXrKiR/tHc0eoKjICUuWpNZoVCOo= -github.com/Masterminds/semver v1.5.0 h1:H65muMkzWKEuNDnfl9d70GUjFniHKHRbFPGBuZ3QEww= -github.com/Masterminds/semver v1.5.0/go.mod h1:MB6lktGJrhw8PrUyiEoblNEGEQ+RzHPF078ddwwvV3Y= github.com/PeerDB-io/glua64 v1.0.1 h1:biXLlFF/L5pnJCwDon7hkWkuQPozC8NjKS3J7Wzi69I= github.com/PeerDB-io/glua64 v1.0.1/go.mod h1:UHmAhniv61bJPMhQvxkpC7jXbn353dSbQviu83bgQVg= github.com/PeerDB-io/gluabit32 v1.0.2 h1:AGI1Z7dwDVotakpuOOuyTX4/QGi5HUYsipL/VfodmO4= @@ -278,8 +276,8 @@ github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= -github.com/go-mysql-org/go-mysql v1.12.0 h1:tyToNggfCfl11OY7GbWa2Fq3ofyScO9GY8b5f5wAmE4= -github.com/go-mysql-org/go-mysql v1.12.0/go.mod h1:/XVjs1GlT6NPSf13UgXLv/V5zMNricTCqeNaehSBghs= +github.com/go-mysql-org/go-mysql v1.12.1-0.20250706035254-4a082cf9bd9a h1:xmHPT1ElX3AzFn8uSWlsJArgcjPjTt/KFFsaJe28qBY= +github.com/go-mysql-org/go-mysql v1.12.1-0.20250706035254-4a082cf9bd9a/go.mod h1:FQxw17uRbFvMZFK+dPtIPufbU46nBdrGaxOw0ac9MFs= github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-ole/go-ole v1.3.0 h1:Dt6ye7+vXGIKZ7Xtk4s6/xVdGDQynvom7xCFEdWr6uE= github.com/go-ole/go-ole v1.3.0/go.mod h1:5LS6F96DhAwUc7C+1HLexzMXY1xGRSryjyPPKW6zv78=