Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 22 additions & 6 deletions tap_postgres/stream_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ def refresh_streams_schema(conn_config: Dict, streams: List[Dict]):
# For every stream dictionary, update the schema and metadata from the new discovery
for idx, stream in enumerate(streams):
# update schema
streams[idx]['schema'] = copy.deepcopy(new_discovery[stream['tap_stream_id']]['schema'])
discovered_stream = new_discovery[stream['tap_stream_id']]
streams[idx]['schema'] = merge_stream_schema(stream, discovered_stream)

# Update metadata
#
Expand All @@ -86,17 +87,32 @@ def refresh_streams_schema(conn_config: Dict, streams: List[Dict]):
md_map = metadata.to_map(stream['metadata'])
meta = md_map.get(())

for idx_met, metadatum in enumerate(new_discovery[stream['tap_stream_id']]['metadata']):
for idx_met, metadatum in enumerate(discovered_stream['metadata']):
if not metadatum['breadcrumb']:
meta.update(new_discovery[stream['tap_stream_id']]['metadata'][idx_met]['metadata'])
new_discovery[stream['tap_stream_id']]['metadata'][idx_met]['metadata'] = meta
meta.update(discovered_stream['metadata'][idx_met]['metadata'])
discovered_stream['metadata'][idx_met]['metadata'] = meta

# 2nd step: now copy all the metadata from the updated new discovery to the original stream
streams[idx]['metadata'] = copy.deepcopy(new_discovery[stream['tap_stream_id']]['metadata'])
streams[idx]['metadata'] = copy.deepcopy(discovered_stream['metadata'])

LOGGER.debug('Updated streams schemas %s', streams)
LOGGER.info("The first stream is %s", streams)


def merge_stream_schema(stream, discovered_stream):
"""
When the db discovery happens, the stream schema (infered from the catalog) is updated with the new schema in the discovery.
This scenario makes the schema specified in the config yaml file become in vain.
This function merges the schema from the catalog with the schema from the discovery,
hence helping the tap to resist to the schema evolution but retain the configured schema from users.
"""
discovered_schema = copy.deepcopy(discovered_stream['schema'])
for column, column_schema in stream['schema']['properties'].items():
if column in discovered_schema['properties'] and column_schema != discovered_schema['properties'][column]:
override = copy.deepcopy(stream['schema']['properties'][column])
LOGGER.info('Overriding schema for %s.%s with %s', stream['tap_stream_id'], column, override)
discovered_schema['properties'][column].update(override)
return discovered_schema

