Skip to main content

Concurrent Update Handling

The Challenge

Multiple pipelines update same DTX_ID simultaneously: CRM (name/address), Billing (status/balance), CDR (traits), Segments (memberships).

Key Mechanisms

  • Kafka Partitioning — same DTX_ID → same partition
  • Flink Keyed Processing — single-threaded per DTX_ID
  • Merge Policies — attribute-level conflict resolution
  • Atomic Upsert — INSERT ON CONFLICT UPDATE

Merge Policies

PolicyLogicUse Case
MOST_RECENTLatest timestamp winsDynamic attrs (status, balance)
SOURCE_PRIORITYOrdered source preferenceAuthoritative sources (CRM > Web)
HIGHEST_CONFIDENCEBest confidence winsProbabilistic data
COALESCEFirst non-null winsGap filling
CUSTOMCustom logicComplex business rules

Scenarios

AttributeConflictPolicyResolution
given_nameCRM: Ahmed, Web: AhmadSOURCE_PRIORITYKeep CRM
account.statusBilling 10:00: ACTIVE, CRM 10:05: SUSPENDEDMOST_RECENTKeep SUSPENDED
emailOld exists, new from formMOST_RECENTReplace
phoneCRM has value, Web nullCOALESCEKeep CRM
home_locationSignaling 85%, DPI 72%HIGHEST_CONFIDENCEKeep signaling

Upsert SQL

INSERT INTO profiles (tenant_id, dtx_id, given_name, account_status, updated_at)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT (tenant_id, dtx_id) DO UPDATE SET
given_name = CASE
WHEN merge_policy('given_name') = 'SOURCE_PRIORITY'
AND source_priority(EXCLUDED.source) > source_priority(profiles.source_given_name)
THEN EXCLUDED.given_name ELSE profiles.given_name END,
account_status = CASE
WHEN merge_policy('account_status') = 'MOST_RECENT'
AND EXCLUDED.updated_at > profiles.updated_at_status
THEN EXCLUDED.account_status ELSE profiles.account_status END,
updated_at = GREATEST(profiles.updated_at, EXCLUDED.updated_at);