Skip to content

Commit 2354b74

Browse files
committed
feat: Implement TTL and expiration support for Redis commands
1 parent 55ed560 commit 2354b74

File tree

6 files changed

+648
-21
lines changed

6 files changed

+648
-21
lines changed

README.md

Lines changed: 60 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ Blobasaur is a high-performance, sharded blob storage server written in Rust. It
4040
- [Storage Compression](#storage-compression-1)
4141
- [Advanced Topics](#advanced-topics)
4242
- [Database Schema](#database-schema)
43+
- [TTL and Key Expiration](#ttl-and-key-expiration)
4344
- [Race Condition Handling](#race-condition-handling)
4445
- [Redis Cluster Compatibility](#redis-cluster-compatibility)
4546
- [Development](#development)
@@ -57,6 +58,7 @@ Blobasaur is a high-performance, sharded blob storage server written in Rust. It
5758
- **🗜️ Storage Compression**: Configurable compression with multiple algorithms (Gzip, Zstd, Lz4, Brotli)
5859
- **📊 Namespacing**: Hash-based namespacing using `HGET`/`HSET` commands for logical data organization
5960
- **🏷️ Metadata Tracking**: Automatic tracking of creation time, update time, expiration, and version numbers
61+
- **⏰ TTL Support**: Redis-compatible key expiration with `SET EX/PX`, `TTL`, and `EXPIRE` commands plus automatic background cleanup
6062
- **🌐 Redis Cluster Support**: Full cluster protocol support with automatic node discovery and client redirection
6163
- **⚙️ Highly Configurable**: Flexible configuration for shards, compression, batching, and performance tuning
6264

@@ -159,6 +161,11 @@ just build-linux
159161
```bash
160162
redis-cli -p 6379 SET mykey "Hello, World!"
161163
redis-cli -p 6379 GET mykey
164+
165+
# Test TTL functionality
166+
redis-cli -p 6379 SET session:123 "user_data" EX 60 # Expires in 60 seconds
167+
redis-cli -p 6379 TTL session:123 # Check remaining time
168+
redis-cli -p 6379 EXPIRE mykey 300 # Set 5 minute expiration
162169
```
163170
164171
## CLI Usage
@@ -260,12 +267,19 @@ level = 1
260267
261268
Blobasaur implements core Redis commands for blob operations:
262269
263-
- **`SET key value`**: Store or replace a blob
270+
- **`SET key value [EX seconds] [PX milliseconds]`**: Store or replace a blob with optional TTL
264271
```bash
272+
# Basic SET
265273
redis-cli SET mykey "Hello, World!"
274+
275+
# SET with TTL in seconds
276+
redis-cli SET mykey "Hello, World!" EX 60
277+
278+
# SET with TTL in milliseconds
279+
redis-cli SET mykey "Hello, World!" PX 30000
266280
```
267281
268-
- **`GET key`**: Retrieve a blob
282+
- **`GET key`**: Retrieve a blob (excludes expired keys)
269283
```bash
270284
redis-cli GET mykey
271285
```
@@ -275,11 +289,28 @@ Blobasaur implements core Redis commands for blob operations:
275289
redis-cli DEL mykey
276290
```
277291
278-
- **`EXISTS key`**: Check if a blob exists
292+
- **`EXISTS key`**: Check if a blob exists (excludes expired keys)
279293
```bash
280294
redis-cli EXISTS mykey
281295
```
282296
297+
- **`TTL key`**: Get the remaining time to live for a key
298+
```bash
299+
redis-cli TTL mykey
300+
# Returns:
301+
# -1 if key exists but has no expiration
302+
# -2 if key does not exist or has expired
303+
# positive number: remaining TTL in seconds
304+
```
305+
306+
- **`EXPIRE key seconds`**: Set expiration time for an existing key
307+
```bash
308+
redis-cli EXPIRE mykey 120 # Expire in 2 minutes
309+
# Returns:
310+
# 1 if expiration was set successfully
311+
# 0 if key does not exist
312+
```
313+
283314
### Namespaced Commands
284315
285316
Use namespaces to organize data into logical groups:
@@ -461,6 +492,32 @@ CREATE TABLE blobs (
461492
- Same schema as default table
462493
- Isolated from other namespaces
463494
495+
### TTL and Key Expiration
496+
497+
**Features:**
498+
- **Redis-Compatible TTL**: Full support for `SET EX/PX`, `TTL`, and `EXPIRE` commands
499+
- **Automatic Expiry**: All read operations (`GET`, `EXISTS`, `TTL`) automatically filter expired keys
500+
- **Background Cleanup**: Per-shard cleanup tasks run every 60 seconds to remove expired keys
501+
- **Efficient Storage**: Uses indexed `expires_at` timestamps for fast expiry queries
502+
503+
**Implementation Details:**
504+
- Expiration timestamps stored as Unix epoch seconds in `expires_at` column
505+
- Database indexes on `expires_at` for efficient cleanup queries
506+
- Background cleanup processes both main `blobs` table and namespaced tables
507+
- Race-free expiry checking: keys are considered expired at query time
508+
509+
**Usage Examples:**
510+
```bash
511+
# Set with 60 second expiration
512+
redis-cli SET session:abc123 "user_data" EX 60
513+
514+
# Check remaining TTL
515+
redis-cli TTL session:abc123
516+
517+
# Add expiration to existing key
518+
redis-cli EXPIRE permanent_key 3600
519+
```
520+
464521
### Race Condition Handling
465522
466523
**The Problem:** Async writes could cause GET requests to miss recently SET data.

src/main.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,17 @@ async fn run_server(config_path: &str) -> Result<()> {
160160
));
161161
}
162162

163+
// Spawn cleanup tasks for each shard
164+
let cleanup_interval_secs = 60; // Clean up expired keys every 60 seconds
165+
for i in 0..cfg.num_shards {
166+
let pool = shared_state.db_pools[i].clone();
167+
tokio::spawn(shard_manager::shard_cleanup_task(
168+
i,
169+
pool,
170+
cleanup_interval_secs,
171+
));
172+
}
173+
163174
// Start HTTP metrics server if enabled
164175
if let Some(handle) = prometheus_handle {
165176
let metrics_addr = cfg

src/redis/integration_tests.rs

Lines changed: 109 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,8 @@ mod integration_tests {
8181
parsed_command,
8282
RedisCommand::Set {
8383
key: "mykey".to_string(),
84-
value: Bytes::from_static(b"hello world")
84+
value: Bytes::from_static(b"hello world"),
85+
ttl_seconds: None
8586
}
8687
);
8788
}
@@ -220,9 +221,10 @@ mod integration_tests {
220221

221222
match parsed_command {
222223
RedisCommand::Get { key } => assert_eq!(key, "key1"),
223-
RedisCommand::Set { key, value } => {
224+
RedisCommand::Set { key, value, ttl_seconds } => {
224225
assert_eq!(key, "key2");
225226
assert_eq!(value, Bytes::from_static(b"value"));
227+
assert_eq!(ttl_seconds, None);
226228
}
227229
RedisCommand::Del { key } => assert_eq!(key, "key3"),
228230
_ => panic!("Unexpected command"),
@@ -241,9 +243,10 @@ mod integration_tests {
241243
let parsed_command = parse_command(parsed_resp).unwrap();
242244

243245
match parsed_command {
244-
RedisCommand::Set { key, value } => {
246+
RedisCommand::Set { key, value, ttl_seconds } => {
245247
assert_eq!(key, "binary");
246248
assert_eq!(value, Bytes::copy_from_slice(binary_data));
249+
assert_eq!(ttl_seconds, None);
247250
}
248251
_ => panic!("Expected SET command"),
249252
}
@@ -267,9 +270,10 @@ mod integration_tests {
267270
let parsed_command = parse_command(parsed_resp).unwrap();
268271

269272
match parsed_command {
270-
RedisCommand::Set { key, value } => {
273+
RedisCommand::Set { key, value, ttl_seconds } => {
271274
assert_eq!(key, large_key);
272275
assert_eq!(value, Bytes::from(large_value.into_bytes()));
276+
assert_eq!(ttl_seconds, None);
273277
}
274278
_ => panic!("Expected SET command"),
275279
}
@@ -283,9 +287,10 @@ mod integration_tests {
283287
let parsed_command = parse_command(parsed_resp).unwrap();
284288

285289
match parsed_command {
286-
RedisCommand::Set { key, value } => {
290+
RedisCommand::Set { key, value, ttl_seconds } => {
287291
assert_eq!(key, "empty");
288292
assert_eq!(value, Bytes::new());
293+
assert_eq!(ttl_seconds, None);
289294
}
290295
_ => panic!("Expected SET command"),
291296
}
@@ -415,6 +420,105 @@ mod integration_tests {
415420
assert!(matches!(result, Err(ParseError::Invalid(_))));
416421
}
417422

423+
#[tokio::test]
424+
async fn test_set_with_ttl_integration() {
425+
// Test SET with EX option
426+
let set_ex_command = b"*5\r\n$3\r\nSET\r\n$8\r\nttl_test\r\n$5\r\nvalue\r\n$2\r\nEX\r\n$2\r\n60\r\n";
427+
let (parsed_resp, _) = parse_resp_with_remaining(set_ex_command).unwrap();
428+
let parsed_command = parse_command(parsed_resp).unwrap();
429+
match parsed_command {
430+
RedisCommand::Set { key, value, ttl_seconds } => {
431+
assert_eq!(key, "ttl_test");
432+
assert_eq!(value, Bytes::from_static(b"value"));
433+
assert_eq!(ttl_seconds, Some(60));
434+
}
435+
_ => panic!("Expected SET command with TTL"),
436+
}
437+
438+
// Test SET with PX option
439+
let set_px_command = b"*5\r\n$3\r\nSET\r\n$8\r\nttl_test\r\n$5\r\nvalue\r\n$2\r\nPX\r\n$5\r\n60000\r\n";
440+
let (parsed_resp, _) = parse_resp_with_remaining(set_px_command).unwrap();
441+
let parsed_command = parse_command(parsed_resp).unwrap();
442+
match parsed_command {
443+
RedisCommand::Set { key, value, ttl_seconds } => {
444+
assert_eq!(key, "ttl_test");
445+
assert_eq!(value, Bytes::from_static(b"value"));
446+
assert_eq!(ttl_seconds, Some(60)); // 60000ms = 60s
447+
}
448+
_ => panic!("Expected SET command with TTL"),
449+
}
450+
}
451+
452+
#[tokio::test]
453+
async fn test_ttl_command_integration() {
454+
let ttl_command = b"*2\r\n$3\r\nTTL\r\n$7\r\nmykey42\r\n";
455+
let mock_response = b":-1\r\n"; // Key exists but no expiration
456+
457+
let port = create_mock_server(vec![mock_response.to_vec()]).await;
458+
let addr = format!("127.0.0.1:{}", port);
459+
460+
let response = send_command_and_receive(&addr, ttl_command).await;
461+
assert_eq!(response, mock_response);
462+
463+
// Parse the command to verify our parser works
464+
let (parsed_resp, _) = parse_resp_with_remaining(ttl_command).unwrap();
465+
let parsed_command = parse_command(parsed_resp).unwrap();
466+
assert_eq!(
467+
parsed_command,
468+
RedisCommand::Ttl {
469+
key: "mykey42".to_string()
470+
}
471+
);
472+
}
473+
474+
#[tokio::test]
475+
async fn test_expire_command_integration() {
476+
let expire_command = b"*3\r\n$6\r\nEXPIRE\r\n$7\r\nmykey42\r\n$3\r\n120\r\n";
477+
let mock_response = b":1\r\n"; // Key exists and expiration was set
478+
479+
let port = create_mock_server(vec![mock_response.to_vec()]).await;
480+
let addr = format!("127.0.0.1:{}", port);
481+
482+
let response = send_command_and_receive(&addr, expire_command).await;
483+
assert_eq!(response, mock_response);
484+
485+
// Parse the command to verify our parser works
486+
let (parsed_resp, _) = parse_resp_with_remaining(expire_command).unwrap();
487+
let parsed_command = parse_command(parsed_resp).unwrap();
488+
assert_eq!(
489+
parsed_command,
490+
RedisCommand::Expire {
491+
key: "mykey42".to_string(),
492+
seconds: 120
493+
}
494+
);
495+
}
496+
497+
#[tokio::test]
498+
async fn test_ttl_edge_cases() {
499+
// Test TTL for non-existent key
500+
let ttl_command = b"*2\r\n$3\r\nTTL\r\n$11\r\nnonexistent\r\n";
501+
let mock_response = b":-2\r\n"; // Key doesn't exist
502+
503+
let port = create_mock_server(vec![mock_response.to_vec()]).await;
504+
let addr = format!("127.0.0.1:{}", port);
505+
506+
let response = send_command_and_receive(&addr, ttl_command).await;
507+
assert_eq!(response, mock_response);
508+
}
509+
510+
#[tokio::test]
511+
async fn test_expire_on_nonexistent_key() {
512+
let expire_command = b"*3\r\n$6\r\nEXPIRE\r\n$11\r\nnonexistent\r\n$2\r\n60\r\n";
513+
let mock_response = b":0\r\n"; // Key doesn't exist
514+
515+
let port = create_mock_server(vec![mock_response.to_vec()]).await;
516+
let addr = format!("127.0.0.1:{}", port);
517+
518+
let response = send_command_and_receive(&addr, expire_command).await;
519+
assert_eq!(response, mock_response);
520+
}
521+
418522
#[tokio::test]
419523
async fn test_case_insensitive_commands() {
420524
let test_cases = vec![

0 commit comments

Comments
 (0)