diff --git a/.github/instructions/project-layout.instructions.md b/.github/instructions/project-layout.instructions.md new file mode 100644 index 0000000..88e8bea --- /dev/null +++ b/.github/instructions/project-layout.instructions.md @@ -0,0 +1,215 @@ +--- +applyTo: '**' +--- +# Adding New Redis Commands to Upstash Redis Python Package + +## Package Structure Overview + +The Upstash Redis Python package is organized as follows: + +``` +upstash_redis/ +├── __init__.py # Package exports +├── client.py # Synchronous Redis client +├── commands.py # Command implementations +├── commands.pyi # Type hints for commands +├── errors.py # Custom exceptions +├── format.py # Response formatters +├── http.py # HTTP client for Redis REST API +├── typing.py # Type definitions +├── utils.py # Utility functions +└── asyncio/ + ├── __init__.py + └── client.py # Asynchronous Redis client + +tests/ +├── commands/ # Command-specific tests organized by category +│ ├── hash/ # Hash command tests +│ ├── json/ # JSON command tests +│ ├── list/ # List command tests +│ ├── set/ # Set command tests +│ ├── sortedSet/ # Sorted set command tests +│ ├── string/ # String command tests +│ └── asyncio/ # Async versions of tests +``` + +## Steps to Add a New Redis Command + +### 1. Implement the Command in `commands.py` + +Add your command method to the `Commands` class: + +```python +# filepath: upstash_redis/commands.py +def your_new_command(self, key: str, *args, **kwargs) -> CommandsProtocol: + """ + Description of your command. + + Args: + key: The Redis key + *args: Additional arguments + **kwargs: Additional keyword arguments + + Returns: + CommandsProtocol: Command object for execution + """ + return self._execute_command("YOUR_REDIS_COMMAND", key, *args, **kwargs) +``` + +### 2. Add Type Hints in `commands.pyi` + +Update the type stub file with your command signature: + +```python +# filepath: upstash_redis/commands.pyi +def your_new_command(self, key: str, *args, **kwargs) -> CommandsProtocol: ... +``` + +### 3. Update Client Classes (if needed) + +If your command requires special handling, update both sync and async clients: + +```python +# filepath: upstash_redis/client.py +# Add any client-specific logic if needed + +# filepath: upstash_redis/asyncio/client.py +# Add async version if special handling is needed +``` + +### 4. Add Response Formatting (if needed) + +If your command returns data that needs special formatting, add a formatter in `format.py`: + +```python +# filepath: upstash_redis/format.py +def format_your_command_response(response: Any) -> YourReturnType: + """Format the response from your Redis command.""" + # Implementation here + pass +``` + +### 5. Write Comprehensive Tests + +Create test files in the appropriate category folder: + +```python +# filepath: tests/commands/{category}/test_your_new_command.py +import pytest +from upstash_redis import Redis + +def test_your_new_command_basic(): + """Test basic functionality of your new command.""" + redis = Redis.from_env() + result = redis.your_new_command("test_key", "arg1", "arg2") + # Add assertions + +def test_your_new_command_edge_cases(): + """Test edge cases and error conditions.""" + # Add edge case tests + +# If async support is needed: +# filepath: tests/commands/asyncio/test_your_new_command.py +import pytest +from upstash_redis.asyncio import Redis as AsyncRedis + +@pytest.mark.asyncio +async def test_your_new_command_async(): + """Test async version of your new command.""" + redis = AsyncRedis.from_env() + result = await redis.your_new_command("test_key", "arg1", "arg2") + # Add assertions +``` + +### 6. Update Package Exports (if needed) + +If you're adding a new public class or function, update `__init__.py`: + +```python +# filepath: upstash_redis/__init__.py +from upstash_redis.your_new_module import YourNewClass + +__all__ = ["AsyncRedis", "Redis", "YourNewClass"] +``` + +## Command Categories and Organization + +Commands are typically organized into these categories: + +- **String**: Basic key-value operations (`GET`, `SET`, etc.) +- **Hash**: Hash field operations (`HGET`, `HSET`, etc.) +- **List**: List operations (`LPUSH`, `RPOP`, etc.) +- **Set**: Set operations (`SADD`, `SREM`, etc.) +- **Sorted Set**: Sorted set operations (`ZADD`, `ZREM`, etc.) +- **JSON**: JSON operations (`JSON.GET`, `JSON.SET`, etc.) +- **Generic**: Key management (`DEL`, `EXISTS`, etc.) +- **Server**: Server management commands + +## Testing Guidelines + +1. **Test file naming**: `test_{command_name}.py` +2. **Test function naming**: `test_{command_name}_{scenario}` +3. **Include both positive and negative test cases** +4. **Test with different data types and edge cases** +5. **Add async tests if the command supports async operations** +6. **Use appropriate fixtures from `conftest.py`** + +## Example: Adding a New Hash Command + +Here's a complete example of adding a hypothetical `HMERGE` command: + +```python +# filepath: upstash_redis/commands.py +def hmerge(self, key: str, source_key: str) -> CommandsProtocol: + """ + Merge hash from source_key into key. + + Args: + key: Destination hash key + source_key: Source hash key to merge from + + Returns: + CommandsProtocol: Command for execution + """ + return self._execute_command("HMERGE", key, source_key) +``` + +```python +# filepath: tests/commands/hash/test_hmerge.py +import pytest +from upstash_redis import Redis + +def test_hmerge_basic(redis_client): + """Test basic HMERGE functionality.""" + redis = redis_client + + # Setup + redis.hset("hash1", {"field1": "value1", "field2": "value2"}) + redis.hset("hash2", {"field3": "value3", "field2": "overwrite"}) + + # Execute + result = redis.hmerge("hash1", "hash2") + + # Verify + merged_hash = redis.hgetall("hash1") + assert merged_hash["field1"] == "value1" + assert merged_hash["field2"] == "overwrite" # Should be overwritten + assert merged_hash["field3"] == "value3" # Should be added +``` + +## Running Tests + +To run tests for your new command: + +```bash +# Run specific test file +pytest tests/commands/hash/test_your_command.py -v + +# Run all tests in a category +pytest tests/commands/hash/ -v + +# Run all tests +pytest tests/ -v +``` + +Follow this structure and you'll have a well-integrated Redis command that follows the package's conventions and patterns. \ No newline at end of file diff --git a/README.md b/README.md index 74eba75..0407ef7 100644 --- a/README.md +++ b/README.md @@ -112,6 +112,58 @@ redis.bitfield_ro("test_key_2") \ .execute() ``` +### Redis Streams + +Redis Streams provide a powerful data structure for handling real-time data. The SDK supports all stream commands: + +```python +from upstash_redis import Redis + +redis = Redis.from_env() + +# Add entries to a stream +entry_id = redis.xadd("mystream", "*", {"field1": "value1", "field2": "value2"}) +print(f"Added entry: {entry_id}") + +# Read from stream +messages = redis.xread({"mystream": "0-0"}) +print(f"Messages: {messages}") + +# Create consumer group +redis.xgroup_create("mystream", "mygroup", "$") + +# Read as part of consumer group +messages = redis.xreadgroup("mygroup", "consumer1", {"mystream": ">"}) + +# Acknowledge processed messages +if messages: + message_ids = [msg[0] for msg in messages[0][1]] + redis.xack("mystream", "mygroup", *message_ids) + +# Get stream length +length = redis.xlen("mystream") +print(f"Stream length: {length}") +``` + +For async usage: + +```python +from upstash_redis.asyncio import Redis + +redis = Redis.from_env() + +async def stream_example(): + # Add entries to a stream + entry_id = await redis.xadd("mystream", "*", {"user": "alice", "action": "login"}) + + # Read from stream + messages = await redis.xread({"mystream": "0-0"}) + + # Consumer group operations + await redis.xgroup_create("mystream", "processors", "$") + messages = await redis.xreadgroup("processors", "worker1", {"mystream": ">"}) +``` + ### Custom commands If you want to run a command that hasn't been implemented, you can use the `execute` function of your client instance and pass the command as a `list`. diff --git a/pyproject.toml b/pyproject.toml index 12edb17..9550021 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "upstash-redis" -version = "1.4.0" +version = "1.5.0" description = "Serverless Redis SDK from Upstash" license = "MIT" authors = ["Upstash ", "Zgîmbău Tudor "] diff --git a/tests/commands/asyncio/scripting/test_evalsha_ro.py b/tests/commands/asyncio/scripting/test_evalsha_ro.py index 1ccae6e..dd92a95 100644 --- a/tests/commands/asyncio/scripting/test_evalsha_ro.py +++ b/tests/commands/asyncio/scripting/test_evalsha_ro.py @@ -32,7 +32,8 @@ async def test_with_arguments(async_redis: Redis) -> None: @mark.asyncio async def test_with_keys_and_arguments(async_redis: Redis) -> None: - sha1_digest = await execute_on_http("SCRIPT", "LOAD", "return {ARGV[1], KEYS[1]}") + # Load the script using the same Redis client instance + sha1_digest = await async_redis.script_load("return {ARGV[1], KEYS[1]}") assert isinstance(sha1_digest, str) assert await async_redis.evalsha_ro(sha1_digest, keys=["a"], args=["b"]) == [ diff --git a/tests/commands/asyncio/stream/__init__.py b/tests/commands/asyncio/stream/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/commands/asyncio/stream/test_xack.py b/tests/commands/asyncio/stream/test_xack.py new file mode 100644 index 0000000..568efcc --- /dev/null +++ b/tests/commands/asyncio/stream/test_xack.py @@ -0,0 +1,127 @@ +import pytest +import pytest_asyncio + +from upstash_redis.asyncio import Redis + + +@pytest_asyncio.fixture(autouse=True) +async def flush_db(async_redis: Redis): + await async_redis.flushdb() + + +@pytest.mark.asyncio +async def test_xack_basic(async_redis: Redis): + """Test basic XACK functionality""" + # Add entries + await async_redis.xadd("mystream", "*", {"field": "value1"}) + await async_redis.xadd("mystream", "*", {"field": "value2"}) + + # Create consumer group + await async_redis.xgroup_create("mystream", "mygroup", "0") + + # Read messages as consumer + result = await async_redis.xreadgroup("mygroup", "consumer1", {"mystream": ">"}) + entries = result[0][1] + + # Get message IDs + id1 = entries[0][0] + + # Acknowledge one message + acked = await async_redis.xack("mystream", "mygroup", id1) + assert acked == 1 + + # Check pending messages (should be 1 left) + pending = await async_redis.xpending("mystream", "mygroup") + assert pending[0] == 1 + + +@pytest.mark.asyncio +async def test_xack_multiple_ids(async_redis: Redis): + """Test XACK with multiple message IDs""" + # Add entries + for i in range(5): + await async_redis.xadd("mystream", "*", {"field": f"value{i}"}) + + # Create consumer group + await async_redis.xgroup_create("mystream", "mygroup", "0") + + # Read messages as consumer + result = await async_redis.xreadgroup("mygroup", "consumer1", {"mystream": ">"}) + entries = result[0][1] + + # Get first 3 message IDs + ids = [entry[0] for entry in entries[:3]] + + # Acknowledge multiple messages at once + acked = await async_redis.xack("mystream", "mygroup", *ids) + assert acked == 3 + + # Check pending messages (should be 2 left) + pending = await async_redis.xpending("mystream", "mygroup") + assert pending[0] == 2 + + +@pytest.mark.asyncio +async def test_xack_nonexistent_message(async_redis: Redis): + """Test XACK with non-existent message ID""" + # Add entry and create group + await async_redis.xadd("mystream", "*", {"field": "value1"}) + await async_redis.xgroup_create("mystream", "mygroup", "0") + + # Try to acknowledge non-existent message + acked = await async_redis.xack("mystream", "mygroup", "9999999999999-0") + assert acked == 0 + + +@pytest.mark.asyncio +async def test_xack_already_acknowledged(async_redis: Redis): + """Test XACK on already acknowledged message""" + # Add entry and create group + await async_redis.xadd("mystream", "*", {"field": "value1"}) + await async_redis.xgroup_create("mystream", "mygroup", "0") + + # Read and acknowledge message + result = await async_redis.xreadgroup("mygroup", "consumer1", {"mystream": ">"}) + message_id = result[0][1][0][0] + + # First acknowledgment + acked = await async_redis.xack("mystream", "mygroup", message_id) + assert acked == 1 + + # Second acknowledgment (already acked) + acked = await async_redis.xack("mystream", "mygroup", message_id) + assert acked == 0 + + +@pytest.mark.asyncio +async def test_xack_different_consumers(async_redis: Redis): + """Test XACK with messages from different consumers""" + # Add entries + await async_redis.xadd("mystream", "*", {"field": "value1"}) + await async_redis.xadd("mystream", "*", {"field": "value2"}) + + # Create consumer group + await async_redis.xgroup_create("mystream", "mygroup", "0") + + # Read with consumer1 + result1 = await async_redis.xreadgroup( + "mygroup", "consumer1", {"mystream": ">"}, count=1 + ) + id1 = result1[0][1][0][0] + + # Read with consumer2 + result2 = await async_redis.xreadgroup( + "mygroup", "consumer2", {"mystream": ">"}, count=1 + ) + id2 = result2[0][1][0][0] + + # Both consumers can acknowledge their respective messages + acked1 = await async_redis.xack("mystream", "mygroup", id1) + acked2 = await async_redis.xack("mystream", "mygroup", id2) + + assert acked1 == 1 + assert acked2 == 1 + + # No pending messages left + pending = await async_redis.xpending("mystream", "mygroup") + assert pending[0] == 0 diff --git a/tests/commands/asyncio/stream/test_xadd.py b/tests/commands/asyncio/stream/test_xadd.py new file mode 100644 index 0000000..ff8e363 --- /dev/null +++ b/tests/commands/asyncio/stream/test_xadd.py @@ -0,0 +1,151 @@ +import pytest +import pytest_asyncio + +from upstash_redis.asyncio import Redis + + +@pytest_asyncio.fixture(autouse=True) +async def flush_db(async_redis: Redis): + await async_redis.flushdb() + + +@pytest.mark.asyncio +async def test_xadd_basic(async_redis: Redis): + """Test basic XADD functionality""" + result = await async_redis.xadd( + "mystream", "*", {"field1": "value1", "field2": "value2"} + ) + assert isinstance(result, str) + assert len(result) > 0 + # Stream ID format should be timestamp-sequence + assert "-" in result + + +@pytest.mark.asyncio +async def test_xadd_with_specific_id(async_redis: Redis): + """Test XADD with a specific ID""" + stream_id = "1609459200000-0" + result = await async_redis.xadd("mystream", stream_id, {"field": "value"}) + assert result == stream_id + + +@pytest.mark.asyncio +async def test_xadd_with_maxlen_exact(async_redis: Redis): + """Test XADD with exact maxlen trimming""" + # Add multiple entries + for i in range(10): + await async_redis.xadd("mystream", "*", {"field": f"value{i}"}) + + # Add with maxlen=5 (exact) + await async_redis.xadd( + "mystream", "*", {"field": "new_value"}, maxlen=5, approximate_trim=False + ) + + # Check stream length + length = await async_redis.xlen("mystream") + assert length <= 5 + + +@pytest.mark.asyncio +async def test_xadd_with_maxlen_approximate(async_redis: Redis): + """Test XADD with approximate maxlen trimming""" + # Add multiple entries + for i in range(120): + await async_redis.xadd("mystream", "*", {"field": f"value{i}"}) + + # Add with approximate maxlen + await async_redis.xadd( + "mystream", "*", {"field": "new_value"}, maxlen=30, approximate_trim=True + ) + + # Check stream length (should be around 30, but may be slightly more due to approximation) + length = await async_redis.xlen("mystream") + assert length >= 110 + assert length <= 130 # Allow some variance for approximation + + +@pytest.mark.asyncio +async def test_xadd_with_nomkstream(async_redis: Redis): + """Test XADD with NOMKSTREAM option""" + # Should return None when stream doesn't exist and NOMKSTREAM is used + result = await async_redis.xadd( + "nonexistent", "*", {"field": "value"}, nomkstream=True + ) + assert result is None + + # Create the stream first + await async_redis.xadd("mystream", "*", {"field": "value1"}) + + # Now NOMKSTREAM should work + result = await async_redis.xadd( + "mystream", "*", {"field": "value2"}, nomkstream=True + ) + assert isinstance(result, str) + + +@pytest.mark.asyncio +async def test_xadd_with_minid(async_redis: Redis): + """Test XADD with MINID trimming""" + # Add some entries with known IDs + await async_redis.xadd("mystream", "1000-0", {"field": "old1"}) + await async_redis.xadd("mystream", "2000-0", {"field": "old2"}) + await async_redis.xadd("mystream", "3000-0", {"field": "old3"}) + + # Add new entry with MINID that should trim old entries + await async_redis.xadd( + "mystream", "4000-0", {"field": "new"}, minid="2500-0", approximate_trim=False + ) + + # Check that old entries before minid are removed + entries = await async_redis.xrange("mystream") + entry_ids = [entry[0] for entry in entries] + + # Should not contain entries with IDs less than 2500-0 + for entry_id in entry_ids: + timestamp = int(entry_id.split("-")[0]) + assert timestamp >= 2500 + + +@pytest.mark.asyncio +async def test_xadd_multiple_fields(async_redis: Redis): + """Test XADD with multiple fields""" + data = {"name": "John", "age": "30", "city": "New York", "score": "95.5"} + + result = await async_redis.xadd("mystream", "*", data) + assert isinstance(result, str) + + # Verify the data was stored correctly + entries = await async_redis.xrange("mystream") + assert len(entries) == 1 + + entry_id, entry_data = entries[0] + assert entry_id == result + + # Convert list to dict for easier comparison + entry_dict = {} + for i in range(0, len(entry_data), 2): + entry_dict[entry_data[i]] = entry_data[i + 1] + + assert entry_dict == data + + +@pytest.mark.asyncio +async def test_xadd_empty_data_should_fail(async_redis: Redis): + """Test that XADD with empty data should fail""" + with pytest.raises(Exception): + await async_redis.xadd("mystream", "*", {}) + + +@pytest.mark.asyncio +async def test_xadd_with_limit(async_redis: Redis): + """Test XADD with LIMIT option for trimming""" + # Add many entries + for i in range(100): + await async_redis.xadd("mystream", "*", {"field": f"value{i}"}) + + # Add with maxlen and limit + await async_redis.xadd("mystream", "*", {"field": "limited"}, maxlen=50, limit=10) + + # The stream should be trimmed but might not reach exactly 50 due to limit + length = await async_redis.xlen("mystream") + assert length > 50 # Should be more than 50 due to limit constraint diff --git a/tests/commands/asyncio/stream/test_xclaim.py b/tests/commands/asyncio/stream/test_xclaim.py new file mode 100644 index 0000000..b3a99d9 --- /dev/null +++ b/tests/commands/asyncio/stream/test_xclaim.py @@ -0,0 +1,86 @@ +import pytest +import pytest_asyncio + +from upstash_redis.asyncio import Redis + + +@pytest_asyncio.fixture(autouse=True) +async def flush_db(async_redis: Redis): + await async_redis.flushdb() + + +@pytest.mark.asyncio +async def test_xclaim_basic(async_redis: Redis): + """Test basic XCLAIM functionality""" + # Add entries and create group + await async_redis.xadd("mystream", "*", {"field": "value1"}) + await async_redis.xadd("mystream", "*", {"field": "value2"}) + await async_redis.xgroup_create("mystream", "mygroup", "0") + + # Read messages with consumer1 + result = await async_redis.xreadgroup("mygroup", "consumer1", {"mystream": ">"}) + message_id = result[0][1][0][0] + + # Consumer2 claims the message from consumer1 + claimed = await async_redis.xclaim( + "mystream", + "mygroup", + "consumer2", + 0, # min_idle_time = 0 (claim immediately) + message_id, + ) + + assert isinstance(claimed, list) + if len(claimed) > 0: + assert claimed[0][0] == message_id + + +@pytest.mark.asyncio +async def test_xclaim_with_justid(async_redis: Redis): + """Test XCLAIM with JUSTID option""" + # Add entries and create group + await async_redis.xadd("mystream", "*", {"field": "value1"}) + await async_redis.xgroup_create("mystream", "mygroup", "0") + + # Read message with consumer1 + result = await async_redis.xreadgroup("mygroup", "consumer1", {"mystream": ">"}) + message_id = result[0][1][0][0] + + # Claim with justid option + claimed = await async_redis.xclaim( + "mystream", + "mygroup", + "consumer2", + 0, # min_idle_time + message_id, + justid=True, # return just IDs + ) + + assert isinstance(claimed, list) + + +@pytest.mark.asyncio +async def test_xclaim_multiple_ids(async_redis: Redis): + """Test XCLAIM with multiple message IDs""" + # Add multiple entries + for i in range(3): + await async_redis.xadd("mystream", "*", {"field": f"value{i}"}) + + # Create group and read messages + await async_redis.xgroup_create("mystream", "mygroup", "0") + result = await async_redis.xreadgroup("mygroup", "consumer1", {"mystream": ">"}) + + # Get all message IDs + message_ids = [entry[0] for entry in result[0][1]] + + # Claim all messages at once + claimed = await async_redis.xclaim( + "mystream", + "mygroup", + "consumer2", + 0, # min_idle_time + *message_ids, # unpack all message IDs + ) + + assert isinstance(claimed, list) + assert len(claimed) >= len(message_ids) diff --git a/tests/commands/asyncio/stream/test_xlen.py b/tests/commands/asyncio/stream/test_xlen.py new file mode 100644 index 0000000..a70c8be --- /dev/null +++ b/tests/commands/asyncio/stream/test_xlen.py @@ -0,0 +1,69 @@ +import pytest +import pytest_asyncio + +from upstash_redis.asyncio import Redis + + +@pytest_asyncio.fixture(autouse=True) +async def flush_db(async_redis: Redis): + await async_redis.flushdb() + + +@pytest.mark.asyncio +async def test_xlen_basic(async_redis: Redis): + """Test basic XLEN functionality""" + # Initially, stream doesn't exist + length = await async_redis.xlen("mystream") + assert length == 0 + + # Add entries and check length + await async_redis.xadd("mystream", "*", {"field": "value1"}) + length = await async_redis.xlen("mystream") + assert length == 1 + + # Add more entries + await async_redis.xadd("mystream", "*", {"field": "value2"}) + await async_redis.xadd("mystream", "*", {"field": "value3"}) + length = await async_redis.xlen("mystream") + assert length == 3 + + +@pytest.mark.asyncio +async def test_xlen_after_deletion(async_redis: Redis): + """Test XLEN after deleting entries""" + # Add entries + await async_redis.xadd("mystream", "*", {"field": "value1"}) + id2 = await async_redis.xadd("mystream", "*", {"field": "value2"}) + await async_redis.xadd("mystream", "*", {"field": "value3"}) + + # Initial length + length = await async_redis.xlen("mystream") + assert length == 3 + + # Delete one entry + deleted = await async_redis.xdel("mystream", id2) + assert deleted == 1 + + # Length should decrease + length = await async_redis.xlen("mystream") + assert length == 2 + + +@pytest.mark.asyncio +async def test_xlen_after_trimming(async_redis: Redis): + """Test XLEN after trimming stream""" + # Add multiple entries + for i in range(130): + await async_redis.xadd("mystream", "*", {"field": f"value{i}"}) + + # Initial length + length = await async_redis.xlen("mystream") + assert length == 130 + + # Trim to 5 entries + trimmed = await async_redis.xtrim("mystream", maxlen=5) + assert trimmed >= 0 + + # Length should be 5 + length = await async_redis.xlen("mystream") + assert length == 5 diff --git a/tests/commands/asyncio/stream/test_xpending.py b/tests/commands/asyncio/stream/test_xpending.py new file mode 100644 index 0000000..f6aa701 --- /dev/null +++ b/tests/commands/asyncio/stream/test_xpending.py @@ -0,0 +1,125 @@ +import pytest +import pytest_asyncio + +from upstash_redis.asyncio import Redis + + +@pytest_asyncio.fixture(autouse=True) +async def flush_db(async_redis: Redis): + await async_redis.flushdb() + + +@pytest.mark.asyncio +async def test_xpending_summary(async_redis: Redis): + """Test XPENDING summary form""" + # Add entries and create group + await async_redis.xadd("mystream", "*", {"field": "value1"}) + await async_redis.xadd("mystream", "*", {"field": "value2"}) + await async_redis.xgroup_create("mystream", "mygroup", "0") + + # Read messages to create pending entries + await async_redis.xreadgroup("mygroup", "consumer1", {"mystream": ">"}) + + # Get summary pending info + pending = await async_redis.xpending("mystream", "mygroup") + + assert isinstance(pending, list) + assert pending[0] == 2 # Should have 2 pending messages + + +@pytest.mark.asyncio +async def test_xpending_detailed(async_redis: Redis): + """Test XPENDING detailed form""" + # Add entries and create group + await async_redis.xadd("mystream", "*", {"field": "value1"}) + await async_redis.xadd("mystream", "*", {"field": "value2"}) + await async_redis.xgroup_create("mystream", "mygroup", "0") + + # Read messages to create pending entries + await async_redis.xreadgroup("mygroup", "consumer1", {"mystream": ">"}) + + # Get detailed pending info - all three parameters are required + detailed = await async_redis.xpending("mystream", "mygroup", "-", "+", 10) + + assert isinstance(detailed, list) + assert len(detailed) == 2 # Should have 2 pending messages + + +@pytest.mark.asyncio +async def test_xpending_detailed_with_consumer(async_redis: Redis): + """Test XPENDING detailed form with specific consumer""" + # Add entries and create group + for i in range(3): + await async_redis.xadd("mystream", "*", {"field": f"value{i}"}) + + await async_redis.xgroup_create("mystream", "mygroup", "0") + + # Read messages with different consumers + await async_redis.xreadgroup("mygroup", "consumer1", {"mystream": ">"}, count=2) + await async_redis.xreadgroup("mygroup", "consumer2", {"mystream": ">"}, count=1) + + # Get detailed pending info for specific consumer + detailed = await async_redis.xpending( + "mystream", "mygroup", "-", "+", 10, "consumer1" + ) + + assert isinstance(detailed, list) + assert len(detailed) == 2 # consumer1 should have 2 pending messages + + +@pytest.mark.asyncio +async def test_xpending_detailed_with_idle(async_redis: Redis): + """Test XPENDING detailed form with IDLE parameter""" + # Add entries and create group + await async_redis.xadd("mystream", "*", {"field": "value1"}) + await async_redis.xgroup_create("mystream", "mygroup", "0") + + # Read message to create pending entry + await async_redis.xreadgroup("mygroup", "consumer1", {"mystream": ">"}) + + # Get detailed pending info with IDLE filter (0ms minimum idle time) + detailed = await async_redis.xpending("mystream", "mygroup", "-", "+", 10, idle=0) + + assert isinstance(detailed, list) + assert len(detailed) >= 1 + + +@pytest.mark.asyncio +async def test_xpending_partial_args_error(async_redis: Redis): + """Test that providing partial detailed args raises an error""" + await async_redis.xadd("mystream", "*", {"field": "value1"}) + await async_redis.xgroup_create("mystream", "mygroup", "0") + + # Providing only start should raise an error + with pytest.raises( + ValueError, match="start, end, and count must all be provided together" + ): + await async_redis.xpending("mystream", "mygroup", start="-") + + # Providing only start and end should raise an error + with pytest.raises( + ValueError, match="start, end, and count must all be provided together" + ): + await async_redis.xpending("mystream", "mygroup", start="-", end="+") + + # Providing only count should raise an error + with pytest.raises( + ValueError, match="start, end, and count must all be provided together" + ): + await async_redis.xpending("mystream", "mygroup", count=10) + + +@pytest.mark.asyncio +async def test_xpending_empty_pending_list(async_redis: Redis): + """Test XPENDING when no messages are pending""" + # Add entry and create group but don't read any messages + await async_redis.xadd("mystream", "*", {"field": "value1"}) + await async_redis.xgroup_create("mystream", "mygroup", "$") # Start from end + + # Summary should show no pending messages + pending = await async_redis.xpending("mystream", "mygroup") + assert pending[0] == 0 + + # Detailed should return empty list + detailed = await async_redis.xpending("mystream", "mygroup", "-", "+", 10) + assert detailed == [] diff --git a/tests/commands/asyncio/stream/test_xread.py b/tests/commands/asyncio/stream/test_xread.py new file mode 100644 index 0000000..4f0bfe4 --- /dev/null +++ b/tests/commands/asyncio/stream/test_xread.py @@ -0,0 +1,88 @@ +import pytest +import pytest_asyncio + +from upstash_redis.asyncio import Redis + + +@pytest_asyncio.fixture(autouse=True) +async def flush_db(async_redis: Redis): + await async_redis.flushdb() + + +@pytest.mark.asyncio +async def test_xread_basic(async_redis: Redis): + """Test basic XREAD functionality""" + # Add some entries + await async_redis.xadd("mystream", "*", {"field": "value1"}) + await async_redis.xadd("mystream", "*", {"field": "value2"}) + + # Read from beginning + result = await async_redis.xread({"mystream": "0-0"}) + + assert len(result) == 1 + stream_name, entries = result[0] + assert stream_name == "mystream" + assert len(entries) >= 2 + + +@pytest.mark.asyncio +async def test_xread_with_count(async_redis: Redis): + """Test XREAD with COUNT option""" + # Add multiple entries + for i in range(5): + await async_redis.xadd("mystream", "*", {"field": f"value{i}"}) + + # Read with count limit + result = await async_redis.xread({"mystream": "0-0"}, count=2) + stream_name, entries = result[0] + assert len(entries) == 2 + + +@pytest.mark.asyncio +async def test_xread_multiple_streams(async_redis: Redis): + """Test XREAD with multiple streams""" + # Add entries to multiple streams + await async_redis.xadd("stream1", "*", {"field": "value1"}) + await async_redis.xadd("stream2", "*", {"field": "value2"}) + + # Read from both streams + result = await async_redis.xread({"stream1": "0-0", "stream2": "0-0"}) + + assert len(result) == 2 + stream_names = [stream[0] for stream in result] + assert "stream1" in stream_names + assert "stream2" in stream_names + + +@pytest.mark.asyncio +async def test_xread_from_specific_id(async_redis: Redis): + """Test XREAD from a specific stream ID""" + # Add entries + await async_redis.xadd("mystream", "*", {"field": "value1"}) + id2 = await async_redis.xadd("mystream", "*", {"field": "value2"}) + id3 = await async_redis.xadd("mystream", "*", {"field": "value3"}) + + # Read from id2 onwards (should get id3) + result = await async_redis.xread({"mystream": id2}) + + stream_name, entries = result[0] + assert len(entries) == 1 + assert entries[0][0] == id3 + + +@pytest.mark.asyncio +async def test_xread_nonexistent_stream(async_redis: Redis): + """Test XREAD on nonexistent stream""" + result = await async_redis.xread({"nonexistent": "0-0"}) + assert result == [] + + +@pytest.mark.asyncio +async def test_xread_from_end(async_redis: Redis): + """Test XREAD from end of stream (no new messages)""" + # Add an entry + await async_redis.xadd("mystream", "*", {"field": "value1"}) + + # Read from $ (end of stream) - should return empty + result = await async_redis.xread({"mystream": "$"}) + assert result == [] diff --git a/tests/commands/asyncio/stream/test_xreadgroup.py b/tests/commands/asyncio/stream/test_xreadgroup.py new file mode 100644 index 0000000..0266a7c --- /dev/null +++ b/tests/commands/asyncio/stream/test_xreadgroup.py @@ -0,0 +1,136 @@ +import pytest +import pytest_asyncio + +from upstash_redis.asyncio import Redis + + +@pytest_asyncio.fixture(autouse=True) +async def flush_db(async_redis: Redis): + await async_redis.flushdb() + + +@pytest.mark.asyncio +async def test_xreadgroup_basic(async_redis: Redis): + """Test basic XREADGROUP functionality""" + # Add some entries + await async_redis.xadd("mystream", "*", {"field": "value1"}) + await async_redis.xadd("mystream", "*", {"field": "value2"}) + + # Create consumer group + await async_redis.xgroup_create("mystream", "mygroup", "0") + + # Read as part of consumer group + result = await async_redis.xreadgroup("mygroup", "consumer1", {"mystream": ">"}) + + assert len(result) == 1 + stream_name, entries = result[0] + assert stream_name == "mystream" + assert len(entries) == 2 + + +@pytest.mark.asyncio +async def test_xreadgroup_with_count(async_redis: Redis): + """Test XREADGROUP with COUNT option""" + # Add multiple entries + for i in range(5): + await async_redis.xadd("mystream", "*", {"field": f"value{i}"}) + + # Create consumer group + await async_redis.xgroup_create("mystream", "mygroup", "0") + + # Read with count limit + result = await async_redis.xreadgroup( + "mygroup", "consumer1", {"mystream": ">"}, count=3 + ) + + stream_name, entries = result[0] + assert len(entries) == 3 + + +@pytest.mark.asyncio +async def test_xreadgroup_with_noack(async_redis: Redis): + """Test XREADGROUP with NOACK option""" + # Add an entry + await async_redis.xadd("mystream", "*", {"field": "value1"}) + + # Create consumer group + await async_redis.xgroup_create("mystream", "mygroup", "0") + + # Read with NOACK - message should not be added to PEL + result = await async_redis.xreadgroup( + "mygroup", "consumer1", {"mystream": ">"}, noack=True + ) + + assert len(result) == 1 + stream_name, entries = result[0] + assert len(entries) == 1 + + # Check pending list should be empty due to NOACK + pending = await async_redis.xpending("mystream", "mygroup") + assert pending[0] == 0 + + +@pytest.mark.asyncio +async def test_xreadgroup_multiple_consumers(async_redis: Redis): + """Test XREADGROUP with multiple consumers""" + # Add multiple entries + for i in range(4): + await async_redis.xadd("mystream", "*", {"field": f"value{i}"}) + + # Create consumer group + await async_redis.xgroup_create("mystream", "mygroup", "0") + + # Read with first consumer + result1 = await async_redis.xreadgroup( + "mygroup", "consumer1", {"mystream": ">"}, count=2 + ) + + # Read with second consumer + result2 = await async_redis.xreadgroup( + "mygroup", "consumer2", {"mystream": ">"}, count=2 + ) + + # Both should get different messages + entries1 = result1[0][1] + entries2 = result2[0][1] + + assert len(entries1) == 2 + assert len(entries2) == 2 + + # Entries should be different + entry_ids1 = {entry[0] for entry in entries1} + entry_ids2 = {entry[0] for entry in entries2} + assert entry_ids1.isdisjoint(entry_ids2) + + +@pytest.mark.asyncio +async def test_xreadgroup_from_id(async_redis: Redis): + """Test XREADGROUP from specific ID""" + # Add entries + id1 = await async_redis.xadd("mystream", "*", {"field": "value1"}) + id2 = await async_redis.xadd("mystream", "*", {"field": "value2"}) + + # Create consumer group starting from id1 + await async_redis.xgroup_create("mystream", "mygroup", id1) + + # Read new messages (should only get id2) + result = await async_redis.xreadgroup("mygroup", "consumer1", {"mystream": ">"}) + + stream_name, entries = result[0] + assert len(entries) == 1 + assert entries[0][0] == id2 + + +@pytest.mark.asyncio +async def test_xreadgroup_no_new_messages(async_redis: Redis): + """Test XREADGROUP when no new messages are available""" + # Add an entry + await async_redis.xadd("mystream", "*", {"field": "value1"}) + + # Create consumer group from end + await async_redis.xgroup_create("mystream", "mygroup", "$") + + # Try to read new messages (should be empty) + result = await async_redis.xreadgroup("mygroup", "consumer1", {"mystream": ">"}) + + assert result == [] diff --git a/tests/commands/stream/test_xack.py b/tests/commands/stream/test_xack.py new file mode 100644 index 0000000..c332429 --- /dev/null +++ b/tests/commands/stream/test_xack.py @@ -0,0 +1,157 @@ +import pytest + +from upstash_redis import Redis + + +@pytest.fixture(autouse=True) +def flush_db(redis: Redis): + redis.flushdb() + + +def test_xack_acknowledge_messages(redis: Redis): + """Test acknowledging messages successfully""" + stream_key = "test_stream" + group = "test_group" + consumer = "test_consumer" + + # Add some messages to the stream + id1 = redis.xadd(stream_key, "*", {"field1": "value1"}) + id2 = redis.xadd(stream_key, "*", {"field2": "value2"}) + + # Create a consumer group + redis.xgroup_create(stream_key, group, "0") + + # Read messages with the consumer group (makes them pending) + redis.xreadgroup(group, consumer, {stream_key: ">"}, count=2) + + # Acknowledge the messages + result = redis.xack(stream_key, group, id1, id2) + assert result == 2 + + +def test_xack_acknowledge_single_message(redis: Redis): + """Test acknowledging a single message""" + stream_key = "test_stream" + group = "test_group" + consumer = "test_consumer" + + # Add a message + message_id = redis.xadd(stream_key, "*", {"field": "value"}) + + # Create consumer group + redis.xgroup_create(stream_key, group, "0") + + # Read the message + redis.xreadgroup(group, consumer, {stream_key: ">"}, count=1) + + # Acknowledge it + result = redis.xack(stream_key, group, message_id) + assert result == 1 + + +def test_xack_re_acknowledge_returns_zero(redis: Redis): + """Test that re-acknowledging a message returns 0""" + stream_key = "test_stream" + group = "test_group" + consumer = "test_consumer" + + # Add a message + message_id = redis.xadd(stream_key, "*", {"field": "value"}) + + # Create consumer group with MKSTREAM + redis.xgroup_create(stream_key, group, "0", mkstream=True) + + # Read the message + redis.xreadgroup(group, consumer, {stream_key: ">"}, count=1) + + # Acknowledge once + result1 = redis.xack(stream_key, group, message_id) + assert result1 == 1 + + # Try to acknowledge again + result2 = redis.xack(stream_key, group, message_id) + assert result2 == 0 + + +def test_xack_nonexistent_message(redis: Redis): + """Test acknowledging non-existent messages returns 0""" + stream_key = "test_stream" + group = "test_group" + + # Add a message and create group + redis.xadd(stream_key, "*", {"field": "value"}) + redis.xgroup_create(stream_key, group, "0") + + # Try to acknowledge a non-existent message + result = redis.xack(stream_key, group, "9999999999999-0") + assert result == 0 + + +def test_xack_multiple_messages(redis: Redis): + """Test acknowledging multiple messages at once""" + stream_key = "test_stream" + group = "test_group" + consumer = "test_consumer" + + # Add multiple messages + ids = [] + for i in range(5): + message_id = redis.xadd(stream_key, "*", {"field": f"value{i}"}) + ids.append(message_id) + + # Create consumer group + redis.xgroup_create(stream_key, group, "0") + + # Read all messages + redis.xreadgroup(group, consumer, {stream_key: ">"}, count=5) + + # Acknowledge all messages + result = redis.xack(stream_key, group, *ids) + assert result == 5 + + +def test_xack_partial_acknowledge(redis: Redis): + """Test acknowledging some messages out of many""" + stream_key = "test_stream" + group = "test_group" + consumer = "test_consumer" + + # Add multiple messages + ids = [] + for i in range(5): + message_id = redis.xadd(stream_key, "*", {"field": f"value{i}"}) + ids.append(message_id) + + # Create consumer group + redis.xgroup_create(stream_key, group, "0") + + # Read all messages + redis.xreadgroup(group, consumer, {stream_key: ">"}, count=5) + + # Acknowledge only first 3 messages + result = redis.xack(stream_key, group, ids[0], ids[1], ids[2]) + assert result == 3 + + # Check that 2 messages are still pending + pending_info = redis.xpending(stream_key, group) + assert pending_info[0] == 2 # 2 messages still pending + + +def test_xack_wrong_group_name(redis: Redis): + """Test acknowledging with wrong group name""" + stream_key = "test_stream" + group = "test_group" + wrong_group = "wrong_group" + consumer = "test_consumer" + + # Add a message + message_id = redis.xadd(stream_key, "*", {"field": "value"}) + + # Create consumer group + redis.xgroup_create(stream_key, group, "0") + + # Read the message + redis.xreadgroup(group, consumer, {stream_key: ">"}, count=1) + + response = redis.xack(stream_key, wrong_group, message_id) + assert response == 0 diff --git a/tests/commands/stream/test_xadd.py b/tests/commands/stream/test_xadd.py new file mode 100644 index 0000000..3ef5aa8 --- /dev/null +++ b/tests/commands/stream/test_xadd.py @@ -0,0 +1,135 @@ +import pytest + +from upstash_redis import Redis + + +@pytest.fixture(autouse=True) +def flush_db(redis: Redis): + redis.flushdb() + + +def test_xadd_basic(redis: Redis): + """Test basic XADD functionality""" + result = redis.xadd("mystream", "*", {"field1": "value1", "field2": "value2"}) + assert isinstance(result, str) + assert len(result) > 0 + # Stream ID format should be timestamp-sequence + assert "-" in result + + +def test_xadd_with_specific_id(redis: Redis): + """Test XADD with a specific ID""" + stream_id = "1609459200000-0" + result = redis.xadd("mystream", stream_id, {"field": "value"}) + assert result == stream_id + + +def test_xadd_with_maxlen_exact(redis: Redis): + """Test XADD with exact maxlen trimming""" + # Add multiple entries + for i in range(10): + redis.xadd("mystream", "*", {"field": f"value{i}"}) + + # Add with maxlen=5 (exact) + redis.xadd( + "mystream", "*", {"field": "new_value"}, maxlen=5, approximate_trim=False + ) + + # Check stream length + length = redis.xlen("mystream") + assert length <= 5 + + +def test_xadd_with_maxlen_approximate(redis: Redis): + """Test XADD with approximate maxlen trimming""" + # Add multiple entries + for i in range(110): + redis.xadd("mystream", "*", {"field": f"value{i}"}) + + # Add with approximate maxlen + redis.xadd( + "mystream", "*", {"field": "new_value"}, maxlen=30, approximate_trim=True + ) + + # Check stream length (should be around 30, but may be slightly more due to approximation) + length = redis.xlen("mystream") + assert length >= 100 + assert length <= 120 # Allow some variance for approximation + + +def test_xadd_with_nomkstream(redis: Redis): + """Test XADD with NOMKSTREAM option""" + # Should return None when stream doesn't exist and NOMKSTREAM is used + result = redis.xadd("nonexistent", "*", {"field": "value"}, nomkstream=True) + assert result is None + + # Create the stream first + redis.xadd("mystream", "*", {"field": "value1"}) + + # Now NOMKSTREAM should work + result = redis.xadd("mystream", "*", {"field": "value2"}, nomkstream=True) + assert isinstance(result, str) + + +def test_xadd_with_minid(redis: Redis): + """Test XADD with MINID trimming""" + # Add some entries with known IDs + redis.xadd("mystream", "1000-0", {"field": "old1"}) + redis.xadd("mystream", "2000-0", {"field": "old2"}) + redis.xadd("mystream", "3000-0", {"field": "old3"}) + + # Add new entry with MINID that should trim old entries + redis.xadd( + "mystream", "4000-0", {"field": "new"}, minid="2500-0", approximate_trim=False + ) + + # Check that old entries before minid are removed + entries = redis.xrange("mystream") + entry_ids = [entry[0] for entry in entries] + + # Should not contain entries with IDs less than 2500-0 + for entry_id in entry_ids: + timestamp = int(entry_id.split("-")[0]) + assert timestamp >= 2500 + + +def test_xadd_multiple_fields(redis: Redis): + """Test XADD with multiple fields""" + data = {"name": "John", "age": "30", "city": "New York", "score": "95.5"} + + result = redis.xadd("mystream", "*", data) + assert isinstance(result, str) + + # Verify the data was stored correctly + entries = redis.xrange("mystream") + assert len(entries) == 1 + + entry_id, entry_data = entries[0] + assert entry_id == result + + # Convert list to dict for easier comparison + entry_dict = {} + for i in range(0, len(entry_data), 2): + entry_dict[entry_data[i]] = entry_data[i + 1] + + assert entry_dict == data + + +def test_xadd_empty_data_should_fail(redis: Redis): + """Test that XADD with empty data should fail""" + with pytest.raises(Exception): + redis.xadd("mystream", "*", {}) + + +def test_xadd_with_limit(redis: Redis): + """Test XADD with LIMIT option for trimming""" + # Add many entries + for i in range(100): + redis.xadd("mystream", "*", {"field": f"value{i}"}) + + # Add with maxlen and limit + redis.xadd("mystream", "*", {"field": "limited"}, maxlen=50, limit=10) + + # The stream should be trimmed but might not reach exactly 50 due to limit + length = redis.xlen("mystream") + assert length > 50 # Should be more than 50 due to limit constraint diff --git a/tests/commands/stream/test_xclaim.py b/tests/commands/stream/test_xclaim.py new file mode 100644 index 0000000..4be3413 --- /dev/null +++ b/tests/commands/stream/test_xclaim.py @@ -0,0 +1,290 @@ +import pytest +import time + +from upstash_redis import Redis + + +@pytest.fixture(autouse=True) +def flush_db(redis: Redis): + redis.flushdb() + + +def test_xclaim_basic(redis: Redis): + """Test basic XCLAIM functionality""" + stream_key = "test_stream" + group = "test_group" + consumer1 = "consumer1" + consumer2 = "consumer2" + + # Add entry and setup consumer group + redis.xadd(stream_key, "*", {"field": "value"}) + redis.xgroup_create(stream_key, group, "0") + + # Consumer1 reads the message + result = redis.xreadgroup(group, consumer1, {stream_key: ">"}, count=1) + message_id = result[0][1][0][0] + + # Consumer2 claims the message from consumer1 + claimed = redis.xclaim( + stream_key, + group, + consumer2, + 0, # min_idle_time = 0 (claim immediately) + message_id, + ) + + assert isinstance(claimed, list) + if len(claimed) > 0: + # Should return the claimed message info + assert claimed[0][0] == message_id + + +def test_xclaim_with_idle_time(redis: Redis): + """Test XCLAIM with minimum idle time""" + stream_key = "test_stream" + group = "test_group" + consumer1 = "consumer1" + consumer2 = "consumer2" + + # Setup + redis.xadd(stream_key, "*", {"field": "value"}) + redis.xgroup_create(stream_key, group, "0") + + # Consumer1 reads message + result = redis.xreadgroup(group, consumer1, {stream_key: ">"}, count=1) + message_id = result[0][1][0][0] + + # Wait a bit + time.sleep(1) + + # Consumer2 tries to claim with idle time requirement + claimed = redis.xclaim( + stream_key, + group, + consumer2, + 500, # 500ms minimum idle time + message_id, + ) + + # Should succeed since message has been idle long enough + assert isinstance(claimed, list) + + +def test_xclaim_justid_option(redis: Redis): + """Test XCLAIM with JUSTID option""" + stream_key = "test_stream" + group = "test_group" + consumer1 = "consumer1" + consumer2 = "consumer2" + + # Setup + redis.xadd(stream_key, "*", {"field": "value"}) + redis.xgroup_create(stream_key, group, "0") + + # Consumer1 reads message + result = redis.xreadgroup(group, consumer1, {stream_key: ">"}, count=1) + message_id = result[0][1][0][0] + + # Consumer2 claims with JUSTID + claimed = redis.xclaim(stream_key, group, consumer2, 0, message_id, justid=True) + + # With JUSTID, should return only the IDs + assert isinstance(claimed, list) + if len(claimed) > 0: + assert claimed[0] == message_id + + +def test_xclaim_multiple_messages(redis: Redis): + """Test XCLAIM with multiple messages""" + stream_key = "test_stream" + group = "test_group" + consumer1 = "consumer1" + consumer2 = "consumer2" + + # Add multiple messages + message_ids = [] + for i in range(3): + entry_id = redis.xadd(stream_key, "*", {"field": f"value{i}"}) + message_ids.append(entry_id) + + # Setup group and consumer1 reads all + redis.xgroup_create(stream_key, group, "0") + redis.xreadgroup(group, consumer1, {stream_key: ">"}, count=3) + + # Consumer2 claims all messages + claimed = redis.xclaim(stream_key, group, consumer2, 0, *message_ids) + + assert isinstance(claimed, list) + + +def test_xclaim_basic_functionality(redis: Redis): + """Test basic XCLAIM functionality""" + stream_key = "test_stream" + group = "test_group" + consumer1 = "consumer1" + consumer2 = "consumer2" + + # Setup + redis.xadd(stream_key, "*", {"field": "value"}) + redis.xgroup_create(stream_key, group, "0") + + # Consumer1 reads message + result = redis.xreadgroup(group, consumer1, {stream_key: ">"}, count=1) + message_id = result[0][1][0][0] + + # Consumer2 claims message + claimed = redis.xclaim( + stream_key, + group, + consumer2, + 0, # min_idle_time + message_id, + ) + + # Should succeed despite high idle time requirement + assert isinstance(claimed, list) + + +def test_xautoclaim_basic(redis: Redis): + """Test basic XAUTOCLAIM functionality""" + stream_key = "test_stream" + group = "test_group" + consumer1 = "consumer1" + consumer2 = "consumer2" + + # Setup + redis.xadd(stream_key, "*", {"field": "value1"}) + redis.xadd(stream_key, "*", {"field": "value2"}) + redis.xgroup_create(stream_key, group, "0") + + # Consumer1 reads messages + redis.xreadgroup(group, consumer1, {stream_key: ">"}, count=2) + + # Wait a bit to make messages idle + time.sleep(1) + + # Consumer2 auto-claims idle messages + result = redis.xautoclaim( + stream_key, + group, + consumer2, + 500, # 500ms minimum idle time + "0-0", # Start from beginning + count=10, + ) + + # Should return [next_start, claimed_messages] or [next_start, claimed_messages, deleted_message_ids] + assert isinstance(result, list) + assert len(result) >= 2 + + next_start = result[0] + claimed_messages = result[1] + # Third element (deleted_message_ids) may or may not be present depending on Redis version + deleted_message_ids = result[2] if len(result) > 2 else [] + + assert isinstance(next_start, str) + assert isinstance(claimed_messages, list) + assert isinstance(deleted_message_ids, list) + + +def test_xautoclaim_with_count_limit(redis: Redis): + """Test XAUTOCLAIM with count limit""" + stream_key = "test_stream" + group = "test_group" + consumer1 = "consumer1" + consumer2 = "consumer2" + + # Add multiple messages + for i in range(5): + redis.xadd(stream_key, "*", {"field": f"value{i}"}) + + # Setup group and consumer1 reads all + redis.xgroup_create(stream_key, group, "0") + redis.xreadgroup(group, consumer1, {stream_key: ">"}, count=5) + + # Wait and auto-claim with count limit + time.sleep(1) + result = redis.xautoclaim(stream_key, group, consumer2, 500, "0-0", count=2) + + # Should respect count limit + claimed_messages = result[1] + assert len(claimed_messages) <= 2 + + +def test_xautoclaim_justid_option(redis: Redis): + """Test XAUTOCLAIM with JUSTID option""" + stream_key = "test_stream" + group = "test_group" + consumer1 = "consumer1" + consumer2 = "consumer2" + + # Setup + redis.xadd(stream_key, "*", {"field": "value"}) + redis.xgroup_create(stream_key, group, "0") + + # Consumer1 reads, then consumer2 auto-claims with JUSTID + redis.xreadgroup(group, consumer1, {stream_key: ">"}, count=1) + time.sleep(1) + + result = redis.xautoclaim(stream_key, group, consumer2, 500, "0-0", justid=True) + + # With JUSTID, claimed messages should be just IDs + assert isinstance(result, list) + assert ( + len(result) >= 2 + ) # [next_start, claimed_messages] or [next_start, claimed_messages, deleted_message_ids] + if len(result[1]) > 0: + # Each claimed item should be a string ID, not [id, data] pair + claimed_item = result[1][0] + assert isinstance(claimed_item, str) + + +def test_xautoclaim_no_idle_messages(redis: Redis): + """Test XAUTOCLAIM when no messages meet idle criteria""" + stream_key = "test_stream" + group = "test_group" + consumer1 = "consumer1" + consumer2 = "consumer2" + + # Setup and immediately try to auto-claim + redis.xadd(stream_key, "*", {"field": "value"}) + redis.xgroup_create(stream_key, group, "0") + redis.xreadgroup(group, consumer1, {stream_key: ">"}, count=1) + + # Immediately try to claim with high idle requirement + result = redis.xautoclaim( + stream_key, + group, + consumer2, + 5000, # 5 seconds idle time required + "0-0", + ) + + # Should return empty claimed messages + assert isinstance(result, list) + assert len(result) >= 2 + + claimed_messages = result[1] + # Third element (deleted_message_ids) may or may not be present depending on Redis version + deleted_message_ids = result[2] if len(result) > 2 else [] + + assert claimed_messages == [] + assert isinstance(deleted_message_ids, list) + + +def test_xclaim_nonexistent_message(redis: Redis): + """Test XCLAIM with non-existent message ID""" + stream_key = "test_stream" + group = "test_group" + consumer = "test_consumer" + + # Setup basic stream and group + redis.xadd(stream_key, "*", {"field": "value"}) + redis.xgroup_create(stream_key, group, "0") + + # Try to claim non-existent message + claimed = redis.xclaim(stream_key, group, consumer, 0, "9999999999999-0") + + # Should return empty or handle gracefully + assert isinstance(claimed, list) + # Non-existent messages typically result in empty claims diff --git a/tests/commands/stream/test_xdel.py b/tests/commands/stream/test_xdel.py new file mode 100644 index 0000000..1c47130 --- /dev/null +++ b/tests/commands/stream/test_xdel.py @@ -0,0 +1,171 @@ +import pytest + +from upstash_redis import Redis + + +@pytest.fixture(autouse=True) +def flush_db(redis: Redis): + redis.flushdb() + + +def test_xdel_single_entry(redis: Redis): + """Test deleting a single entry from stream""" + key = "test_stream" + + # Add some entries + id1 = redis.xadd(key, "*", {"name": "Jane", "surname": "Austen"}) + id2 = redis.xadd(key, "*", {"name": "Toni", "surname": "Morrison"}) + + # Delete one entry + result = redis.xdel(key, id2) + assert result == 1 + + # Verify only one entry remains + entries = redis.xrange(key, "-", "+") + assert len(entries) == 1 + assert entries[0][0] == id1 + + +def test_xdel_multiple_entries(redis: Redis): + """Test deleting multiple entries from stream""" + key = "test_stream" + + # Add multiple entries + id1 = redis.xadd(key, "*", {"name": "Jane", "surname": "Austen"}) + id2 = redis.xadd(key, "*", {"name": "Toni", "surname": "Morrison"}) + id3 = redis.xadd(key, "*", {"name": "Agatha", "surname": "Christie"}) + id4 = redis.xadd(key, "*", {"name": "Ngozi", "surname": "Adichie"}) + + # Delete multiple entries at once + result = redis.xdel(key, id1, id2, id3) + assert result == 3 + + # Verify only one entry remains + entries = redis.xrange(key, "-", "+") + assert len(entries) == 1 + assert entries[0][0] == id4 + + +def test_xdel_nonexistent_entry(redis: Redis): + """Test deleting a non-existent entry returns 0""" + key = "test_stream" + + # Add an entry + redis.xadd(key, "*", {"field": "value"}) + + # Try to delete non-existent entry + result = redis.xdel(key, "9999999999999-0") + assert result == 0 + + +def test_xdel_from_nonexistent_stream(redis: Redis): + """Test deleting from non-existent stream returns 0""" + result = redis.xdel("nonexistent_stream", "1234567890-0") + assert result == 0 + + +def test_xdel_all_entries(redis: Redis): + """Test deleting all entries from a stream""" + key = "test_stream" + + # Add several entries + ids = [] + for i in range(5): + entry_id = redis.xadd(key, "*", {"field": f"value{i}"}) + ids.append(entry_id) + + # Delete all entries + result = redis.xdel(key, *ids) + assert result == 5 + + # Stream should still exist but be empty + length = redis.xlen(key) + assert length == 0 + + # Range should return empty list + entries = redis.xrange(key, "-", "+") + assert entries == [] + + +def test_xdel_partial_success(redis: Redis): + """Test deleting mix of existing and non-existing entries""" + key = "test_stream" + + # Add some entries + id1 = redis.xadd(key, "*", {"field": "value1"}) + id2 = redis.xadd(key, "*", {"field": "value2"}) + + # Try to delete existing and non-existing entries + result = redis.xdel(key, id1, "9999999999999-0", id2, "8888888888888-0") + assert result == 2 # Only the 2 existing entries should be deleted + + # Stream should be empty now + length = redis.xlen(key) + assert length == 0 + + +def test_xdel_empty_stream(redis: Redis): + """Test deleting from empty stream""" + key = "test_stream" + + # Create empty stream by adding and deleting an entry + entry_id = redis.xadd(key, "*", {"field": "value"}) + redis.xdel(key, entry_id) + + # Try to delete from now-empty stream + result = redis.xdel(key, "1234567890-0") + assert result == 0 + + +def test_xdel_with_consumer_groups(redis: Redis): + """Test that XDEL works even when stream has consumer groups""" + key = "test_stream" + group = "test_group" + consumer = "test_consumer" + + # Add entries + id1 = redis.xadd(key, "*", {"field": "value1"}) + id2 = redis.xadd(key, "*", {"field": "value2"}) + + # Create consumer group + redis.xgroup_create(key, group, "0") + + # Read some messages + redis.xreadgroup(group, consumer, {key: ">"}, count=2) + + # Delete entries (this should work even with pending messages) + result = redis.xdel(key, id1, id2) + assert result == 2 + + # Stream should be empty + length = redis.xlen(key) + assert length == 0 + + +def test_xdel_preserves_stream_metadata(redis: Redis): + """Test that XDEL doesn't affect stream metadata like length correctly""" + key = "test_stream" + + # Add multiple entries + ids = [] + for i in range(10): + entry_id = redis.xadd(key, "*", {"field": f"value{i}"}) + ids.append(entry_id) + + initial_length = redis.xlen(key) + assert initial_length == 10 + + # Delete some entries + deleted_count = redis.xdel(key, ids[0], ids[2], ids[4]) + assert deleted_count == 3 + + # Check updated length + new_length = redis.xlen(key) + assert new_length == 7 + + # Verify remaining entries are correct + remaining_entries = redis.xrange(key, "-", "+") + remaining_ids = [entry[0] for entry in remaining_entries] + + expected_remaining = [ids[1], ids[3], ids[5], ids[6], ids[7], ids[8], ids[9]] + assert remaining_ids == expected_remaining diff --git a/tests/commands/stream/test_xgroup.py b/tests/commands/stream/test_xgroup.py new file mode 100644 index 0000000..96e32f2 --- /dev/null +++ b/tests/commands/stream/test_xgroup.py @@ -0,0 +1,198 @@ +import pytest + +from upstash_redis import Redis + + +@pytest.fixture(autouse=True) +def flush_db(redis: Redis): + redis.flushdb() + + +def test_xgroup_create_basic(redis: Redis): + """Test creating a consumer group""" + key = "test_stream" + group = "test_group" + + # Create stream first + redis.xadd(key, "*", {"field": "value"}) + + # Create consumer group + result = redis.xgroup_create(key, group, "$") + assert result == "OK" + + +def test_xgroup_create_with_mkstream(redis: Redis): + """Test creating a consumer group with MKSTREAM""" + key = "test_stream" + group = "test_group" + + # Create group on non-existent stream using MKSTREAM + result = redis.xgroup_create(key, group, "$", mkstream=True) + assert result == "OK" + + # Verify stream exists now + length = redis.xlen(key) + assert length == 0 # Empty stream + + +def test_xgroup_create_without_mkstream_should_fail(redis: Redis): + """Test that creating group on non-existent stream fails without MKSTREAM""" + key = "nonexistent_stream" + group = "test_group" + + # This should raise an exception + with pytest.raises(Exception): + redis.xgroup_create(key, group, "$") + + +def test_xgroup_createconsumer(redis: Redis): + """Test creating a consumer in a group""" + key = "test_stream" + group = "test_group" + consumer = "test_consumer" + + # Create stream and group first + redis.xadd(key, "*", {"field": "value"}) + redis.xgroup_create(key, group, "0") + + # Create consumer + result = redis.xgroup_createconsumer(key, group, consumer) + assert result == 1 # Consumer was created + + +def test_xgroup_delconsumer(redis: Redis): + """Test deleting a consumer from a group""" + key = "test_stream" + group = "test_group" + consumer = "test_consumer" + + # Setup stream, group, and consumer + redis.xadd(key, "*", {"field": "value1"}) + redis.xadd(key, "*", {"field": "value2"}) + redis.xgroup_create(key, group, "0") + redis.xgroup_createconsumer(key, group, consumer) + + # Read some messages to create pending messages + redis.xreadgroup(group, consumer, {key: ">"}, count=2) + + # Delete consumer (should return number of pending messages) + result = redis.xgroup_delconsumer(key, group, consumer) + assert result >= 0 # Should return number of pending messages deleted + + +def test_xgroup_delconsumer_nonexistent(redis: Redis): + """Test deleting non-existent consumer returns 0""" + key = "test_stream" + group = "test_group" + + # Create stream and group + redis.xadd(key, "*", {"field": "value"}) + redis.xgroup_create(key, group, "0") + + # Try to delete non-existent consumer + result = redis.xgroup_delconsumer(key, group, "nonexistent_consumer") + assert result == 0 + + +def test_xgroup_destroy(redis: Redis): + """Test destroying a consumer group""" + key = "test_stream" + group = "test_group" + + # Create stream and group + redis.xadd(key, "*", {"field": "value"}) + redis.xgroup_create(key, group, "0") + + # Destroy the group + result = redis.xgroup_destroy(key, group) + assert result == 1 # Group was destroyed + + +def test_xgroup_destroy_nonexistent(redis: Redis): + """Test destroying non-existent group returns 0""" + key = "test_stream" + + # Create stream only + redis.xadd(key, "*", {"field": "value"}) + + # Try to destroy non-existent group + result = redis.xgroup_destroy(key, "nonexistent_group") + assert result == 0 + + +def test_xgroup_destroy_after_use(redis: Redis): + """Test destroying group after it has been used""" + key = "test_stream" + group = "test_group" + consumer = "test_consumer" + + # Setup and use the group + redis.xadd(key, "*", {"field": "value1"}) + redis.xadd(key, "*", {"field": "value2"}) + redis.xgroup_create(key, group, "0") + redis.xreadgroup(group, consumer, {key: ">"}, count=2) + + # Destroy the group + result = redis.xgroup_destroy(key, group) + assert result == 1 + + +def test_xgroup_setid(redis: Redis): + """Test setting group's last delivered ID""" + key = "test_stream" + group = "test_group" + + # Create stream and group + redis.xadd(key, "*", {"field": "value1"}) + redis.xadd(key, "*", {"field": "value2"}) + redis.xgroup_create(key, group, "0") + + # Set ID to current end + result = redis.xgroup_setid(key, group, "$") + assert result == "OK" + + +def test_xgroup_setid_with_entries_read(redis: Redis): + """Test setting group ID with ENTRIESREAD""" + key = "test_stream" + group = "test_group" + + # Create stream and group + for i in range(5): + redis.xadd(key, "*", {"field": f"value{i}"}) + redis.xgroup_create(key, group, "0") + + # Set ID with entries_read + result = redis.xgroup_setid(key, group, "$", entries_read=3) + assert result == "OK" + + +def test_xgroup_workflow_complete(redis: Redis): + """Test complete workflow with multiple group operations""" + key = "test_stream" + group = "test_group" + consumer1 = "consumer1" + consumer2 = "consumer2" + + # Add some data + for i in range(3): + redis.xadd(key, "*", {"field": f"value{i}"}) + + # Create group + redis.xgroup_create(key, group, "0") + + # Create consumers + redis.xgroup_createconsumer(key, group, consumer1) + redis.xgroup_createconsumer(key, group, consumer2) + + # Use the group + redis.xreadgroup(group, consumer1, {key: ">"}, count=2) + redis.xreadgroup(group, consumer2, {key: ">"}, count=1) + + # Clean up consumers + redis.xgroup_delconsumer(key, group, consumer1) + redis.xgroup_delconsumer(key, group, consumer2) + + # Destroy group + result = redis.xgroup_destroy(key, group) + assert result == 1 diff --git a/tests/commands/stream/test_xinfo.py b/tests/commands/stream/test_xinfo.py new file mode 100644 index 0000000..db61875 --- /dev/null +++ b/tests/commands/stream/test_xinfo.py @@ -0,0 +1,170 @@ +import pytest + +from upstash_redis import Redis + + +@pytest.fixture(autouse=True) +def flush_db(redis: Redis): + redis.flushdb() + + +def test_xinfo_groups_empty_stream(redis: Redis): + """Test XINFO GROUPS on stream with no groups""" + stream_key = "test_stream" + + # Add entries to stream + for i in range(3): + redis.xadd(stream_key, "*", {"field": f"value{i}"}) + + # Check groups (should be empty) + result = redis.xinfo_groups(stream_key) + assert result == [] + + +def test_xinfo_groups_with_groups(redis: Redis): + """Test XINFO GROUPS on stream with consumer groups""" + stream_key = "test_stream" + group = "test_group" + + # Add entries + for i in range(3): + redis.xadd(stream_key, "*", {"field": f"value{i}"}) + + # Create consumer group + redis.xgroup_create(stream_key, group, "0") + + # Get group info + result = redis.xinfo_groups(stream_key) + assert len(result) == 1 + + # Result should be list of group info arrays + group_info = result[0] + assert isinstance(group_info, list) + + # Find group name in the info (format: [key, value, key, value, ...]) + group_info_dict = {} + for i in range(0, len(group_info), 2): + group_info_dict[group_info[i]] = group_info[i + 1] + + assert group_info_dict.get("name") == group + + +def test_xinfo_consumers_empty_group(redis: Redis): + """Test XINFO CONSUMERS on group with no consumers""" + stream_key = "test_stream" + group = "test_group" + + # Add entries and create group + redis.xadd(stream_key, "*", {"field": "value"}) + redis.xgroup_create(stream_key, group, "0") + + # Check consumers (should be empty) + result = redis.xinfo_consumers(stream_key, group) + assert result == [] + + +def test_xinfo_consumers_with_consumers(redis: Redis): + """Test XINFO CONSUMERS on group with active consumers""" + stream_key = "test_stream" + group = "test_group" + consumer = "test_consumer" + + # Add entries + for i in range(3): + redis.xadd(stream_key, "*", {"field": f"value{i}"}) + + # Create group and consumer, then read messages + redis.xgroup_create(stream_key, group, "0") + redis.xreadgroup(group, consumer, {stream_key: ">"}) + + # Get consumer info + result = redis.xinfo_consumers(stream_key, group) + assert len(result) >= 1 + + # Check if our consumer is in the list + found_consumer = False + for consumer_info in result: + # Convert to dict + consumer_dict = {} + for i in range(0, len(consumer_info), 2): + consumer_dict[consumer_info[i]] = consumer_info[i + 1] + + if consumer_dict.get("name") == consumer: + found_consumer = True + # Should have pending messages + assert consumer_dict.get("pending", 0) >= 0 + + assert found_consumer + + +def test_xinfo_nonexistent_group(redis: Redis): + """Test XINFO CONSUMERS on non-existent group""" + stream_key = "test_stream" + nonexistent_group = "nonexistent_group" + + # Create stream but no group + redis.xadd(stream_key, "*", {"field": "value"}) + + response = redis.xinfo_consumers(stream_key, nonexistent_group) + assert response == [] + + +def test_xinfo_multiple_groups(redis: Redis): + """Test XINFO GROUPS with multiple consumer groups""" + stream_key = "test_stream" + group1 = "group1" + group2 = "group2" + + # Add entries + redis.xadd(stream_key, "*", {"field": "value"}) + + # Create multiple groups + redis.xgroup_create(stream_key, group1, "0") + redis.xgroup_create(stream_key, group2, "$") + + # Get groups info + result = redis.xinfo_groups(stream_key) + assert len(result) == 2 + + # Collect group names + group_names = set() + for group_info in result: + group_dict = {} + for i in range(0, len(group_info), 2): + group_dict[group_info[i]] = group_info[i + 1] + group_names.add(group_dict.get("name")) + + assert group1 in group_names + assert group2 in group_names + + +def test_xinfo_multiple_consumers(redis: Redis): + """Test XINFO CONSUMERS with multiple consumers""" + stream_key = "test_stream" + group = "test_group" + consumer1 = "consumer1" + consumer2 = "consumer2" + + # Add entries + for i in range(4): + redis.xadd(stream_key, "*", {"field": f"value{i}"}) + + # Create group and consumers + redis.xgroup_create(stream_key, group, "0") + redis.xreadgroup(group, consumer1, {stream_key: ">"}, count=2) + redis.xreadgroup(group, consumer2, {stream_key: ">"}, count=2) + + # Get consumers info + result = redis.xinfo_consumers(stream_key, group) + assert len(result) >= 2 + + # Collect consumer names + consumer_names = set() + for consumer_info in result: + consumer_dict = {} + for i in range(0, len(consumer_info), 2): + consumer_dict[consumer_info[i]] = consumer_info[i + 1] + consumer_names.add(consumer_dict.get("name")) + + assert consumer1 in consumer_names + assert consumer2 in consumer_names diff --git a/tests/commands/stream/test_xlen.py b/tests/commands/stream/test_xlen.py new file mode 100644 index 0000000..95d582d --- /dev/null +++ b/tests/commands/stream/test_xlen.py @@ -0,0 +1,166 @@ +import pytest + +from upstash_redis import Redis + + +@pytest.fixture(autouse=True) +def flush_db(redis: Redis): + redis.flushdb() + + +def test_xlen_basic(redis: Redis): + """Test basic XLEN functionality""" + key = "test_stream" + + # Add some entries + redis.xadd(key, "*", {"name": "Jane", "surname": "Austen"}) + redis.xadd(key, "*", {"name": "Toni", "surname": "Morrison"}) + redis.xadd(key, "*", {"name": "Hezarfen", "surname": "----"}) + + # Check length + result = redis.xlen(key) + assert result == 3 + + +def test_xlen_empty_stream(redis: Redis): + """Test XLEN on empty stream""" + key = "test_stream" + + # Create empty stream + redis.xadd(key, "*", {"field": "value"}) + redis.xdel(key, redis.xrange(key)[0][0]) + + # Check length + result = redis.xlen(key) + assert result == 0 + + +def test_xlen_nonexistent_stream(redis: Redis): + """Test XLEN on non-existent stream returns 0""" + result = redis.xlen("nonexistent_stream") + assert result == 0 + + +def test_xlen_after_operations(redis: Redis): + """Test XLEN after various stream operations""" + key = "test_stream" + + # Initially 0 + assert redis.xlen(key) == 0 + + # Add entries + id1 = redis.xadd(key, "*", {"field": "value1"}) + assert redis.xlen(key) == 1 + + id2 = redis.xadd(key, "*", {"field": "value2"}) + assert redis.xlen(key) == 2 + + id3 = redis.xadd(key, "*", {"field": "value3"}) + assert redis.xlen(key) == 3 + + # Delete an entry + redis.xdel(key, id2) + assert redis.xlen(key) == 2 + + # Delete all entries + redis.xdel(key, id1, id3) + assert redis.xlen(key) == 0 + + +def test_xlen_with_trimming(redis: Redis): + """Test XLEN with stream trimming""" + key = "test_stream" + + # Add many entries + for i in range(10): + redis.xadd(key, "*", {"field": f"value{i}"}) + + assert redis.xlen(key) == 10 + + # Add entry with trimming + redis.xadd(key, "*", {"field": "final"}, maxlen=5, approximate_trim=False) + + # Length should be at most 5 + length = redis.xlen(key) + assert length <= 5 + + +def test_xlen_large_stream(redis: Redis): + """Test XLEN with a larger number of entries""" + key = "test_stream" + + # Add many entries + entry_count = 100 + for i in range(entry_count): + redis.xadd(key, "*", {"index": str(i), "data": f"data_{i}"}) + + # Check length + result = redis.xlen(key) + assert result == entry_count + + +def test_xlen_with_consumer_groups(redis: Redis): + """Test that XLEN is not affected by consumer groups""" + key = "test_stream" + group = "test_group" + consumer = "test_consumer" + + # Add entries + for i in range(5): + redis.xadd(key, "*", {"field": f"value{i}"}) + + initial_length = redis.xlen(key) + assert initial_length == 5 + + # Create consumer group and read messages + redis.xgroup_create(key, group, "0") + redis.xreadgroup(group, consumer, {key: ">"}, count=3) + + # Length should remain the same + length_after_read = redis.xlen(key) + assert length_after_read == initial_length == 5 + + # Acknowledge some messages + entries = redis.xrange(key, "-", "+", count=3) + message_ids = [entry[0] for entry in entries] + redis.xack(key, group, *message_ids) + + # Length should still be the same + length_after_ack = redis.xlen(key) + assert length_after_ack == initial_length == 5 + + +def test_xlen_incremental(redis: Redis): + """Test XLEN incrementally as entries are added""" + key = "test_stream" + + expected_length = 0 + assert redis.xlen(key) == expected_length + + # Add entries one by one and check length + for i in range(20): + redis.xadd(key, "*", {"counter": str(i)}) + expected_length += 1 + assert redis.xlen(key) == expected_length + + +def test_xlen_multiple_streams(redis: Redis): + """Test XLEN on multiple different streams""" + stream1 = "stream1" + stream2 = "stream2" + stream3 = "stream3" + + # Add different numbers of entries to each stream + for i in range(3): + redis.xadd(stream1, "*", {"field": f"value{i}"}) + + for i in range(7): + redis.xadd(stream2, "*", {"field": f"value{i}"}) + + # stream3 remains empty + + # Check lengths + assert redis.xlen(stream1) == 3 + assert redis.xlen(stream2) == 7 + assert redis.xlen(stream3) == 0 + assert redis.xlen("nonexistent") == 0 diff --git a/tests/commands/stream/test_xpending.py b/tests/commands/stream/test_xpending.py new file mode 100644 index 0000000..2a02352 --- /dev/null +++ b/tests/commands/stream/test_xpending.py @@ -0,0 +1,270 @@ +import pytest +import time + +from upstash_redis import Redis + + +@pytest.fixture(autouse=True) +def flush_db(redis: Redis): + redis.flushdb() + + +def test_xpending_basic_info(redis: Redis): + """Test basic XPENDING functionality""" + stream_key = "test_stream" + group = "test_group" + consumer = "test_consumer" + + # Setup + redis.xadd(stream_key, "*", {"field": "value"}) + redis.xgroup_create(stream_key, group, "0", mkstream=True) + redis.xreadgroup(group, consumer, {stream_key: ">"}, count=1) + + # Get basic pending info + result = redis.xpending(stream_key, group) + assert isinstance(result, list) + assert len(result) >= 4 + + # Format: [count, start_id, end_id, consumers_info] + pending_count = result[0] + assert pending_count > 0 + + +def test_xpending_detailed_info(redis: Redis): + """Test detailed XPENDING with range""" + stream_key = "test_stream" + group = "test_group" + consumer = "test_consumer" + + # Add multiple entries + for i in range(3): + redis.xadd(stream_key, "*", {"field": f"value{i}"}) + + # Create group and read messages + redis.xgroup_create(stream_key, group, "0") + redis.xreadgroup(group, consumer, {stream_key: ">"}, count=3) + + # Get detailed pending info + result = redis.xpending(stream_key, group, "-", "+", 10) + assert isinstance(result, list) + + # Each pending message should have: [id, consumer, idle_time, delivery_count] + for pending_msg in result: + assert len(pending_msg) >= 4 + message_id = pending_msg[0] + consumer_name = pending_msg[1] + idle_time = pending_msg[2] + delivery_count = pending_msg[3] + + assert isinstance(message_id, str) + assert consumer_name == consumer + assert isinstance(idle_time, int) + assert delivery_count >= 1 + + +def test_xpending_with_idle_time(redis: Redis): + """Test XPENDING with idle time filter""" + stream_key = "test_stream" + group = "test_group" + consumer = "test_consumer" + + # Add entry and read it + redis.xadd(stream_key, "*", {"field": "value"}) + redis.xgroup_create(stream_key, group, "0", mkstream=True) + redis.xreadgroup(group, consumer, {stream_key: ">"}, count=1) + + # Wait a bit + time.sleep(1) + + # Get pending messages with idle time filter + pending_with_idle = redis.xpending(stream_key, group, "-", "+", 10, idle=500) + assert isinstance(pending_with_idle, list) + + # Should get messages that have been idle for at least 500ms + if len(pending_with_idle) > 0: + for pending_msg in pending_with_idle: + idle_time = pending_msg[2] + assert idle_time >= 500 + + +def test_xpending_no_idle_messages(redis: Redis): + """Test XPENDING when no messages meet idle criteria""" + stream_key = "test_stream" + group = "test_group" + consumer = "test_consumer" + + # Add entry and read it immediately + redis.xadd(stream_key, "*", {"field": "value"}) + redis.xgroup_create(stream_key, group, "0") + redis.xreadgroup(group, consumer, {stream_key: ">"}, count=1) + + # Immediately check for messages idle for 5000ms (should be none) + result = redis.xpending(stream_key, group, "-", "+", 10, idle=5000) + assert result == [] + + +def test_xpending_specific_consumer(redis: Redis): + """Test XPENDING filtered by specific consumer""" + stream_key = "test_stream" + group = "test_group" + consumer1 = "consumer1" + consumer2 = "consumer2" + + # Add entries + for i in range(4): + redis.xadd(stream_key, "*", {"field": f"value{i}"}) + + # Create group and have different consumers read + redis.xgroup_create(stream_key, group, "0") + redis.xreadgroup(group, consumer1, {stream_key: ">"}, count=2) + redis.xreadgroup(group, consumer2, {stream_key: ">"}, count=2) + + # Get pending for specific consumer + pending_consumer1 = redis.xpending( + stream_key, group, "-", "+", 10, consumer=consumer1 + ) + + # All returned messages should belong to consumer1 + for pending_msg in pending_consumer1: + assert pending_msg[1] == consumer1 + + +def test_xpending_empty_group(redis: Redis): + """Test XPENDING on group with no pending messages""" + stream_key = "test_stream" + group = "test_group" + + # Create empty group + redis.xadd(stream_key, "*", {"field": "value"}) + redis.xgroup_create(stream_key, group, "$") # Start from end + + # Check basic pending info (should show no pending) + result = redis.xpending(stream_key, group) + assert result[0] == 0 # No pending messages + + # Check detailed pending info (should be empty) + detailed = redis.xpending(stream_key, group, "-", "+", 10) + assert detailed == [] + + +def test_xpending_after_acknowledgment(redis: Redis): + """Test XPENDING after messages are acknowledged""" + stream_key = "test_stream" + group = "test_group" + consumer = "test_consumer" + + # Add entries and read them + ids = [] + for i in range(3): + entry_id = redis.xadd(stream_key, "*", {"field": f"value{i}"}) + ids.append(entry_id) + + redis.xgroup_create(stream_key, group, "0") + redis.xreadgroup(group, consumer, {stream_key: ">"}, count=3) + + # Check initial pending count + initial_pending = redis.xpending(stream_key, group) + assert initial_pending[0] == 3 + + # Acknowledge some messages + redis.xack(stream_key, group, ids[0], ids[1]) + + # Check pending count after acknowledgment + after_ack_pending = redis.xpending(stream_key, group) + assert after_ack_pending[0] == 1 # Only 1 message still pending + + +def test_xpending_nonexistent_stream(redis: Redis): + """Test XPENDING on non-existent stream""" + with pytest.raises(Exception): + redis.xpending("nonexistent_stream", "nonexistent_group") + + +def test_xpending_nonexistent_group(redis: Redis): + """Test XPENDING on non-existent group""" + stream_key = "test_stream" + + # Create stream but no group + redis.xadd(stream_key, "*", {"field": "value"}) + + with pytest.raises(Exception): + redis.xpending(stream_key, "nonexistent_group") + + +def test_xpending_range_bounds(redis: Redis): + """Test XPENDING with specific range bounds""" + stream_key = "test_stream" + group = "test_group" + consumer = "test_consumer" + + # Add entries with known IDs + id1 = "1000000000000-0" + id2 = "2000000000000-0" + id3 = "3000000000000-0" + + redis.xadd(stream_key, id1, {"field": "value1"}) + redis.xadd(stream_key, id2, {"field": "value2"}) + redis.xadd(stream_key, id3, {"field": "value3"}) + + # Create group and read all + redis.xgroup_create(stream_key, group, "0") + redis.xreadgroup(group, consumer, {stream_key: ">"}, count=3) + + # Get pending in specific range + pending_range = redis.xpending(stream_key, group, id1, id2, 10) + + # Should only get messages in the specified range + returned_ids = {pending_msg[0] for pending_msg in pending_range} + assert id1 in returned_ids + assert id2 in returned_ids + # id3 should not be included as it's outside the range + + +def test_xpending_count_limit(redis: Redis): + """Test XPENDING with count limit""" + stream_key = "test_stream" + group = "test_group" + consumer = "test_consumer" + + # Add many entries + for i in range(10): + redis.xadd(stream_key, "*", {"field": f"value{i}"}) + + # Create group and read all + redis.xgroup_create(stream_key, group, "0") + redis.xreadgroup(group, consumer, {stream_key: ">"}, count=10) + + # Get pending with count limit + pending_limited = redis.xpending(stream_key, group, "-", "+", 5) + + # Should return at most 5 entries + assert len(pending_limited) <= 5 + + +def test_xpending_delivery_count(redis: Redis): + """Test XPENDING shows correct delivery count""" + stream_key = "test_stream" + group = "test_group" + consumer1 = "consumer1" + consumer2 = "consumer2" + + # Add entry + redis.xadd(stream_key, "*", {"field": "value"}) + + # Create group and read with first consumer + redis.xgroup_create(stream_key, group, "0") + redis.xreadgroup(group, consumer1, {stream_key: ">"}, count=1) + + # Check initial delivery count + pending1 = redis.xpending(stream_key, group, "-", "+", 10) + initial_delivery_count = pending1[0][3] + assert initial_delivery_count == 1 + + # Claim the message with another consumer (increases delivery count) + message_id = pending1[0][0] + redis.xclaim(stream_key, group, consumer2, 0, message_id) + + # Check updated delivery count + pending2 = redis.xpending(stream_key, group, "-", "+", 10) + updated_delivery_count = pending2[0][3] + assert updated_delivery_count >= initial_delivery_count diff --git a/tests/commands/stream/test_xrange.py b/tests/commands/stream/test_xrange.py new file mode 100644 index 0000000..84e966c --- /dev/null +++ b/tests/commands/stream/test_xrange.py @@ -0,0 +1,227 @@ +import pytest + +from upstash_redis import Redis + + +@pytest.fixture(autouse=True) +def flush_db(redis: Redis): + redis.flushdb() + + +def test_xrange_basic(redis: Redis): + """Test basic XRANGE functionality""" + key = "test_stream" + + # Add entries + field1, field2 = "field1", "field2" + value1, value2 = "test_value1", "test_value2" + + redis.xadd(key, "*", {field1: value1, field2: value2}) + + # Get all entries + result = redis.xrange(key, "-", "+") + assert len(result) == 1 + + # Check structure [id, [field1, value1, field2, value2]] + entry_id, entry_data = result[0] + assert isinstance(entry_id, str) + assert isinstance(entry_data, list) + + # Convert to dict for easier testing + entry_dict = {} + for i in range(0, len(entry_data), 2): + entry_dict[entry_data[i]] = entry_data[i + 1] + + assert entry_dict[field1] == value1 + assert entry_dict[field2] == value2 + + +def test_xrange_with_count_limit(redis: Redis): + """Test XRANGE with COUNT limit""" + key = "test_stream" + + # Add multiple entries + for i in range(5): + redis.xadd(key, "*", {"field": f"value{i}"}) + + # Get only first 2 entries + result = redis.xrange(key, "-", "+", count=2) + assert len(result) == 2 + + # Verify they are the first entries (oldest) + for i, (entry_id, entry_data) in enumerate(result): + entry_dict = {} + for j in range(0, len(entry_data), 2): + entry_dict[entry_data[j]] = entry_data[j + 1] + assert entry_dict["field"] == f"value{i}" + + +def test_xrange_empty_stream(redis: Redis): + """Test XRANGE on empty stream""" + result = redis.xrange("nonexistent_stream", "-", "+") + assert result == [] + + +def test_xrange_specific_range(redis: Redis): + """Test XRANGE with specific ID range""" + key = "test_stream" + + # Add entries with specific IDs + id1 = "1000000000000-0" + id2 = "2000000000000-0" + id3 = "3000000000000-0" + + redis.xadd(key, id1, {"field": "value1"}) + redis.xadd(key, id2, {"field": "value2"}) + redis.xadd(key, id3, {"field": "value3"}) + + # Get range from id1 to id2 + result = redis.xrange(key, id1, id2) + assert len(result) == 2 + + returned_ids = [entry[0] for entry in result] + assert id1 in returned_ids + assert id2 in returned_ids + assert id3 not in returned_ids + + +def test_xrange_single_entry(redis: Redis): + """Test XRANGE targeting single entry""" + key = "test_stream" + + # Add entries + redis.xadd(key, "*", {"field": "value1"}) + id2 = redis.xadd(key, "*", {"field": "value2"}) + redis.xadd(key, "*", {"field": "value3"}) + + # Get only the middle entry + result = redis.xrange(key, id2, id2) + assert len(result) == 1 + assert result[0][0] == id2 + + +def test_xrange_many_fields(redis: Redis): + """Test XRANGE with entries having many fields""" + key = "test_stream" + + # Create entry with multiple fields + fields = {} + for i in range(10): + fields[f"field_{i}"] = f"value_{i}" + + entry_id = redis.xadd(key, "*", fields) + + # Retrieve and verify + result = redis.xrange(key, "-", "+") + assert len(result) == 1 + + returned_id, returned_data = result[0] + assert returned_id == entry_id + + # Convert to dict + returned_dict = {} + for i in range(0, len(returned_data), 2): + returned_dict[returned_data[i]] = returned_data[i + 1] + + assert returned_dict == fields + + +def test_xrange_progressive_entries(redis: Redis): + """Test XRANGE as entries are progressively added""" + key = "test_stream" + + # Add entries progressively and test range each time + for i in range(1, 6): + redis.xadd(key, "*", {"counter": str(i)}) + + result = redis.xrange(key, "-", "+") + assert len(result) == i + + # Verify all entries are present + counters = [] + for entry_id, entry_data in result: + entry_dict = {} + for j in range(0, len(entry_data), 2): + entry_dict[entry_data[j]] = entry_data[j + 1] + counters.append(int(entry_dict["counter"])) + + assert counters == list(range(1, i + 1)) + + +def test_xrange_after_deletion(redis: Redis): + """Test XRANGE after some entries are deleted""" + key = "test_stream" + + # Add entries + ids = [] + for i in range(5): + entry_id = redis.xadd(key, "*", {"field": f"value{i}"}) + ids.append(entry_id) + + # Delete middle entries + redis.xdel(key, ids[1], ids[3]) + + # Range should return remaining entries + result = redis.xrange(key, "-", "+") + assert len(result) == 3 + + returned_ids = [entry[0] for entry in result] + assert ids[0] in returned_ids + assert ids[2] in returned_ids + assert ids[4] in returned_ids + assert ids[1] not in returned_ids + assert ids[3] not in returned_ids + + +def test_xrange_with_trimmed_stream(redis: Redis): + """Test XRANGE with a stream that has been trimmed""" + key = "test_stream" + + # Add many entries + for i in range(10): + redis.xadd(key, "*", {"field": f"value{i}"}) + + # Trim stream + redis.xtrim(key, maxlen=5, approximate=False) + + # Range should show only remaining entries + result = redis.xrange(key, "-", "+") + assert len(result) <= 5 + + +def test_xrange_ordering(redis: Redis): + """Test that XRANGE returns entries in correct order""" + key = "test_stream" + + # Add entries + ids = [] + for i in range(5): + entry_id = redis.xadd(key, "*", {"order": str(i)}) + ids.append(entry_id) + + # Get all entries + result = redis.xrange(key, "-", "+") + + # Verify order is maintained (chronological) + returned_ids = [entry[0] for entry in result] + assert returned_ids == ids + + +def test_xrange_boundary_conditions(redis: Redis): + """Test XRANGE boundary conditions""" + key = "test_stream" + + # Add single entry + entry_id = redis.xadd(key, "*", {"field": "value"}) + + # Test exact boundaries + result = redis.xrange(key, entry_id, entry_id) + assert len(result) == 1 + assert result[0][0] == entry_id + + # Test exclusive boundaries (using different IDs) + result = redis.xrange(key, "0-0", entry_id) + assert len(result) == 1 + + result = redis.xrange(key, entry_id, "+") + assert len(result) == 1 diff --git a/tests/commands/stream/test_xread.py b/tests/commands/stream/test_xread.py new file mode 100644 index 0000000..5a545de --- /dev/null +++ b/tests/commands/stream/test_xread.py @@ -0,0 +1,238 @@ +import pytest + +from upstash_redis import Redis + + +@pytest.fixture(autouse=True) +def flush_db(redis: Redis): + redis.flushdb() + + +def test_xread_basic(redis: Redis): + """Test basic XREAD functionality""" + stream_key = "test_stream" + + # Add entries + redis.xadd(stream_key, "*", {"field1": "value1", "field2": "value2"}) + + # Read from beginning + result = redis.xread({stream_key: "0-0"}) + assert len(result) == 1 + + # Result should be [stream_name, entries] + stream_name, entries = result[0] + assert stream_name == stream_key + assert len(entries) == 1 + + +def test_xread_multiple_entries(redis: Redis): + """Test XREAD with multiple entries""" + stream_key = "test_stream" + + # Add multiple entries + for i in range(3): + redis.xadd(stream_key, "*", {"field": f"value{i}"}) + + # Read all entries + result = redis.xread({stream_key: "0-0"}) + stream_name, entries = result[0] + + assert len(entries) == 3 + + +def test_xread_with_count(redis: Redis): + """Test XREAD with COUNT option""" + stream_key = "test_stream" + + # Add multiple entries + for i in range(5): + redis.xadd(stream_key, "*", {"field": f"value{i}"}) + + # Read with count limit + result = redis.xread({stream_key: "0-0"}, count=2) + stream_name, entries = result[0] + + assert len(entries) == 2 + + +def test_xread_multiple_streams(redis: Redis): + """Test XREAD with multiple streams""" + stream1 = "stream1" + stream2 = "stream2" + + # Add entries to both streams + redis.xadd(stream1, "*", {"field": "value1"}) + redis.xadd(stream2, "*", {"field": "value2"}) + redis.xadd(stream1, "*", {"field": "value3"}) + + # Read from both streams + result = redis.xread({stream1: "0-0", stream2: "0-0"}) + + # Should return data for both streams + assert len(result) <= 2 # May be 1 or 2 depending on implementation + + # Collect all returned streams + returned_streams = {stream_name for stream_name, entries in result} + + # At least one stream should be returned + assert len(returned_streams) >= 1 + + +def test_xread_from_specific_id(redis: Redis): + """Test XREAD starting from specific ID""" + stream_key = "test_stream" + + # Add entries + id1 = redis.xadd(stream_key, "*", {"field": "value1"}) + id2 = redis.xadd(stream_key, "*", {"field": "value2"}) + id3 = redis.xadd(stream_key, "*", {"field": "value3"}) + + # Read from id2 onwards (should get id3) + result = redis.xread({stream_key: id2}) + + if result: # May be empty if no new entries after id2 + stream_name, entries = result[0] + if len(entries) > 0: + # Should get entries after id2 + returned_ids = [entry[0] for entry in entries] + assert id3 in returned_ids + assert id1 not in returned_ids + assert id2 not in returned_ids + + +def test_xread_no_new_entries(redis: Redis): + """Test XREAD when no new entries exist""" + stream_key = "test_stream" + + # Add an entry + last_id = redis.xadd(stream_key, "*", {"field": "value"}) + + # Try to read from the last ID (should return empty or no data) + result = redis.xread({stream_key: last_id}) + + # Should either be empty or contain no entries for this stream + if result: + for stream_name, entries in result: + if stream_name == stream_key: + assert len(entries) == 0 + + +def test_xread_empty_stream(redis: Redis): + """Test XREAD on empty stream""" + result = redis.xread({"nonexistent_stream": "0-0"}) + + # Should return empty or no results + if result: + for stream_name, entries in result: + if stream_name == "nonexistent_stream": + assert len(entries) == 0 + + +def test_xread_dollar_id(redis: Redis): + """Test XREAD with $ ID (should get only new entries)""" + stream_key = "test_stream" + + # Add initial entries + redis.xadd(stream_key, "*", {"field": "old1"}) + redis.xadd(stream_key, "*", {"field": "old2"}) + + # Read from $ (should get nothing initially) + result = redis.xread({stream_key: "$"}) + + # Should return empty for this stream + if result: + for stream_name, entries in result: + if stream_name == stream_key: + assert len(entries) == 0 + + # Add new entry + redis.xadd(stream_key, "*", {"field": "new"}) + + # Now read from $ again (in a real scenario, this might still not work + # without blocking, but we'll test the ID format) + # This is more of a format validation test + + +def test_xread_with_mixed_existing_nonexisting_streams(redis: Redis): + """Test XREAD with mix of existing and non-existing streams""" + existing_stream = "existing_stream" + nonexisting_stream = "nonexisting_stream" + + # Add data to existing stream only + redis.xadd(existing_stream, "*", {"field": "value"}) + + # Read from both + result = redis.xread({existing_stream: "0-0", nonexisting_stream: "0-0"}) + + # Should get data for existing stream only + if result: + stream_names = [stream_name for stream_name, entries in result] + # Existing stream should be present + found_existing = False + for stream_name, entries in result: + if stream_name == existing_stream: + found_existing = True + assert len(entries) > 0 + elif stream_name == nonexisting_stream: + assert len(entries) == 0 + + # Should have found the existing stream + if len(result) > 0: + assert found_existing or nonexisting_stream not in stream_names + + +def test_xread_progressive_reading(redis: Redis): + """Test progressive reading as new entries are added""" + stream_key = "test_stream" + + # Start with empty stream + last_read_id = "0-0" + + # Add entries and read progressively + for i in range(3): + # Add new entry + new_id = redis.xadd(stream_key, "*", {"counter": str(i)}) + + # Read from last position + result = redis.xread({stream_key: last_read_id}) + + if result: + stream_name, entries = result[0] + if stream_name == stream_key and len(entries) > 0: + # Update last read position + last_read_id = entries[-1][0] + + # Verify we got the expected entry + for entry_id, entry_data in entries: + if entry_id == new_id: + entry_dict = {} + for j in range(0, len(entry_data), 2): + entry_dict[entry_data[j]] = entry_data[j + 1] + assert entry_dict["counter"] == str(i) + + +def test_xread_entry_format(redis: Redis): + """Test that XREAD returns entries in correct format""" + stream_key = "test_stream" + + # Add entry with known data + test_data = {"name": "John", "age": "30", "city": "NYC"} + entry_id = redis.xadd(stream_key, "*", test_data) + + # Read the entry + result = redis.xread({stream_key: "0-0"}) + assert len(result) == 1 + + stream_name, entries = result[0] + assert stream_name == stream_key + assert len(entries) == 1 + + returned_id, returned_data = entries[0] + assert returned_id == entry_id + + # Convert returned data to dict + returned_dict = {} + for i in range(0, len(returned_data), 2): + returned_dict[returned_data[i]] = returned_data[i + 1] + + assert returned_dict == test_data diff --git a/tests/commands/stream/test_xreadgroup.py b/tests/commands/stream/test_xreadgroup.py new file mode 100644 index 0000000..726b38f --- /dev/null +++ b/tests/commands/stream/test_xreadgroup.py @@ -0,0 +1,279 @@ +import pytest + +from upstash_redis import Redis + + +@pytest.fixture(autouse=True) +def flush_db(redis: Redis): + redis.flushdb() + + +def test_xreadgroup_basic(redis: Redis): + """Test basic XREADGROUP functionality""" + stream_key = "test_stream" + group = "test_group" + consumer = "test_consumer" + + # Add entries + for i in range(3): + redis.xadd(stream_key, "*", {"field": f"value{i}"}) + + # Create consumer group + redis.xgroup_create(stream_key, group, "0") + + # Read with consumer group + result = redis.xreadgroup(group, consumer, {stream_key: ">"}) + + assert len(result) == 1 + stream_name, entries = result[0] + assert stream_name == stream_key + assert len(entries) == 3 + + +def test_xreadgroup_with_count(redis: Redis): + """Test XREADGROUP with COUNT option""" + stream_key = "test_stream" + group = "test_group" + consumer = "test_consumer" + + # Add entries + for i in range(5): + redis.xadd(stream_key, "*", {"field": f"value{i}"}) + + # Create consumer group + redis.xgroup_create(stream_key, group, "0") + + # Read with count limit + result = redis.xreadgroup(group, consumer, {stream_key: ">"}, count=2) + + stream_name, entries = result[0] + assert len(entries) == 2 + + +def test_xreadgroup_pending_messages(redis: Redis): + """Test reading pending messages with XREADGROUP""" + stream_key = "test_stream" + group = "test_group" + consumer = "test_consumer" + + # Add entries + for i in range(3): + redis.xadd(stream_key, "*", {"field": f"value{i}"}) + + # Create consumer group + redis.xgroup_create(stream_key, group, "0") + + # Read new messages (makes them pending) + redis.xreadgroup(group, consumer, {stream_key: ">"}) + + # Read pending messages for this consumer + result2 = redis.xreadgroup(group, consumer, {stream_key: "0"}) + + # Should get the same messages again (they're pending) + if result2: + stream_name, entries = result2[0] + assert len(entries) >= 0 # May be 0 or more depending on implementation + + +def test_xreadgroup_noack_option(redis: Redis): + """Test XREADGROUP with NOACK option""" + stream_key = "test_stream" + group = "test_group" + consumer = "test_consumer" + + # Add entry + redis.xadd(stream_key, "*", {"field": "value"}) + + # Create consumer group + redis.xgroup_create(stream_key, group, "0") + + # Read with NOACK (messages won't be added to PEL) + redis.xreadgroup(group, consumer, {stream_key: ">"}, noack=True) + + # Check pending messages (should be 0 due to NOACK) + pending_info = redis.xpending(stream_key, group) + assert pending_info[0] == 0 # No pending messages + + +def test_xreadgroup_multiple_streams(redis: Redis): + """Test XREADGROUP with multiple streams""" + stream1 = "stream1" + stream2 = "stream2" + group = "test_group" + consumer = "test_consumer" + + # Add entries to both streams + redis.xadd(stream1, "*", {"field": "value1"}) + redis.xadd(stream2, "*", {"field": "value2"}) + + # Create consumer groups for both streams + redis.xgroup_create(stream1, group, "0") + redis.xgroup_create(stream2, group, "0") + + # Read from both streams + result = redis.xreadgroup(group, consumer, {stream1: ">", stream2: ">"}) + + # Should get data from both streams + assert len(result) >= 1 + + # Collect stream names + stream_names = {stream_name for stream_name, entries in result} + + # Should have at least one of the streams + assert stream1 in stream_names or stream2 in stream_names + + +def test_xreadgroup_different_consumers(redis: Redis): + """Test XREADGROUP with different consumers in same group""" + stream_key = "test_stream" + group = "test_group" + consumer1 = "consumer1" + consumer2 = "consumer2" + + # Add entries + for i in range(4): + redis.xadd(stream_key, "*", {"field": f"value{i}"}) + + # Create consumer group + redis.xgroup_create(stream_key, group, "0") + + # Each consumer reads some messages + result1 = redis.xreadgroup(group, consumer1, {stream_key: ">"}, count=2) + result2 = redis.xreadgroup(group, consumer2, {stream_key: ">"}, count=2) + + # Both should get different messages + if result1 and result2: + _, entries1 = result1[0] + _, entries2 = result2[0] + + # Get message IDs + ids1 = {entry[0] for entry in entries1} + ids2 = {entry[0] for entry in entries2} + + # IDs should be different (load balancing) + assert len(ids1.intersection(ids2)) == 0 + + +def test_xreadgroup_acknowledge_workflow(redis: Redis): + """Test complete workflow with XREADGROUP and acknowledgment""" + stream_key = "test_stream" + group = "test_group" + consumer = "test_consumer" + + # Add entries + redis.xadd(stream_key, "*", {"field": "value1"}) + redis.xadd(stream_key, "*", {"field": "value2"}) + + # Create consumer group + redis.xgroup_create(stream_key, group, "0") + + # Read messages + result = redis.xreadgroup(group, consumer, {stream_key: ">"}) + stream_name, entries = result[0] + + # Acknowledge messages + message_ids = [entry[0] for entry in entries] + acked_count = redis.xack(stream_key, group, *message_ids) + + assert acked_count == len(message_ids) + + # Check pending count should be 0 + pending_info = redis.xpending(stream_key, group) + assert pending_info[0] == 0 + + +def test_xreadgroup_no_new_messages(redis: Redis): + """Test XREADGROUP when no new messages available""" + stream_key = "test_stream" + group = "test_group" + consumer = "test_consumer" + + # Add initial entry + redis.xadd(stream_key, "*", {"field": "value"}) + + # Create group starting from end + redis.xgroup_create(stream_key, group, "$") + + # Try to read new messages (should be empty) + result = redis.xreadgroup(group, consumer, {stream_key: ">"}) + + # Should return empty or no entries + if result: + for stream_name, entries in result: + if stream_name == stream_key: + assert len(entries) == 0 + + +def test_xreadgroup_from_beginning(redis: Redis): + """Test XREADGROUP starting from beginning of stream""" + stream_key = "test_stream" + group = "test_group" + consumer = "test_consumer" + + # Add entries + expected_count = 3 + for i in range(expected_count): + redis.xadd(stream_key, "*", {"counter": str(i)}) + + # Create group from beginning + redis.xgroup_create(stream_key, group, "0") + + # Read all messages + result = redis.xreadgroup(group, consumer, {stream_key: ">"}) + + stream_name, entries = result[0] + assert len(entries) == expected_count + + +def test_xreadgroup_mkstream_behavior(redis: Redis): + """Test XREADGROUP behavior with streams created via MKSTREAM""" + stream_key = "test_stream" + group = "test_group" + consumer = "test_consumer" + + # Create group with MKSTREAM (creates empty stream) + redis.xgroup_create(stream_key, group, "0", mkstream=True) + + # Try to read (should return empty) + result = redis.xreadgroup(group, consumer, {stream_key: ">"}) + + if result: + for stream_name, entries in result: + if stream_name == stream_key: + assert len(entries) == 0 + + # Add entry and try reading again + redis.xadd(stream_key, "*", {"field": "value"}) + result = redis.xreadgroup(group, consumer, {stream_key: ">"}) + + # Should now get the entry + stream_name, entries = result[0] + assert len(entries) == 1 + + +def test_xreadgroup_entry_format(redis: Redis): + """Test XREADGROUP returns entries in correct format""" + stream_key = "test_stream" + group = "test_group" + consumer = "test_consumer" + + # Add entry with known data + test_data = {"name": "Alice", "score": "100"} + entry_id = redis.xadd(stream_key, "*", test_data) + + # Create group and read + redis.xgroup_create(stream_key, group, "0") + result = redis.xreadgroup(group, consumer, {stream_key: ">"}) + + stream_name, entries = result[0] + assert len(entries) == 1 + + returned_id, returned_data = entries[0] + assert returned_id == entry_id + + # Convert to dict for comparison + returned_dict = {} + for i in range(0, len(returned_data), 2): + returned_dict[returned_data[i]] = returned_data[i + 1] + + assert returned_dict == test_data diff --git a/tests/commands/stream/test_xrevrange.py b/tests/commands/stream/test_xrevrange.py new file mode 100644 index 0000000..a61120c --- /dev/null +++ b/tests/commands/stream/test_xrevrange.py @@ -0,0 +1,281 @@ +import pytest + +from upstash_redis import Redis + + +@pytest.fixture(autouse=True) +def flush_db(redis: Redis): + redis.flushdb() + + +def test_xrevrange_basic(redis: Redis): + """Test basic XREVRANGE functionality""" + key = "test_stream" + + # Add entries in order + redis.xadd(key, "*", {"name": "Virginia", "surname": "Woolf"}) + redis.xadd(key, "*", {"name": "Jane", "surname": "Austen"}) + redis.xadd(key, "*", {"name": "Toni", "surname": "Morrison"}) + redis.xadd(key, "*", {"name": "Agatha", "surname": "Christie"}) + redis.xadd(key, "*", {"name": "Ngozi", "surname": "Adichie"}) + + # Get entries in reverse order + result = redis.xrevrange(key, "+", "-") + + assert len(result) == 5 + + # First entry should be the last one added (Ngozi Adichie) + first_entry_id, first_entry_data = result[0] + + # Convert to dict for easier comparison + first_entry_dict = {} + for i in range(0, len(first_entry_data), 2): + first_entry_dict[first_entry_data[i]] = first_entry_data[i + 1] + + assert first_entry_dict["name"] == "Ngozi" + assert first_entry_dict["surname"] == "Adichie" + + +def test_xrevrange_with_count_limit(redis: Redis): + """Test XREVRANGE with COUNT limit""" + key = "test_stream" + + # Add entries + redis.xadd(key, "*", {"name": "Virginia", "surname": "Woolf"}) + redis.xadd(key, "*", {"name": "Jane", "surname": "Austen"}) + redis.xadd(key, "*", {"name": "Toni", "surname": "Morrison"}) + redis.xadd(key, "*", {"name": "Agatha", "surname": "Christie"}) + redis.xadd(key, "*", {"name": "Ngozi", "surname": "Adichie"}) + + # Get only last 2 entries + result = redis.xrevrange(key, "+", "-", count=2) + + assert len(result) == 2 + + # Should get the two most recent entries in reverse order + entry1_dict = {} + entry1_data = result[0][1] + for i in range(0, len(entry1_data), 2): + entry1_dict[entry1_data[i]] = entry1_data[i + 1] + + entry2_dict = {} + entry2_data = result[1][1] + for i in range(0, len(entry2_data), 2): + entry2_dict[entry2_data[i]] = entry2_data[i + 1] + + # First should be Ngozi (most recent) + assert entry1_dict["name"] == "Ngozi" + assert entry1_dict["surname"] == "Adichie" + + # Second should be Agatha (second most recent) + assert entry2_dict["name"] == "Agatha" + assert entry2_dict["surname"] == "Christie" + + +def test_xrevrange_empty_stream(redis: Redis): + """Test XREVRANGE on empty stream""" + result = redis.xrevrange("nonexistent_stream", "+", "-") + assert result == [] + + +def test_xrevrange_single_entry(redis: Redis): + """Test XREVRANGE with single entry""" + key = "test_stream" + + # Add single entry + redis.xadd(key, "*", {"field": "value"}) + + # Get in reverse + result = redis.xrevrange(key, "+", "-") + + assert len(result) == 1 + entry_id, entry_data = result[0] + + # Convert to dict + entry_dict = {} + for i in range(0, len(entry_data), 2): + entry_dict[entry_data[i]] = entry_data[i + 1] + + assert entry_dict["field"] == "value" + + +def test_xrevrange_specific_range(redis: Redis): + """Test XREVRANGE with specific ID range""" + key = "test_stream" + + # Add entries with known IDs + id1 = "1000000000000-0" + id2 = "2000000000000-0" + id3 = "3000000000000-0" + id4 = "4000000000000-0" + + redis.xadd(key, id1, {"field": "value1"}) + redis.xadd(key, id2, {"field": "value2"}) + redis.xadd(key, id3, {"field": "value3"}) + redis.xadd(key, id4, {"field": "value4"}) + + # Get range from id3 to id1 (reverse order) + result = redis.xrevrange(key, id3, id1) + + # Should get id3, id2, id1 in that order + returned_ids = [entry[0] for entry in result] + + assert id3 in returned_ids + assert id2 in returned_ids + assert id1 in returned_ids + assert id4 not in returned_ids # Outside range + + # First returned should be id3 (highest in range) + assert returned_ids[0] == id3 + + +def test_xrevrange_reverse_order_verification(redis: Redis): + """Test that XREVRANGE returns entries in correct reverse chronological order""" + key = "test_stream" + + # Add entries with sequential data + entry_ids = [] + for i in range(5): + entry_id = redis.xadd(key, "*", {"order": str(i)}) + entry_ids.append(entry_id) + + # Get in reverse order + result = redis.xrevrange(key, "+", "-") + + # Should be in reverse chronological order + returned_ids = [entry[0] for entry in result] + + # Compare with expected reverse order + expected_reverse = list(reversed(entry_ids)) + assert returned_ids == expected_reverse + + # Verify data is also in reverse order + for i, (entry_id, entry_data) in enumerate(result): + entry_dict = {} + for j in range(0, len(entry_data), 2): + entry_dict[entry_data[j]] = entry_data[j + 1] + + # Should be in reverse order (4, 3, 2, 1, 0) + expected_order = str(4 - i) + assert entry_dict["order"] == expected_order + + +def test_xrevrange_compared_to_xrange(redis: Redis): + """Test XREVRANGE returns same entries as XRANGE but in reverse order""" + key = "test_stream" + + # Add multiple entries + for i in range(10): + redis.xadd(key, "*", {"index": str(i)}) + + # Get with XRANGE (normal order) + forward_result = redis.xrange(key, "-", "+") + + # Get with XREVRANGE (reverse order) + reverse_result = redis.xrevrange(key, "+", "-") + + # Should have same number of entries + assert len(forward_result) == len(reverse_result) + + # IDs should be the same but in reverse order + forward_ids = [entry[0] for entry in forward_result] + reverse_ids = [entry[0] for entry in reverse_result] + + assert forward_ids == list(reversed(reverse_ids)) + + +def test_xrevrange_after_deletions(redis: Redis): + """Test XREVRANGE after some entries are deleted""" + key = "test_stream" + + # Add entries + ids = [] + for i in range(5): + entry_id = redis.xadd(key, "*", {"field": f"value{i}"}) + ids.append(entry_id) + + # Delete middle entries + redis.xdel(key, ids[1], ids[3]) + + # Get remaining in reverse order + result = redis.xrevrange(key, "+", "-") + + # Should only have entries 0, 2, 4 (in reverse order: 4, 2, 0) + assert len(result) == 3 + + returned_ids = [entry[0] for entry in result] + assert ids[4] in returned_ids # Most recent remaining + assert ids[2] in returned_ids # Middle remaining + assert ids[0] in returned_ids # Oldest remaining + + # Deleted entries should not be present + assert ids[1] not in returned_ids + assert ids[3] not in returned_ids + + # Order should be 4, 2, 0 + assert returned_ids[0] == ids[4] # Most recent first + + +def test_xrevrange_boundary_conditions(redis: Redis): + """Test XREVRANGE boundary conditions""" + key = "test_stream" + + # Add single entry + entry_id = redis.xadd(key, "*", {"field": "value"}) + + # Test exact boundaries + result = redis.xrevrange(key, entry_id, entry_id) + assert len(result) == 1 + assert result[0][0] == entry_id + + # Test inclusive boundaries + result = redis.xrevrange(key, "+", entry_id) + assert len(result) == 1 + + result = redis.xrevrange(key, entry_id, "-") + assert len(result) == 1 + + +def test_xrevrange_with_trimmed_stream(redis: Redis): + """Test XREVRANGE on a stream that has been trimmed""" + key = "test_stream" + + # Add many entries + for i in range(20): + redis.xadd(key, "*", {"field": f"value{i}"}) + + # Trim to keep only recent entries + redis.xtrim(key, maxlen=5, approximate=False) + + # Get remaining in reverse order + result = redis.xrevrange(key, "+", "-") + + # Should have at most 5 entries + assert len(result) <= 5 + + # Should be in reverse chronological order + if len(result) > 1: + # Compare timestamps to ensure reverse order + for i in range(len(result) - 1): + current_id = result[i][0] + next_id = result[i + 1][0] + + current_timestamp = int(current_id.split("-")[0]) + next_timestamp = int(next_id.split("-")[0]) + + assert current_timestamp >= next_timestamp + + +def test_xrevrange_large_count(redis: Redis): + """Test XREVRANGE with count larger than stream size""" + key = "test_stream" + + # Add few entries + for i in range(3): + redis.xadd(key, "*", {"field": f"value{i}"}) + + # Request more than available + result = redis.xrevrange(key, "+", "-", count=10) + + # Should return all available entries + assert len(result) == 3 diff --git a/tests/commands/stream/test_xtrim.py b/tests/commands/stream/test_xtrim.py new file mode 100644 index 0000000..f20a180 --- /dev/null +++ b/tests/commands/stream/test_xtrim.py @@ -0,0 +1,237 @@ +import pytest + +from upstash_redis import Redis + + +@pytest.fixture(autouse=True) +def flush_db(redis: Redis): + redis.flushdb() + + +def test_xtrim_maxlen_exact(redis: Redis): + """Test XTRIM with exact MAXLEN""" + key = "test_stream" + + # Add many entries + for i in range(50): + redis.xadd(key, "*", {"field": f"value{i}"}) + + # Trim to exact length + trimmed = redis.xtrim(key, maxlen=30, approximate=False) + assert isinstance(trimmed, int) + assert trimmed >= 0 + + # Check final length + length = redis.xlen(key) + assert length <= 30 + + +def test_xtrim_maxlen_approximate(redis: Redis): + """Test XTRIM with approximate MAXLEN""" + key = "test_stream" + + # Add many entries + for i in range(120): + redis.xadd(key, "*", {"field": f"value{i}"}) + + # Trim approximately + trimmed = redis.xtrim(key, maxlen=30, approximate=True) + assert isinstance(trimmed, int) + + # Length should be around 30 but may vary due to approximation + length = redis.xlen(key) + assert length >= 110 # Should be reasonably close + assert length <= 130 # But allow some variance + + +def test_xtrim_with_limit(redis: Redis): + """Test XTRIM with LIMIT option""" + key = "test_stream" + + # Add many entries + for i in range(100): + redis.xadd(key, "*", {"field": f"value{i}"}) + + # Trim with limit + redis.xtrim(key, maxlen=50, limit=10) + + # With limit, trimming might not reach the target + length = redis.xlen(key) + assert length > 50 # Should be more than target due to limit + + +def test_xtrim_minid(redis: Redis): + """Test XTRIM with MINID""" + key = "test_stream" + + # Add entries with known timestamps + base_timestamp = 1000000000000 + for i in range(20): + stream_id = f"{base_timestamp + i * 1000}-0" + redis.xadd(key, stream_id, {"field": f"value{i}"}) + + # Trim by minimum ID (remove entries older than midpoint) + mid_id = f"{base_timestamp + 10000}-0" + redis.xtrim(key, minid=mid_id, approximate=False) + + # Check that old entries are removed + entries = redis.xrange(key, "-", "+") + for entry_id, _ in entries: + timestamp = int(entry_id.split("-")[0]) + assert timestamp >= base_timestamp + 10000 + + +def test_xtrim_zero_length(redis: Redis): + """Test XTRIM with zero length removes everything""" + key = "test_stream" + + # Add entries + for i in range(10): + redis.xadd(key, "*", {"field": f"value{i}"}) + + initial_length = redis.xlen(key) + assert initial_length == 10 + + # Trim to zero + redis.xtrim(key, maxlen=0, approximate=False) + + # Should remove everything + final_length = redis.xlen(key) + assert final_length <= 1 # Might leave 1 entry in some implementations + + +def test_xtrim_empty_stream(redis: Redis): + """Test XTRIM on empty stream""" + key = "test_stream" + + # Create empty stream + redis.xadd(key, "*", {"field": "value"}) + redis.xtrim(key, maxlen=0, approximate=False) + + # Trim already empty stream + trimmed = redis.xtrim(key, maxlen=10, approximate=False) + assert trimmed == 0 + + +def test_xtrim_no_effect(redis: Redis): + """Test XTRIM when no trimming needed""" + key = "test_stream" + + # Add few entries + for i in range(5): + redis.xadd(key, "*", {"field": f"value{i}"}) + + # Trim to larger size (should have no effect) + trimmed = redis.xtrim(key, maxlen=10, approximate=False) + assert trimmed == 0 + + # Length should remain unchanged + length = redis.xlen(key) + assert length == 5 + + +def test_xtrim_preserves_recent_entries(redis: Redis): + """Test that XTRIM preserves the most recent entries""" + key = "test_stream" + + # Add entries with known data + entry_ids = [] + for i in range(10): + entry_id = redis.xadd(key, "*", {"counter": str(i)}) + entry_ids.append(entry_id) + + # Trim to keep only 5 entries + redis.xtrim(key, maxlen=5, approximate=False) + + # Check remaining entries (should be the last 5) + remaining = redis.xrange(key, "-", "+") + remaining_ids = [entry[0] for entry in remaining] + + # Should contain the most recent entries + for recent_id in entry_ids[-5:]: + if recent_id in remaining_ids: + # At least some of the recent entries should remain + pass + + +def test_xtrim_with_consumer_groups(redis: Redis): + """Test XTRIM behavior with consumer groups""" + key = "test_stream" + group = "test_group" + consumer = "test_consumer" + + # Add entries + for i in range(10): + redis.xadd(key, "*", {"field": f"value{i}"}) + + # Create consumer group and read some messages + redis.xgroup_create(key, group, "0") + redis.xreadgroup(group, consumer, {key: ">"}, count=5) + + # Trim the stream + redis.xtrim(key, maxlen=5, approximate=False) + + # Stream should be trimmed + length = redis.xlen(key) + assert length <= 5 + + +def test_xtrim_invalid_parameters(redis: Redis): + """Test XTRIM with invalid parameter combinations""" + key = "test_stream" + + # Add some entries + redis.xadd(key, "*", {"field": "value"}) + + # Should raise error when neither maxlen nor minid specified + with pytest.raises(ValueError): + redis.xtrim(key) # No maxlen or minid + + +def test_xtrim_minid_exact(redis: Redis): + """Test XTRIM MINID with exact trimming""" + key = "test_stream" + + # Add entries with predictable IDs + base_timestamp = 2000000000000 + ids = [] + for i in range(10): + stream_id = f"{base_timestamp + i * 1000}-0" + redis.xadd(key, stream_id, {"index": str(i)}) + ids.append(stream_id) + + # Trim by MINID (exact) + target_id = ids[5] # Keep entries from index 5 onwards + redis.xtrim(key, minid=target_id, approximate=False) + + # Verify only entries >= target_id remain + remaining = redis.xrange(key, "-", "+") + for entry_id, _ in remaining: + # Parse timestamp from ID + entry_timestamp = int(entry_id.split("-")[0]) + target_timestamp = int(target_id.split("-")[0]) + assert entry_timestamp >= target_timestamp + + +def test_xtrim_very_large_stream(redis: Redis): + """Test XTRIM on a larger stream""" + key = "test_stream" + + # Add many entries + entry_count = 200 + for i in range(entry_count): + redis.xadd(key, "*", {"index": str(i)}) + + initial_length = redis.xlen(key) + assert initial_length == entry_count + + # Trim to much smaller size + target_size = 20 + trimmed = redis.xtrim(key, maxlen=target_size, approximate=False) + + # Should have removed many entries + assert trimmed > 0 + + # Final size should be around target + final_length = redis.xlen(key) + assert final_length <= target_size diff --git a/upstash_redis/__init__.py b/upstash_redis/__init__.py index 268539d..8ea6207 100644 --- a/upstash_redis/__init__.py +++ b/upstash_redis/__init__.py @@ -1,4 +1,4 @@ -__version__ = "1.4.0" +__version__ = "1.5.0" from upstash_redis.asyncio.client import Redis as AsyncRedis from upstash_redis.client import Redis diff --git a/upstash_redis/commands.py b/upstash_redis/commands.py index 3bb5fb6..a833882 100644 --- a/upstash_redis/commands.py +++ b/upstash_redis/commands.py @@ -4641,6 +4641,555 @@ def script_load(self, script: str) -> ResponseT: return self.execute(command) + # Redis Streams Commands + + def xadd( + self, + key: str, + id: str, + data: Dict[str, Any], + maxlen: Optional[int] = None, + approximate_trim: bool = True, + nomkstream: bool = False, + minid: Optional[str] = None, + limit: Optional[int] = None, + ) -> ResponseT: + """ + Adds an entry to a stream. + + Example: + ```python + stream_id = redis.xadd("mystream", "*", {"field1": "value1", "field2": "value2"}) + print(stream_id) # e.g., "1609459200000-0" + ``` + + See https://redis.io/commands/xadd + """ + command: List = ["XADD", key] + + # Handle NOMKSTREAM option + if nomkstream: + command.append("NOMKSTREAM") + + # Handle trimming options + if maxlen is not None: + command.append("MAXLEN") + if approximate_trim: + command.append("~") + else: + command.append("=") + command.append(str(maxlen)) + if limit is not None: + command.extend(["LIMIT", str(limit)]) + elif minid is not None: + command.append("MINID") + if approximate_trim: + command.append("~") + else: + command.append("=") + command.append(minid) + if limit is not None: + command.extend(["LIMIT", str(limit)]) + + # Add stream ID + command.append(id) + + # Add field-value pairs + for field, value in data.items(): + command.extend([field, str(value)]) + + return self.execute(command) + + def xack(self, key: str, group: str, *ids: str) -> ResponseT: + """ + Acknowledges one or more messages in a consumer group. + + Example: + ```python + acknowledged = redis.xack("mystream", "mygroup", "1609459200000-0", "1609459200001-0") + print(acknowledged) # 2 + ``` + + See https://redis.io/commands/xack + """ + command: List = ["XACK", key, group] + list(ids) + return self.execute(command) + + def xdel(self, key: str, *ids: str) -> ResponseT: + """ + Removes one or more entries from a stream. + + Example: + ```python + deleted = redis.xdel("mystream", "1609459200000-0", "1609459200001-0") + print(deleted) # 2 + ``` + + See https://redis.io/commands/xdel + """ + command: List = ["XDEL", key] + list(ids) + return self.execute(command) + + def xgroup_create( + self, + key: str, + group: str, + id: str = "$", + mkstream: bool = False, + ) -> ResponseT: + """ + Creates a new consumer group for a stream. + + Note: The ENTRIESREAD option is not supported in Upstash Redis. + + Example: + ```python + result = redis.xgroup_create("mystream", "mygroup", "0", mkstream=True) + print(result) # "OK" + ``` + + See https://redis.io/commands/xgroup-create + """ + command: List = ["XGROUP", "CREATE", key, group, id] + + if mkstream: + command.append("MKSTREAM") + + return self.execute(command) + + def xgroup_createconsumer(self, key: str, group: str, consumer: str) -> ResponseT: + """ + Creates a consumer in a consumer group. + + Example: + ```python + result = redis.xgroup_createconsumer("mystream", "mygroup", "myconsumer") + print(result) # 1 (consumer was created) or 0 (consumer already existed) + ``` + + See https://redis.io/commands/xgroup-createconsumer + """ + command: List = ["XGROUP", "CREATECONSUMER", key, group, consumer] + return self.execute(command) + + def xgroup_delconsumer(self, key: str, group: str, consumer: str) -> ResponseT: + """ + Deletes a consumer from a consumer group. + + Example: + ```python + pending_count = redis.xgroup_delconsumer("mystream", "mygroup", "myconsumer") + print(pending_count) # Number of pending messages deleted + ``` + + See https://redis.io/commands/xgroup-delconsumer + """ + command: List = ["XGROUP", "DELCONSUMER", key, group, consumer] + return self.execute(command) + + def xgroup_destroy(self, key: str, group: str) -> ResponseT: + """ + Destroys a consumer group. + + Example: + ```python + result = redis.xgroup_destroy("mystream", "mygroup") + print(result) # 1 (group was destroyed) or 0 (group didn't exist) + ``` + + See https://redis.io/commands/xgroup-destroy + """ + command: List = ["XGROUP", "DESTROY", key, group] + return self.execute(command) + + def xgroup_setid( + self, key: str, group: str, id: str, entries_read: Optional[int] = None + ) -> ResponseT: + """ + Sets the consumer group last delivered ID to something else. + + Example: + ```python + result = redis.xgroup_setid("mystream", "mygroup", "$") + print(result) # "OK" + ``` + + See https://redis.io/commands/xgroup-setid + """ + command: List = ["XGROUP", "SETID", key, group, id] + + if entries_read is not None: + command.extend(["ENTRIESREAD", str(entries_read)]) + + return self.execute(command) + + def xinfo_consumers(self, key: str, group: str) -> ResponseT: + """ + Returns information about consumers in a consumer group. + + Example: + ```python + consumers = redis.xinfo_consumers("mystream", "mygroup") + print(consumers) # List of consumer information + ``` + + See https://redis.io/commands/xinfo-consumers + """ + command: List = ["XINFO", "CONSUMERS", key, group] + return self.execute(command) + + def xinfo_groups(self, key: str) -> ResponseT: + """ + Returns information about consumer groups in a stream. + + Example: + ```python + groups = redis.xinfo_groups("mystream") + print(groups) # List of group information + ``` + + See https://redis.io/commands/xinfo-groups + """ + command: List = ["XINFO", "GROUPS", key] + return self.execute(command) + + def xlen(self, key: str) -> ResponseT: + """ + Returns the length of a stream. + + Example: + ```python + length = redis.xlen("mystream") + print(length) # e.g., 42 + ``` + + See https://redis.io/commands/xlen + """ + command: List = ["XLEN", key] + return self.execute(command) + + def xpending( + self, + key: str, + group: str, + start: Optional[str] = None, + end: Optional[str] = None, + count: Optional[int] = None, + consumer: Optional[str] = None, + idle: Optional[int] = None, + ) -> ResponseT: + """ + Returns information about pending messages in a consumer group. + + :param key: The stream key + :param group: The consumer group name + :param start: Start ID for detailed info + :param end: End ID for detailed info + :param count: Number of entries to return + :param consumer: Filter by specific consumer + :param idle: Minimum idle time in milliseconds + + Note: start, end, and count must all be provided together for detailed information. + If none are provided, returns summary information. + + Example: + ```python + # Get general pending info + pending = redis.xpending("mystream", "mygroup") + print(pending) # [count, start_id, end_id, consumers] + + # Get detailed pending info + detailed = redis.xpending("mystream", "mygroup", "-", "+", 10) + print(detailed) # List of detailed pending message info + + # Get detailed info for specific consumer + detailed = redis.xpending("mystream", "mygroup", "-", "+", 10, "consumer1") + ``` + + See https://redis.io/commands/xpending + """ + command: List = ["XPENDING", key, group] + + # Check if any of start, end, count are provided + detailed_args_provided = [start is not None, end is not None, count is not None] + + if any(detailed_args_provided): + # If any are provided, all must be provided + if not all(detailed_args_provided): + raise ValueError( + "start, end, and count must all be provided together for detailed XPENDING" + ) + + if idle is not None: + command.extend(["IDLE", str(idle)]) + command.extend([start, end, str(count)]) + if consumer is not None: + command.append(consumer) + + return self.execute(command) + + def xrange( + self, + key: str, + start: str = "-", + end: str = "+", + count: Optional[int] = None, + ) -> ResponseT: + """ + Returns entries from a stream within a range. + + Example: + ```python + entries = redis.xrange("mystream", "-", "+", count=10) + print(entries) # List of [id, fields] pairs + ``` + + See https://redis.io/commands/xrange + """ + command: List = ["XRANGE", key, start, end] + + if count is not None: + command.extend(["COUNT", str(count)]) + + return self.execute(command) + + def xread( + self, + streams: Dict[str, str], + count: Optional[int] = None, + block: Optional[int] = None, + ) -> ResponseT: + """ + Reads entries from one or more streams. + + Example: + ```python + # Non-blocking read + entries = redis.xread({"mystream": "0-0"}, count=10) + print(entries) # List of [stream, entries] pairs + + # Blocking read (not supported in Upstash Redis) + # entries = redis.xread({"mystream": "$"}, block=1000) + ``` + + See https://redis.io/commands/xread + """ + command: List = ["XREAD"] + + if count is not None: + command.extend(["COUNT", str(count)]) + + if block is not None: + command.extend(["BLOCK", str(block)]) + + command.append("STREAMS") + + # Add stream names + for stream_key in streams.keys(): + command.append(stream_key) + + # Add stream IDs + for stream_id in streams.values(): + command.append(stream_id) + + return self.execute(command) + + def xreadgroup( + self, + group: str, + consumer: str, + streams: Dict[str, str], + count: Optional[int] = None, + block: Optional[int] = None, + noack: bool = False, + ) -> ResponseT: + """ + Reads entries from streams using a consumer group. + + Example: + ```python + # Read new messages + entries = redis.xreadgroup("mygroup", "myconsumer", {"mystream": ">"}, count=10) + print(entries) # List of [stream, entries] pairs + + # Read pending messages + pending = redis.xreadgroup("mygroup", "myconsumer", {"mystream": "0"}, count=10) + print(pending) + ``` + + See https://redis.io/commands/xreadgroup + """ + command: List = ["XREADGROUP", "GROUP", group, consumer] + + if count is not None: + command.extend(["COUNT", str(count)]) + + if block is not None: + command.extend(["BLOCK", str(block)]) + + if noack: + command.append("NOACK") + + command.append("STREAMS") + + # Add stream names + for stream_key in streams.keys(): + command.append(stream_key) + + # Add stream IDs + for stream_id in streams.values(): + command.append(stream_id) + + return self.execute(command) + + def xrevrange( + self, + key: str, + end: str = "+", + start: str = "-", + count: Optional[int] = None, + ) -> ResponseT: + """ + Returns entries from a stream within a range in reverse order. + + Example: + ```python + entries = redis.xrevrange("mystream", "+", "-", count=10) + print(entries) # List of [id, fields] pairs in reverse order + ``` + + See https://redis.io/commands/xrevrange + """ + command: List = ["XREVRANGE", key, end, start] + + if count is not None: + command.extend(["COUNT", str(count)]) + + return self.execute(command) + + def xtrim( + self, + key: str, + maxlen: Optional[int] = None, + approximate: bool = True, + minid: Optional[str] = None, + limit: Optional[int] = None, + ) -> ResponseT: + """ + Trims a stream to a specified size or minimum ID. + + Example: + ```python + # Trim by length + trimmed = redis.xtrim("mystream", maxlen=1000, approximate=True) + print(trimmed) # Number of entries removed + + # Trim by minimum ID + trimmed = redis.xtrim("mystream", minid="1609459200000-0") + print(trimmed) + ``` + + See https://redis.io/commands/xtrim + """ + command: List = ["XTRIM", key] + + if maxlen is not None: + command.append("MAXLEN") + if approximate: + command.append("~") + else: + command.append("=") + command.append(str(maxlen)) + elif minid is not None: + command.append("MINID") + if approximate: + command.append("~") + else: + command.append("=") + command.append(minid) + else: + raise ValueError("Either maxlen or minid must be specified") + + if limit is not None: + command.extend(["LIMIT", str(limit)]) + + return self.execute(command) + + def xclaim( + self, + key: str, + group: str, + consumer: str, + min_idle_time: int, + *ids: str, + justid: bool = False, + ) -> ResponseT: + """ + Changes ownership of pending messages to a different consumer. + + Note: Upstash Redis only supports basic XCLAIM functionality. + Advanced options like IDLE, TIME, RETRYCOUNT, FORCE, and LASTID are not supported. + + :param key: The stream key + :param group: The consumer group name + :param consumer: The consumer name to claim messages for + :param min_idle_time: Minimum idle time in milliseconds + :param ids: Message IDs to claim (variable arguments) + :param justid: Return only message IDs instead of full messages + + Example: + ```python + claimed = redis.xclaim( + "mystream", "mygroup", "newconsumer", + 3600000, "1609459200000-0", "1609459200001-0" + ) + print(claimed) # List of claimed messages + ``` + + See https://redis.io/commands/xclaim + """ + command: List = ["XCLAIM", key, group, consumer, str(min_idle_time)] + command.extend(ids) + + if justid: + command.append("JUSTID") + + return self.execute(command) + + def xautoclaim( + self, + key: str, + group: str, + consumer: str, + min_idle_time: int, + start: str, + count: Optional[int] = None, + justid: bool = False, + ) -> ResponseT: + """ + Automatically claims pending messages that have been idle for too long. + + Example: + ```python + result = redis.xautoclaim( + "mystream", "mygroup", "myconsumer", + 3600000, "0-0", count=10 + ) + print(result) # [next_start, claimed_messages] + ``` + + See https://redis.io/commands/xautoclaim + """ + command: List = ["XAUTOCLAIM", key, group, consumer, str(min_idle_time), start] + + if count is not None: + command.extend(["COUNT", str(count)]) + + if justid: + command.append("JUSTID") + + return self.execute(command) + class JsonCommands: def __init__(self, client: Commands): diff --git a/upstash_redis/commands.pyi b/upstash_redis/commands.pyi index 6b461dd..43eb0cf 100644 --- a/upstash_redis/commands.pyi +++ b/upstash_redis/commands.pyi @@ -544,6 +544,103 @@ class Commands: self, flush_type: Optional[Literal["ASYNC", "SYNC"]] = None ) -> bool: ... def script_load(self, script: str) -> str: ... + def xadd( + self, + key: str, + id: str, + data: Mapping[str, ValueT], + maxlen: Optional[int] = None, + approximate_trim: bool = True, + nomkstream: bool = False, + minid: Optional[str] = None, + limit: Optional[int] = None, + ) -> str: ... + def xack(self, key: str, group: str, *ids: str) -> int: ... + def xautoclaim( + self, + key: str, + group: str, + consumer: str, + min_idle_time: int, + start: str = "0-0", + count: Optional[int] = None, + justid: Optional[bool] = None, + ) -> List[List[str]]: ... + def xclaim( + self, + key: str, + group: str, + consumer: str, + min_idle_time: int, + *ids: str, + justid: Optional[bool] = None, + ) -> List[List[Any]]: ... + def xdel(self, key: str, *ids: str) -> int: ... + def xgroup_create( + self, + key: str, + group: str, + id: str = "$", + mkstream: Optional[bool] = None, + ) -> bool: ... + def xgroup_createconsumer(self, key: str, group: str, consumer: str) -> int: ... + def xgroup_delconsumer(self, key: str, group: str, consumer: str) -> int: ... + def xgroup_destroy(self, key: str, group: str) -> int: ... + def xgroup_setid( + self, + key: str, + group: str, + id: str, + entries_read: Optional[int] = None, + ) -> bool: ... + def xinfo_consumers(self, key: str, group: str) -> List[List[Any]]: ... + def xinfo_groups(self, key: str) -> List[List[Any]]: ... + def xlen(self, key: str) -> int: ... + def xpending( + self, + key: str, + group: str, + start: Optional[str] = None, + end: Optional[str] = None, + count: Optional[int] = None, + consumer: Optional[str] = None, + idle: Optional[int] = None, + ) -> Any: ... + def xrange( + self, + key: str, + start: str = "-", + end: str = "+", + count: Optional[int] = None, + ) -> List[List[Any]]: ... + def xread( + self, + streams: Dict[str, str], + count: Optional[int] = None, + ) -> List[List[Any]]: ... + def xreadgroup( + self, + group: str, + consumer: str, + streams: Dict[str, str], + count: Optional[int] = None, + noack: Optional[bool] = None, + ) -> List[List[Any]]: ... + def xrevrange( + self, + key: str, + end: str = "+", + start: str = "-", + count: Optional[int] = None, + ) -> List[List[Any]]: ... + def xtrim( + self, + key: str, + maxlen: Optional[int] = None, + approximate: Optional[bool] = None, + minid: Optional[str] = None, + limit: Optional[int] = None, + ) -> int: ... class AsyncCommands: def __init__(self): ... @@ -1094,6 +1191,105 @@ class AsyncCommands: self, flush_type: Optional[Literal["ASYNC", "SYNC"]] = None ) -> bool: ... async def script_load(self, script: str) -> str: ... + async def xadd( + self, + key: str, + id: str, + data: Mapping[str, ValueT], + maxlen: Optional[int] = None, + approximate_trim: bool = True, + nomkstream: bool = False, + minid: Optional[str] = None, + limit: Optional[int] = None, + ) -> str: ... + async def xack(self, key: str, group: str, *ids: str) -> int: ... + async def xautoclaim( + self, + key: str, + group: str, + consumer: str, + min_idle_time: int, + start: str = "0-0", + count: Optional[int] = None, + justid: Optional[bool] = None, + ) -> List[List[str]]: ... + async def xclaim( + self, + key: str, + group: str, + consumer: str, + min_idle_time: int, + *ids: str, + justid: Optional[bool] = None, + ) -> List[List[Any]]: ... + async def xdel(self, key: str, *ids: str) -> int: ... + async def xgroup_create( + self, + key: str, + group: str, + id: str = "$", + mkstream: Optional[bool] = None, + ) -> bool: ... + async def xgroup_createconsumer( + self, key: str, group: str, consumer: str + ) -> int: ... + async def xgroup_delconsumer(self, key: str, group: str, consumer: str) -> int: ... + async def xgroup_destroy(self, key: str, group: str) -> int: ... + async def xgroup_setid( + self, + key: str, + group: str, + id: str, + entries_read: Optional[int] = None, + ) -> bool: ... + async def xinfo_consumers(self, key: str, group: str) -> List[List[Any]]: ... + async def xinfo_groups(self, key: str) -> List[List[Any]]: ... + async def xlen(self, key: str) -> int: ... + async def xpending( + self, + key: str, + group: str, + start: Optional[str] = None, + end: Optional[str] = None, + count: Optional[int] = None, + consumer: Optional[str] = None, + idle: Optional[int] = None, + ) -> Any: ... + async def xrange( + self, + key: str, + start: str = "-", + end: str = "+", + count: Optional[int] = None, + ) -> List[List[Any]]: ... + async def xread( + self, + streams: Dict[str, str], + count: Optional[int] = None, + ) -> List[List[Any]]: ... + async def xreadgroup( + self, + group: str, + consumer: str, + streams: Dict[str, str], + count: Optional[int] = None, + noack: Optional[bool] = None, + ) -> List[List[Any]]: ... + async def xrevrange( + self, + key: str, + end: str = "+", + start: str = "-", + count: Optional[int] = None, + ) -> List[List[Any]]: ... + async def xtrim( + self, + key: str, + maxlen: Optional[int] = None, + approximate: Optional[bool] = None, + minid: Optional[str] = None, + limit: Optional[int] = None, + ) -> int: ... class BitFieldCommands: def __init__(self, client: Commands, key: str): ... diff --git a/upstash_redis/format.py b/upstash_redis/format.py index df04501..cdca6d4 100644 --- a/upstash_redis/format.py +++ b/upstash_redis/format.py @@ -200,6 +200,32 @@ def format_sorted_set_response_with_score(res, command): return res +def format_xread_response(res, command): + """Format XREAD/XREADGROUP response to convert stream entries.""" + if res is None: + return [] + if not res: + return res + + # Return as-is to maintain Redis-compatible format + return res + + +def format_xrange_response(res, command): + """Format XRANGE/XREVRANGE response to convert entries.""" + if not res: + return res + + # Return as-is to maintain Redis-compatible format + return res + + +def format_xpending_response(res, command): + """Format XPENDING response.""" + # Return as-is to maintain Redis-compatible format + return res + + FORMATTERS: Dict[str, Callable] = { "COPY": to_bool, "EXPIRE": to_bool, @@ -268,6 +294,16 @@ def format_sorted_set_response_with_score(res, command): "LSET": ok_to_bool, "SCRIPT FLUSH": ok_to_bool, "SCRIPT EXISTS": to_bool_list, + "XREAD": format_xread_response, + "XREADGROUP": format_xread_response, + "XRANGE": format_xrange_response, + "XREVRANGE": format_xrange_response, + "XPENDING": format_xpending_response, + "XGROUP CREATE": ok_to_bool, + "XGROUP DESTROY": to_bool, + "XGROUP CREATECONSUMER": to_bool, + "XGROUP DELCONSUMER": to_bool, + "XGROUP SETID": ok_to_bool, }