Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/lib/bridge/bridge-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ interface BridgeManagerState {
adapters: Map<string, BaseChannelAdapter>;
adapterMeta: Map<string, AdapterMeta>;
running: boolean;
/** Guard against concurrent start() calls — set immediately on entry, cleared on completion or error */
starting: boolean;
startedAt: string | null;
loopAborts: Map<string, AbortController>;
activeTasks: Map<string, AbortController>;
Expand All @@ -218,6 +220,7 @@ function getState(): BridgeManagerState {
adapters: new Map(),
adapterMeta: new Map(),
running: false,
starting: false,
startedAt: null,
loopAborts: new Map(),
activeTasks: new Map(),
Expand Down Expand Up @@ -263,6 +266,10 @@ export interface StartResult {
export async function start(): Promise<StartResult> {
const state = getState();
if (state.running) return { started: true };
if (state.starting) return { started: false, reason: 'starting_in_progress' };

// Set starting flag immediately to prevent concurrent start() calls
state.starting = true;

const bridgeEnabled = getSetting('remote_bridge_enabled') === 'true';
if (!bridgeEnabled) {
Expand Down Expand Up @@ -311,6 +318,7 @@ export async function start(): Promise<StartResult> {
console.warn('[bridge-manager] No adapters started successfully, bridge not activated');
state.adapters.clear();
state.adapterMeta.clear();
state.starting = false;
const reason = configErrors.length > 0
? `adapter_config_invalid: ${configErrors.join('; ')}`
: 'no_adapters_started';
Expand All @@ -320,6 +328,7 @@ export async function start(): Promise<StartResult> {
// Mark running BEFORE starting consumer loops — runAdapterLoop checks
// state.running in its while-condition, so it must be true first.
state.running = true;
state.starting = false;
state.startedAt = new Date().toISOString();

// Suppress notification bot polling to avoid conflicts
Expand All @@ -344,6 +353,7 @@ export async function stop(): Promise<void> {
if (!state.running) return;

state.running = false;
state.starting = false;

// Abort all active tool/stream tasks so in-flight Claude sessions stop
// writing to DB and release session locks cleanly. Without this, `/stop`
Expand Down
50 changes: 49 additions & 1 deletion src/lib/channels/feishu/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,17 @@ interface CardActionEvent {
open_message_id?: string;
}
import { loadFeishuConfig, validateFeishuConfig } from './config';

/** Max number of message IDs to keep for dedup. */
const DEDUP_MAX = 1000;

/**
* Max age for inbound messages in milliseconds.
* Messages older than this are dropped as stale — they are likely historical
* events replayed by the Lark SDK after a WSClient reconnect or bridge restart.
* Default: 5 minutes.
*/
const STALE_MESSAGE_MAX_AGE_MS = 5 * 60 * 1000;
import { FeishuGateway } from './gateway';
import { parseMessageWithResources } from './inbound';
import { getBotInfo } from './identity';
Expand Down Expand Up @@ -60,6 +71,8 @@ export class FeishuChannelPlugin implements ChannelPlugin<FeishuConfig> {
* scheduling new timers.
*/
private identityGeneration = 0;
/** Dedup: set of recently seen message IDs to prevent duplicate processing. */
private seenMessageIds = new Set<string>();

loadConfig(): FeishuConfig | null {
this.config = loadFeishuConfig();
Expand Down Expand Up @@ -347,6 +360,11 @@ export class FeishuChannelPlugin implements ChannelPlugin<FeishuConfig> {
this.waitResolve(null);
this.waitResolve = null;
}
// NOTE: Do NOT clear seenMessageIds here.
// After gateway.stop() closes the WSClient, in-flight events from the old
// connection may still arrive briefly. Keeping the dedup set prevents them
// from being re-processed if a new gateway starts immediately after.
// The set is bounded by DEDUP_MAX and will naturally age out.
}

isRunning(): boolean {
Expand Down Expand Up @@ -381,10 +399,40 @@ export class FeishuChannelPlugin implements ChannelPlugin<FeishuConfig> {
}

private enqueueMessage(msg: InboundMessage): void {
// Track messageId for reaction acknowledgment (skip callback messages)
// Stale message filter: drop messages older than STALE_MESSAGE_MAX_AGE_MS.
// The Lark SDK replays unprocessed historical events on WSClient reconnect,
// which can cause the bot to respond to messages from hours or days ago.
if (msg.timestamp && !msg.callbackData) {
const age = Date.now() - msg.timestamp;
if (age > STALE_MESSAGE_MAX_AGE_MS) {
console.log('[feishu/plugin]', `Stale message dropped (${Math.round(age / 1000)}s old):`, msg.messageId);
return;
}
}

// Dedup: skip messages already seen (prevents duplicates from WS reconnect/retry)
if (msg.messageId && !msg.callbackData) {
if (this.seenMessageIds.has(msg.messageId)) {
console.log('[feishu/plugin]', 'Duplicate message skipped:', msg.messageId);
return;
}
this.seenMessageIds.add(msg.messageId);

// Track messageId for reaction acknowledgment
this.lastMessageIdByChat.set(msg.address.chatId, msg.messageId);

// Prune dedup set when it exceeds capacity
if (this.seenMessageIds.size > DEDUP_MAX) {
const excess = this.seenMessageIds.size - DEDUP_MAX;
let removed = 0;
for (const id of this.seenMessageIds) {
if (removed >= excess) break;
this.seenMessageIds.delete(id);
removed++;
}
}
}

if (this.waitResolve) {
const resolve = this.waitResolve;
this.waitResolve = null;
Expand Down
Loading