Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion tap_postgres/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ def main_impl():
args = parse_args(REQUIRED_CONFIG_KEYS)

limit = args.config.get('limit')
offset = args.config.get('offset')
conn_config = {
# Required config keys
'host': args.config['host'],
Expand All @@ -409,7 +410,8 @@ def main_impl():
'break_at_end_lsn': args.config.get('break_at_end_lsn', True),
'logical_poll_total_seconds': float(args.config.get('logical_poll_total_seconds', 0)),
'use_secondary': args.config.get('use_secondary', False),
'limit': int(limit) if limit else None
'limit': int(limit) if limit else None,
'offset': int(offset) if offset else None
}

if conn_config['use_secondary']:
Expand Down
13 changes: 11 additions & 2 deletions tap_postgres/sync_strategies/incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ def sync_table(conn_info, stream, state, desired_columns, md_map):
"replication_key_value": replication_key_value,
"schema_name": schema_name,
"table_name": stream['table_name'],
"limit": conn_info['limit']
"limit": conn_info['limit'],
"offset": conn_info['offset']
})
LOGGER.info('select statement: %s with itersize %s', select_sql, cur.itersize)
cur.execute(select_sql)
Expand Down Expand Up @@ -130,7 +131,15 @@ def _get_select_sql(params):
table_name = params['table_name']

limit_statement = f'LIMIT {params["limit"]}' if params["limit"] else ''
where_statement = f"WHERE {replication_key} >= '{replication_key_value}'::{replication_key_sql_datatype}" \

if not params["offset"] or not replication_key_value:
offset = ''
elif replication_key_sql_datatype.startswith('timestamp'):
offset = f' - interval \'{params["offset"]} seconds\''
else:
offset = f' - {params["offset"]}'
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not really clear on this

It can be used by integer ID based replication as well, in that case it will import the last offset number of rows before the latest replicated id.

do you have an example? meaning if we do incrementing serial primary key, we put a value like "1000" and if the last ID is 9999 we will only replicate up to ID 8999?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the above is correct, I think maybe the term offset can be avoided, because it's very similar to pagination terminologies (limit and offset) yet behaves differently, especially the interval one.

There are also 2 units here (interval uses seconds, and Incrementing ID uses count) and they are both just expressed as number (offset: N). This could be confusing, maybe we can separate to two different config?

skip_last_n_rows: 1000
skip_last_n_seconds: 900

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think i'll skip implementing skip_last_n_rows in that case, as we don't really need it right now, let's have only one new config value.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good! 👍


where_statement = f"WHERE {replication_key} >= '{replication_key_value}'::{replication_key_sql_datatype}{offset}" \
if replication_key_value else ""

select_sql = f"""
Expand Down
3 changes: 2 additions & 1 deletion tests/unit/test_incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ def setUp(self) -> None:
'password': 'foo_pass',
'port': 12345,
'use_secondary': False,
'limit': None
'limit': None,
'offset': None
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the offset testable with unit test?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my understanding after checking the unit test code is that without basically rewriting the test suite it's not testable, and i don't really feel the energy to learn python unit testing and creating a whole new test suite for this feature 😆

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it, I'm guessing we don't run unit test in CI/CD for this repo as well, right? in that case even running it does not give a good benefit. But we probably have to be more careful in QA testing our stuff then

}
self.stream = {'tap_stream_id': 5, 'stream': 'bar', 'table_name': 'pg_tbl'}
self.md_map = {
Expand Down