Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions examples/hash_operations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
"""
Redis Hash Operations
======================

Demonstrates working with Redis hashes -- lightweight dictionaries stored
under a single key. Common use-cases include user profiles, product
catalogs, and session data.

Commands covered:
- HSET / HGET / HGETALL
- HMGET for fetching multiple fields
- HINCRBY for atomic field increments
- HDEL / HEXISTS / HKEYS / HVALS

Prerequisites:
pip install redis

A running Redis server (default: localhost:6379).
Start one with Docker:
docker run -d -p 6379:6379 redis
"""

import redis


def main() -> None:
r = redis.Redis(host="localhost", port=6379, decode_responses=True)

# ------------------------------------------------------------------
# 1. HSET -- create a hash with multiple fields in one call
# ------------------------------------------------------------------
r.hset(
"user:1000",
mapping={
"name": "Alice",
"email": "alice@example.com",
"login_count": 0,
"verified": "true",
},
)
print("Created user:1000")

# ------------------------------------------------------------------
# 2. HGET -- read a single field
# ------------------------------------------------------------------
name = r.hget("user:1000", "name")
print(f"HGET name -> {name}") # Alice

# ------------------------------------------------------------------
# 3. HMGET -- read several fields at once
# ------------------------------------------------------------------
fields = r.hmget("user:1000", ["name", "email"])
print(f"HMGET -> {fields}") # ['Alice', 'alice@example.com']

# ------------------------------------------------------------------
# 4. HGETALL -- return every field-value pair as a dict
# ------------------------------------------------------------------
user = r.hgetall("user:1000")
print(f"HGETALL -> {user}")

# ------------------------------------------------------------------
# 5. HINCRBY -- atomically increment a numeric field
# ------------------------------------------------------------------
r.hincrby("user:1000", "login_count", 1)
r.hincrby("user:1000", "login_count", 1)
logins = r.hget("user:1000", "login_count")
print(f"login_count -> {logins}") # 2

# ------------------------------------------------------------------
# 6. HEXISTS / HKEYS / HVALS -- inspect hash structure
# ------------------------------------------------------------------
exists = r.hexists("user:1000", "email")
print(f"HEXISTS email -> {exists}") # True

keys = r.hkeys("user:1000")
print(f"HKEYS -> {keys}")

vals = r.hvals("user:1000")
print(f"HVALS -> {vals}")

# ------------------------------------------------------------------
# 7. HDEL -- remove specific fields from the hash
# ------------------------------------------------------------------
r.hdel("user:1000", "verified")
remaining = r.hgetall("user:1000")
print(f"After HDEL -> {remaining}")

# ------------------------------------------------------------------
# Cleanup
# ------------------------------------------------------------------
r.delete("user:1000")
print("Cleanup complete.")


if __name__ == "__main__":
main()
110 changes: 110 additions & 0 deletions examples/pubsub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
"""
Redis Pub/Sub Pattern
======================

Demonstrates the publish/subscribe messaging pattern with redis-py.

This script runs both a subscriber and a publisher in the same process
using threads so you can see the full flow without opening two terminals.

Commands covered:
- SUBSCRIBE / PSUBSCRIBE (pattern-based)
- PUBLISH
- Unsubscribing and cleaning up

Prerequisites:
pip install redis

A running Redis server (default: localhost:6379).
Start one with Docker:
docker run -d -p 6379:6379 redis
"""

import threading
import time

import redis


def subscriber(ready_event: threading.Event) -> None:
"""Listen for messages on the 'notifications' channel."""
r = redis.Redis(host="localhost", port=6379, decode_responses=True)
pubsub = r.pubsub()

# Subscribe to a specific channel
pubsub.subscribe("notifications")

# Subscribe to a pattern -- receives messages from any channel
# whose name starts with "events."
pubsub.psubscribe("events.*")

print("[subscriber] Subscribed to 'notifications' and 'events.*'")

# Signal the main thread that we are ready to receive messages.
ready_event.set()

# listen() yields messages as they arrive. The first messages are
# subscription confirmations (type "subscribe" / "psubscribe").
for message in pubsub.listen():
msg_type = message["type"]

if msg_type in ("subscribe", "psubscribe"):
# Confirmation of subscription -- nothing to process.
continue

