Skip to content

Commit dbef518

Browse files
committed
Add support for dbt with materialized views
1 parent 93b5087 commit dbef518

File tree

6 files changed

+147
-29
lines changed

6 files changed

+147
-29
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -324,8 +324,8 @@ SELECT * FROM [TABLE] WHERE [JSON_COLUMN]->>'[JSON_KEY]' = '[JSON_VALUE]';
324324
- [x] Packaging in a Docker image
325325
- [x] Table compaction without Trino as a dependency
326326
- [x] Materialized views
327+
- [x] Transformations with dbt ([#25](https://github.com/BemiHQ/BemiDB/issues/25))
327328
- [ ] Partitioned tables ([#15](https://github.com/BemiHQ/BemiDB/issues/15))
328-
- [ ] Transformations with dbt ([#25](https://github.com/BemiHQ/BemiDB/issues/25))
329329

330330
Are you looking for real-time data syncing? Check out [BemiDB Cloud](https://bemidb.com), our managed data platform.
331331

src/common/common_config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package common
22

33
const (
4-
VERSION = "1.1.0"
4+
VERSION = "1.2.0"
55

66
ENV_LOG_LEVEL = "BEMIDB_LOG_LEVEL"
77
ENV_DISABLE_ANONYMOUS_ANALYTICS = "BEMIDB_DISABLE_ANONYMOUS_ANALYTICS"

src/common/iceberg_catalog.go

Lines changed: 65 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -243,12 +243,24 @@ func (catalog *IcebergCatalog) DropTable(icebergSchemaTable IcebergSchemaTable)
243243
PanicIfError(catalog.Config, err)
244244
}
245245

246-
func (catalog *IcebergCatalog) CreateMaterializedView(icebergSchemaTable IcebergSchemaTable, definition string) error {
246+
func (catalog *IcebergCatalog) CreateMaterializedView(icebergSchemaTable IcebergSchemaTable, definition string, ifNotExists bool) error {
247247
pgClient := catalog.newPostgresClient()
248248
defer pgClient.Close()
249249

250+
exists, err := catalog.doesMaterializedViewExist(pgClient, icebergSchemaTable)
251+
if err != nil {
252+
return err
253+
}
254+
if exists {
255+
if ifNotExists {
256+
return nil
257+
} else {
258+
return fmt.Errorf("materialized view %s already exists", icebergSchemaTable.String())
259+
}
260+
}
261+
250262
ctx := context.Background()
251-
_, err := pgClient.Exec(
263+
_, err = pgClient.Exec(
252264
ctx,
253265
"INSERT INTO iceberg_materialized_views (schema_name, table_name, definition) VALUES ($1, $2, $3)",
254266
icebergSchemaTable.Schema, icebergSchemaTable.Table, definition,
@@ -257,24 +269,47 @@ func (catalog *IcebergCatalog) CreateMaterializedView(icebergSchemaTable Iceberg
257269
return err
258270
}
259271

260-
func (catalog *IcebergCatalog) DropMaterializedView(icebergSchemaTable IcebergSchemaTable) error {
272+
func (catalog *IcebergCatalog) RenameMaterializedView(icebergSchemaTable IcebergSchemaTable, newName string, missingOk bool) error {
261273
ctx := context.Background()
262-
263274
pgClient := catalog.newPostgresClient()
264275
defer pgClient.Close()
265276

266-
var exists bool
267-
err := pgClient.QueryRow(
277+
exists, err := catalog.doesMaterializedViewExist(pgClient, icebergSchemaTable)
278+
if err != nil {
279+
return err
280+
}
281+
if !exists {
282+
if missingOk {
283+
return nil
284+
} else {
285+
return fmt.Errorf("materialized view %s does not exist", icebergSchemaTable.String())
286+
}
287+
}
288+
289+
_, err = pgClient.Exec(
268290
ctx,
269-
"SELECT TRUE FROM iceberg_materialized_views WHERE schema_name=$1 AND table_name=$2",
270-
icebergSchemaTable.Schema, icebergSchemaTable.Table,
271-
).Scan(&exists)
291+
"UPDATE iceberg_materialized_views SET table_name=$1 WHERE schema_name=$2 AND table_name=$3",
292+
newName, icebergSchemaTable.Schema, icebergSchemaTable.Table,
293+
)
272294

295+
return err
296+
}
297+
298+
func (catalog *IcebergCatalog) DropMaterializedView(icebergSchemaTable IcebergSchemaTable, missingOk bool) error {
299+
ctx := context.Background()
300+
301+
pgClient := catalog.newPostgresClient()
302+
defer pgClient.Close()
303+
304+
exists, err := catalog.doesMaterializedViewExist(pgClient, icebergSchemaTable)
273305
if err != nil {
274-
if err.Error() == "no rows in result set" {
275-
return fmt.Errorf("materialized view %s does not exist", icebergSchemaTable.String())
306+
return err
307+
}
308+
if !exists {
309+
if missingOk {
310+
return nil
276311
} else {
277-
return fmt.Errorf("error checking materialized view existence: %w", err)
312+
return fmt.Errorf("materialized view %s does not exist", icebergSchemaTable.String())
278313
}
279314
}
280315

@@ -287,6 +322,24 @@ func (catalog *IcebergCatalog) DropMaterializedView(icebergSchemaTable IcebergSc
287322
return err
288323
}
289324

325+
func (catalog *IcebergCatalog) doesMaterializedViewExist(pgClient *PostgresClient, icebergSchemaTable IcebergSchemaTable) (bool, error) {
326+
var exists bool
327+
err := pgClient.QueryRow(
328+
context.Background(),
329+
"SELECT TRUE FROM iceberg_materialized_views WHERE schema_name=$1 AND table_name=$2",
330+
icebergSchemaTable.Schema, icebergSchemaTable.Table,
331+
).Scan(&exists)
332+
333+
if err != nil {
334+
if err.Error() == "no rows in result set" {
335+
return false, nil
336+
} else {
337+
return false, fmt.Errorf("error checking materialized view existence: %w", err)
338+
}
339+
}
340+
return exists, nil
341+
}
342+
290343
// ---------------------------------------------------------------------------------------------------------------------
291344

292345
func (catalog *IcebergCatalog) newPostgresClient() *PostgresClient {

src/server/iceberg_writer.go

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,19 @@ func NewIcebergWriter(config *Config, storageS3 *common.StorageS3, serverDuckdbC
2020
}
2121
}
2222

23-
func (writer *IcebergWriter) CreateMaterializedView(icebergSchemaTable common.IcebergSchemaTable, remappedDefinitionQuery string) error {
24-
return writer.IcebergCatalog.CreateMaterializedView(icebergSchemaTable, remappedDefinitionQuery)
23+
func (writer *IcebergWriter) CreateMaterializedView(icebergSchemaTable common.IcebergSchemaTable, remappedDefinitionQuery string, ifNotExists bool) error {
24+
return writer.IcebergCatalog.CreateMaterializedView(icebergSchemaTable, remappedDefinitionQuery, ifNotExists)
25+
}
26+
27+
func (writer *IcebergWriter) RenameMaterializedView(icebergSchemaTable common.IcebergSchemaTable, newName string, missingOk bool) error {
28+
err := writer.IcebergCatalog.RenameMaterializedView(icebergSchemaTable, newName, missingOk)
29+
if err != nil {
30+
return err
31+
}
32+
33+
icebergTable := common.NewIcebergTable(writer.Config.CommonConfig, writer.StorageS3, writer.ServerDuckdbClient, icebergSchemaTable)
34+
icebergTable.Rename(newName)
35+
return nil
2536
}
2637

2738
func (writer *IcebergWriter) RefreshMaterializedView(icebergSchemaTable common.IcebergSchemaTable, remappedDefinitionQuery string) error {
@@ -62,8 +73,8 @@ func (writer *IcebergWriter) RefreshMaterializedView(icebergSchemaTable common.I
6273
return nil
6374
}
6475

65-
func (writer *IcebergWriter) DropMaterializedView(icebergSchemaTable common.IcebergSchemaTable) error {
66-
err := writer.IcebergCatalog.DropMaterializedView(icebergSchemaTable)
76+
func (writer *IcebergWriter) DropMaterializedView(icebergSchemaTable common.IcebergSchemaTable, missingOk bool) error {
77+
err := writer.IcebergCatalog.DropMaterializedView(icebergSchemaTable, missingOk)
6778
if err != nil {
6879
return err
6980
}

src/server/query_handler_test.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,10 @@ func TestHandleQuery(t *testing.T) {
352352
"description": {"oid", "pnpubid", "pnnspid"},
353353
"types": {uint32ToString(pgtype.OIDOID), uint32ToString(pgtype.Int8OID), uint32ToString(pgtype.Int8OID)},
354354
},
355+
"SELECT * FROM pg_catalog.pg_rewrite": {
356+
"description": {"oid", "rulename", "ev_class", "ev_type", "ev_enabled", "is_instead", "ev_qual", "ev_action"},
357+
"types": {uint32ToString(pgtype.OIDOID), uint32ToString(pgtype.TextOID), uint32ToString(pgtype.Int8OID), uint32ToString(pgtype.TextOID), uint32ToString(pgtype.TextOID), uint32ToString(pgtype.BoolOID), uint32ToString(pgtype.TextOID), uint32ToString(pgtype.TextOID)},
358+
},
355359
"SELECT pubname, NULL, NULL FROM pg_catalog.pg_publication p JOIN pg_catalog.pg_publication_namespace pn ON p.oid = pn.pnpubid JOIN pg_catalog.pg_class pc ON pc.relnamespace = pn.pnnspid UNION SELECT pubname, pg_get_expr(pr.prqual, c.oid), (CASE WHEN pr.prattrs IS NOT NULL THEN (SELECT string_agg(attname, ', ') FROM pg_catalog.generate_series(0, pg_catalog.array_upper(pr.prattrs::pg_catalog.int2[], 1)) s, pg_catalog.pg_attribute WHERE attrelid = pr.prrelid AND attnum = prattrs[s]) ELSE NULL END) FROM pg_catalog.pg_publication p JOIN pg_catalog.pg_publication_rel pr ON p.oid = pr.prpubid JOIN pg_catalog.pg_class c ON c.oid = pr.prrelid UNION SELECT pubname, NULL, NULL FROM pg_catalog.pg_publication p ORDER BY 1": {
356360
"description": {"pubname", "NULL", "NULL"},
357361
"types": {uint32ToString(pgtype.TextOID), uint32ToString(pgtype.TextOID), uint32ToString(pgtype.TextOID)},
@@ -1310,6 +1314,11 @@ func TestHandleQuery(t *testing.T) {
13101314
"types": {uint32ToString(pgtype.Int4OID)},
13111315
"values": {"1"},
13121316
},
1317+
`WITH schema AS (SELECT pg_namespace.nspname AS name FROM pg_namespace WHERE nspname != 'information_schema' AND nspname NOT LIKE 'pg\_%') SELECT schema.name AS schema FROM schema GROUP BY schema ORDER BY schema LIMIT 1`: {
1318+
"description": {"schema_"},
1319+
"types": {uint32ToString(pgtype.TextOID)},
1320+
"values": {"postgres"},
1321+
},
13131322
})
13141323
})
13151324

src/server/query_remapper.go

Lines changed: 56 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -130,16 +130,17 @@ func (remapper *QueryRemapper) remapStatements(statements []*pgQuery.RawStmt) ([
130130
case node.GetTransactionStmt() != nil:
131131
statements[i] = NOOP_QUERY_TREE.Stmts[0]
132132

133-
// CREATE MATERIALIZED VIEW
133+
// CREATE MATERIALIZED VIEW [IF NOT EXISTS] AS ... [WITH NO DATA]
134134
case node.GetCreateTableAsStmt() != nil:
135135
err := remapper.createMaterializedView(node)
136136
if err != nil {
137137
return nil, err
138138
}
139139
statements[i] = NOOP_QUERY_TREE.Stmts[0]
140140

141-
// DROP MATERIALIZED VIEW
142-
case node.GetDropStmt() != nil && node.GetDropStmt().RemoveType == pgQuery.ObjectType_OBJECT_MATVIEW:
141+
// DROP MATERIALIZED VIEW [IF EXISTS]
142+
case node.GetDropStmt() != nil &&
143+
(node.GetDropStmt().RemoveType == pgQuery.ObjectType_OBJECT_TABLE || node.GetDropStmt().RemoveType == pgQuery.ObjectType_OBJECT_MATVIEW):
143144
err := remapper.dropMaterializedViewFromNode(node)
144145
if err != nil {
145146
return nil, err
@@ -154,6 +155,15 @@ func (remapper *QueryRemapper) remapStatements(statements []*pgQuery.RawStmt) ([
154155
}
155156
statements[i] = NOOP_QUERY_TREE.Stmts[0]
156157

158+
// ALTER TABLE [IF EXISTS] ... RENAME TO ...
159+
case node.GetRenameStmt() != nil &&
160+
(node.GetRenameStmt().RenameType == pgQuery.ObjectType_OBJECT_TABLE || node.GetRenameStmt().RenameType == pgQuery.ObjectType_OBJECT_MATVIEW):
161+
err := remapper.renameMaterializedViewFromNode(node)
162+
if err != nil {
163+
return nil, err
164+
}
165+
statements[i] = NOOP_QUERY_TREE.Stmts[0]
166+
157167
// Unsupported query
158168
default:
159169
common.LogDebug(remapper.config.CommonConfig, "Query tree:", stmt, node)
@@ -554,8 +564,10 @@ func (remapper *QueryRemapper) createMaterializedView(node *pgQuery.Node) error
554564
return fmt.Errorf("couldn't read definition of CREATE MATERIALIZED VIEW: %w", err)
555565
}
556566

567+
ifNotExists := node.GetCreateTableAsStmt().IfNotExists
568+
557569
// Store the materialized view in the catalog
558-
err = remapper.IcebergWriter.CreateMaterializedView(icebergSchemaTable, definition)
570+
err = remapper.IcebergWriter.CreateMaterializedView(icebergSchemaTable, definition, ifNotExists)
559571
if err != nil {
560572
if strings.HasPrefix(err.Error(), "ERROR: duplicate key value violates unique constraint") {
561573
return fmt.Errorf("relation %s already exists", icebergSchemaTable.String())
@@ -568,7 +580,7 @@ func (remapper *QueryRemapper) createMaterializedView(node *pgQuery.Node) error
568580
if !node.GetCreateTableAsStmt().Into.SkipData {
569581
queryStatements, _, err := remapper.ParseAndRemapQuery(definition)
570582
if err != nil {
571-
deleteErr := remapper.IcebergWriter.DropMaterializedView(icebergSchemaTable)
583+
deleteErr := remapper.IcebergWriter.DropMaterializedView(icebergSchemaTable, true)
572584
if deleteErr != nil {
573585
return fmt.Errorf("couldn't remap definition of CREATE MATERIALIZED VIEW: %w (%w)", err, deleteErr)
574586
}
@@ -577,7 +589,7 @@ func (remapper *QueryRemapper) createMaterializedView(node *pgQuery.Node) error
577589

578590
err = remapper.IcebergWriter.RefreshMaterializedView(icebergSchemaTable, queryStatements[0])
579591
if err != nil {
580-
deleteErr := remapper.IcebergWriter.DropMaterializedView(icebergSchemaTable)
592+
deleteErr := remapper.IcebergWriter.DropMaterializedView(icebergSchemaTable, true)
581593
if deleteErr != nil {
582594
return fmt.Errorf("couldn't refresh materialized view: %w (%w)", err, deleteErr)
583595
}
@@ -590,21 +602,34 @@ func (remapper *QueryRemapper) createMaterializedView(node *pgQuery.Node) error
590602

591603
func (remapper *QueryRemapper) dropMaterializedViewFromNode(node *pgQuery.Node) error {
592604
var icebergSchemaTable common.IcebergSchemaTable
593-
nodeItems := node.GetDropStmt().Objects[0].GetList().Items
594-
if len(nodeItems) == 2 {
605+
dropStatement := node.GetDropStmt()
606+
nodeItems := dropStatement.Objects[0].GetList().Items
607+
608+
switch len(nodeItems) {
609+
case 3:
610+
if nodeItems[0].GetString_().Sval != remapper.config.Database {
611+
return fmt.Errorf("cross-database materialized view drop is not supported: %s", nodeItems[0].GetString_().Sval)
612+
}
613+
icebergSchemaTable = common.IcebergSchemaTable{
614+
Schema: nodeItems[1].GetString_().Sval,
615+
Table: nodeItems[2].GetString_().Sval,
616+
}
617+
case 2:
595618
icebergSchemaTable = common.IcebergSchemaTable{
596619
Schema: nodeItems[0].GetString_().Sval,
597620
Table: nodeItems[1].GetString_().Sval,
598621
}
599-
} else {
622+
case 1:
600623
icebergSchemaTable = common.IcebergSchemaTable{
601624
Schema: PG_SCHEMA_PUBLIC,
602625
Table: nodeItems[0].GetString_().Sval,
603626
}
627+
default:
628+
return errors.New("couldn't read DROP MATERIALIZED VIEW statement")
604629
}
605630

606-
// Delete the materialized view from the catalog
607-
err := remapper.IcebergWriter.DropMaterializedView(icebergSchemaTable)
631+
// Drop the materialized view from the catalog
632+
err := remapper.IcebergWriter.DropMaterializedView(icebergSchemaTable, dropStatement.MissingOk)
608633
if err != nil {
609634
return err
610635
}
@@ -648,6 +673,26 @@ func (remapper *QueryRemapper) refreshMaterializedViewFromNode(node *pgQuery.Nod
648673
return nil
649674
}
650675

676+
func (remapper *QueryRemapper) renameMaterializedViewFromNode(node *pgQuery.Node) error {
677+
icebergSchemaTable := common.IcebergSchemaTable{
678+
Schema: node.GetRenameStmt().Relation.Schemaname,
679+
Table: node.GetRenameStmt().Relation.Relname,
680+
}
681+
if icebergSchemaTable.Schema == "" {
682+
icebergSchemaTable.Schema = PG_SCHEMA_PUBLIC
683+
}
684+
685+
renameStatement := node.GetRenameStmt()
686+
newName := renameStatement.Newname
687+
688+
err := remapper.IcebergWriter.RenameMaterializedView(icebergSchemaTable, newName, renameStatement.MissingOk)
689+
if err != nil {
690+
return fmt.Errorf("couldn't rename table: %w", err)
691+
}
692+
693+
return nil
694+
}
695+
651696
func (remapper *QueryRemapper) traceTreeTraversal(label string, indentLevel int) {
652697
common.LogTrace(remapper.config.CommonConfig, strings.Repeat(">", indentLevel), label)
653698
}

0 commit comments

Comments
 (0)