Skip to content

vndee/redis-data-structures

Repository files navigation

🌟 Redis Data Structures

PyPI version PyPI Downloads Python 3.8+ codecov code linting tests License: MIT

A Python library providing high-level, Redis-backed data structures with a clean, Pythonic interface. Perfect for distributed systems, microservices, and any application requiring persistent, thread-safe data structures, especially in environments where multiple workers share the same data structure.

Redis Data Structures (1)

πŸ’‘ Examples

πŸ“‹ Table of Contents

✨ Features

  • Thread-safe data structures backed by Redis
  • Clean, Pythonic interface
  • Connection pooling, automatic retries, and circuit breaker pattern for fault tolerance
  • Type preservation for complex types
  • Automatic compression of large data using zlib (configurable)
  • >90% test coverage
  • Async support (coming soon)

πŸ“¦ Installation

pip install redis-data-structures

Note: Ensure that Redis is running for the library to function properly.

πŸš€ Quick Start

from redis_data_structures import Queue, Stack, Set, ConnectionManager

# Initialize connection
conn = ConnectionManager(host='localhost', port=6379, db=0)

# Create and use data structures
queue = Queue("tasks", connection_manager=conn)
queue.push({'id': 1, 'action': 'process'})

stack = Stack("commands", connection_manager=conn)
stack.push({'action': 'create'})

set_ds = Set("users", connection_manager=conn)
set_ds.add({'id': 'user1', 'name': 'Alice'})

You can also skip using ConnectionManager if the following environment variables are set:

  • REDIS_HOST
  • REDIS_PORT
  • REDIS_DB
  • REDIS_USERNAME
  • REDIS_PASSWORD
from redis_data_structures import Queue, Stack, Set

queue = Queue("test_queue")
stack = Stack("test_stack")
set_ds = Set("test_set")

Refer to initialization for more information.

πŸ“Š Data Structures

Structure Description Use Case
Queue FIFO queue Job processing, message passing
Stack LIFO stack Undo systems, execution contexts
Set Unique collection Membership testing, deduplication
Dict Python-like dictionary (key-value store) Caching, metadata storage
HashMap Key-value store Caching, metadata storage
PriorityQueue Priority-based queue Task scheduling
RingBuffer Fixed-size circular buffer Logs, metrics
Graph Graph with adjacency list Relationships, networks
Trie Prefix tree Autocomplete, spell checking
BloomFilter Probabilistic set Membership testing
Deque Double-ended queue Sliding windows

πŸ’» Usage Examples

Redis Data Structures (2)

from redis_data_structures import Queue

queue = Queue("tasks")

# Basic operations
queue.push({'id': 1, 'action': 'process'})
task = queue.pop()
size = queue.size()

stack = Stack("commands")
stack.push({'action': 'create'})
command = stack.pop()
size = stack.size()

set_ds = Set("users")
set_ds.add({'id': 'user1'})
exists = set_ds.contains({'id': 'user1'})
members = set_ds.members()

hash_map = HashMap("users")
hash_map.set('user:1', {'name': 'Alice', 'age': 30})
user = hash_map.get('user:1')
exists = hash_map.exists('user:1')

priority_queue = PriorityQueue("tasks")
priority_queue.push({'id': 1, 'priority': 1})
task = priority_queue.pop()
peek = priority_queue.peek()

...

For more examples, see examples.

πŸ”— Connection Management

from redis_data_structures import ConnectionManager
from datetime import timedelta

conn = ConnectionManager(
    host='localhost',
    port=6379,
    db=0,
    max_connections=20,
    retry_max_attempts=5,
    circuit_breaker_threshold=10,
    circuit_breaker_timeout=timedelta(minutes=5),
    ssl=True
)

# Reuse for multiple queues
pq1 = PriorityQueue("tasks", connection_manager=conn)
pq2 = PriorityQueue("tasks", connection_manager=conn)

stack = Stack("commands", connection_manager=conn)
set_ds = Set("users", connection_manager=conn)

πŸ” Complex Types

Redis Data Structures (3)

Redis Data Structures (4)

from redis_data_structures import LRUCache, HashMap, SerializableType
from datetime import datetime, timezone
from pydantic import BaseModel

