Back to Documentation

Time Series Queries

Advanced time-series analytics with window functions, aggregations, and downsampling

Overview

Time-series capabilities include:

  • Time Bucketing: Aggregate data into time intervals
  • Window Functions: Moving averages, LAG, LEAD
  • Gap Filling: Handle missing data points
  • Downsampling: Reduce data resolution
  • Continuous Aggregates: Materialized views for time data

Time Bucketing

-- Aggregate by hour
SELECT 
  time_bucket('1 hour', timestamp) as hour,
  AVG(temperature) as avg_temp,
  MAX(temperature) as max_temp,
  MIN(temperature) as min_temp
FROM sensor_data
WHERE timestamp > NOW() - INTERVAL '24 hours'
GROUP BY hour
ORDER BY hour;

-- Aggregate by 5-minute intervals
SELECT 
  time_bucket('5 minutes', timestamp) as bucket,
  sensor_id,
  AVG(value) as avg_value,
  COUNT(*) as sample_count
FROM metrics
GROUP BY bucket, sensor_id
ORDER BY bucket DESC;

-- Custom bucket sizes
SELECT 
  time_bucket('15 seconds', timestamp) as bucket,
  COUNT(*) as request_count,
  AVG(response_time) as avg_response_time
FROM api_logs
WHERE timestamp > NOW() - INTERVAL '1 hour'
GROUP BY bucket
ORDER BY bucket;

-- Bucket with offset
SELECT 
  time_bucket('1 day', timestamp, INTERVAL '6 hours') as day,
  SUM(sales) as total_sales
FROM transactions
GROUP BY day;

Window Functions

-- Moving average
SELECT 
  timestamp,
  temperature,
  AVG(temperature) OVER (
    ORDER BY timestamp
    ROWS BETWEEN 9 PRECEDING AND CURRENT ROW
  ) as moving_avg_10
FROM sensor_data
ORDER BY timestamp;

-- LAG and LEAD for time series
SELECT 
  timestamp,
  price,
  LAG(price, 1) OVER (ORDER BY timestamp) as prev_price,
  LEAD(price, 1) OVER (ORDER BY timestamp) as next_price,
  price - LAG(price, 1) OVER (ORDER BY timestamp) as price_change
FROM stock_prices
WHERE symbol = 'AAPL'
ORDER BY timestamp;

-- Cumulative sum over time
SELECT 
  date,
  daily_sales,
  SUM(daily_sales) OVER (
    ORDER BY date
    ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
  ) as cumulative_sales
FROM sales
ORDER BY date;

-- Running minimum and maximum
SELECT 
  timestamp,
  value,
  MIN(value) OVER (
    ORDER BY timestamp
    ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
  ) as running_min,
  MAX(value) OVER (
    ORDER BY timestamp
    ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
  ) as running_max
FROM metrics;

-- Rate of change
SELECT 
  timestamp,
  value,
  (value - LAG(value) OVER (ORDER BY timestamp)) / 
  EXTRACT(EPOCH FROM (timestamp - LAG(timestamp) OVER (ORDER BY timestamp))) as rate_per_second
FROM counters
ORDER BY timestamp;

Gap Filling

-- Fill gaps with zeros
SELECT 
  bucket,
  COALESCE(value, 0) as value
FROM generate_series(
  NOW() - INTERVAL '24 hours',
  NOW(),
  INTERVAL '1 hour'
) as bucket
LEFT JOIN (
  SELECT time_bucket('1 hour', timestamp) as bucket, SUM(value) as value
  FROM metrics
  GROUP BY bucket
) m USING (bucket)
ORDER BY bucket;

