State Management
Architecture Overview
Zustand Stores
usePipelineStore
File: stores/usePipelineStore.ts
Persistence: IndexedDB (dtx-pipeline-autosave, version 3)
Manages the full state of the pipeline designer canvas.
| State | Type | Purpose |
|---|---|---|
pipeline | ConnectorPipeline | ProcessingPipeline | Current draft pipeline |
nodeConfigurations | Map<string, OperatorNodeConfiguration> | Per-node operator configs |
edgeConfigurations | Map<string, PipelineEdgeConfiguration> | Per-edge configs |
syncedNodeIds | Set<string> | Nodes synced to backend |
syncStatus | 'idle' | 'syncing' | 'synced' | 'error' | Backend sync state |
viewport | { x, y, zoom } | Canvas viewport position |
Key Actions: initializeConnectorPipeline(), initializeProcessingPipeline(), setNodes(), setEdges(), setNodeConfiguration(), updateNodeConfiguration(), loadPipeline(), clearPipeline(), markNodeAsSynced()
useWorkflowDesignerStore
File: stores/useWorkflowDesignerStore.ts
Persistence: localStorage (workflow-designer-storage)
| State | Type | Purpose |
|---|---|---|
workflow | WorkflowDefinition | Workflow metadata |
version | WorkflowVersion | Active version |
nodes | Node[] | XYFlow nodes |
edges | Edge[] | XYFlow edges |
selectedNodeId | string | null | Currently selected node |
isDirty | boolean | Unsaved changes |
validationErrors | ValidationError[] | Current errors |
Key Actions: addNode(), updateNode(), removeNode(), selectNode(), duplicateNode(), addEdgeConnection(), validate(), onNodesChange(), onEdgesChange(), onConnect()
Selectors: useSelectedNode(), useWorkflowValidation(), useWorkflowDirtyState()
useAccessManagementStore
File: stores/useAccessManagementStore.ts
Persistence: sessionStorage (dtx-access-management)
| State | Type | Purpose |
|---|---|---|
expandedGroups | Set<string> | Expanded group tree nodes |
selectedGroupId | string | null | Selected group |
userFilters | UserFilters | User list filters |
roleFilters | RoleFilters | Role list filters |
currentUserPermissions | Permission[] | Cached permissions |
React Query (Server State)
// App.tsx — QueryClient configuration
const queryClient = new QueryClient({
defaultOptions: {
queries: {
staleTime: 0, // Data always refetched on mount
retry: 1, // Single retry on failure
},
},
});
Query Key Patterns
| Hook | Query Key | Polling |
|---|---|---|
usePipelines() | ['pipelines', params] | — |
useSchemas() | ['schemas', params] | — |
useOperators() | ['operator-templates'] | — |
useOperatorConfigs() | ['operator-config', id] | — |
useGenerators() | ['generators', params] | 5s |
useGenerator(id) | ['generators', id] | — |
useGeneratorMetrics(id) | ['generators', id, 'metrics'] | — |
usePools() | ['pools'] | — |
useUniverses() | ['universes'] | — |
useKafkaTopics() | ['kafka-topics'] | — |
useCacheManagement() | ['cache-keys', pattern] | — |
useApprovalWorkflow() | ['workflows'] | — |
useAccessManagement() | ['users'], ['roles'], ['groups'] | — |
useExpressions() | ['expression-templates'] | — |
Mutation Pattern
// Standard mutation pattern used across all services
const mutation = useMutation({
mutationFn: (data) => service.create(data),
onSuccess: () => {
toast.success('Created successfully');
queryClient.invalidateQueries(['resource-key']);
navigate('/resource-list');
},
onError: (error) => {
toast.error(`Failed: ${error.message}`);
},
});
Key Data Models
Pipeline Types
type Pipeline = ConnectorPipeline | ProcessingPipeline;
interface ConnectorPipeline {
id: string;
name: string;
description: string;
type: 'connector';
connectorType: 'source' | 'sink';
status: PipelineStatus;
executionMode: 'streaming' | 'batch';
nodes: PipelineNode[];
edges: PipelineEdge[];
createdBy: User;
createdAt: string;
updatedAt: string;
}
type PipelineStatus = 'draft' | 'compiling' | 'compiled'
| 'deploying' | 'deployed' | 'paused' | 'cancelled' | 'failed';
Operator Types
interface OperatorTemplate {
id: string;
name: string;
description: string;
type: 'STREAM' | 'BATCH' | 'REQUEST_RESPONSE';
category: 'SOURCE' | 'SINK' | 'PROCESSOR' | 'TESTER';
subtype: string;
status: 'ACTIVE' | 'DEPRECATED';
configSchema: object; // JSON Schema for FormRenderer
}
interface OperatorNodeConfiguration {
operatorTemplateId: string;
operatorName: string;
inputSchemas: SchemaReference[];
outputSchema?: SchemaReference;
configStage: ConfigStageConfig;
logicStage?: LogicStageConfig;
testerConfig?: TesterConfig;
}
Workflow Types
interface WorkflowDefinition {
id: string;
name: string;
description: string;
status: 'draft' | 'published' | 'archived';
versions: WorkflowVersion[];
}
type WorkflowNodeType = 'start' | 'approval' | 'condition'
| 'parallel-split' | 'parallel-join' | 'notification'
| 'timer' | 'script' | 'reject' | 'loop' | 'end';
type TaskStatus = 'pending' | 'claimed' | 'completed'
| 'delegated' | 'escalated' | 'expired' | 'cancelled';
Schema Types
interface SchemaResponse {
id: string;
name: string;
groupId: string;
description: string;
format: string;
type: string;
state: string;
latestVersion: number;
}
interface FieldResponse {
name: string;
type: string;
nullable: boolean;
pii: boolean;
description?: string;
}
Access Management Types
interface User {
id: string;
username: string;
email: string;
firstName: string;
lastName: string;
roles: Role[];
groups: UserGroup[];
enabled: boolean;
}
interface Role {
id: string;
name: string;
description: string;
permissions: Permission[];
}
interface UserGroup {
id: string;
name: string;
path: string;
parentId?: string;
children: UserGroup[]; // Recursive tree structure
}
Standard API Response
interface StandardApiResponse<T> {
status: 'success' | 'error';
data: T;
message?: string;
errors?: FieldError[];
}
Custom Hooks
| Hook | Purpose |
|---|---|
useDebounce | Generic value debounce with configurable delay |
useAutoSave | Throttled local save (3s) + debounced server sync (10s) |
usePermissionCheck | hasPermission(), hasAnyPermission(), hasAllPermissions(), canAccessResource() |
useKafkaTopics | React Query hook for Kafka topic CRUD |
useSyntheticData | React Query hooks for generators, pools, universes, metrics |
useCacheManagement | React Query hook for cache key operations |
useMetricsHistory | Time-series metrics history for charts |
useCompilationProgress | Pipeline compilation progress with polling |
useExecutionMode | Streaming vs batch mode management |
usePipelineSave | Pipeline serialization + conflict resolution |
usePipelineRecovery | Recover pipelines from IndexedDB after failures |
usePreviewPropagation | Preview data propagation through pipeline DAG |
useActiveSDGGenerators | Active generators with polling for real-time status |