Skip to content

Implemented a fallback parameter validation for DbClusterParameterGroup #230

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 79 additions & 7 deletions pkg/resource/db_cluster_parameter_group/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,17 @@ const (
)

var (
// cache of parameter defaults
// cache of parameter defaults for cluster parameters
cachedParamMeta = util.ParamMetaCache{
Cache: map[string]map[string]util.ParamMeta{},
}

// cache of parameter defaults for instance parameters
// Used as fallback when cluster parameter validation fails
instanceParamMeta = util.ParamMetaCache{
Cache: map[string]map[string]util.ParamMeta{},
}

errParameterGroupJustCreated = fmt.Errorf("parameter group just got created")
requeueWaitWhileCreating = ackrequeue.NeededAfter(
errParameterGroupJustCreated,
Expand Down Expand Up @@ -208,6 +214,11 @@ func sdkTagsFromResourceTags(
// RDS does not have a DeleteParameter or DeleteParameterFromParameterGroup API
// call. Instead, you need to call ResetDBClusterParameterGroup with a list of
// DB Cluster Parameters that you want RDS to reset to a default value.
//
// Note(rushmash91): This function uses fallback parameter validation to work around an AWS API
// limitation where DescribeEngineDefaultClusterParameters may not return all valid
// cluster parameters (e.g., MySQL logging parameters like slow_query_log). See
// getParameterMeta() for details on the fallback mechanism.
func (rm *resourceManager) syncParameters(
ctx context.Context,
desired *resource,
Expand Down Expand Up @@ -307,9 +318,7 @@ func (rm *resourceManager) resetParameters(
// default to this if something goes wrong looking up parameter
// defaults
applyMethod := svcsdktypes.ApplyMethodImmediate
pMeta, err = cachedParamMeta.Get(
ctx, *family, paramName, rm.getFamilyParameters,
)
pMeta, err = rm.getParameterMeta(ctx, *family, paramName)
if err != nil {
return err
}
Expand Down Expand Up @@ -360,9 +369,7 @@ func (rm *resourceManager) modifyParameters(
for paramName, paramValue := range toModify {
// default to "immediate" if something goes wrong looking up defaults
applyMethod := svcsdktypes.ApplyMethodImmediate
pMeta, err = cachedParamMeta.Get(
ctx, *family, paramName, rm.getFamilyParameters,
)
pMeta, err = rm.getParameterMeta(ctx, *family, paramName)
if err != nil {
return err
}
Expand Down Expand Up @@ -433,3 +440,68 @@ func (rm *resourceManager) getFamilyParameters(
}
return familyMeta, nil
}

// getInstanceFamilyParameters calls the RDS DescribeEngineDefaultParameters API to
// retrieve the set of parameter information for a DB instance parameter group family.
// This is used as a fallback when cluster parameter validation fails, as some valid
// cluster parameters may only be listed in the instance parameter defaults.
func (rm *resourceManager) getInstanceFamilyParameters(
ctx context.Context,
family string,
) (map[string]util.ParamMeta, error) {
var marker *string
familyMeta := map[string]util.ParamMeta{}

for {
resp, err := rm.sdkapi.DescribeEngineDefaultParameters(
ctx,
&svcsdk.DescribeEngineDefaultParametersInput{
DBParameterGroupFamily: aws.String(family),
Marker: marker,
},
)
rm.metrics.RecordAPICall("GET", "DescribeEngineDefaultParameters", err)
if err != nil {
return nil, err
}
for _, param := range resp.EngineDefaults.Parameters {
pName := *param.ParameterName
familyMeta[pName] = util.ParamMeta{
IsModifiable: *param.IsModifiable,
IsDynamic: *param.ApplyType != applyTypeStatic,
}
}
marker = resp.EngineDefaults.Marker
if marker == nil {
break
}
}
return familyMeta, nil
}

// getParameterMeta retrieves parameter metadata with fallback validation.
// First tries cluster-level parameter validation, then falls back to instance-level
// validation if the parameter is not found. This works around the AWS API limitation
// where DescribeEngineDefaultClusterParameters may not return all valid cluster
// parameters (e.g., MySQL logging parameters like slow_query_log).
func (rm *resourceManager) getParameterMeta(
ctx context.Context,
family string,
paramName string,
) (*util.ParamMeta, error) {
// Try cluster-level parameters first
pMeta, err := cachedParamMeta.Get(ctx, family, paramName, rm.getFamilyParameters)
if err == nil {
return pMeta, nil
}

// If not found in cluster parameters, try instance-level parameters
// Some valid cluster parameters may only be listed in instance defaults
instanceMeta, instanceErr := instanceParamMeta.Get(ctx, family, paramName, rm.getInstanceFamilyParameters)
if instanceErr == nil {
return instanceMeta, nil
}

// Return original error if not found in either
return nil, err
}
40 changes: 36 additions & 4 deletions test/e2e/db_cluster_parameter_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,18 +74,50 @@ def get(db_cluster_parameter_group_name):
return None

def get_parameters(db_cluster_parameter_group_name):
"""Returns a dict containing the paramters of a given parameter group
"""Returns a dict containing the parameters of a given parameter group

If no such DB cluster parameter group exists, returns None.
If no such DB cluster parameter group exists, returns empty list.
"""
c = boto3.client('rds')
try:
resp = c.describe_db_cluster_parameters(
DBClusterParameterGroupName=db_cluster_parameter_group_name,
)
return resp['Parameters']
except c.exceptions.DBClusterParameterGroupNotFoundFault:
return None
except c.exceptions.DBParameterGroupNotFoundFault:
return []

def get_user_defined_parameters(db_cluster_parameter_group_name):
"""Returns a dict containing the user-defined parameters of a given cluster parameter group

If no such DB cluster parameter group exists, returns empty list.
Uses Source="user" to get only user-defined parameters (like the controller does).
"""
c = boto3.client('rds')
try:
all_parameters = []
marker = None

while True:
params = {
'DBClusterParameterGroupName': db_cluster_parameter_group_name,
'Source': 'user'
}
if marker:
params['Marker'] = marker

resp = c.describe_db_cluster_parameters(**params)
all_parameters.extend(resp['Parameters'])

# Check if there are more results
if 'Marker' in resp:
marker = resp['Marker']
else:
break

return all_parameters
except c.exceptions.DBParameterGroupNotFoundFault:
return []

def get_tags(db_cluster_parameter_group_arn):
"""Returns a dict containing the DB cluster parameter group's tag records
Expand Down
36 changes: 36 additions & 0 deletions test/e2e/db_parameter_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,42 @@ def get_parameters(db_parameter_group_name):
return None


def get_engine_default_parameters(db_parameter_group_family):
"""Returns a dict containing the engine default parameters for a given parameter group family

This function calls DescribeEngineDefaultParameters to get the default parameter metadata
that's used as fallback validation in cluster parameter groups.
"""
c = boto3.client('rds')
try:
all_parameters = []
marker = None

while True:
if marker:
resp = c.describe_engine_default_parameters(
DBParameterGroupFamily=db_parameter_group_family,
Marker=marker
)
else:
resp = c.describe_engine_default_parameters(
DBParameterGroupFamily=db_parameter_group_family,
)

parameters = resp['EngineDefaults']['Parameters']
all_parameters.extend(parameters)

# Check if there are more results
if 'Marker' in resp['EngineDefaults']:
marker = resp['EngineDefaults']['Marker']
else:
break

return all_parameters
except Exception as e:
return None


def get_tags(db_parameter_group_arn):
"""Returns a dict containing the DB parameter group's tag records from the
RDS API.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
apiVersion: rds.services.k8s.aws/v1alpha1
kind: DBClusterParameterGroup
metadata:
name: $DB_CLUSTER_PARAMETER_GROUP_NAME
spec:
name: $DB_CLUSTER_PARAMETER_GROUP_NAME
description: $DB_CLUSTER_PARAMETER_GROUP_DESC
family: $DB_CLUSTER_PARAMETER_GROUP_FAMILY
parameterOverrides:
slow_query_log: "$PARAM_SLOW_QUERY_LOG_VALUE"
long_query_time: "$PARAM_LONG_QUERY_TIME_VALUE"
log_queries_not_using_indexes: "$PARAM_LOG_QUERIES_NOT_USING_INDEXES_VALUE"
111 changes: 111 additions & 0 deletions test/e2e/tests/test_db_cluster_parameter_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from e2e import service_marker, CRD_GROUP, CRD_VERSION, load_rds_resource
from e2e.replacement_values import REPLACEMENT_VALUES
from e2e import db_cluster_parameter_group
from e2e import db_parameter_group
from e2e import tag
from e2e import condition

Expand Down Expand Up @@ -77,6 +78,49 @@ def aurora_mysql57_cluster_param_group():
db_cluster_parameter_group.wait_until_deleted(resource_name)


@pytest.fixture
def aurora_mysql80_logging_cluster_param_group():
resource_name = random_suffix_name("aurora-mysql8-logging", 32)

replacements = REPLACEMENT_VALUES.copy()
replacements["DB_CLUSTER_PARAMETER_GROUP_NAME"] = resource_name
replacements["DB_CLUSTER_PARAMETER_GROUP_DESC"] = "Test MySQL logging parameters for Aurora MySQL 8.0"
replacements["DB_CLUSTER_PARAMETER_GROUP_FAMILY"] = "aurora-mysql8.0"
replacements["PARAM_SLOW_QUERY_LOG_VALUE"] = "1"
replacements["PARAM_LONG_QUERY_TIME_VALUE"] = "10"
replacements["PARAM_LOG_QUERIES_NOT_USING_INDEXES_VALUE"] = "1"

resource_data = load_rds_resource(
"db_cluster_parameter_group_aurora_mysql8.0_logging",
additional_replacements=replacements,
)
logging.debug(resource_data)

# Create the k8s resource
ref = k8s.CustomResourceReference(
CRD_GROUP, CRD_VERSION, RESOURCE_PLURAL,
resource_name, namespace="default",
)
k8s.create_custom_resource(ref, resource_data)
cr = k8s.wait_resource_consumed_by_controller(ref)
time.sleep(CREATE_WAIT_AFTER_SECONDS)

assert cr is not None
assert k8s.get_resource_exists(ref)

yield ref, cr, resource_name

# Try to delete, if doesn't already exist
try:
_, deleted = k8s.delete_custom_resource(ref, 3, 10)
assert deleted
time.sleep(DELETE_WAIT_AFTER_SECONDS)
except:
pass

db_cluster_parameter_group.wait_until_deleted(resource_name)


@service_marker
@pytest.mark.canary
class TestDBClusterParameterGroup:
Expand Down Expand Up @@ -161,3 +205,70 @@ def test_crud_aurora_mysql5_7(self, aurora_mysql57_cluster_param_group):
assert "ParameterValue" in tp, f"No ParameterValue in parameter of name 'aurora_read_replica_read_committed': {tp}"
assert tp["ParameterValue"] == "ON", f"Wrong value for parameter of name 'aurora_read_replica_read_committed': {tp}"
assert found == 2, f"Did not find parameters with names 'aurora_binlog_read_buffer_size' and 'aurora_read_replica_read_committed': {test_params}"

def test_mysql_logging_parameters(self, aurora_mysql80_logging_cluster_param_group):
ref, cr, resource_name = aurora_mysql80_logging_cluster_param_group

latest = db_cluster_parameter_group.get(resource_name)
assert latest is not None

instance_defaults = db_parameter_group.get_engine_default_parameters("mysql8.0")
assert instance_defaults is not None, "Failed to get instance-level engine defaults"

fallback_params = list(filter(lambda x: x["ParameterName"] in [
"slow_query_log",
"long_query_time",
"log_queries_not_using_indexes",
], instance_defaults))

# Log debug info about found parameters
found_param_names = [p['ParameterName'] for p in fallback_params]
logging.debug(f"Found fallback parameters: {found_param_names}")

assert len(fallback_params) == 3, f"Expected 3 MySQL logging parameters in instance defaults, found {len(fallback_params)}: {found_param_names}"

for param in fallback_params:
assert "ParameterName" in param, f"Missing ParameterName in fallback parameter: {param}"
assert "IsModifiable" in param, f"Missing IsModifiable in fallback parameter: {param}"
assert "ApplyType" in param, f"Missing ApplyType in fallback parameter: {param}"

assert 'status' in cr
assert 'parameterOverrideStatuses' in cr['status']

# Verify the parameter statuses show our MySQL logging parameters
status_params = cr['status']['parameterOverrideStatuses']
param_names = [p['parameterName'] for p in status_params]

assert "slow_query_log" in param_names, f"slow_query_log parameter missing from status: {param_names}"
assert "long_query_time" in param_names, f"long_query_time parameter missing from status: {param_names}"
assert "log_queries_not_using_indexes" in param_names, f"log_queries_not_using_indexes parameter missing from status: {param_names}"

# Additional wait for AWS RDS parameter propagation
# RDS parameter changes can take 5-10 minutes to be fully visible via API
logging.debug("Waiting additional time for AWS RDS parameter propagation...")
time.sleep(MODIFY_WAIT_AFTER_SECONDS)

latest_params = db_cluster_parameter_group.get_user_defined_parameters(resource_name)
assert latest_params is not None, "Failed to get user-defined cluster parameters"

test_params = list(filter(lambda x: x["ParameterName"] in [
"slow_query_log",
"long_query_time",
"log_queries_not_using_indexes",
], latest_params))

# Check initial parameter values
expected_initial_values = {
"slow_query_log": "1",
"long_query_time": "10",
"log_queries_not_using_indexes": "1"
}

for tp in test_params:
param_name = tp["ParameterName"]
assert param_name in expected_initial_values, f"Unexpected parameter: {param_name}"
assert tp["ParameterValue"] == expected_initial_values[param_name], \
f"Wrong value for {param_name}: expected {expected_initial_values[param_name]}, got {tp['ParameterValue']}"

assert len(test_params) == len(expected_initial_values), \
f"Expected {len(expected_initial_values)} parameters, found {len(test_params)}: {test_params}"