Skip to content

Commit c3aab43

Browse files
authored
Do not use slow query from INFORMATION_SCHEMA (PeerDB-io#3573)
When the `INFORMATION_SCHEMA` table baloons it will lead to very slow query execution. In our case - the queries end up taking ~5mins, which drastically slow down the ingest. The proposal in this PR is to swap to using a `SHOW COLUMNS IN TABLE...` and then querying the result from that. There is a Snowflake documentation link which highlights this problem, and solution as well, https://community.snowflake.com/s/article/Select-the-list-of-columns-in-the-table-without-using-information-schema There is an alternative to continue using the `INFORMATION_SCHEMA` but have a cache for all the tables in the sync, I however, think that will be trickier to implement.
1 parent a799c6e commit c3aab43

File tree

2 files changed

+28
-12
lines changed

2 files changed

+28
-12
lines changed

flow/connectors/snowflake/qrep.go

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/aws/aws-sdk-go-v2/aws"
1111
"github.com/aws/aws-sdk-go-v2/service/s3"
1212
"github.com/jackc/pgx/v5/pgtype"
13+
"github.com/snowflakedb/gosnowflake"
1314

1415
"github.com/PeerDB-io/peerdb/flow/connectors/utils"
1516
"github.com/PeerDB-io/peerdb/flow/generated/protos"
@@ -182,14 +183,22 @@ func (c *SnowflakeConnector) getColsFromTable(ctx context.Context, tableName str
182183
return nil, fmt.Errorf("failed to parse table name: %w", err)
183184
}
184185

185-
rows, err := c.QueryContext(
186-
ctx,
187-
getTableSchemaSQL,
188-
strings.ToUpper(schemaTable.Schema),
189-
strings.ToUpper(schemaTable.Table),
190-
)
191-
if err != nil {
192-
return nil, fmt.Errorf("failed to execute query: %w", err)
186+
fq := fmt.Sprintf("%s.%s", strings.ToUpper(schemaTable.Schema), strings.ToUpper(schemaTable.Table))
187+
188+
channel := make(chan string, 1)
189+
ctxWithOpt := gosnowflake.WithQueryIDChan(ctx, channel)
190+
rows, err := c.QueryContext(ctxWithOpt, getTableColumnListSQL, fq)
191+
if err != nil || rows.Err() != nil {
192+
return nil, fmt.Errorf("failed to run getTableColumnList query: %w", err)
193+
}
194+
defer rows.Close()
195+
196+
qid := <-channel
197+
198+
rows, err = c.QueryContext(ctx, getTableSchemaSQL, qid)
199+
200+
if err != nil || rows.Err() != nil {
201+
return nil, fmt.Errorf("failed to run getTableSchema query: %w", err)
193202
}
194203
defer rows.Close()
195204

@@ -200,9 +209,13 @@ func (c *SnowflakeConnector) getColsFromTable(ctx context.Context, tableName str
200209
if err := rows.Scan(&colName, &colType, &numericPrecision, &numericScale); err != nil {
201210
return nil, fmt.Errorf("failed to scan row: %w", err)
202211
}
212+
parsedColType := colType.String
213+
if colType.String == "FIXED" {
214+
parsedColType = "NUMBER"
215+
}
203216
cols = append(cols, SnowflakeTableColumn{
204217
ColumnName: colName.String,
205-
ColumnType: colType.String,
218+
ColumnType: parsedColType,
206219
NumericPrecision: numericPrecision.Int32,
207220
NumericScale: numericScale.Int32,
208221
})

flow/connectors/snowflake/snowflake.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,12 @@ const (
6060
ARRAY_AGG(DISTINCT _PEERDB_UNCHANGED_TOAST_COLUMNS) FROM %s.%s WHERE
6161
_PEERDB_BATCH_ID = %d AND _PEERDB_RECORD_TYPE != 2
6262
GROUP BY _PEERDB_DESTINATION_TABLE_NAME`
63-
getTableSchemaSQL = `SELECT COLUMN_NAME, DATA_TYPE, NUMERIC_PRECISION, NUMERIC_SCALE FROM INFORMATION_SCHEMA.COLUMNS
64-
WHERE UPPER(TABLE_SCHEMA)=? AND UPPER(TABLE_NAME)=? ORDER BY ORDINAL_POSITION`
65-
63+
getTableColumnListSQL = `SHOW COLUMNS IN TABLE IDENTIFIER(?)`
64+
getTableSchemaSQL = `SELECT "column_name",
65+
parse_json("data_type"):type::string AS type,
66+
parse_json("data_type"):precision::string AS precision,
67+
parse_json("data_type"):scale::string AS scale
68+
FROM table(result_scan(?))`
6669
checkIfTableExistsSQL = `SELECT TO_BOOLEAN(COUNT(1)) FROM INFORMATION_SCHEMA.TABLES
6770
WHERE TABLE_SCHEMA=? and TABLE_NAME=?`
6871
dropTableIfExistsSQL = "DROP TABLE IF EXISTS %s.%s"

0 commit comments

Comments
 (0)