// agent/event-driven-graph.ts
import { StateGraph, START, END } from '@langchain/langgraph';
import { RunnableLambda } from '@langchain/core/runnables';
import { z } from 'zod';
// Event types
const EventSchema = z.discriminatedUnion('type', [
z.object({
type: z.literal('user_message'),
content: z.string(),
userId: z.string(),
}),
z.object({
type: z.literal('system_alert'),
severity: z.enum(['low', 'medium', 'high']),
message: z.string(),
}),
z.object({
type: z.literal('data_update'),
table: z.string(),
recordId: z.string(),
changes: z.record(z.any()),
}),
]);
// State with event queue
const EventDrivenState = z.object({
events: z.array(EventSchema),
processed_events: z.array(z.string()), // event IDs
current_event: EventSchema.optional(),
responses: z.array(z.any()),
context: z.record(z.any()),
});
const eventDrivenWorkflow = new StateGraph(EventDrivenState)
.addNode('event_router', async (state) => {
if (state.events.length === 0) {
return state; // No events to process
}
const event = state.events[0];
const remainingEvents = state.events.slice(1);
return {
...state,
events: remainingEvents,
current_event: event,
};
})
.addNode('process_user_message', async (state) => {
const event = state.current_event;
if (event?.type !== 'user_message') return state;
// Process user message
const response = await handleUserMessage(event, state.context);
return {
...state,
responses: [...state.responses, response],
processed_events: [...state.processed_events, generateEventId(event)],
};
})
.addNode('process_system_alert', async (state) => {
const event = state.current_event;
if (event?.type !== 'system_alert') return state;
// Process system alert
const response = await handleSystemAlert(event, state.context);
return {
...state,
responses: [...state.responses, response],
processed_events: [...state.processed_events, generateEventId(event)],
};
})
.addNode('process_data_update', async (state) => {
const event = state.current_event;
if (event?.type !== 'data_update') return state;
// Process data update
const updatedContext = await handleDataUpdate(event, state.context);
return {
...state,
context: { ...state.context, ...updatedContext },
processed_events: [...state.processed_events, generateEventId(event)],
};
})
.addConditionalEdges('event_router', (state) => {
const event = state.current_event;
if (!event) return END;
switch (event.type) {
case 'user_message':
return 'process_user_message';
case 'system_alert':
return 'process_system_alert';
case 'data_update':
return 'process_data_update';
default:
return 'event_router'; // Skip unknown events
}
})
.addEdge('process_user_message', 'event_router')
.addEdge('process_system_alert', 'event_router')
.addEdge('process_data_update', 'event_router')
.addEdge(START, 'event_router');
async function handleUserMessage(event: any, context: any) {
// Implement user message handling
return {
type: 'response',
content: `Processed message: ${event.content}`,
userId: event.userId,
};
}
async function handleSystemAlert(event: any, context: any) {
// Implement alert handling based on severity
const priority = event.severity === 'high' ? 'urgent' : 'normal';
return {
type: 'alert_acknowledgment',
message: `Alert processed with ${priority} priority: ${event.message}`,
severity: event.severity,
};
}
async function handleDataUpdate(event: any, context: any) {
// Update context based on data changes
return {
[event.table]: {
...context[event.table],
[event.recordId]: {
...context[event.table]?.[event.recordId],
...event.changes,
last_updated: new Date().toISOString(),
},
},
};
}
function generateEventId(event: any): string {
return `${event.type}_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
export const eventDrivenGraph = eventDrivenWorkflow.compile();