Skip to content

Commit bf9b3fd

Browse files
icecrasher321Vikhyath Mondreti
andauthored
fix(conn pool separation): conn pool separation test (#568)
* add logs to see freestyle payload * fix lint * db connection config changes * fix lint * db client fix + workspace switching * fix lint * remove unecessary logging * revert route file --------- Co-authored-by: Vikhyath Mondreti <vikhyathmondreti@Vikhyaths-Air.attlocal.net>
1 parent e452396 commit bf9b3fd

7 files changed

Lines changed: 109 additions & 64 deletions

File tree

apps/sim/app/workspace/[workspaceId]/w/components/sidebar/components/workspace-header/workspace-header.tsx

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,7 @@ export const WorkspaceHeader = React.memo<WorkspaceHeaderProps>(
333333
}, [sessionData?.user?.id, fetchSubscriptionStatus, fetchWorkspaces])
334334

335335
const switchWorkspace = useCallback(
336-
(workspace: Workspace) => {
336+
async (workspace: Workspace) => {
337337
// If already on this workspace, close dropdown and do nothing else
338338
if (activeWorkspace?.id === workspace.id) {
339339
setWorkspaceDropdownOpen(false)
@@ -344,9 +344,9 @@ export const WorkspaceHeader = React.memo<WorkspaceHeaderProps>(
344344
setWorkspaceDropdownOpen(false)
345345

346346
// Use full workspace switch which now handles localStorage automatically
347-
switchToWorkspace(workspace.id)
347+
await switchToWorkspace(workspace.id)
348348

349-
// Update URL to include workspace ID
349+
// Update URL to include workspace ID - only after workspace switch completes
350350
router.push(`/workspace/${workspace.id}/w`)
351351
},
352352
[activeWorkspace?.id, switchToWorkspace, router, setWorkspaceDropdownOpen]
@@ -374,9 +374,9 @@ export const WorkspaceHeader = React.memo<WorkspaceHeaderProps>(
374374

375375
// Use switchToWorkspace to properly load workflows for the new workspace
376376
// This will clear existing workflows, set loading state, and fetch workflows from DB
377-
switchToWorkspace(newWorkspace.id)
377+
await switchToWorkspace(newWorkspace.id)
378378

379-
// Update URL to include new workspace ID
379+
// Update URL to include new workspace ID - only after workspace switch completes
380380
router.push(`/workspace/${newWorkspace.id}/w`)
381381
}
382382
} catch (err) {
@@ -464,11 +464,15 @@ export const WorkspaceHeader = React.memo<WorkspaceHeaderProps>(
464464
setWorkspaces(updatedWorkspaces)
465465

466466
// If deleted workspace was active, switch to another workspace
467-
if (activeWorkspace?.id === id && updatedWorkspaces.length > 0) {
468-
// Use the specialized method for handling workspace deletion
469-
const newWorkspaceId = updatedWorkspaces[0].id
470-
useWorkflowRegistry.getState().handleWorkspaceDeletion(newWorkspaceId)
471-
setActiveWorkspace(updatedWorkspaces[0])
467+
if (activeWorkspace?.id === id) {
468+
const newWorkspace = updatedWorkspaces[0]
469+
setActiveWorkspace(newWorkspace)
470+
471+
// Switch to the new workspace (this handles all workflow state management)
472+
await switchToWorkspace(newWorkspace.id)
473+
474+
// Navigate to the new workspace - only after workspace switch completes
475+
router.push(`/workspace/${newWorkspace.id}/w`)
472476
}
473477

474478
setWorkspaceDropdownOpen(false)

apps/sim/contexts/socket-context.tsx

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -427,15 +427,15 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
427427

428428
// Check if we already have a pending timeout for this block
429429
if (!positionUpdateTimeouts.current.has(blockId)) {
430-
// Schedule emission with light throttling (120fps = ~8ms)
430+
// Schedule emission with optimized throttling (30fps = ~33ms) to reduce DB load
431431
const timeoutId = window.setTimeout(() => {
432432
const latestUpdate = pendingPositionUpdates.current.get(blockId)
433433
if (latestUpdate) {
434434
socket.emit('workflow-operation', latestUpdate)
435435
pendingPositionUpdates.current.delete(blockId)
436436
}
437437
positionUpdateTimeouts.current.delete(blockId)
438-
}, 8) // 120fps for smooth movement
438+
}, 33) // 30fps - good balance between smoothness and DB performance
439439

440440
positionUpdateTimeouts.current.set(blockId, timeoutId)
441441
}
@@ -475,14 +475,14 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
475475
[socket, currentWorkflowId]
476476
)
477477

478-
// Minimal cursor throttling (reduced from 30fps to 120fps)
478+
// Cursor throttling optimized for database connection health
479479
const lastCursorEmit = useRef(0)
480480
const emitCursorUpdate = useCallback(
481481
(cursor: { x: number; y: number }) => {
482482
if (socket && currentWorkflowId) {
483483
const now = performance.now()
484-
// Very light throttling at 120fps (8ms) to prevent excessive spam
485-
if (now - lastCursorEmit.current >= 8) {
484+
// Reduced to 30fps (33ms) to reduce database load while maintaining smooth UX
485+
if (now - lastCursorEmit.current >= 33) {
486486
socket.emit('cursor-update', { cursor })
487487
lastCursorEmit.current = now
488488
}

apps/sim/db/index.ts

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,31 @@ import * as schema from './schema'
77
// In development, use the direct DATABASE_URL
88
const connectionString = env.POSTGRES_URL ?? env.DATABASE_URL
99

10-
const drizzleClient = drizzle(
11-
postgres(connectionString, {
12-
prepare: false, // Disable prefetch as it is not supported for "Transaction" pool mode
13-
idle_timeout: 30, // Keep connections alive for 30 seconds when idle
14-
connect_timeout: 30, // Timeout after 30 seconds when connecting
15-
}),
16-
{ schema }
17-
)
10+
/**
11+
* Connection Pool Allocation Strategy
12+
*
13+
* Main App (this file): 3 connections per instance
14+
* Socket Server Operations: 2 connections
15+
* Socket Server Room Manager: 1 connection
16+
*
17+
* With ~3-4 Vercel serverless instances typically active:
18+
* - Main app: 3 × 4 = 12 connections
19+
* - Socket server: 2 + 1 = 3 connections
20+
* - Buffer: 5 connections for spikes/other services
21+
* - Total: ~20 connections (at capacity limit)
22+
*
23+
* This conservative allocation prevents pool exhaustion while maintaining performance.
24+
*/
25+
26+
const postgresClient = postgres(connectionString, {
27+
prepare: false, // Disable prefetch as it is not supported for "Transaction" pool mode
28+
idle_timeout: 20, // Reduce idle timeout to 20 seconds to free up connections faster
29+
connect_timeout: 10, // Reduce connect timeout to 10 seconds
30+
max: 3, // Conservative limit - with multiple serverless functions, this prevents pool exhaustion
31+
onnotice: () => {}, // Disable notices to reduce noise
32+
})
33+
34+
const drizzleClient = drizzle(postgresClient, { schema })
1835

1936
declare global {
2037
var database: PostgresJsDatabase<typeof schema> | undefined

apps/sim/socket-server/database/operations.ts

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,31 @@
11
import { and, eq, or } from 'drizzle-orm'
2-
import { db } from '../../db'
2+
import { drizzle } from 'drizzle-orm/postgres-js'
3+
import postgres from 'postgres'
4+
import * as schema from '../../db/schema'
35
import { workflow, workflowBlocks, workflowEdges, workflowSubflows } from '../../db/schema'
6+
import { env } from '../../lib/env'
47
import { createLogger } from '../../lib/logs/console-logger'
58
import { loadWorkflowFromNormalizedTables } from '../../lib/workflows/db-helpers'
69

710
const logger = createLogger('SocketDatabase')
811

12+
// Create dedicated database connection for socket server with optimized settings
13+
const connectionString = env.POSTGRES_URL ?? env.DATABASE_URL
14+
const socketDb = drizzle(
15+
postgres(connectionString, {
16+
prepare: false,
17+
idle_timeout: 10, // Shorter idle timeout for socket operations
18+
connect_timeout: 5, // Faster connection timeout
19+
max: 2, // Very small pool for socket server to avoid exhausting Supabase limit
20+
onnotice: () => {}, // Disable notices
21+
debug: false, // Disable debug for socket operations
22+
}),
23+
{ schema }
24+
)
25+
26+
// Use dedicated connection for socket operations, fallback to shared db for compatibility
27+
const db = socketDb
28+
929
// Constants
1030
const DEFAULT_LOOP_ITERATIONS = 5
1131

@@ -115,9 +135,20 @@ export async function getWorkflowState(workflowId: string) {
115135

116136
// Persist workflow operation
117137
export async function persistWorkflowOperation(workflowId: string, operation: any) {
138+
const startTime = Date.now()
118139
try {
119140
const { operation: op, target, payload, timestamp, userId } = operation
120141

142+
// Log high-frequency operations for monitoring
143+
if (op === 'update-position' && Math.random() < 0.01) {
144+
// Log 1% of position updates
145+
logger.debug('Socket DB operation sample:', {
146+
operation: op,
147+
target,
148+
workflowId: `${workflowId.substring(0, 8)}...`,
149+
})
150+
}
151+
121152
await db.transaction(async (tx) => {
122153
// Update the workflow's last modified timestamp first
123154
await tx
@@ -140,9 +171,22 @@ export async function persistWorkflowOperation(workflowId: string, operation: an
140171
throw new Error(`Unknown operation target: ${target}`)
141172
}
142173
})
174+
175+
// Log slow operations for monitoring
176+
const duration = Date.now() - startTime
177+
if (duration > 100) {
178+
// Log operations taking more than 100ms
179+
logger.warn('Slow socket DB operation:', {
180+
operation: operation.operation,
181+
target: operation.target,
182+
duration: `${duration}ms`,
183+
workflowId: `${workflowId.substring(0, 8)}...`,
184+
})
185+
}
143186
} catch (error) {
187+
const duration = Date.now() - startTime
144188
logger.error(
145-
`❌ Error persisting workflow operation (${operation.operation} on ${operation.target}):`,
189+
`❌ Error persisting workflow operation (${operation.operation} on ${operation.target}) after ${duration}ms:`,
146190
error
147191
)
148192
throw error

apps/sim/socket-server/rooms/manager.ts

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,25 @@
11
import { and, eq, isNull } from 'drizzle-orm'
2+
import { drizzle } from 'drizzle-orm/postgres-js'
3+
import postgres from 'postgres'
24
import type { Server } from 'socket.io'
3-
import { db } from '../../db'
5+
import * as schema from '../../db/schema'
46
import { workflowBlocks, workflowEdges } from '../../db/schema'
7+
import { env } from '../../lib/env'
58
import { createLogger } from '../../lib/logs/console-logger'
69

10+
// Create dedicated database connection for room manager
11+
const connectionString = env.POSTGRES_URL ?? env.DATABASE_URL
12+
const db = drizzle(
13+
postgres(connectionString, {
14+
prepare: false,
15+
idle_timeout: 15,
16+
connect_timeout: 5,
17+
max: 1, // Minimal pool for room operations to conserve connections
18+
onnotice: () => {},
19+
}),
20+
{ schema }
21+
)
22+
723
const logger = createLogger('RoomManager')
824

925
export interface UserPresence {
@@ -80,7 +96,7 @@ export class RoomManager {
8096
})
8197

8298
const socketsToDisconnect: string[] = []
83-
room.users.forEach((presence, socketId) => {
99+
room.users.forEach((_presence, socketId) => {
84100
socketsToDisconnect.push(socketId)
85101
})
86102

apps/sim/stores/workflows/registry/store.ts

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -267,41 +267,6 @@ export const useWorkflowRegistry = create<WorkflowRegistry>()(
267267
await fetchWorkflowsFromDB(workspaceId)
268268
},
269269

270-
// Handle cleanup on workspace deletion
271-
handleWorkspaceDeletion: async (newWorkspaceId: string) => {
272-
// Set transition state
273-
setWorkspaceTransitioning(true)
274-
275-
try {
276-
logger.info(`Switching to new workspace after deletion: ${newWorkspaceId}`)
277-
278-
// Reset all workflow state
279-
resetWorkflowStores()
280-
281-
// Set loading state while we fetch workflows
282-
set({
283-
isLoading: true,
284-
workflows: {},
285-
activeWorkflowId: null,
286-
})
287-
288-
// Properly await workflow fetching to prevent race conditions
289-
await fetchWorkflowsFromDB(newWorkspaceId)
290-
291-
set({ isLoading: false })
292-
logger.info(`Successfully switched to workspace after deletion: ${newWorkspaceId}`)
293-
} catch (error) {
294-
logger.error('Error fetching workflows after workspace deletion:', {
295-
error,
296-
workspaceId: newWorkspaceId,
297-
})
298-
set({ isLoading: false, error: 'Failed to load workspace data' })
299-
} finally {
300-
// End transition state
301-
setWorkspaceTransitioning(false)
302-
}
303-
},
304-
305270
// Switch to workspace with comprehensive error handling and loading states
306271
switchToWorkspace: async (workspaceId: string) => {
307272
// Prevent multiple simultaneous transitions

apps/sim/stores/workflows/registry/types.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,8 @@ export interface WorkflowRegistryState {
3232
export interface WorkflowRegistryActions {
3333
setLoading: (loading: boolean) => void
3434
setActiveWorkflow: (id: string) => Promise<void>
35-
switchToWorkspace: (id: string) => void
35+
switchToWorkspace: (id: string) => Promise<void>
3636
loadWorkflows: (workspaceId?: string) => Promise<void>
37-
handleWorkspaceDeletion: (newWorkspaceId: string) => void
3837
removeWorkflow: (id: string) => Promise<void>
3938
updateWorkflow: (id: string, metadata: Partial<WorkflowMetadata>) => Promise<void>
4039
createWorkflow: (options?: {

0 commit comments

Comments
 (0)