def any_logical_streams(streams, default_replication_method):
"""
Checks if streams list contains any stream with log_based method
Expand Down
156 changes: 144 additions & 12 deletions tests/integration/test_streams_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
from tap_postgres import stream_utils

try:
from tests.utils import get_test_connection, ensure_test_table, get_test_connection_config
from tests.utils import get_test_connection, ensure_test_table, get_test_connection_config, alter_schema_test_table
except ImportError:
from utils import get_test_connection, ensure_test_table, get_test_connection_config
from utils import get_test_connection, ensure_test_table, get_test_connection_config, alter_schema_test_table


def do_not_dump_catalog(catalog):
Expand All @@ -25,13 +25,17 @@ class TestInit(unittest.TestCase):
table_name = 'CHICKEN TIMES'

def setUp(self):
table_spec = {"columns": [{"name": "id", "type": "integer", "primary_key": True, "serial": True},
{"name": '"character-varying_name"', "type": "character varying"},
{"name": '"varchar-name"', "type": "varchar(28)"},
{"name": 'char_name', "type": "char(10)"},
{"name": '"text-name"', "type": "text"}],
"name": self.table_name}

table_spec = {
"columns": [
{"name": "id", "type": "integer", "primary_key": True, "serial": True},
{"name": '"character-varying_name"', "type": "character varying"},
{"name": '"varchar-name"', "type": "varchar(28)"},
{"name": 'char_name', "type": "char(10)"},
{"name": '"text-name"', "type": "text"},
{"name": "nested_json", "type": "jsonb"},
],
"name": self.table_name
}
ensure_test_table(table_spec)

def test_refresh_streams_schema(self):
Expand All @@ -42,7 +46,22 @@ def test_refresh_streams_schema(self):
'table_name': self.table_name,
'stream': self.table_name,
'tap_stream_id': f'public-{self.table_name}',
'schema': [],
'schema': {
'type': 'object',
'properties': {
'nested_json': {
'type': ['null', 'object'],
'properties': {
'name': {
'type': 'string'
},
'val': {
'type': 'string'
}
}
}
}
},
'metadata': [
{
'breadcrumb': [],
Expand Down Expand Up @@ -86,14 +105,127 @@ def test_refresh_streams_schema(self):
'selected-by-default': True},
('properties', 'char_name'): {'selected-by-default': True,
'inclusion': 'available',
'sql-datatype': 'character'}})
'sql-datatype': 'character'},
('properties', 'nested_json'): {'selected-by-default': True,
'inclusion': 'available',
'sql-datatype': 'jsonb'}})

self.assertEqual({'properties': {'id': {'type': ['integer'],
'maximum': 2147483647,
'minimum': -2147483648},
'character-varying_name': {'type': ['null', 'string']},
'varchar-name': {'type': ['null', 'string'], 'maxLength': 28},
'char_name': {'type': ['null', 'string'], 'maxLength': 10},
'text-name': {'type': ['null', 'string']}},
'text-name': {'type': ['null', 'string']},
'nested_json': {'type': ['null', 'object'],
'properties': {
'name': {'type': 'string'},
'val': {'type': 'string'},
}
}
},
'type': 'object',
'definitions': BASE_RECURSIVE_SCHEMAS}, streams[0].get('schema'))

def test_refresh_streams_schema_aware_schema_evolution(self):
table_spec = {
"columns": [
{"name": "newcol", "type": "integer", "is_new_col": True}
],
"name": self.table_name
}
alter_schema_test_table(table_spec)
Copy link

@khoaanguyenn khoaanguyenn Apr 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
table_spec = {
"columns": [
{"name": "newcol", "type": "integer", "is_new_col": True}
],
"name": self.table_name
}
alter_schema_test_table(table_spec)
table_spec = {
"columns": [
{"name": "newcol", "type": "integer"}
],
"name": self.table_name
}
add_columns(table_spec)

Would it be more simpler if we explicitly add_columns in lieu of the generics alter_schema_test_table method ?

As reviewer, I was trying to reading alter_schema_test_table to figure out that this method is actually adding new columns in this case 🤣

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay sir, i will change this and fix this in my next commit 😄


conn_config = get_test_connection_config()

streams = [
{
'table_name': self.table_name,
'stream': self.table_name,
'tap_stream_id': f'public-{self.table_name}',
'schema': {
'type': 'object',
'properties': {
'nested_json': {
'type': ['null', 'object'],
'properties': {
'name': {
'type': 'string'
},
'val': {
'type': 'string'
}
}
}
}
},
'metadata': [
{
'breadcrumb': [],
'metadata': {
'replication-method': 'LOG_BASED',
'table-key-properties': ['some_id'],
'row-count': 1000,
}
}
]
}
]

stream_utils.refresh_streams_schema(conn_config, streams)

self.assertEqual(len(streams), 1)
self.assertEqual(self.table_name, streams[0].get('table_name'))
self.assertEqual(self.table_name, streams[0].get('stream'))

streams[0]['metadata'].sort(key=lambda md: md['breadcrumb'])

self.assertEqual(metadata.to_map(streams[0]['metadata']), {
(): {'table-key-properties': ['id'],
'database-name': 'postgres',
'schema-name': 'public',
'is-view': False,
'row-count': 0,
'replication-method': 'LOG_BASED'
},
('properties', 'character-varying_name'): {'inclusion': 'available',
'sql-datatype': 'character varying',
'selected-by-default': True},
('properties', 'id'): {'inclusion': 'automatic',
'sql-datatype': 'integer',
'selected-by-default': True},
('properties', 'varchar-name'): {'inclusion': 'available',
'sql-datatype': 'character varying',
'selected-by-default': True},
('properties', 'text-name'): {'inclusion': 'available',
'sql-datatype': 'text',
'selected-by-default': True},
('properties', 'char_name'): {'selected-by-default': True,
'inclusion': 'available',
'sql-datatype': 'character'},
('properties', 'nested_json'): {'selected-by-default': True,
'inclusion': 'available',
'sql-datatype': 'jsonb'},
('properties', 'newcol'): {'selected-by-default': True,
'inclusion': 'available',
'sql-datatype': 'integer'}})

self.assertEqual({'properties': {'id': {'type': ['integer'],
'maximum': 2147483647,
'minimum': -2147483648},
'character-varying_name': {'type': ['null', 'string']},
'varchar-name': {'type': ['null', 'string'], 'maxLength': 28},
'char_name': {'type': ['null', 'string'], 'maxLength': 10},
'text-name': {'type': ['null', 'string']},
'nested_json': {'type': ['null', 'object'],
'properties': {
'name': {'type': 'string'},
'val': {'type': 'string'},
}
},
'newcol': {'type': ['null', 'integer'],
'minimum': -2147483648,
'maximum': 2147483647}
},
'type': 'object',
'definitions': BASE_RECURSIVE_SCHEMAS}, streams[0].get('schema'))
19 changes: 19 additions & 0 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,25 @@ def ensure_test_table(table_spec, target_db='postgres'):
LOGGER.info("create table sql: %s", sql)
cur.execute(sql)

def build_alter_table_sql(table, col_spec):
sqls = []
if altered_name:=col_spec.get('change_name'):
sqls.append("ALTER TABLE {} RENAME COLUMN {} TO {}".format(table, col_spec['name'], altered_name))
if altered_type:=col_spec.get('is_change_type'):
sqls.append("ALTER TABLE {} ALTER COLUMN {} TYPE {}".format(table, col_spec['name'], altered_type))
if col_spec.get("is_new_col"):
sqls.append("ALTER TABLE {} ADD {} {}".format(table, col_spec['name'], col_spec['type']))
return sqls

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nits: I understand that we will test more cases, but we don't actually use them all now nor the future. Thus, I believe that we should keep what we only need for now and add more utility in latter PR 👍

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason why I did this is because of the reason why you've specified. So If we only care about the adding another column may be we can remove this function and merge the logic in add_columns function 😄


def alter_schema_test_table(table_spec, target_db='postgres'):
with get_test_connection(target_db) as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
table = table_spec['name']
for col_spec in table_spec['columns']:
for sql in build_alter_table_sql(quote_ident(table, cur), col_spec):
LOGGER.info("alter table sql: %s", sql)
cur.execute(sql)
Copy link

@khoaanguyenn khoaanguyenn Apr 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def alter_schema_test_table(table_spec, target_db='postgres'):
with get_test_connection(target_db) as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
table = table_spec['name']
for col_spec in table_spec['columns']:
for sql in build_alter_table_sql(quote_ident(table, cur), col_spec):
LOGGER.info("alter table sql: %s", sql)
cur.execute(sql)
def add_columns(table_spec, target_db='postgres'):
with get_test_connection(target_db) as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
table = table_spec['name']
for col_name, col_type in table_spec['columns']:
sql = "ALTER TABLE {} ADD {} {}".format(table_name, col_name, col_type)
LOGGER.info("alter table sql: %s", sql)
cur.execute(sql)

Nits


def unselect_column(our_stream, col):
md = metadata.to_map(our_stream['metadata'])
md.get(('properties', col))['selected'] = False
Expand Down