Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pymysqlreplication/binlogstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ def __init__(
ignore_decode_errors=False,
verify_checksum=False,
enable_logging=True,
use_column_name_cache=False,
):
"""
Attributes:
Expand Down Expand Up @@ -230,6 +231,8 @@ def __init__(
verify_checksum: If true, verify events read from the binary log by examining checksums.
enable_logging: When set to True, logs various details helpful for debugging and monitoring
When set to False, logging is disabled to enhance performance.
use_column_name_cache: If true, enables caching of column names from INFORMATION_SCHEMA
for MySQL 5.7 compatibility when binlog metadata is missing. Default is False.
"""

self.__connection_settings = connection_settings
Expand All @@ -254,6 +257,7 @@ def __init__(
self.__ignore_decode_errors = ignore_decode_errors
self.__verify_checksum = verify_checksum
self.__optional_meta_data = False
self.__use_column_name_cache = use_column_name_cache

# We can't filter on packet level TABLE_MAP and rotate event because
# we need them for handling other operations
Expand Down Expand Up @@ -630,6 +634,7 @@ def fetchone(self):
self.__ignore_decode_errors,
self.__verify_checksum,
self.__optional_meta_data,
self.__use_column_name_cache,
)

if binlog_event.event_type == ROTATE_EVENT:
Expand Down
2 changes: 2 additions & 0 deletions pymysqlreplication/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ def __init__(
ignore_decode_errors=False,
verify_checksum=False,
optional_meta_data=False,
enable_logging=False,
use_column_name_cache=False,
):
self.packet = from_packet
self.table_map = table_map
Expand Down
6 changes: 6 additions & 0 deletions pymysqlreplication/packet.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ def __init__(
ignore_decode_errors,
verify_checksum,
optional_meta_data,
enable_logging,
use_column_name_cache=False,
):
# -1 because we ignore the ok byte
self.read_bytes = 0
Expand All @@ -82,6 +84,8 @@ def __init__(

self.packet = from_packet
self.charset = ctl_connection.charset
self.enable_logging = enable_logging
self.use_column_name_cache = use_column_name_cache

# OK value
# timestamp
Expand Down Expand Up @@ -127,6 +131,8 @@ def __init__(
ignore_decode_errors=ignore_decode_errors,
verify_checksum=verify_checksum,
optional_meta_data=optional_meta_data,
enable_logging=enable_logging,
use_column_name_cache=use_column_name_cache,
)
if not self.event._processed:
self.event = None
Expand Down
64 changes: 62 additions & 2 deletions pymysqlreplication/row_event.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import struct
import decimal
import datetime
import logging

from pymysql.charset import charset_by_name
from enum import Enum
Expand All @@ -15,6 +16,10 @@
from .bitmap import BitCount, BitGet



# MySQL 5.7 compatibility: Cache for INFORMATION_SCHEMA column names
_COLUMN_NAME_CACHE = {}

class RowsEvent(BinLogEvent):
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs)
Expand Down Expand Up @@ -746,6 +751,8 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)
self.__ignored_schemas = kwargs["ignored_schemas"]
self.__freeze_schema = kwargs["freeze_schema"]
self.__optional_meta_data = kwargs["optional_meta_data"]
self.__enable_logging = kwargs.get("enable_logging", False)
self.__use_column_name_cache = kwargs.get("use_column_name_cache", False)
# Post-Header
self.table_id = self._read_table_id()

Expand Down Expand Up @@ -909,12 +916,65 @@ def _get_optional_meta_data(self):

return optional_metadata


def _fetch_column_names_from_schema(self):
"""
Fetch column names from INFORMATION_SCHEMA for MySQL 5.7 compatibility.

Only executes if use_column_name_cache=True is enabled.
Uses module-level cache to avoid repeated queries.

Returns:
list: Column names in ORDINAL_POSITION order, or empty list
"""
# Only fetch if explicitly enabled (opt-in feature)
if not self.__use_column_name_cache:
return []

cache_key = f"{self.schema}.{self.table}"

# Check cache first
if cache_key in _COLUMN_NAME_CACHE:
return _COLUMN_NAME_CACHE[cache_key]

try:
query = """
SELECT COLUMN_NAME
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s
ORDER BY ORDINAL_POSITION
"""
cursor = self._ctl_connection._get_table_information_cursor()
cursor.execute(query, (self.schema, self.table))
column_names = [row[0] for row in cursor.fetchall()]
cursor.close()

# Cache result
_COLUMN_NAME_CACHE[cache_key] = column_names

if self.__enable_logging and column_names:
logging.info(f"Cached column names for {cache_key}: {len(column_names)} columns")

return column_names
except Exception as e:
if self.__enable_logging:
logging.warning(f"Failed to fetch column names for {cache_key}: {e}")
# Cache empty result to avoid retry spam
_COLUMN_NAME_CACHE[cache_key] = []
return []

def _sync_column_info(self):
if not self.__optional_meta_data:
# If optional_meta_data is False Do not sync Event Time Column Schemas
column_names = self._fetch_column_names_from_schema()
if column_names and len(column_names) == self.column_count:
for column_idx in range(self.column_count):
self.columns[column_idx].name = column_names[column_idx]
return
if len(self.optional_metadata.column_name_list) == 0:
# May Be Now BINLOG_ROW_METADATA = FULL But Before Action BINLOG_ROW_METADATA Mode = MINIMAL
column_names = self._fetch_column_names_from_schema()
if column_names and len(column_names) == self.column_count:
for column_idx in range(self.column_count):
self.columns[column_idx].name = column_names[column_idx]
return
charset_pos = 0
enum_or_set_pos = 0
Expand Down