Skip to content

Commit ce68406

Browse files
committed
Implement Lambda function versions (#5927)
1 parent 8e9f3c8 commit ce68406

File tree

4 files changed

+115
-19
lines changed

4 files changed

+115
-19
lines changed

scripts/sell_unused_slots.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ def _list_contribution_lambda_functions(cls) -> list[Lambda]:
7171
"""
7272
return [
7373
lambda_
74-
for lambda_ in Lambdas().list_lambdas()
74+
for lambda_ in Lambdas().list_lambdas(deployment='ALL')
7575
if lambda_.is_contribution_lambda
7676
]
7777

src/azul/lambdas.py

Lines changed: 63 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
)
1414

1515
from azul import (
16+
JSON,
1617
R,
1718
cache,
1819
config,
@@ -37,6 +38,7 @@ class Lambda:
3738
name: str
3839
role: str
3940
slot_location: Optional[str]
41+
version: str
4042

4143
@property
4244
def is_contribution_lambda(self) -> bool:
@@ -82,13 +84,15 @@ def has_notification_queue(handler) -> bool:
8284
def from_response(cls, response: 'FunctionConfigurationTypeDef') -> Self:
8385
name = response['FunctionName']
8486
role = response['Role']
87+
version = response['Version']
8588
try:
8689
slot_location = response['Environment']['Variables']['AZUL_TDR_SOURCE_LOCATION']
8790
except KeyError:
8891
slot_location = None
8992
return cls(name=name,
9093
role=role,
91-
slot_location=slot_location)
94+
slot_location=slot_location,
95+
version=version)
9296

9397
def __attrs_post_init__(self):
9498
if self.slot_location is None:
@@ -105,25 +109,65 @@ class Lambdas:
105109
def _lambda(self):
106110
return aws.lambda_
107111

108-
def list_lambdas(self) -> list[Lambda]:
112+
def list_lambdas(self,
113+
deployment: str
114+
) -> list[Lambda]:
115+
"""
116+
Return a list of AWS Lambda functions. Only the largest numbered version
117+
of each function will be included in the list.
118+
119+
:param deployment: Limit output to the specified deployment stage. If
120+
'ALL', functions from all deployments will be
121+
returned.
122+
"""
123+
paginator = self._lambda.get_paginator('list_functions')
124+
lambda_prefixes = None if deployment == 'ALL' else [
125+
config.qualified_resource_name(lambda_name, stage=deployment)
126+
for lambda_name in config.lambda_names()
127+
]
128+
functions: dict[str, JSON] = dict()
129+
for response in paginator.paginate(FunctionVersion='ALL'):
130+
for function in response['Functions']:
131+
version = function['Version']
132+
version = None if version == '$LATEST' else int(version)
133+
if version and (lambda_prefixes is None or any(
134+
function['FunctionName'].startswith(prefix)
135+
for prefix in lambda_prefixes
136+
)):
137+
name = function['FunctionName']
138+
previous_function = functions.get(name)
139+
if previous_function is None or version > int(previous_function['Version']):
140+
functions[name] = function
109141
return [
110142
Lambda.from_response(function)
111-
for response in self._lambda.get_paginator('list_functions').paginate()
112-
for function in response['Functions']
143+
for function in functions.values()
113144
]
114145

146+
def get_function(self, function_name: str) -> JSON:
147+
"""
148+
Return the Lambda client `get_function()` response for the largest
149+
numbered version of the Lambda function.
150+
"""
151+
paginator = self._lambda.get_paginator('list_versions_by_function')
152+
params = {'FunctionName': function_name}
153+
version = max([
154+
int(function['Version'])
155+
for response in paginator.paginate(**params)
156+
for function in response['Versions']
157+
if function['Version'] != '$LATEST'
158+
])
159+
return self._lambda.get_function(FunctionName=function_name,
160+
Qualifier=str(version))
161+
115162
def manage_lambdas(self, enabled: bool):
116-
paginator = self._lambda.get_paginator('list_functions')
117-
lambda_prefixes = [config.qualified_resource_name(lambda_infix) for lambda_infix in config.lambda_names()]
118-
assert all(lambda_prefixes)
119-
for lambda_page in paginator.paginate(FunctionVersion='ALL', MaxItems=500):
120-
for lambda_name in [metadata['FunctionName'] for metadata in lambda_page['Functions']]:
121-
if any(lambda_name.startswith(prefix) for prefix in lambda_prefixes):
122-
self.manage_lambda(lambda_name, enabled)
163+
for function in self.list_lambdas(deployment=config.deployment_stage):
164+
self.manage_lambda(function.name, enabled)
123165

124166
def manage_lambda(self, lambda_name: str, enable: bool):
125-
lambda_settings = self._lambda.get_function(FunctionName=lambda_name)
167+
lambda_settings = self.get_function(function_name=lambda_name)
126168
lambda_arn = lambda_settings['Configuration']['FunctionArn']
169+
# Lambda does not support adding tags to function aliases or versions
170+
lambda_arn, _, _ = lambda_arn.rpartition(':')
127171
lambda_tags = self._lambda.list_tags(Resource=lambda_arn)['Tags']
128172
lambda_name = lambda_settings['Configuration']['FunctionName']
129173
if enable:
@@ -132,13 +176,15 @@ def manage_lambda(self, lambda_name: str, enable: bool):
132176

133177
if original_concurrency_limit is not None:
134178
log.info(f'Setting concurrency limit for {lambda_name} back to {original_concurrency_limit}.')
179+
# Concurrency settings apply to the function as a whole,
180+
# including all published versions and the unpublished
181+
# version
135182
self._lambda.put_function_concurrency(FunctionName=lambda_name,
136183
ReservedConcurrentExecutions=original_concurrency_limit)
137184
else:
138185
log.info(f'Removed concurrency limit for {lambda_name}.')
139186
self._lambda.delete_function_concurrency(FunctionName=lambda_name)
140187

141-
lambda_arn = lambda_settings['Configuration']['FunctionArn']
142188
self._lambda.untag_resource(Resource=lambda_arn, TagKeys=[self.tag_name])
143189
else:
144190
log.warning(f'{lambda_name} is already enabled.')
@@ -156,7 +202,7 @@ def manage_lambda(self, lambda_name: str, enable: bool):
156202

157203
log.info(f'Setting concurrency limit for {lambda_name} to zero.')
158204
new_tag = {self.tag_name: repr(concurrency_limit)}
159-
self._lambda.tag_resource(Resource=lambda_settings['Configuration']['FunctionArn'], Tags=new_tag)
205+
self._lambda.tag_resource(Resource=lambda_arn, Tags=new_tag)
160206
self._lambda.put_function_concurrency(FunctionName=lambda_name, ReservedConcurrentExecutions=0)
161207
else:
162208
log.warning(f'{lambda_name} is already disabled.')
@@ -165,7 +211,7 @@ def reset_lambda_roles(self):
165211
client = self._lambda
166212
lambda_names = set(config.lambda_names())
167213

168-
for lambda_ in self.list_lambdas():
214+
for lambda_ in self.list_lambdas(deployment='ALL'):
169215
for lambda_name in lambda_names:
170216
if lambda_.name.startswith(config.qualified_resource_name(lambda_name)):
171217
other_lambda_name = one(lambda_names - {lambda_name})
@@ -174,6 +220,8 @@ def reset_lambda_roles(self):
174220
config.qualified_resource_name(other_lambda_name)
175221
)
176222
log.info('Temporarily updating %r to role %r', lambda_.name, temporary_role)
223+
# You can’t modify the configuration of a published version,
224+
# only the unpublished version
177225
client.update_function_configuration(FunctionName=lambda_.name,
178226
Role=temporary_role)
179227
log.info('Updating %r to role %r', lambda_.name, lambda_.role)

src/azul/queues.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -502,9 +502,14 @@ def _wait_for_queue_empty(self, queue: 'Queue'):
502502
time.sleep(3)
503503
queue.reload()
504504

505-
def _manage_sqs_push(self, function_name: str, queue: 'Queue', enable: bool):
505+
def _manage_sqs_push(self,
506+
function_name: str,
507+
function_version: str,
508+
queue: 'Queue',
509+
enable: bool):
506510
lambda_ = aws.lambda_
507-
response = lambda_.list_event_source_mappings(FunctionName=function_name,
511+
partial_arn = f'{function_name}:{function_version}'
512+
response = lambda_.list_event_source_mappings(FunctionName=partial_arn,
508513
EventSourceArn=queue.attributes['QueueArn'])
509514
mapping_uuid = one(response['EventSourceMappings'])['UUID']
510515

@@ -554,6 +559,11 @@ def manage_lambdas(self, queues: Mapping[str, 'Queue'], enable: bool):
554559
Enable or disable the readers and writers of the given queues.
555560
"""
556561
functions_by_queue = self.functions_by_queue()
562+
versions_by_function = {
563+
f.name: f.version
564+
for f in self._lambdas.list_lambdas(deployment=config.deployment_stage)
565+
}
566+
assert set(functions_by_queue.values()) <= set(versions_by_function)
557567

558568
with ThreadPoolExecutor(max_workers=len(queues)) as tpe:
559569
futures = []
@@ -567,10 +577,11 @@ def submit(f, *args, **kwargs):
567577
except KeyError:
568578
assert queue_name in config.fail_queue_names
569579
else:
580+
version = versions_by_function[function]
570581
if queue_name == config.notifications_queue.name:
571582
# Prevent new notifications from being added
572583
submit(self._manage_lambda, config.indexer_name, enable)
573-
submit(self._manage_sqs_push, function, queue, enable)
584+
submit(self._manage_sqs_push, function, version, queue, enable)
574585
self._handle_futures(futures)
575586
futures = [tpe.submit(self._wait_for_queue_idle, queue) for queue in queues.values()]
576587
self._handle_futures(futures)

src/azul/terraform.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from pathlib import (
1515
Path,
1616
)
17+
import re
1718
import subprocess
1819
from typing import (
1920
Mapping,
@@ -759,6 +760,10 @@ def tf_config(self, app_name):
759760
for _, resource in functions:
760761
assert 'layers' not in resource
761762
resource['layers'] = ['${aws_lambda_layer_version.dependencies.arn}']
763+
# Publishing a new Lambda function version each time lets us perform
764+
# an atomic update of the lambda function, avoiding a race condition
765+
# between the update of the function's configuration and its code.
766+
resource['publish'] = True
762767
env = config.es_endpoint_env(
763768
es_endpoint=(
764769
aws.es_endpoint
@@ -921,6 +926,38 @@ def tf_config(self, app_name):
921926
sqs_name, _ = config.unqualified_resource_name(resource_name, suffix)
922927
resource['event_source_arn'] = f'${{aws_sqs_queue.{sqs_name}.arn}}'
923928

929+
# Replace the references to unqualified Lambda function ARNs emitted by
930+
# Chalice with qualified ARNs. Note: `aws_lambda_permission` resources
931+
# doesn't support the qualified ARN syntax, instead it has a `qualifier`
932+
# argument.
933+
#
934+
locals[app_name] = re.sub(r'(aws_lambda_function\.[^\.\s]+)\.invoke_arn',
935+
r'\1.qualified_invoke_arn',
936+
json_str(locals[app_name]))
937+
if app_name == 'indexer':
938+
resource_arguments = [
939+
('aws_cloudwatch_event_target', 'arn'),
940+
('aws_lambda_event_source_mapping', 'function_name'),
941+
]
942+
elif app_name == 'service':
943+
resource_arguments = [
944+
('aws_cloudwatch_event_target', 'arn'),
945+
]
946+
else:
947+
assert False, app_name
948+
for resource_type, argument in resource_arguments:
949+
for _, resource in json_item_dicts(resources[resource_type]):
950+
value = json_str(resource[argument])
951+
assert value.endswith('.arn}'), resource
952+
resource[argument] = value.replace('.arn', '.qualified_arn')
953+
954+
# Add a qualifier argument to `aws_lambda_permission` resources
955+
#
956+
for _, resource in json_item_dicts(resources['aws_lambda_permission']):
957+
assert 'qualifier' not in resource, resource
958+
lambda_arn = resource['function_name']
959+
resource['qualifier'] = lambda_arn.replace('.arn', '.version')
960+
924961
return {
925962
'resource': resources,
926963
'data': data,

0 commit comments

Comments
 (0)