if msg_type in ("message", "pmessage"):
channel = message["channel"]
data = message["data"]
print(f"[subscriber] Received on '{channel}': {data}")

# Use a sentinel value to know when to stop.
if data == "QUIT":
break

pubsub.unsubscribe()
pubsub.punsubscribe()
pubsub.close()
print("[subscriber] Unsubscribed and closed.")


def publisher(ready_event: threading.Event) -> None:
"""Publish a few messages after the subscriber is ready."""
# Wait until the subscriber thread has set up its subscriptions.
ready_event.wait()
# Small extra delay to ensure subscription is fully registered.
time.sleep(0.1)

r = redis.Redis(host="localhost", port=6379, decode_responses=True)

# Publish to the exact channel
count = r.publish("notifications", "Hello from publisher!")
print(f"[publisher] Published to 'notifications' ({count} receivers)")

# Publish to a pattern-matched channel
count = r.publish("events.login", "user:42 logged in")
print(f"[publisher] Published to 'events.login' ({count} receivers)")

count = r.publish("events.purchase", "order:99 placed")
print(f"[publisher] Published to 'events.purchase' ({count} receivers)")

# Send the sentinel so the subscriber exits cleanly.
r.publish("notifications", "QUIT")


def main() -> None:
ready = threading.Event()

sub_thread = threading.Thread(target=subscriber, args=(ready,))
pub_thread = threading.Thread(target=publisher, args=(ready,))

sub_thread.start()
pub_thread.start()

pub_thread.join()
sub_thread.join()

print("Done.")


if __name__ == "__main__":
main()
83 changes: 83 additions & 0 deletions examples/string_operations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
"""
Basic Redis String Operations
==============================

Demonstrates the most common Redis string commands using redis-py:
- SET / GET for storing and retrieving values
- MSET / MGET for bulk operations
- INCR / DECR for atomic counters
- Key expiration with EX and PX options

Prerequisites:
pip install redis

A running Redis server (default: localhost:6379).
Start one with Docker:
docker run -d -p 6379:6379 redis
"""

import redis


def main() -> None:
# Connect to Redis. decode_responses=True ensures we get str instead
# of bytes for every response.
r = redis.Redis(host="localhost", port=6379, decode_responses=True)

# ------------------------------------------------------------------
# 1. SET and GET -- store and retrieve a single value
# ------------------------------------------------------------------
r.set("greeting", "Hello, Redis!")
value = r.get("greeting")
print(f"GET greeting -> {value}") # Hello, Redis!

# ------------------------------------------------------------------
# 2. SET with expiration -- key auto-deletes after the TTL
# ------------------------------------------------------------------
# EX sets the expiry in seconds; PX would set it in milliseconds.
r.set("temp_key", "I will expire", ex=60)
ttl = r.ttl("temp_key")
print(f"TTL temp_key -> {ttl}s") # ~60

# ------------------------------------------------------------------
# 3. SET with NX / XX flags
# ------------------------------------------------------------------
# NX: only set if the key does NOT exist (useful for locking)
r.set("bike:1", "Deimos")
result_nx = r.set("bike:1", "Ares", nx=True)
print(f"SET bike:1 NX -> {result_nx}") # None (key already exists)

# XX: only set if the key DOES exist
result_xx = r.set("bike:1", "Ares", xx=True)
print(f"SET bike:1 XX -> {result_xx}") # True

# ------------------------------------------------------------------
# 4. MSET and MGET -- set/get multiple keys in one round-trip
# ------------------------------------------------------------------
r.mset({"color:1": "red", "color:2": "green", "color:3": "blue"})
colors = r.mget(["color:1", "color:2", "color:3"])
print(f"MGET colors -> {colors}") # ['red', 'green', 'blue']

# ------------------------------------------------------------------
# 5. Atomic counters -- INCR / INCRBY / DECR / DECRBY
# ------------------------------------------------------------------
r.set("page_views", 0)
r.incr("page_views") # 1
r.incrby("page_views", 10) # 11
r.decr("page_views") # 10
views = r.get("page_views")
print(f"page_views -> {views}") # 10

# ------------------------------------------------------------------
# Cleanup -- remove keys created by this example
# ------------------------------------------------------------------
r.delete(
"greeting", "temp_key", "bike:1",
"color:1", "color:2", "color:3",
"page_views",
)
print("Cleanup complete.")


if __name__ == "__main__":
main()