Concurrent Update Handling
The Challenge
Multiple pipelines update same DTX_ID simultaneously: CRM (name/address), Billing (status/balance), CDR (traits), Segments (memberships).
Key Mechanisms
- Kafka Partitioning — same DTX_ID → same partition
- Flink Keyed Processing — single-threaded per DTX_ID
- Merge Policies — attribute-level conflict resolution
- Atomic Upsert — INSERT ON CONFLICT UPDATE
Merge Policies
| Policy | Logic | Use Case |
|---|---|---|
| MOST_RECENT | Latest timestamp wins | Dynamic attrs (status, balance) |
| SOURCE_PRIORITY | Ordered source preference | Authoritative sources (CRM > Web) |
| HIGHEST_CONFIDENCE | Best confidence wins | Probabilistic data |
| COALESCE | First non-null wins | Gap filling |
| CUSTOM | Custom logic | Complex business rules |
Scenarios
| Attribute | Conflict | Policy | Resolution |
|---|---|---|---|
| given_name | CRM: Ahmed, Web: Ahmad | SOURCE_PRIORITY | Keep CRM |
| account.status | Billing 10:00: ACTIVE, CRM 10:05: SUSPENDED | MOST_RECENT | Keep SUSPENDED |
| Old exists, new from form | MOST_RECENT | Replace | |
| phone | CRM has value, Web null | COALESCE | Keep CRM |
| home_location | Signaling 85%, DPI 72% | HIGHEST_CONFIDENCE | Keep signaling |
Upsert SQL
INSERT INTO profiles (tenant_id, dtx_id, given_name, account_status, updated_at)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT (tenant_id, dtx_id) DO UPDATE SET
given_name = CASE
WHEN merge_policy('given_name') = 'SOURCE_PRIORITY'
AND source_priority(EXCLUDED.source) > source_priority(profiles.source_given_name)
THEN EXCLUDED.given_name ELSE profiles.given_name END,
account_status = CASE
WHEN merge_policy('account_status') = 'MOST_RECENT'
AND EXCLUDED.updated_at > profiles.updated_at_status
THEN EXCLUDED.account_status ELSE profiles.account_status END,
updated_at = GREATEST(profiles.updated_at, EXCLUDED.updated_at);