Back to Documentation

Distributed Transactions

ACID transactions across multiple shards and geographic regions

Overview

entropyDB provides full ACID guarantees for transactions spanning multiple shards, even across different data centers. Using the Calvin protocol, we achieve this without traditional two-phase commit overhead.

Key Capabilities

  • Cross-shard transactions with full ACID guarantees
  • Global linearizability across regions
  • No 2PC blocking or coordinator failures
  • Automatic failover and recovery

Cross-Shard Transactions

-- Transaction spans multiple shards automatically
BEGIN;

-- Shard 1: User in US-East
UPDATE users 
SET balance = balance - 100 
WHERE id = 123;  -- Shard key: user_id

-- Shard 2: User in US-West
UPDATE users 
SET balance = balance + 100 
WHERE id = 456;  -- Different shard

-- Shard 3: Order table (sharded by order_id)
INSERT INTO orders (user_id, amount)
VALUES (123, 100);

COMMIT;  -- Atomic across all shards

How It Works

  1. 1.Global sequencer assigns transaction order
  2. 2.Transaction broadcast to all involved shards
  3. 3.Each shard executes in same order (deterministic)
  4. 4.No voting or coordination needed
  5. 5.Commit when all shards finish execution

Sharding Strategy

Hash-Based Sharding

-- Create sharded table
CREATE TABLE users (
  id BIGINT PRIMARY KEY,
  name TEXT,
  email TEXT,
  region TEXT
) PARTITION BY HASH (id);

-- entropyDB automatically creates shards
-- Shard 0: hash(id) % 16 = 0
-- Shard 1: hash(id) % 16 = 1
-- ...
-- Shard 15: hash(id) % 16 = 15

Best for: Even distribution, parallel queries, no hotspots

Range-Based Sharding

-- Create range-partitioned table
CREATE TABLE events (
  id BIGINT,
  timestamp TIMESTAMP,
  data JSONB
) PARTITION BY RANGE (timestamp);

-- Manual partition creation
CREATE TABLE events_2024_q1 
  PARTITION OF events 
  FOR VALUES FROM ('2024-01-01') TO ('2024-04-01');

CREATE TABLE events_2024_q2 
  PARTITION OF events 
  FOR VALUES FROM ('2024-04-01') TO ('2024-07-01');

Best for: Time-series data, sequential access, range queries

Geographic Sharding

-- Shard by region for data locality
CREATE TABLE users (
  id BIGINT,
  region TEXT,
  data JSONB
) PARTITION BY LIST (region);

CREATE TABLE users_us 
  PARTITION OF users 
  FOR VALUES IN ('us-east', 'us-west');

CREATE TABLE users_eu 
  PARTITION OF users 
  FOR VALUES IN ('eu-west', 'eu-central');

Best for: Data residency, low latency, compliance requirements

Cross-Region Transactions

Transactions can span data centers with strong consistency:

-- User in US updates, data replicated to EU
BEGIN;

-- Primary in US-East
UPDATE users SET last_login = NOW() WHERE id = 123;

-- Insert into EU shard
INSERT INTO user_activity (user_id, region, activity)
VALUES (123, 'eu-west', 'page_view');

-- Both regions see consistent state
COMMIT;

Latency Considerations

  • Same Region: 5-10ms transaction latency
  • Cross-Region (US ↔ EU): 80-120ms (network RTT)
  • Global (US ↔ Asia): 150-250ms

Read Strategies

Strong Reads (Default)

-- Always reads latest committed data
SELECT * FROM users WHERE id = 123;

-- Waits for any pending replication
-- Guarantees linearizability

Stale Reads (Lower Latency)

-- Read from nearest replica (may be slightly stale)
SET TRANSACTION READ ONLY;
SET entropy.read_mode = 'stale';

SELECT * FROM users WHERE id = 123;

-- Typical staleness: 100-500ms
-- Much faster for geo-distributed reads

Bounded Staleness

-- Read data that's at most 5 seconds old
SET entropy.max_staleness = '5 seconds';

SELECT * FROM users WHERE id = 123;

-- Good balance: Lower latency, bounded inconsistency

Failure Handling

Shard Failure

When a shard fails during transaction:

  • Transaction automatically retried on replica
  • Deterministic execution ensures same result
  • Client may see retry, but atomicity preserved

Network Partition

During network split:

  • Majority partition continues operating
  • Minority partition blocks writes (CP in CAP)
  • Reads continue with bounded staleness
  • Automatic reconciliation when partition heals

Performance Optimization

1. Co-locate Related Data

Use composite shard keys to keep related records together:

-- Shard by tenant_id to keep tenant data on same shard
CREATE TABLE orders (
  tenant_id INT,
  order_id BIGINT,
  ...
) PARTITION BY HASH (tenant_id);

2. Minimize Cross-Shard Transactions

Design schema to keep most transactions within a single shard when possible.

3. Use Async Replication for Reads

Read from local replicas with bounded staleness for better latency.

4. Batch Operations

Group multiple operations in single transaction:

-- Better: Single transaction
BEGIN;
  INSERT INTO orders VALUES (...), (...), (...);
COMMIT;

-- Worse: Three separate transactions
INSERT INTO orders VALUES (...);
INSERT INTO orders VALUES (...);
INSERT INTO orders VALUES (...);

Monitoring

-- Check cross-shard transaction stats
SELECT 
  shard_id,
  COUNT(*) as total_txns,
  COUNT(*) FILTER (WHERE cross_shard) as cross_shard_txns,
  AVG(duration_ms) as avg_duration
FROM transaction_log
WHERE timestamp > NOW() - INTERVAL '1 hour'
GROUP BY shard_id;

-- Monitor replication lag
SELECT 
  replica_region,
  primary_region,
  MAX(lag_ms) as max_lag,
  AVG(lag_ms) as avg_lag
FROM replication_stats
WHERE timestamp > NOW() - INTERVAL '5 minutes'
GROUP BY replica_region, primary_region;

Best Practices

1. Choose Right Shard Key

Pick a shard key with high cardinality and even distribution.

2. Plan for Growth

Start with more shards than needed. Rebalancing is expensive.

3. Test Failure Scenarios

Regularly test shard failures and network partitions in staging.

4. Monitor Latency Percentiles

Track p50, p95, p99 latencies for cross-shard transactions.

5. Use Connection Pooling

Maintain connection pools to each shard for lower overhead.

Next Steps