Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,13 @@ const (
bool_or(NOT pg_attribute.attnotnull): true if ANY column is nullable
*/
selectIndexes = `
SELECT
SELECT
index_relations.relname as index_name,
pg_am.amname as index_type,
pg_index.indisunique as unique,
array_agg(
CASE WHEN pg_index.indkey[pos] != 0
THEN pg_attribute.attname
CASE WHEN pg_index.indkey[pos] != 0
THEN pg_attribute.attname
END ORDER BY pos
) FILTER (WHERE pg_index.indkey[pos] != 0) as column_names,
array_agg(
Expand All @@ -114,11 +114,11 @@ const (
) FILTER (WHERE pg_index.indkey[pos] = 0) as expressions,
COALESCE(bool_or(NOT pg_attribute.attnotnull), false) as has_nullable_column
FROM pg_class table_relations
JOIN pg_index ON table_relations.oid = pg_index.indrelid
JOIN pg_index ON table_relations.oid = pg_index.indrelid
JOIN pg_class index_relations ON index_relations.oid = pg_index.indexrelid
JOIN pg_am ON index_relations.relam = pg_am.oid
JOIN pg_am ON index_relations.relam = pg_am.oid
JOIN generate_subscripts(pg_index.indkey, 1) AS pos ON true
LEFT JOIN pg_attribute ON table_relations.oid = pg_attribute.attrelid
LEFT JOIN pg_attribute ON table_relations.oid = pg_attribute.attrelid
AND pg_attribute.attnum = pg_index.indkey[pos]
AND NOT pg_attribute.attisdropped
WHERE table_relations.relname = $2
Expand Down Expand Up @@ -252,7 +252,7 @@ func (c *SchemaDetails) extractNames(ctx context.Context) error {
for schemaRs.Next() {
var schema string
if err := schemaRs.Scan(&schema); err != nil {
level.Error(c.logger).Log("msg", "failed to scan pg_namespace", "database", dbName, "err", err)
level.Error(c.logger).Log("msg", "failed to scan pg_namespace", "datname", dbName, "err", err)
break
}
schemas = append(schemas, schema)
Expand All @@ -269,7 +269,7 @@ func (c *SchemaDetails) extractNames(ctx context.Context) error {
}

if len(schemas) == 0 {
level.Info(c.logger).Log("msg", "no schema detected from pg_namespace", "database", dbName)
level.Info(c.logger).Log("msg", "no schema detected from pg_namespace", "datname", dbName)
return nil
}

Expand All @@ -278,15 +278,15 @@ func (c *SchemaDetails) extractNames(ctx context.Context) error {
for _, schema := range schemas {
rs, err := c.dbConnection.QueryContext(ctx, selectTableNames, schema)
if err != nil {
level.Error(c.logger).Log("msg", "failed to query tables", "database", dbName, "schema", schema, "err", err)
level.Error(c.logger).Log("msg", "failed to query tables", "datname", dbName, "schema", schema, "err", err)
break
}
defer rs.Close()

for rs.Next() {
var tableName string
if err := rs.Scan(&tableName); err != nil {
level.Error(c.logger).Log("msg", "failed to scan tables", "database", dbName, "schema", schema, "err", err)
level.Error(c.logger).Log("msg", "failed to scan tables", "datname", dbName, "schema", schema, "err", err)
break
}
tables = append(tables, &tableInfo{
Expand All @@ -309,14 +309,14 @@ func (c *SchemaDetails) extractNames(ctx context.Context) error {
}

if len(tables) == 0 {
level.Info(c.logger).Log("msg", "no tables detected from pg_tables", "database", dbName)
level.Info(c.logger).Log("msg", "no tables detected from pg_tables", "datname", dbName)
return nil
}

for _, table := range tables {
table, err = c.fetchTableDefinitions(ctx, table)
if err != nil {
level.Error(c.logger).Log("msg", "failed to get table definitions", "database", dbName, "schema", table.schema, "err", err)
level.Error(c.logger).Log("msg", "failed to get table definitions", "datname", dbName, "schema", table.schema, "err", err)
continue
}

Expand All @@ -336,13 +336,13 @@ func (c *SchemaDetails) extractNames(ctx context.Context) error {
func (c *SchemaDetails) fetchTableDefinitions(ctx context.Context, table *tableInfo) (*tableInfo, error) {
spec, err := c.fetchColumnsDefinitions(ctx, table.database, table.schema, table.tableName)
if err != nil {
level.Error(c.logger).Log("msg", "failed to analyze table spec", "database", table.database, "schema", table.schema, "table", table.tableName, "err", err)
level.Error(c.logger).Log("msg", "failed to analyze table spec", "datname", table.database, "schema", table.schema, "table", table.tableName, "err", err)
return table, err
}

jsonSpec, err := json.Marshal(spec)
if err != nil {
level.Error(c.logger).Log("msg", "failed to marshal table spec", "database", table.database, "schema", table.schema, "table", table.tableName, "err", err)
level.Error(c.logger).Log("msg", "failed to marshal table spec", "datname", table.database, "schema", table.schema, "table", table.tableName, "err", err)
return table, err
}
table.b64TableSpec = base64.StdEncoding.EncodeToString(jsonSpec)
Expand All @@ -354,7 +354,7 @@ func (c *SchemaDetails) fetchColumnsDefinitions(ctx context.Context, databaseNam
qualifiedTableName := fmt.Sprintf("%s.%s", schemaName, tableName)
colRS, err := c.dbConnection.QueryContext(ctx, selectColumnNames, qualifiedTableName)
if err != nil {
level.Error(c.logger).Log("msg", "failed to query table columns", "database", databaseName, "schema", schemaName, "table", tableName, "err", err)
level.Error(c.logger).Log("msg", "failed to query table columns", "datname", databaseName, "schema", schemaName, "table", tableName, "err", err)
return nil, err
}
defer colRS.Close()
Expand All @@ -366,7 +366,7 @@ func (c *SchemaDetails) fetchColumnsDefinitions(ctx context.Context, databaseNam
var columnDefault sql.NullString
var notNullable, isPrimaryKey bool
if err := colRS.Scan(&columnName, &columnType, &notNullable, &columnDefault, &identityGeneration, &isPrimaryKey); err != nil {
level.Error(c.logger).Log("msg", "failed to scan table columns", "database", databaseName, "schema", schemaName, "table", tableName, "err", err)
level.Error(c.logger).Log("msg", "failed to scan table columns", "datname", databaseName, "schema", schemaName, "table", tableName, "err", err)
return nil, err
}

Expand All @@ -391,13 +391,13 @@ func (c *SchemaDetails) fetchColumnsDefinitions(ctx context.Context, databaseNam
}

if err := colRS.Err(); err != nil {
level.Error(c.logger).Log("msg", "failed to iterate over table columns result set", "database", databaseName, "schema", schemaName, "table", tableName, "err", err)
level.Error(c.logger).Log("msg", "failed to iterate over table columns result set", "datname", databaseName, "schema", schemaName, "table", tableName, "err", err)
return nil, err
}

indexesRS, err := c.dbConnection.QueryContext(ctx, selectIndexes, schemaName, tableName)
if err != nil {
level.Error(c.logger).Log("msg", "failed to query indexes", "database", databaseName, "schema", schemaName, "table", tableName, "err", err)
level.Error(c.logger).Log("msg", "failed to query indexes", "datname", databaseName, "schema", schemaName, "table", tableName, "err", err)
return nil, err
}
defer indexesRS.Close()
Expand All @@ -408,7 +408,7 @@ func (c *SchemaDetails) fetchColumnsDefinitions(ctx context.Context, databaseNam
var columns, expressions pq.StringArray

if err := indexesRS.Scan(&indexName, &indexType, &unique, &columns, &expressions, &hasNullableColumn); err != nil {
level.Error(c.logger).Log("msg", "failed to scan indexes", "schema", schemaName, "table", tableName, "err", err)
level.Error(c.logger).Log("msg", "failed to scan indexes", "datname", databaseName, "schema", schemaName, "table", tableName, "err", err)
return nil, err
}

Expand All @@ -426,7 +426,7 @@ func (c *SchemaDetails) fetchColumnsDefinitions(ctx context.Context, databaseNam
}

if err := indexesRS.Err(); err != nil {
level.Error(c.logger).Log("msg", "error during iterating over indexes", "schema", schemaName, "table", tableName, "err", err)
level.Error(c.logger).Log("msg", "error during iterating over indexes", "datname", databaseName, "schema", schemaName, "table", tableName, "err", err)
return nil, err
}

Expand Down