diff --git a/flow/connectors/postgres/qrep_query_executor.go b/flow/connectors/postgres/qrep_query_executor.go index 3969d1b45d..38fa47c683 100644 --- a/flow/connectors/postgres/qrep_query_executor.go +++ b/flow/connectors/postgres/qrep_query_executor.go @@ -5,7 +5,6 @@ import ( "encoding/json" "fmt" "log/slog" - "maps" "slices" "github.com/jackc/pgx/v5" @@ -68,18 +67,13 @@ func (qe *QRepQueryExecutor) cursorToSchema( tx pgx.Tx, cursorName string, ) (types.QRecordSchema, error) { - type attId struct { - relid uint32 - num uint16 - } - rows, err := tx.Query(ctx, "FETCH 0 FROM "+cursorName) if err != nil { return types.QRecordSchema{}, fmt.Errorf("failed to fetch 0 for field descriptions: %w", err) } + defer rows.Close() fds := rows.FieldDescriptions() tableOIDset := make(map[uint32]struct{}) - nullPointers := make(map[attId]*bool, len(fds)) qfields := make([]types.QField, len(fds)) for i, fd := range fds { tableOIDset[fd.TableOID] = struct{}{} @@ -89,7 +83,7 @@ func (qe *QRepQueryExecutor) cursorToSchema( qfields[i] = types.QField{ Name: fd.Name, Type: ctype, - Nullable: false, + Nullable: true, Precision: precision, Scale: scale, } @@ -97,30 +91,9 @@ func (qe *QRepQueryExecutor) cursorToSchema( qfields[i] = types.QField{ Name: fd.Name, Type: ctype, - Nullable: false, + Nullable: true, } } - nullPointers[attId{ - relid: fd.TableOID, - num: fd.TableAttributeNumber, - }] = &qfields[i].Nullable - } - rows.Close() - tableOIDs := slices.Collect(maps.Keys(tableOIDset)) - - rows, err = tx.Query(ctx, "SELECT a.attrelid,a.attnum FROM pg_attribute a WHERE a.attrelid = ANY($1) AND NOT a.attnotnull", tableOIDs) - if err != nil { - return types.QRecordSchema{}, fmt.Errorf("failed to query schema for field descriptions: %w", err) - } - - var att attId - if _, err := pgx.ForEachRow(rows, []any{&att.relid, &att.num}, func() error { - if nullPointer, ok := nullPointers[att]; ok { - *nullPointer = true - } - return nil - }); err != nil { - return types.QRecordSchema{}, fmt.Errorf("failed to process schema for field descriptions: %w", err) } return types.NewQRecordSchema(qfields), nil