Skip to content

Commit be59d2b

Browse files
feat(jobs): Add data retention jobs
Add 3 cron-triggered cleanup jobs dispatched via Trigger.dev (or inline fallback): - cleanup-soft-deletes: hard-deletes soft-deleted workspace resources past retention - cleanup-logs: deletes expired workflow execution logs + S3 files - cleanup-tasks: deletes expired copilot chats, runs, feedback, inbox tasks Enterprise admins can configure per-workspace retention via Settings > Data Retention. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> # Conflicts: # packages/db/migrations/meta/0192_snapshot.json # packages/db/migrations/meta/_journal.json # packages/db/schema.ts
1 parent d9209f9 commit be59d2b

File tree

23 files changed

+17850
-202
lines changed

23 files changed

+17850
-202
lines changed
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import { createLogger } from '@sim/logger'
2+
import { type NextRequest, NextResponse } from 'next/server'
3+
import { verifyCronAuth } from '@/lib/auth/internal'
4+
import { dispatchCleanupJobs } from '@/lib/billing/cleanup-dispatcher'
5+
6+
export const dynamic = 'force-dynamic'
7+
8+
const logger = createLogger('SoftDeleteCleanupAPI')
9+
10+
export async function GET(request: NextRequest) {
11+
try {
12+
const authError = verifyCronAuth(request, 'soft-delete cleanup')
13+
if (authError) return authError
14+
15+
const result = await dispatchCleanupJobs('cleanup-soft-deletes')
16+
17+
logger.info('Soft-delete cleanup jobs dispatched', result)
18+
19+
return NextResponse.json({ triggered: true, ...result })
20+
} catch (error) {
21+
logger.error('Failed to dispatch soft-delete cleanup jobs:', { error })
22+
return NextResponse.json({ error: 'Failed to dispatch soft-delete cleanup' }, { status: 500 })
23+
}
24+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import { createLogger } from '@sim/logger'
2+
import { type NextRequest, NextResponse } from 'next/server'
3+
import { verifyCronAuth } from '@/lib/auth/internal'
4+
import { dispatchCleanupJobs } from '@/lib/billing/cleanup-dispatcher'
5+
6+
export const dynamic = 'force-dynamic'
7+
8+
const logger = createLogger('TaskCleanupAPI')
9+
10+
export async function GET(request: NextRequest) {
11+
try {
12+
const authError = verifyCronAuth(request, 'task cleanup')
13+
if (authError) return authError
14+
15+
const result = await dispatchCleanupJobs('cleanup-tasks')
16+
17+
logger.info('Task cleanup jobs dispatched', result)
18+
19+
return NextResponse.json({ triggered: true, ...result })
20+
} catch (error) {
21+
logger.error('Failed to dispatch task cleanup jobs:', { error })
22+
return NextResponse.json({ error: 'Failed to dispatch task cleanup' }, { status: 500 })
23+
}
24+
}
Lines changed: 7 additions & 184 deletions
Original file line numberDiff line numberDiff line change
@@ -1,201 +1,24 @@
1-
import { db } from '@sim/db'
2-
import { subscription, workflowExecutionLogs, workspace } from '@sim/db/schema'
31
import { createLogger } from '@sim/logger'
4-
import { and, eq, inArray, isNull, lt } from 'drizzle-orm'
52
import { type NextRequest, NextResponse } from 'next/server'
63
import { verifyCronAuth } from '@/lib/auth/internal'
7-
import { sqlIsPaid } from '@/lib/billing/plan-helpers'
8-
import { ENTITLED_SUBSCRIPTION_STATUSES } from '@/lib/billing/subscriptions/utils'
9-
import { env } from '@/lib/core/config/env'
10-
import { snapshotService } from '@/lib/logs/execution/snapshot/service'
11-
import { isUsingCloudStorage, StorageService } from '@/lib/uploads'
4+
import { dispatchCleanupJobs } from '@/lib/billing/cleanup-dispatcher'
125

136
export const dynamic = 'force-dynamic'
147

158
const logger = createLogger('LogsCleanupAPI')
169

17-
const BATCH_SIZE = 2000
18-
1910
export async function GET(request: NextRequest) {
2011
try {
2112
const authError = verifyCronAuth(request, 'logs cleanup')
22-
if (authError) {
23-
return authError
24-
}
25-
26-
const retentionDate = new Date()
27-
retentionDate.setDate(retentionDate.getDate() - Number(env.FREE_PLAN_LOG_RETENTION_DAYS || '7'))
28-
29-
const freeWorkspacesSubquery = db
30-
.select({ id: workspace.id })
31-
.from(workspace)
32-
.leftJoin(
33-
subscription,
34-
and(
35-
eq(subscription.referenceId, workspace.billedAccountUserId),
36-
inArray(subscription.status, ENTITLED_SUBSCRIPTION_STATUSES),
37-
sqlIsPaid(subscription.plan)
38-
)
39-
)
40-
.where(isNull(subscription.id))
41-
42-
const results = {
43-
enhancedLogs: {
44-
total: 0,
45-
archived: 0,
46-
archiveFailed: 0,
47-
deleted: 0,
48-
deleteFailed: 0,
49-
},
50-
files: {
51-
total: 0,
52-
deleted: 0,
53-
deleteFailed: 0,
54-
},
55-
snapshots: {
56-
cleaned: 0,
57-
cleanupFailed: 0,
58-
},
59-
}
60-
61-
const startTime = Date.now()
62-
const MAX_BATCHES = 10
63-
64-
let batchesProcessed = 0
65-
let hasMoreLogs = true
66-
67-
logger.info('Starting enhanced logs cleanup for free-plan workspaces')
68-
69-
while (hasMoreLogs && batchesProcessed < MAX_BATCHES) {
70-
const oldEnhancedLogs = await db
71-
.select({
72-
id: workflowExecutionLogs.id,
73-
workflowId: workflowExecutionLogs.workflowId,
74-
executionId: workflowExecutionLogs.executionId,
75-
stateSnapshotId: workflowExecutionLogs.stateSnapshotId,
76-
level: workflowExecutionLogs.level,
77-
trigger: workflowExecutionLogs.trigger,
78-
startedAt: workflowExecutionLogs.startedAt,
79-
endedAt: workflowExecutionLogs.endedAt,
80-
totalDurationMs: workflowExecutionLogs.totalDurationMs,
81-
executionData: workflowExecutionLogs.executionData,
82-
cost: workflowExecutionLogs.cost,
83-
files: workflowExecutionLogs.files,
84-
createdAt: workflowExecutionLogs.createdAt,
85-
})
86-
.from(workflowExecutionLogs)
87-
.where(
88-
and(
89-
inArray(workflowExecutionLogs.workspaceId, freeWorkspacesSubquery),
90-
lt(workflowExecutionLogs.startedAt, retentionDate)
91-
)
92-
)
93-
.limit(BATCH_SIZE)
94-
95-
results.enhancedLogs.total += oldEnhancedLogs.length
96-
97-
for (const log of oldEnhancedLogs) {
98-
const today = new Date().toISOString().split('T')[0]
99-
100-
const enhancedLogKey = `logs/archived/${today}/${log.id}.json`
101-
const enhancedLogData = JSON.stringify({
102-
...log,
103-
archivedAt: new Date().toISOString(),
104-
logType: 'enhanced',
105-
})
106-
107-
try {
108-
await StorageService.uploadFile({
109-
file: Buffer.from(enhancedLogData),
110-
fileName: enhancedLogKey,
111-
contentType: 'application/json',
112-
context: 'logs',
113-
preserveKey: true,
114-
customKey: enhancedLogKey,
115-
metadata: {
116-
logId: String(log.id),
117-
workflowId: String(log.workflowId ?? ''),
118-
executionId: String(log.executionId),
119-
logType: 'enhanced',
120-
archivedAt: new Date().toISOString(),
121-
},
122-
})
123-
124-
results.enhancedLogs.archived++
125-
126-
if (isUsingCloudStorage() && log.files && Array.isArray(log.files)) {
127-
for (const file of log.files) {
128-
if (file && typeof file === 'object' && file.key) {
129-
results.files.total++
130-
try {
131-
await StorageService.deleteFile({
132-
key: file.key,
133-
context: 'execution',
134-
})
135-
results.files.deleted++
136-
137-
// Also delete from workspace_files table
138-
const { deleteFileMetadata } = await import('@/lib/uploads/server/metadata')
139-
await deleteFileMetadata(file.key)
140-
141-
logger.info(`Deleted execution file: ${file.key}`)
142-
} catch (fileError) {
143-
results.files.deleteFailed++
144-
logger.error(`Failed to delete file ${file.key}:`, { fileError })
145-
}
146-
}
147-
}
148-
}
149-
150-
try {
151-
const deleteResult = await db
152-
.delete(workflowExecutionLogs)
153-
.where(eq(workflowExecutionLogs.id, log.id))
154-
.returning({ id: workflowExecutionLogs.id })
155-
156-
if (deleteResult.length > 0) {
157-
results.enhancedLogs.deleted++
158-
} else {
159-
results.enhancedLogs.deleteFailed++
160-
logger.warn(`Failed to delete log ${log.id} after archiving: No rows deleted`)
161-
}
162-
} catch (deleteError) {
163-
results.enhancedLogs.deleteFailed++
164-
logger.error(`Error deleting log ${log.id} after archiving:`, { deleteError })
165-
}
166-
} catch (archiveError) {
167-
results.enhancedLogs.archiveFailed++
168-
logger.error(`Failed to archive log ${log.id}:`, { archiveError })
169-
}
170-
}
171-
172-
batchesProcessed++
173-
hasMoreLogs = oldEnhancedLogs.length === BATCH_SIZE
174-
175-
logger.info(`Processed logs batch ${batchesProcessed}: ${oldEnhancedLogs.length} logs`)
176-
}
13+
if (authError) return authError
17714

178-
try {
179-
const snapshotRetentionDays = Number(env.FREE_PLAN_LOG_RETENTION_DAYS || '7') + 1 // Keep snapshots 1 day longer
180-
const cleanedSnapshots = await snapshotService.cleanupOrphanedSnapshots(snapshotRetentionDays)
181-
results.snapshots.cleaned = cleanedSnapshots
182-
logger.info(`Cleaned up ${cleanedSnapshots} orphaned snapshots`)
183-
} catch (snapshotError) {
184-
results.snapshots.cleanupFailed = 1
185-
logger.error('Error cleaning up orphaned snapshots:', { snapshotError })
186-
}
15+
const result = await dispatchCleanupJobs('cleanup-logs')
18716

188-
const timeElapsed = (Date.now() - startTime) / 1000
189-
const reachedLimit = batchesProcessed >= MAX_BATCHES && hasMoreLogs
17+
logger.info('Log cleanup jobs dispatched', result)
19018

191-
return NextResponse.json({
192-
message: `Processed ${batchesProcessed} enhanced log batches (${results.enhancedLogs.total} logs, ${results.files.total} files) in ${timeElapsed.toFixed(2)}s${reachedLimit ? ' (batch limit reached)' : ''}`,
193-
results,
194-
complete: !hasMoreLogs,
195-
batchLimitReached: reachedLimit,
196-
})
19+
return NextResponse.json({ triggered: true, ...result })
19720
} catch (error) {
198-
logger.error('Error in log cleanup process:', { error })
199-
return NextResponse.json({ error: 'Failed to process log cleanup' }, { status: 500 })
21+
logger.error('Failed to dispatch log cleanup jobs:', { error })
22+
return NextResponse.json({ error: 'Failed to dispatch log cleanup' }, { status: 500 })
20023
}
20124
}

0 commit comments

Comments
 (0)