Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion ddtrace/contrib/internal/redis/asyncio_patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ async def instrumented_async_execute_cluster_pipeline(func, instance, args, kwar
if not pin or not pin.enabled():
return await func(*args, **kwargs)

cmds = [stringify_cache_args(c.args, cmd_max_len=config.redis.cmd_max_length) for c in instance._command_stack]
# Try to access command_stack, fallback to _command_stack for backward compatibility
command_stack = getattr(instance, "command_stack", None)
if command_stack is None:
command_stack = getattr(instance, "_command_stack", [])

cmds = [stringify_cache_args(c.args, cmd_max_len=config.redis.cmd_max_length) for c in command_stack]
with _instrument_redis_execute_pipeline(pin, config.redis, cmds, instance):
return await func(*args, **kwargs)
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
fixes:
- |
redis: Fix pipeline compatibility with redis >= 6.2.0 after ``_command_stack`` removal.
35 changes: 35 additions & 0 deletions tests/contrib/redis/test_redis_cluster_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,41 @@ async def test_pipeline(traced_redis_cluster):
assert span.get_metric("redis.pipeline_length") == 3


@pytest.mark.snapshot(wait_for_num_traces=1)
@pytest.mark.asyncio
async def test_pipeline_command_stack_count_matches_metric(redis_cluster):
patch()
try:
async with redis_cluster.pipeline(transaction=False) as p:
p.set("a", 1)
p.get("a")
await p.execute()
finally:
unpatch()


@pytest.mark.asyncio
async def test_pipeline_command_stack_parity_when_visible(traced_redis_cluster):
cluster, test_spans = traced_redis_cluster
async with cluster.pipeline(transaction=False) as p:
p.set("a", 1)
p.get("a")
queued = None
if hasattr(p, "command_stack"):
queued = len(p.command_stack)
elif hasattr(p, "_command_stack"):
queued = len(p._command_stack)
await p.execute()

traces = test_spans.pop_traces()
spans = traces[0]
span = spans[0]
assert span.resource == "SET\nGET"
assert span.get_metric("redis.pipeline_length") == 2
if queued is not None:
assert span.get_metric("redis.pipeline_length") == queued


@pytest.mark.skipif(redis.VERSION < (4, 3, 0), reason="redis.asyncio.cluster is not implemented in redis<4.3.0")
@pytest.mark.asyncio
async def test_patch_unpatch(redis_cluster):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
[[
{
"name": "redis.command",
"service": "redis",
"resource": "SET\nGET",
"trace_id": 0,
"span_id": 1,
"parent_id": 0,
"type": "redis",
"error": 0,
"meta": {
"_dd.base_service": "tests.contrib.redis",
"_dd.p.dm": "-0",
"_dd.p.tid": "68b8917800000000",
"component": "redis",
"db.system": "redis",
"language": "python",
"redis.raw_command": "SET a 1\nGET a",
"runtime-id": "73c800bba8084530a15a15dbcea1a8bc",
"span.kind": "client"
},
"metrics": {
"_dd.measured": 1,
"_dd.top_level": 1,
"_dd.tracer_kr": 1.0,
"_sampling_priority_v1": 1,
"process_id": 4422,
"redis.pipeline_length": 2
},
"duration": 1411000,
"start": 1756926328130176000
}]]
Loading