-- Interpolate missing values
WITH time_series AS (
  SELECT 
    timestamp,
    value,
    LAG(value) OVER (ORDER BY timestamp) as prev_value,
    LEAD(value) OVER (ORDER BY timestamp) as next_value,
    LAG(timestamp) OVER (ORDER BY timestamp) as prev_time,
    LEAD(timestamp) OVER (ORDER BY timestamp) as next_time
  FROM sensor_data
)
SELECT 
  timestamp,
  CASE 
    WHEN value IS NOT NULL THEN value
    WHEN prev_value IS NOT NULL AND next_value IS NOT NULL THEN
      prev_value + (next_value - prev_value) * 
      EXTRACT(EPOCH FROM (timestamp - prev_time)) /
      EXTRACT(EPOCH FROM (next_time - prev_time))
    ELSE prev_value
  END as interpolated_value
FROM time_series;

-- Forward fill (carry last observation forward)
SELECT 
  timestamp,
  COALESCE(
    value,
    LAST_VALUE(value IGNORE NULLS) OVER (
      ORDER BY timestamp
      ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
    )
  ) as filled_value
FROM sensor_data;

Continuous Aggregates

-- Create continuous aggregate (materialized view)
CREATE MATERIALIZED VIEW hourly_metrics
WITH (timescaledb.continuous) AS
SELECT 
  time_bucket('1 hour', timestamp) as hour,
  sensor_id,
  AVG(value) as avg_value,
  MAX(value) as max_value,
  MIN(value) as min_value,
  COUNT(*) as sample_count
FROM sensor_data
GROUP BY hour, sensor_id;

-- Add refresh policy
SELECT add_continuous_aggregate_policy('hourly_metrics',
  start_offset => INTERVAL '3 hours',
  end_offset => INTERVAL '1 hour',
  schedule_interval => INTERVAL '1 hour'
);

-- Query continuous aggregate
SELECT * FROM hourly_metrics
WHERE hour > NOW() - INTERVAL '7 days'
  AND sensor_id = 'sensor_123'
ORDER BY hour;

-- Create daily rollup from hourly
CREATE MATERIALIZED VIEW daily_metrics AS
SELECT 
  date_trunc('day', hour) as day,
  sensor_id,
  AVG(avg_value) as avg_value,
  MAX(max_value) as max_value,
  MIN(min_value) as min_value,
  SUM(sample_count) as total_samples
FROM hourly_metrics
GROUP BY day, sensor_id;

-- Refresh materialized view
REFRESH MATERIALIZED VIEW CONCURRENTLY hourly_metrics;

Downsampling

-- Downsample to lower resolution
SELECT 
  time_bucket('1 hour', timestamp) as hour,
  first(value, timestamp) as first_value,
  last(value, timestamp) as last_value,
  avg(value) as avg_value,
  max(value) as max_value,
  min(value) as min_value
FROM high_frequency_data
WHERE timestamp > NOW() - INTERVAL '30 days'
GROUP BY hour;

-- Time-weighted average
SELECT 
  time_bucket('1 hour', timestamp) as hour,
  entropy_time_weighted_avg(value, timestamp) as twa
FROM sensor_data
GROUP BY hour;

-- Adaptive downsampling based on variability
WITH stats AS (
  SELECT 
    time_bucket('5 minutes', timestamp) as bucket,
    AVG(value) as avg_value,
    STDDEV(value) as stddev_value
  FROM metrics
  GROUP BY bucket
)
SELECT 
  bucket,
  avg_value,
  CASE 
    WHEN stddev_value < 0.1 THEN '1 hour'  -- Low variability
    WHEN stddev_value < 1.0 THEN '15 minutes'
    ELSE '5 minutes'  -- High variability, keep resolution
  END as recommended_interval
FROM stats;

-- Retention policy with automatic downsampling
SELECT add_retention_policy('sensor_data', INTERVAL '7 days');
SELECT add_compression_policy('sensor_data', INTERVAL '1 day');

Best Practices

Performance

  • • Use time_bucket for aggregations
  • • Create indexes on timestamp columns
  • • Use continuous aggregates for frequent queries
  • • Implement data retention policies

Modeling

  • • Partition tables by time
  • • Use appropriate bucket sizes
  • • Pre-aggregate when possible
  • • Compress old data

Next Steps