Back to Documentation

Python SDK

High-performance Python client for entropyDB with async support and connection pooling

Overview

Key features of the Python SDK:

  • Connection Pooling: Efficient connection management
  • Async/Await: Native asyncio support
  • Type Safety: Full type hints support
  • ORM Interface: Optional object-relational mapping
  • Vector Operations: First-class vector support

Installation

pip install entropydb

Quick Start

from entropydb import connect

# Connect to database
conn = connect(
    host="localhost",
    port=5432,
    database="entropydb",
    user="entropy",
    password="your_password"
)

# Or use connection string
conn = connect("postgresql://entropy:password@localhost:5432/entropydb")

# Execute query
result = conn.execute("SELECT * FROM users LIMIT 10")
for row in result:
    print(row)

# Close connection
conn.close()

Connection Management

# Context Manager (recommended)
from entropydb import connect

with connect("postgresql://localhost/entropydb") as conn:
    result = conn.execute("SELECT COUNT(*) FROM users")
    print(f"Total users: {result.fetchone()[0]}")
# Connection automatically closed

# Connection Pool
from entropydb import ConnectionPool

pool = ConnectionPool(
    "postgresql://localhost/entropydb",
    min_connections=2,
    max_connections=10
)

# Use connection from pool
with pool.get_connection() as conn:
    result = conn.execute("SELECT * FROM products")
    for row in result:
        print(row)
    
# Close all connections
pool.close()

Query Execution

# Execute and fetch all
result = conn.execute("SELECT * FROM users")
rows = result.fetchall()

# Execute and fetch one
result = conn.execute("SELECT * FROM users WHERE id = %s", (1,))
row = result.fetchone()

# Execute and iterate
result = conn.execute("SELECT * FROM products")
for row in result:
    print(f"{row['name']}: ${row['price']}")

# Parameterized queries (positional)
conn.execute(
    "INSERT INTO users (username, email) VALUES (%s, %s)",
    ("john_doe", "john@example.com")
)

# Named parameters
conn.execute(
    "INSERT INTO users (username, email) VALUES (%(username)s, %(email)s)",
    {"username": "jane_doe", "email": "jane@example.com"}
)

# Batch operations
users = [
    ("user1", "user1@example.com"),
    ("user2", "user2@example.com"),
    ("user3", "user3@example.com"),
]

conn.execute_batch(
    "INSERT INTO users (username, email) VALUES (%s, %s)",
    users
)

Transactions

# Manual transaction control
conn.begin()
try:
    conn.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
    conn.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
    conn.commit()
except Exception as e:
    conn.rollback()
    print(f"Transaction failed: {e}")

# Context manager (recommended)
with conn.transaction():
    conn.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
    conn.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
# Auto-commit on success, auto-rollback on exception

Working with JSON

import json

# Insert JSON data
metadata = {
    "tags": ["featured", "electronics"],
    "specs": {"cpu": "Intel i7", "ram": "16GB"}
}

conn.execute(
    "INSERT INTO products (name, metadata) VALUES (%s, %s)",
    ("Laptop", json.dumps(metadata))
)

# Query JSON fields
result = conn.execute("""
    SELECT 
        name,
        metadata->>'brand' as brand,
        metadata->'specs'->>'cpu' as cpu
    FROM products
    WHERE metadata @> '{"tags": ["featured"]}'
""")

for row in result:
    print(f"{row['name']} - {row['brand']} - {row['cpu']}")

Vector Operations

import numpy as np
from entropydb.vector import Vector

# Create embedding
embedding = np.random.rand(768).tolist()

# Insert vector
conn.execute(
    "INSERT INTO products (name, embedding) VALUES (%s, %s)",
    ("Product 1", Vector(embedding))
)

# Similarity search
query_embedding = np.random.rand(768).tolist()
result = conn.execute("""
    SELECT 
        name,
        1 - (embedding <=> %s) as similarity
    FROM products
    ORDER BY embedding <=> %s
    LIMIT 10
""", (Vector(query_embedding), Vector(query_embedding)))

for row in result:
    print(f"{row['name']}: {row['similarity']:.4f}")

ORM-Style API

from entropydb import Table, Column, Integer, Text, JSONB

# Define table
class User(Table):
    __tablename__ = 'users'
    
    id = Column(Integer, primary_key=True)
    username = Column(Text, nullable=False)
    email = Column(Text, nullable=False)
    metadata = Column(JSONB)

# Query using ORM
users = User.query(conn).filter(
    User.username.like('%john%')
).limit(10).all()

# Insert using ORM
new_user = User(
    username='alice',
    email='alice@example.com',
    metadata={'role': 'admin'}
)
new_user.save(conn)

# Update
user = User.query(conn).get(1)
user.email = 'newemail@example.com'
user.save(conn)

Async Support

import asyncio
from entropydb import async_connect

async def main():
    # Async connection
    conn = await async_connect("postgresql://localhost/entropydb")
    
    # Async queries
    result = await conn.execute("SELECT * FROM users")
    async for row in result:
        print(row)
    
    # Async transaction
    async with conn.transaction():
        await conn.execute("INSERT INTO users (username) VALUES (%s)", ("test",))
    
    await conn.close()

# Run
asyncio.run(main())

Error Handling

from entropydb import (
    entropyDBError,
    ConnectionError,
    QueryError,
    IntegrityError
)

try:
    conn = connect("postgresql://localhost/entropydb")
    conn.execute("INSERT INTO users (username) VALUES (%s)", ("duplicate",))
except ConnectionError as e:
    print(f"Connection failed: {e}")
except IntegrityError as e:
    print(f"Constraint violation: {e}")
except QueryError as e:
    print(f"Query error: {e}")
except entropyDBError as e:
    print(f"Database error: {e}")
finally:
    if conn:
        conn.close()

Configuration Options

from entropydb import connect

conn = connect(
    host="localhost",
    port=5432,
    database="entropydb",
    user="entropy",
    password="password",
    
    # Connection options
    connect_timeout=10,
    statement_timeout=30000,  # milliseconds
    
    # SSL options
    sslmode="require",
    sslcert="/path/to/client-cert.pem",
    sslkey="/path/to/client-key.pem",
    sslrootcert="/path/to/ca-cert.pem",
    
    # Performance options
    prepare_threshold=5,  # Prepared statement threshold
    fetch_size=1000,  # Row fetch size
)

Best Practices

Connection Management

  • • Use connection pooling
  • • Always close connections
  • • Use context managers
  • • Set appropriate timeouts

Query Optimization

  • • Use parameterized queries
  • • Batch insert operations
  • • Use transactions properly
  • • Handle errors gracefully

Next Steps