Skip to content

Commit c7888f4

Browse files
committed
Fix: Mirror message duplication due to retries on SQS throttling (#7419, PR #7426)
2 parents dbdfb60 + a5ace29 commit c7888f4

File tree

6 files changed

+72
-22
lines changed

6 files changed

+72
-22
lines changed

src/azul/azulclient.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,12 @@ def mirror_queue(self):
262262
return aws.sqs_queue(name)
263263

264264
def queue_mirror_messages(self, messages: Iterable[SQSMessage]) -> int:
265-
return self.queues.send_messages(self.mirror_queue(), messages)
265+
rate_limit = float(aws.sqs_fifo_rate_limit)
266+
if config.is_in_lambda:
267+
rate_limit /= config.mirroring_concurrency
268+
return self.queues.send_messages(self.mirror_queue(),
269+
messages,
270+
rate_limit=rate_limit)
266271

267272
def delete_all_indices(self, catalog: CatalogName):
268273
self.index_service.delete_indices(catalog)

src/azul/deployment.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -745,6 +745,14 @@ def sqs_resource(self) -> 'SQSServiceResource':
745745
def sqs_queue(self, queue_name: str) -> 'Queue':
746746
return self.sqs_resource.get_queue_by_name(QueueName=queue_name)
747747

748+
#: The maximum number of SendMessage, ReceiveMessage, or DeleteMessage API
749+
#: calls per second supported for normal-throughput (as opposed to high-
750+
#: throughput) FIFO queues.
751+
#:
752+
#: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/quotas-messages.html
753+
#:
754+
sqs_fifo_rate_limit = 300
755+
748756

749757
aws = AWS()
750758
del AWS

src/azul/indexer/__init__.py

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -239,38 +239,40 @@ def parse(cls, prefix: str) -> Self:
239239
return cls(common=entry, partition=partition)
240240

241241
@classmethod
242-
def for_main_deployment(cls, num_subgraphs: int) -> Self:
242+
def for_main_deployment(cls, num_elements: int, partition_size: int) -> Self:
243243
"""
244-
A prefix that is expected to rarely exceed 8192 subgraphs per partition
244+
A prefix that divides a source containing the given number of elements
245+
(subgraphs, files, …) into partitions that rarely exceed the given size.
245246
246-
>>> str(Prefix.for_main_deployment(0))
247+
>>> n = 8192
248+
249+
>>> str(Prefix.for_main_deployment(0, n))
247250
Traceback (most recent call last):
248251
...
249252
ValueError: math domain error
250253
251-
>>> str(Prefix.for_main_deployment(1))
254+
>>> str(Prefix.for_main_deployment(1, n))
252255
'/0'
253256
254257
>>> cases = [-1, 0, 1, 2]
255258
256-
>>> n = 8192
257-
>>> [str(Prefix.for_main_deployment(n + i)) for i in cases]
259+
>>> [str(Prefix.for_main_deployment(n + i, n)) for i in cases]
258260
['/0', '/0', '/1', '/1']
259261
260262
Sources with this many bundles are very rare, so we have a generous
261263
margin of error surrounding this cutoff point
262264
263-
>>> n = 8192 * 16
264-
>>> [str(Prefix.for_main_deployment(n + i)) for i in cases]
265+
>>> m = n * 16
266+
>>> [str(Prefix.for_main_deployment(m + i, n)) for i in cases]
265267
['/1', '/1', '/2', '/2']
266268
"""
267-
partition = cls._prefix_length(num_subgraphs, 8192)
269+
partition = cls._prefix_length(num_elements, partition_size)
268270
return cls(common='', partition=partition)
269271

270272
@classmethod
271-
def for_lesser_deployment(cls, num_subgraphs: int) -> Self:
273+
def for_lesser_deployment(cls, num_elements: int) -> Self:
272274
"""
273-
A prefix that yields an average of approximately 24 subgraphs per
275+
A prefix that yields an average of approximately 24 elements per
274276
source, using an experimentally derived heuristic formula designed to
275277
minimize manual adjustment of the computed common prefixes. The
276278
partition prefix length is always 1, even though some partitions may be
@@ -294,9 +296,9 @@ def for_lesser_deployment(cls, num_subgraphs: int) -> Self:
294296
>>> [str(Prefix.for_lesser_deployment(n + i)) for i in cases]
295297
['e/1', 'f/1', '00/1', '10/1']
296298
"""
297-
digits = f'{num_subgraphs - 1:x}'[::-1]
298-
length = cls._prefix_length(num_subgraphs, 64)
299-
assert length < len(digits), num_subgraphs
299+
digits = f'{num_elements - 1:x}'[::-1]
300+
length = cls._prefix_length(num_elements, 64)
301+
assert length < len(digits), num_elements
300302
return cls(common=digits[:length], partition=1)
301303

302304
@classmethod

src/azul/indexer/mirror_controller.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@
2828
from azul.chalice import (
2929
LambdaMetric,
3030
)
31+
from azul.deployment import (
32+
aws,
33+
)
3134
from azul.digests import (
3235
Hasher,
3336
get_resumable_hasher,
@@ -131,7 +134,18 @@ def _mirror(self, action: MirrorAction, message: JSON):
131134
def mirror_source(self, catalog: CatalogName, source_json: JSON):
132135
plugin = self.repository_plugin(catalog)
133136
source = plugin.source_ref_cls.from_json(source_json)
134-
source = plugin.partition_source_for_mirroring(catalog, source)
137+
# The desired partition size depends on the maximum number of messages
138+
# we can send in one Lambda invocation, because queueing the individual
139+
# mirror_file messages turns out to dominate the running time of
140+
# handling a mirror_source message.
141+
partition_size = int(
142+
aws.sqs_fifo_rate_limit # max. # of SendMessage calls per second
143+
* self.client.queues.batch_size # number of messages per call
144+
* config.mirror_lambda_timeout # max. duration of the invocation
145+
/ config.mirroring_concurrency # number of concurrent invocations
146+
/ 2 # safety margin
147+
)
148+
source = plugin.partition_source_for_mirroring(catalog, source, partition_size)
135149
prefix = source.spec.prefix
136150
log.info('Queueing %d partitions of source %r in catalog %r',
137151
prefix.num_partitions, str(source.spec), catalog)

src/azul/plugins/__init__.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -697,23 +697,26 @@ def partition_source_for_indexing(self,
697697
an updated copy of the source with a heuristically computed prefix that
698698
should be appropriate for indexing in the given catalog.
699699
"""
700-
return self._partition_source(catalog, source, self.count_bundles)
700+
partition_size = 8192
701+
return self._partition_source(catalog, source, self.count_bundles, partition_size)
701702

702703
def partition_source_for_mirroring(self,
703704
catalog: CatalogName,
704-
source: SOURCE_REF
705+
source: SOURCE_REF,
706+
partition_size: int,
705707
) -> SOURCE_REF:
706708
"""
707709
If the source already has a prefix, return the source. Otherwise, return
708710
an updated copy of the source with a heuristically computed prefix that
709711
should be appropriate for mirroring in the given catalog.
710712
"""
711-
return self._partition_source(catalog, source, self.count_files)
713+
return self._partition_source(catalog, source, self.count_files, partition_size)
712714

713715
def _partition_source(self,
714716
catalog: CatalogName,
715717
source: SOURCE_REF,
716-
counter: Callable[[SOURCE_SPEC], int]
718+
counter: Callable[[SOURCE_SPEC], int],
719+
partition_size: int
717720
) -> SOURCE_REF:
718721
if source.spec.prefix is None:
719722
count = counter(source.spec)
@@ -722,7 +725,13 @@ def _partition_source(self,
722725
# We use the "lesser" heuristic during IT to keep the cost and
723726
# performance of the tests within reasonable limits
724727
if is_main and not is_it:
725-
prefix = Prefix.for_main_deployment(count)
728+
# Sanity-check the partition size. We know the upper bound
729+
# caused some mirror Lambda invocations to time out. The lower
730+
# bound is hypothetical. It'll likely still work for mirroring
731+
# but we'd like to know if partitions get that small. For
732+
# indexing, the partition size is fixed at the upper bound.
733+
assert 512 <= partition_size <= 8192, partition_size
734+
prefix = Prefix.for_main_deployment(count, partition_size)
726735
else:
727736
prefix = Prefix.for_lesser_deployment(count)
728737
return source.with_prefix(prefix)

src/azul/queues.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,11 +194,23 @@ def read_messages(self, queue: 'Queue') -> builtins.list['Message']:
194194
self._cleanup_messages(queue, messages)
195195
return messages
196196

197-
def send_messages(self, queue: 'Queue', messages: Iterable[SQSMessage]) -> int:
197+
def send_messages(self,
198+
queue: 'Queue',
199+
messages: Iterable[SQSMessage],
200+
rate_limit: float | None = None
201+
) -> int:
198202
num_messages = 0
199203
for batch in chunked(messages, self.batch_size):
200204
entries = [message.to_batch_entry(i) for i, message in enumerate(batch)]
205+
start = time.time()
201206
queue.send_messages(Entries=entries)
207+
if rate_limit is not None:
208+
period = 1 / rate_limit
209+
time_spent = time.time() - start
210+
time_to_sleep = period - time_spent
211+
if time_to_sleep > 0:
212+
log.debug('Sleeping %.3fs to prevent exceeding rate limit', time_to_sleep)
213+
time.sleep(time_to_sleep)
202214
num_messages += len(batch)
203215
return num_messages
204216

0 commit comments

Comments
 (0)