class User(SerializableType):
    """Example of a custom Redis data type using standard class."""

    def __init__(self, name: str, joined: datetime):
        """Initialize the User object."""
        self.name = name
        self.joined = joined

    def to_dict(self) -> dict:
        """Convert the User object to a dictionary."""
        return {
            "name": self.name,
            "joined": self.joined.isoformat(),  # Convert datetime to string
        }

    @classmethod
    def from_dict(cls, data: dict) -> "User":
        """Create a User object from a dictionary."""
        return cls(
            name=data["name"],
            joined=datetime.fromisoformat(data["joined"]),  # Convert string back to datetime
        )

    def __eq__(self, other) -> bool:
        """Override __eq__ for proper equality comparison."""
        return (
            isinstance(other, User) and 
            self.name == other.name and 
            self.joined == other.joined
        )

    def __str__(self) -> str:
        """Return a string representation of the User object."""
        return f"User(name='{self.name}', joined={self.joined.isoformat()})"


class Address(BaseModel):
    """Nested Pydantic model for demonstration."""

    street: str
    city: str
    country: str
    postal_code: Optional[str] = None


class UserModel(BaseModel):
    """Example of a Pydantic model - works automatically with Redis structures."""

    name: str
    email: str
    age: int = Field(gt=0, lt=150)
    joined: datetime
    address: Optional[Address] = None
    tags: Set[str] = set()

    def __str__(self) -> str:
        """Return a string representation of the UserModel instance."""
        return f"UserModel(name='{self.name}', email='{self.email}', age={self.age})"


# Initialize data structures
cache = LRUCache("test_cache", capacity=1000)  # Using default connection settings
hash_map = HashMap("type_demo_hash")  # Using default connection settings

# Example 1: Basic Python Types
data = {
    "string": "hello",
    "integer": 42,
    "float": 3.14,
    "boolean": True,
    "none": None,
}
for key, value in data.items():
    hash_map.set(key, value)
    result = hash_map.get(key)

# Example 2: Collections
collections = {
    "tuple": (1, "two", 3.0),
    "list": [1, 2, 3, "four"],
    "set": {1, 2, 3, 4},
    "dict": {"a": 1, "b": 2},
}
for key, value in collections.items():
    hash_map.set(key, value)
    result = hash_map.get(key)

# Example 3: DateTime Types
now = datetime.now(timezone.utc)
hash_map.set("datetime", now)
result = hash_map.get("datetime")

# Example 4: Custom Type
user = User("John Doe", datetime.now(timezone.utc))
hash_map.set("custom_user", user)
result = hash_map.get("custom_user")

# Example 5: Pydantic Models
user_model = UserModel(
    name="Jane Smith",
    email="[email protected]",
    age=30,
    joined=datetime.now(timezone.utc),
    address=Address(
        street="123 Main St",
        city="New York",
        country="USA",
        postal_code="10001",
    ),
    tags={"developer", "python"},
)

# Store in different data structures
cache.put("pydantic_user", user_model)
hash_map.set("pydantic_user", user_model)

# Retrieve and verify
cache_result = cache.get("pydantic_user")
hash_result = hash_map.get("pydantic_user")

# Example 6: Nested Structures
nested_data = {
    "user": user,
    "model": user_model,
    "list": [1, user, user_model],
    "tuple": (user, user_model),
    "dict": {
        "user": user,
        "model": user_model,
        "date": now,
    },
}
hash_map.set("nested", nested_data)
result = hash_map.get("nested")

Important Note for Distributed Systems: In scenarios where some processes only consume data (without storing any), you need to manually register types before deserializing since the type registering is only automatically done when storing data. This is common in worker processes, read-only replicas, or monitoring systems. Example:

from redis_data_structures import RedisDataStructure

# In consumer processes, register types before reading data
redis_structure = RedisDataStructure(key="my_key")

# Register your custom types
redis_structure.register_types(User)  # For SerializableType classes
redis_structure.register_types(UserModel)  # For Pydantic models

# Register multiple types at once
redis_structure.register_types([User, UserModel])  # For multiple types

# Now you can safely deserialize data
user = hash_map.get("custom_user")  # Will correctly deserialize as User instance
model = hash_map.get("pydantic_user")  # Will correctly deserialize as UserModel instance

See type preservation for more information.

🀝 Contributing

  1. Fork the repository
  2. Create a feature branch (git checkout -b feature/amazing-feature)
  3. Commit changes (git commit -m 'Add amazing feature')
  4. Push to branch (git push origin feature/amazing-feature)
  5. Open a Pull Request

πŸ“ License

This project is licensed under the MIT License - see the LICENSE file for details.

About

A Python package providing Redis-backed data structures for building scalable and resilient applications.

Resources

License

Code of conduct

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published