Handling Late-Arriving Data in ClickHouse Views
Late-arriving data remains one of the most persistent failure modes in high-throughput analytics pipelines. When events arrive out of chronological order, cross partition boundaries, or are backfilled hours after their logical timestamp, standard materialized views frequently produce incorrect aggregations, duplicate counters, or silent metric divergence. Successfully handling late-arriving data in ClickHouse requires a deliberate combination of storage engine selection, deterministic versioning, partition-aware ingestion routing, and automated reconciliation workflows. This guide provides production-grade patterns, exact DDL configurations, diagnostic queries, and Python ETL automation strategies to guarantee materialized view accuracy under real-world data latency conditions.
Root Cause: Stateless MV Execution & Block Boundaries
ClickHouse materialized views are not background refresh jobs or scheduled aggregators. They function as stateless INSERT triggers that execute synchronously when data lands in the source table. Each incoming block is transformed, aggregated, and appended to the target table. When a late-arriving event lands in a partition that has already been processed, the materialized view treats it as a new row.
For SummingMergeTree targets, this inflates counters. For AggregatingMergeTree, it accumulates duplicate intermediate states. While the background merge process eventually deduplicates or collapses these states, it only does so if the engine is explicitly configured with version or sign columns, and only when partitions are merged. Relying on implicit merge timing introduces non-deterministic query results across rolling windows, directly violating SLA-bound reporting requirements.
Late data fundamentally breaks the assumption that ingestion order equals event order. Without explicit watermarking and idempotent upsert patterns, materialized views will produce divergent results until background merges catch up, which may never happen under sustained high-throughput ingestion.
Architectural Mitigation: Versioning & Watermarking
The most reliable mitigation strategy decouples raw ingestion from downstream aggregation using a two-tier TO pattern. Instead of attaching a materialized view directly to a landing table, route data through a staging layer that explicitly tracks logical event timestamps, ingestion timestamps, and a monotonically increasing version identifier.
This architecture aligns with established Incremental Refresh Strategies by ensuring that late events are processed as explicit state updates rather than blind appends. The target table should utilize ReplacingMergeTree(version) or AggregatingMergeTree paired with SimpleAggregateFunction(anyLast, ...) to guarantee deterministic conflict resolution during background merges. By enforcing a strict versioning contract, you transform out-of-order arrivals into predictable, idempotent updates.
Production-Ready DDL & Configuration
The following DDL establishes a versioned ingestion pipeline. Note the explicit version column and the TO clause that routes data away from the raw table.
-- 1. Raw ingestion table (partitioned by ingestion date for operational isolation)
CREATE TABLE analytics.events_raw
(
event_id UUID,
event_ts DateTime64(3, 'UTC'),
ingestion_ts DateTime64(3, 'UTC') DEFAULT now64(3, 'UTC'),
event_type LowCardinality(String),
user_id String,
metrics Map(String, Float64),
version UInt64
)
ENGINE = MergeTree()
PARTITION BY toDate(ingestion_ts)
ORDER BY (event_type, user_id, event_ts)
SETTINGS index_granularity = 8192;
-- 2. Aggregated target table with deterministic versioning
CREATE TABLE analytics.events_agg
(
event_type LowCardinality(String),
user_id String,
event_date Date,
event_count SimpleAggregateFunction(sum, UInt64),
last_event_ts SimpleAggregateFunction(max, DateTime64(3, 'UTC')),
version UInt64
)
ENGINE = ReplacingMergeTree(version)
PARTITION BY event_date
ORDER BY (event_type, user_id, event_date)
SETTINGS allow_nullable_key = 0;
-- 3. Materialized View routing and aggregation
CREATE MATERIALIZED VIEW analytics.mv_events_agg
TO analytics.events_agg AS
SELECT
event_type,
user_id,
toDate(event_ts) AS event_date,
1 AS event_count,
event_ts AS last_event_ts,
version
FROM analytics.events_raw
WHERE event_ts >= (now() - INTERVAL 30 DAY);
Key configuration notes:
- The
versioncolumn must be generated upstream or during ingestion. Higher values always win during merges. ReplacingMergeTreedoes not guarantee immediate deduplication. Queries must account for unmerged parts or useFINALselectively.- The
WHEREclause in the MV acts as a soft watermark, preventing extremely stale backfills from triggering unnecessary target writes. Adjust based on your SLA tolerance.
Python ETL Integration & Deterministic Watermarking
Python ETL pipelines must enforce deterministic version generation before inserting into the raw table. Relying on ClickHouse defaults for versioning introduces race conditions during parallel ingestion.
import time
from clickhouse_driver import Client
client = Client(host='clickhouse-cluster', port=9000, database='analytics')
def generate_version(event_ts: float) -> int:
"""
Generates a monotonic version based on event timestamp + nanosecond precision.
Ensures late-arriving events with the same logical timestamp receive higher versions.
"""
ns_precision = int(time.time_ns() % 1_000_000_000)
return int(event_ts * 1_000) + ns_precision
def ingest_events(events: list[dict]) -> None:
payload = []
for e in events:
payload.append((
e['event_id'],
e['event_ts'],
e['event_type'],
e['user_id'],
e['metrics'],
generate_version(e['event_ts'])
))
client.execute(
"INSERT INTO analytics.events_raw "
"(event_id, event_ts, event_type, user_id, metrics, version) VALUES",
payload,
settings={'insert_deduplicate': 0}
)
For historical backfills, increment the version offset by a fixed multiplier (e.g., + 1_000_000_000) to guarantee that replayed data supersedes existing states without requiring manual OPTIMIZE operations. This pattern is documented in official ClickHouse documentation for ReplacingMergeTree conflict resolution.
Diagnostic Queries & Monitoring
Operational visibility into late-arriving data requires targeted queries against system.parts and custom watermark checks. The following diagnostics should be scheduled via cron or Airflow:
-- Detect late-arriving events (ingestion_ts > event_ts by > 24 hours)
SELECT
event_date,
count() AS late_rows,
max(ingestion_ts - event_ts) AS max_latency_hours
FROM analytics.events_raw
WHERE ingestion_ts - event_ts > INTERVAL 24 HOUR
GROUP BY event_date
ORDER BY event_date DESC;
-- Monitor unmerged parts in the aggregated target
SELECT
partition,
count() AS part_count,
sum(rows) AS total_rows,
max(modification_time) AS last_merge_ts
FROM system.parts
WHERE table = 'events_agg' AND database = 'analytics' AND active = 1
GROUP BY partition
HAVING part_count > 5
ORDER BY last_merge_ts ASC;
Threshold tuning should trigger alerts when part_count exceeds 10 per partition or when max_latency_hours breaches your reporting SLA. Automated reconciliation jobs can then compare raw row counts against aggregated sums to flag divergence early.
Operational Fallbacks & View Recovery
When background merges fall behind ingestion velocity, or when a pipeline outage causes massive backfills, manual intervention may be required. Avoid running OPTIMIZE TABLE ... FINAL on production workloads; it forces synchronous merges across all partitions, causing severe CPU and I/O spikes. Instead, use partition-scoped optimization:
OPTIMIZE TABLE analytics.events_agg PARTITION '2024-05-01' FINAL;
For catastrophic state divergence, implement a fallback reconciliation chain:
- Pause the materialized view:
SYSTEM STOP MERGES analytics.events_agg; - Truncate the target table.
- Re-run the aggregation via a direct
INSERT INTO ... SELECTfrom the raw table, ordered byversion. - Resume merges:
SYSTEM START MERGES analytics.events_agg;
This recovery pattern integrates seamlessly into broader Materialized View Management & Sync Automation frameworks, ensuring that pipeline orchestration tools can trigger deterministic resets without manual SQL intervention.
Conclusion
Handling late-arriving data in ClickHouse materialized views demands explicit versioning, partition-aware routing, and proactive monitoring. By decoupling ingestion from aggregation, enforcing deterministic watermarking in Python ETL pipelines, and leveraging ReplacingMergeTree conflict resolution, analytics teams can eliminate silent metric corruption. Combine these patterns with partition-scoped diagnostics and automated fallback chains to maintain sub-minute SLA accuracy even under sustained out-of-order ingestion.