Skip to main content

State Management

Architecture Overview


Zustand Stores

usePipelineStore

File: stores/usePipelineStore.ts Persistence: IndexedDB (dtx-pipeline-autosave, version 3)

Manages the full state of the pipeline designer canvas.

StateTypePurpose
pipelineConnectorPipeline | ProcessingPipelineCurrent draft pipeline
nodeConfigurationsMap<string, OperatorNodeConfiguration>Per-node operator configs
edgeConfigurationsMap<string, PipelineEdgeConfiguration>Per-edge configs
syncedNodeIdsSet<string>Nodes synced to backend
syncStatus'idle' | 'syncing' | 'synced' | 'error'Backend sync state
viewport{ x, y, zoom }Canvas viewport position

Key Actions: initializeConnectorPipeline(), initializeProcessingPipeline(), setNodes(), setEdges(), setNodeConfiguration(), updateNodeConfiguration(), loadPipeline(), clearPipeline(), markNodeAsSynced()


useWorkflowDesignerStore

File: stores/useWorkflowDesignerStore.ts Persistence: localStorage (workflow-designer-storage)

StateTypePurpose
workflowWorkflowDefinitionWorkflow metadata
versionWorkflowVersionActive version
nodesNode[]XYFlow nodes
edgesEdge[]XYFlow edges
selectedNodeIdstring | nullCurrently selected node
isDirtybooleanUnsaved changes
validationErrorsValidationError[]Current errors

Key Actions: addNode(), updateNode(), removeNode(), selectNode(), duplicateNode(), addEdgeConnection(), validate(), onNodesChange(), onEdgesChange(), onConnect()

Selectors: useSelectedNode(), useWorkflowValidation(), useWorkflowDirtyState()


useAccessManagementStore

File: stores/useAccessManagementStore.ts Persistence: sessionStorage (dtx-access-management)

StateTypePurpose
expandedGroupsSet<string>Expanded group tree nodes
selectedGroupIdstring | nullSelected group
userFiltersUserFiltersUser list filters
roleFiltersRoleFiltersRole list filters
currentUserPermissionsPermission[]Cached permissions

React Query (Server State)

// App.tsx — QueryClient configuration
const queryClient = new QueryClient({
defaultOptions: {
queries: {
staleTime: 0, // Data always refetched on mount
retry: 1, // Single retry on failure
},
},
});

Query Key Patterns

HookQuery KeyPolling
usePipelines()['pipelines', params]
useSchemas()['schemas', params]
useOperators()['operator-templates']
useOperatorConfigs()['operator-config', id]
useGenerators()['generators', params]5s
useGenerator(id)['generators', id]
useGeneratorMetrics(id)['generators', id, 'metrics']
usePools()['pools']
useUniverses()['universes']
useKafkaTopics()['kafka-topics']
useCacheManagement()['cache-keys', pattern]
useApprovalWorkflow()['workflows']
useAccessManagement()['users'], ['roles'], ['groups']
useExpressions()['expression-templates']

Mutation Pattern

// Standard mutation pattern used across all services
const mutation = useMutation({
mutationFn: (data) => service.create(data),
onSuccess: () => {
toast.success('Created successfully');
queryClient.invalidateQueries(['resource-key']);
navigate('/resource-list');
},
onError: (error) => {
toast.error(`Failed: ${error.message}`);
},
});

Key Data Models

Pipeline Types

type Pipeline = ConnectorPipeline | ProcessingPipeline;

interface ConnectorPipeline {
id: string;
name: string;
description: string;
type: 'connector';
connectorType: 'source' | 'sink';
status: PipelineStatus;
executionMode: 'streaming' | 'batch';
nodes: PipelineNode[];
edges: PipelineEdge[];
createdBy: User;
createdAt: string;
updatedAt: string;
}

type PipelineStatus = 'draft' | 'compiling' | 'compiled'
| 'deploying' | 'deployed' | 'paused' | 'cancelled' | 'failed';

Operator Types

interface OperatorTemplate {
id: string;
name: string;
description: string;
type: 'STREAM' | 'BATCH' | 'REQUEST_RESPONSE';
category: 'SOURCE' | 'SINK' | 'PROCESSOR' | 'TESTER';
subtype: string;
status: 'ACTIVE' | 'DEPRECATED';
configSchema: object; // JSON Schema for FormRenderer
}

interface OperatorNodeConfiguration {
operatorTemplateId: string;
operatorName: string;
inputSchemas: SchemaReference[];
outputSchema?: SchemaReference;
configStage: ConfigStageConfig;
logicStage?: LogicStageConfig;
testerConfig?: TesterConfig;
}

Workflow Types

interface WorkflowDefinition {
id: string;
name: string;
description: string;
status: 'draft' | 'published' | 'archived';
versions: WorkflowVersion[];
}

type WorkflowNodeType = 'start' | 'approval' | 'condition'
| 'parallel-split' | 'parallel-join' | 'notification'
| 'timer' | 'script' | 'reject' | 'loop' | 'end';

type TaskStatus = 'pending' | 'claimed' | 'completed'
| 'delegated' | 'escalated' | 'expired' | 'cancelled';

Schema Types

interface SchemaResponse {
id: string;
name: string;
groupId: string;
description: string;
format: string;
type: string;
state: string;
latestVersion: number;
}

interface FieldResponse {
name: string;
type: string;
nullable: boolean;
pii: boolean;
description?: string;
}

Access Management Types

interface User {
id: string;
username: string;
email: string;
firstName: string;
lastName: string;
roles: Role[];
groups: UserGroup[];
enabled: boolean;
}

interface Role {
id: string;
name: string;
description: string;
permissions: Permission[];
}

interface UserGroup {
id: string;
name: string;
path: string;
parentId?: string;
children: UserGroup[]; // Recursive tree structure
}

Standard API Response

interface StandardApiResponse<T> {
status: 'success' | 'error';
data: T;
message?: string;
errors?: FieldError[];
}

Custom Hooks

HookPurpose
useDebounceGeneric value debounce with configurable delay
useAutoSaveThrottled local save (3s) + debounced server sync (10s)
usePermissionCheckhasPermission(), hasAnyPermission(), hasAllPermissions(), canAccessResource()
useKafkaTopicsReact Query hook for Kafka topic CRUD
useSyntheticDataReact Query hooks for generators, pools, universes, metrics
useCacheManagementReact Query hook for cache key operations
useMetricsHistoryTime-series metrics history for charts
useCompilationProgressPipeline compilation progress with polling
useExecutionModeStreaming vs batch mode management
usePipelineSavePipeline serialization + conflict resolution
usePipelineRecoveryRecover pipelines from IndexedDB after failures
usePreviewPropagationPreview data propagation through pipeline DAG
useActiveSDGGeneratorsActive generators with polling for real-time status