feat: add Telegram Topics (forum mode) support
Some checks failed
Bump version / bump-version (push) Has been cancelled
Merge-forward skill branches / merge-forward (push) Has been cancelled
Update token count / update-tokens (push) Has been cancelled

- buildJid() constructs tg:{chatId}:{threadId} for topic messages
- parseJid() extracts chatId + threadId from JID for outbound routing
- /chatid command shows thread ID in forum topics
- sendMessage and setTyping pass message_thread_id when present
- All message handlers (text, photo, voice, media) use thread-aware JIDs

Allows each forum topic to be registered as an independent Nanoclaw group.
This commit is contained in:
Andy
2026-03-14 12:04:06 +00:00
parent c0902877fa
commit 8e24a31bd4
26 changed files with 5190 additions and 717 deletions

File diff suppressed because it is too large Load Diff

366
src/channels/telegram.ts Normal file
View File

@@ -0,0 +1,366 @@
import fs from 'fs';
import path from 'path';
import { Bot } from 'grammy';
import { ASSISTANT_NAME, TRIGGER_PATTERN } from '../config.js';
import { resolveGroupIpcPath } from '../group-folder.js';
import { logger } from '../logger.js';
import { transcribeBuffer } from '../transcription.js';
import {
Channel,
OnChatMetadata,
OnInboundMessage,
RegisteredGroup,
} from '../types.js';
export interface TelegramChannelOpts {
onMessage: OnInboundMessage;
onChatMetadata: OnChatMetadata;
registeredGroups: () => Record<string, RegisteredGroup>;
clearSession: (chatJid: string) => void;
}
export class TelegramChannel implements Channel {
name = 'telegram';
private bot: Bot | null = null;
private opts: TelegramChannelOpts;
private botToken: string;
constructor(botToken: string, opts: TelegramChannelOpts) {
this.botToken = botToken;
this.opts = opts;
}
/**
* Build a JID from a Telegram chat ID and optional topic thread ID.
* Format: "tg:{chatId}" or "tg:{chatId}:{threadId}" for forum topics.
*/
private buildJid(chatId: number, threadId?: number): string {
return threadId ? `tg:${chatId}:${threadId}` : `tg:${chatId}`;
}
/**
* Parse a JID back into chatId (string) and optional threadId (number).
* Handles negative chat IDs (groups/supergroups start with -100...).
*/
private parseJid(jid: string): { chatId: string; threadId?: number } {
const withoutPrefix = jid.replace(/^tg:/, '');
const colonIdx = withoutPrefix.indexOf(':');
if (colonIdx === -1) {
return { chatId: withoutPrefix };
}
return {
chatId: withoutPrefix.slice(0, colonIdx),
threadId: parseInt(withoutPrefix.slice(colonIdx + 1), 10),
};
}
async connect(): Promise<void> {
this.bot = new Bot(this.botToken);
// Command to get chat ID (useful for registration)
// In forum topics, also shows the thread ID so the full JID can be used.
this.bot.command('chatid', (ctx) => {
const chatId = ctx.chat.id;
const chatType = ctx.chat.type;
const threadId = (ctx.message as any)?.message_thread_id as
| number
| undefined;
const chatJid = this.buildJid(chatId, threadId);
const chatName =
chatType === 'private'
? ctx.from?.first_name || 'Private'
: (ctx.chat as any).title || 'Unknown';
const topicLine = threadId ? `\nThread ID: \`${threadId}\`` : '';
ctx.reply(
`Chat ID: \`${chatJid}\`${topicLine}\nName: ${chatName}\nType: ${chatType}`,
{ parse_mode: 'Markdown' },
);
});
// Command to check bot status
this.bot.command('ping', (ctx) => {
ctx.reply(`${ASSISTANT_NAME} is online.`);
});
// Command to clear conversation context (start a new session)
this.bot.command('reset', (ctx) => {
const threadId = (ctx.message as any)?.message_thread_id as
| number
| undefined;
const chatJid = this.buildJid(ctx.chat.id, threadId);
this.opts.clearSession(chatJid);
ctx.reply('Session cleared. Next message starts a fresh conversation.');
});
this.bot.on('message:text', async (ctx) => {
// Skip commands
if (ctx.message.text.startsWith('/')) return;
const threadId = ctx.message.message_thread_id;
const chatJid = this.buildJid(ctx.chat.id, threadId);
let content = ctx.message.text;
const timestamp = new Date(ctx.message.date * 1000).toISOString();
const senderName =
ctx.from?.first_name ||
ctx.from?.username ||
ctx.from?.id.toString() ||
'Unknown';
const sender = ctx.from?.id.toString() || '';
const msgId = ctx.message.message_id.toString();
// Determine chat name
const chatName =
ctx.chat.type === 'private'
? senderName
: (ctx.chat as any).title || chatJid;
// In private DMs every message is implicitly addressed to the bot —
// no @mention entity exists. Prepend the trigger so TRIGGER_PATTERN matches.
if (ctx.chat.type === 'private' && !TRIGGER_PATTERN.test(content)) {
content = `@${ASSISTANT_NAME} ${content}`;
}
// Translate Telegram @bot_username mentions into TRIGGER_PATTERN format.
// Telegram @mentions (e.g., @andy_ai_bot) won't match TRIGGER_PATTERN
// (e.g., ^@Andy\b), so we prepend the trigger when the bot is @mentioned.
const botUsername = ctx.me?.username?.toLowerCase();
if (botUsername) {
const entities = ctx.message.entities || [];
const isBotMentioned = entities.some((entity) => {
if (entity.type === 'mention') {
const mentionText = content
.substring(entity.offset, entity.offset + entity.length)
.toLowerCase();
return mentionText === `@${botUsername}`;
}
return false;
});
if (isBotMentioned && !TRIGGER_PATTERN.test(content)) {
content = `@${ASSISTANT_NAME} ${content}`;
}
}
// Store chat metadata for discovery
this.opts.onChatMetadata(chatJid, timestamp, chatName);
// Only deliver full message for registered groups
const group = this.opts.registeredGroups()[chatJid];
if (!group) {
logger.debug(
{ chatJid, chatName },
'Message from unregistered Telegram chat',
);
return;
}
// Deliver message — startMessageLoop() will pick it up
this.opts.onMessage(chatJid, {
id: msgId,
chat_jid: chatJid,
sender,
sender_name: senderName,
content,
timestamp,
is_from_me: false,
});
logger.info(
{ chatJid, chatName, sender: senderName },
'Telegram message stored',
);
});
// Handle non-text messages with placeholders so the agent knows something was sent
const storeNonText = (ctx: any, placeholder: string) => {
const threadId = ctx.message?.message_thread_id as number | undefined;
const chatJid = this.buildJid(ctx.chat.id, threadId);
const group = this.opts.registeredGroups()[chatJid];
if (!group) return;
const timestamp = new Date(ctx.message.date * 1000).toISOString();
const senderName =
ctx.from?.first_name ||
ctx.from?.username ||
ctx.from?.id?.toString() ||
'Unknown';
const caption = ctx.message.caption ? ` ${ctx.message.caption}` : '';
this.opts.onChatMetadata(chatJid, timestamp);
this.opts.onMessage(chatJid, {
id: ctx.message.message_id.toString(),
chat_jid: chatJid,
sender: ctx.from?.id?.toString() || '',
sender_name: senderName,
content: `${placeholder}${caption}`,
timestamp,
is_from_me: false,
});
};
this.bot.on('message:photo', async (ctx) => {
const threadId = ctx.message.message_thread_id;
const chatJid = this.buildJid(ctx.chat.id, threadId);
const group = this.opts.registeredGroups()[chatJid];
if (!group) return;
let placeholder = '[Photo]';
try {
// Highest resolution is last in the array
const photos = ctx.message.photo;
const largest = photos[photos.length - 1];
const file = await ctx.api.getFile(largest.file_id);
const url = `https://api.telegram.org/file/bot${this.botToken}/${file.file_path}`;
const res = await fetch(url);
if (res.ok) {
const buffer = Buffer.from(await res.arrayBuffer());
const ext = path.extname(file.file_path || '') || '.jpg';
const filename = `photo_${ctx.message.message_id}${ext}`;
const inputDir = path.join(
resolveGroupIpcPath(group.folder),
'input',
);
fs.mkdirSync(inputDir, { recursive: true });
fs.writeFileSync(path.join(inputDir, filename), buffer);
placeholder = `[Photo: /workspace/ipc/input/${filename} — use the Read tool to view this image]`;
logger.info(
{ chatJid, bytes: buffer.length },
'Downloaded Telegram photo',
);
} else {
logger.error(
{ chatJid, status: res.status },
'Failed to download Telegram photo',
);
}
} catch (err) {
logger.error({ err }, 'Telegram photo download error');
}
storeNonText(ctx, placeholder);
});
this.bot.on('message:video', (ctx) => storeNonText(ctx, '[Video]'));
this.bot.on('message:voice', async (ctx) => {
const threadId = ctx.message.message_thread_id;
const chatJid = this.buildJid(ctx.chat.id, threadId);
const group = this.opts.registeredGroups()[chatJid];
if (!group) return;
let placeholder = '[Voice Message - transcription unavailable]';
try {
const file = await ctx.getFile();
const url = `https://api.telegram.org/file/bot${this.botToken}/${file.file_path}`;
const res = await fetch(url);
if (res.ok) {
const buffer = Buffer.from(await res.arrayBuffer());
logger.info(
{ chatJid, bytes: buffer.length },
'Downloaded Telegram voice file',
);
const transcript = await transcribeBuffer(buffer);
if (transcript) {
placeholder = `[Voice: ${transcript}]`;
logger.info(
{ chatJid, length: transcript.length },
'Transcribed voice message',
);
} else {
logger.warn({ chatJid }, 'Voice transcription returned null');
}
} else {
logger.error(
{ chatJid, status: res.status },
'Failed to download Telegram voice file',
);
}
} catch (err) {
logger.error({ err }, 'Telegram voice transcription error');
placeholder = '[Voice Message - transcription failed]';
}
storeNonText(ctx, placeholder);
});
this.bot.on('message:audio', (ctx) => storeNonText(ctx, '[Audio]'));
this.bot.on('message:document', (ctx) => {
const name = ctx.message.document?.file_name || 'file';
storeNonText(ctx, `[Document: ${name}]`);
});
this.bot.on('message:sticker', (ctx) => {
const emoji = ctx.message.sticker?.emoji || '';
storeNonText(ctx, `[Sticker ${emoji}]`);
});
this.bot.on('message:location', (ctx) => storeNonText(ctx, '[Location]'));
this.bot.on('message:contact', (ctx) => storeNonText(ctx, '[Contact]'));
// Handle errors gracefully
this.bot.catch((err) => {
logger.error({ err: err.message }, 'Telegram bot error');
});
// Start polling — returns a Promise that resolves when started
return new Promise<void>((resolve) => {
this.bot!.start({
onStart: (botInfo) => {
logger.info(
{ username: botInfo.username, id: botInfo.id },
'Telegram bot connected',
);
console.log(`\n Telegram bot: @${botInfo.username}`);
console.log(
` Send /chatid to the bot to get a chat's registration ID\n`,
);
resolve();
},
});
});
}
async sendMessage(jid: string, text: string): Promise<void> {
if (!this.bot) {
logger.warn('Telegram bot not initialized');
return;
}
try {
const { chatId, threadId } = this.parseJid(jid);
// Telegram has a 4096 character limit per message — split if needed
const MAX_LENGTH = 4096;
const opts = threadId ? { message_thread_id: threadId } : {};
for (let i = 0; i < text.length; i += MAX_LENGTH) {
await this.bot.api.sendMessage(chatId, text.slice(i, i + MAX_LENGTH), opts);
}
logger.info({ jid, length: text.length }, 'Telegram message sent');
} catch (err) {
logger.error({ jid, err }, 'Failed to send Telegram message');
}
}
isConnected(): boolean {
return this.bot !== null;
}
ownsJid(jid: string): boolean {
return jid.startsWith('tg:');
}
async disconnect(): Promise<void> {
if (this.bot) {
this.bot.stop();
this.bot = null;
logger.info('Telegram bot stopped');
}
}
async setTyping(jid: string, isTyping: boolean): Promise<void> {
if (!this.bot || !isTyping) return;
try {
const { chatId, threadId } = this.parseJid(jid);
const opts = threadId ? { message_thread_id: threadId } : {};
await this.bot.api.sendChatAction(chatId, 'typing', opts);
} catch (err) {
logger.debug({ jid, err }, 'Failed to send Telegram typing indicator');
}
}
}

File diff suppressed because it is too large Load Diff

400
src/channels/whatsapp.ts Normal file
View File

@@ -0,0 +1,400 @@
import { exec } from 'child_process';
import fs from 'fs';
import path from 'path';
import makeWASocket, {
Browsers,
DisconnectReason,
WASocket,
fetchLatestWaWebVersion,
makeCacheableSignalKeyStore,
useMultiFileAuthState,
} from '@whiskeysockets/baileys';
import {
ASSISTANT_HAS_OWN_NUMBER,
ASSISTANT_NAME,
STORE_DIR,
} from '../config.js';
import { getLastGroupSync, setLastGroupSync, updateChatName } from '../db.js';
import { logger } from '../logger.js';
import { isVoiceMessage, transcribeAudioMessage } from '../transcription.js';
import {
Channel,
OnInboundMessage,
OnChatMetadata,
RegisteredGroup,
} from '../types.js';
const GROUP_SYNC_INTERVAL_MS = 24 * 60 * 60 * 1000; // 24 hours
export interface WhatsAppChannelOpts {
onMessage: OnInboundMessage;
onChatMetadata: OnChatMetadata;
registeredGroups: () => Record<string, RegisteredGroup>;
}
export class WhatsAppChannel implements Channel {
name = 'whatsapp';
private sock!: WASocket;
private connected = false;
private lidToPhoneMap: Record<string, string> = {};
private outgoingQueue: Array<{ jid: string; text: string }> = [];
private flushing = false;
private groupSyncTimerStarted = false;
private opts: WhatsAppChannelOpts;
constructor(opts: WhatsAppChannelOpts) {
this.opts = opts;
}
async connect(): Promise<void> {
return new Promise<void>((resolve, reject) => {
this.connectInternal(resolve).catch(reject);
});
}
private async connectInternal(onFirstOpen?: () => void): Promise<void> {
const authDir = path.join(STORE_DIR, 'auth');
fs.mkdirSync(authDir, { recursive: true });
const { state, saveCreds } = await useMultiFileAuthState(authDir);
const { version } = await fetchLatestWaWebVersion({}).catch((err) => {
logger.warn(
{ err },
'Failed to fetch latest WA Web version, using default',
);
return { version: undefined };
});
this.sock = makeWASocket({
version,
auth: {
creds: state.creds,
keys: makeCacheableSignalKeyStore(state.keys, logger),
},
printQRInTerminal: false,
logger,
browser: Browsers.macOS('Chrome'),
});
this.sock.ev.on('connection.update', (update) => {
const { connection, lastDisconnect, qr } = update;
if (qr) {
const msg =
'WhatsApp authentication required. Run /setup in Claude Code.';
logger.error(msg);
exec(
`osascript -e 'display notification "${msg}" with title "NanoClaw" sound name "Basso"'`,
);
setTimeout(() => process.exit(1), 1000);
}
if (connection === 'close') {
this.connected = false;
const reason = (
lastDisconnect?.error as { output?: { statusCode?: number } }
)?.output?.statusCode;
const shouldReconnect = reason !== DisconnectReason.loggedOut;
logger.info(
{
reason,
shouldReconnect,
queuedMessages: this.outgoingQueue.length,
},
'Connection closed',
);
if (shouldReconnect) {
logger.info('Reconnecting...');
this.connectInternal().catch((err) => {
logger.error({ err }, 'Failed to reconnect, retrying in 5s');
setTimeout(() => {
this.connectInternal().catch((err2) => {
logger.error({ err: err2 }, 'Reconnection retry failed');
});
}, 5000);
});
} else {
logger.info('Logged out. Run /setup to re-authenticate.');
process.exit(0);
}
} else if (connection === 'open') {
this.connected = true;
logger.info('Connected to WhatsApp');
// Announce availability so WhatsApp relays subsequent presence updates (typing indicators)
this.sock.sendPresenceUpdate('available').catch((err) => {
logger.warn({ err }, 'Failed to send presence update');
});
// Build LID to phone mapping from auth state for self-chat translation
if (this.sock.user) {
const phoneUser = this.sock.user.id.split(':')[0];
const lidUser = this.sock.user.lid?.split(':')[0];
if (lidUser && phoneUser) {
this.lidToPhoneMap[lidUser] = `${phoneUser}@s.whatsapp.net`;
logger.debug({ lidUser, phoneUser }, 'LID to phone mapping set');
}
}
// Flush any messages queued while disconnected
this.flushOutgoingQueue().catch((err) =>
logger.error({ err }, 'Failed to flush outgoing queue'),
);
// Sync group metadata on startup (respects 24h cache)
this.syncGroupMetadata().catch((err) =>
logger.error({ err }, 'Initial group sync failed'),
);
// Set up daily sync timer (only once)
if (!this.groupSyncTimerStarted) {
this.groupSyncTimerStarted = true;
setInterval(() => {
this.syncGroupMetadata().catch((err) =>
logger.error({ err }, 'Periodic group sync failed'),
);
}, GROUP_SYNC_INTERVAL_MS);
}
// Signal first connection to caller
if (onFirstOpen) {
onFirstOpen();
onFirstOpen = undefined;
}
}
});
this.sock.ev.on('creds.update', saveCreds);
this.sock.ev.on('messages.upsert', async ({ messages }) => {
for (const msg of messages) {
if (!msg.message) continue;
const rawJid = msg.key.remoteJid;
if (!rawJid || rawJid === 'status@broadcast') continue;
// Translate LID JID to phone JID if applicable
const chatJid = await this.translateJid(rawJid);
const timestamp = new Date(
Number(msg.messageTimestamp) * 1000,
).toISOString();
// Always notify about chat metadata for group discovery
const isGroup = chatJid.endsWith('@g.us');
this.opts.onChatMetadata(
chatJid,
timestamp,
undefined,
'whatsapp',
isGroup,
);
// Only deliver full message for registered groups
const groups = this.opts.registeredGroups();
if (groups[chatJid]) {
const content =
msg.message?.conversation ||
msg.message?.extendedTextMessage?.text ||
msg.message?.imageMessage?.caption ||
msg.message?.videoMessage?.caption ||
'';
// Skip protocol messages with no text content (encryption keys, read receipts, etc.)
// but allow voice messages through for transcription
if (!content && !isVoiceMessage(msg)) continue;
const sender = msg.key.participant || msg.key.remoteJid || '';
const senderName = msg.pushName || sender.split('@')[0];
const fromMe = msg.key.fromMe || false;
// Detect bot messages: with own number, fromMe is reliable
// since only the bot sends from that number.
// With shared number, bot messages carry the assistant name prefix
// (even in DMs/self-chat) so we check for that.
const isBotMessage = ASSISTANT_HAS_OWN_NUMBER
? fromMe
: content.startsWith(`${ASSISTANT_NAME}:`);
// Transcribe voice messages before storing
let finalContent = content;
if (isVoiceMessage(msg)) {
try {
const transcript = await transcribeAudioMessage(msg, this.sock);
if (transcript) {
finalContent = `[Voice: ${transcript}]`;
logger.info(
{ chatJid, length: transcript.length },
'Transcribed voice message',
);
} else {
finalContent = '[Voice Message - transcription unavailable]';
}
} catch (err) {
logger.error({ err }, 'Voice transcription error');
finalContent = '[Voice Message - transcription failed]';
}
}
this.opts.onMessage(chatJid, {
id: msg.key.id || '',
chat_jid: chatJid,
sender,
sender_name: senderName,
content: finalContent,
timestamp,
is_from_me: fromMe,
is_bot_message: isBotMessage,
});
}
}
});
}
async sendMessage(jid: string, text: string): Promise<void> {
// Prefix bot messages with assistant name so users know who's speaking.
// On a shared number, prefix is also needed in DMs (including self-chat)
// to distinguish bot output from user messages.
// Skip only when the assistant has its own dedicated phone number.
const prefixed = ASSISTANT_HAS_OWN_NUMBER
? text
: `${ASSISTANT_NAME}: ${text}`;
if (!this.connected) {
this.outgoingQueue.push({ jid, text: prefixed });
logger.info(
{ jid, length: prefixed.length, queueSize: this.outgoingQueue.length },
'WA disconnected, message queued',
);
return;
}
try {
await this.sock.sendMessage(jid, { text: prefixed });
logger.info({ jid, length: prefixed.length }, 'Message sent');
} catch (err) {
// If send fails, queue it for retry on reconnect
this.outgoingQueue.push({ jid, text: prefixed });
logger.warn(
{ jid, err, queueSize: this.outgoingQueue.length },
'Failed to send, message queued',
);
}
}
isConnected(): boolean {
return this.connected;
}
ownsJid(jid: string): boolean {
return jid.endsWith('@g.us') || jid.endsWith('@s.whatsapp.net');
}
async disconnect(): Promise<void> {
this.connected = false;
this.sock?.end(undefined);
}
async setTyping(jid: string, isTyping: boolean): Promise<void> {
try {
const status = isTyping ? 'composing' : 'paused';
logger.debug({ jid, status }, 'Sending presence update');
await this.sock.sendPresenceUpdate(status, jid);
} catch (err) {
logger.debug({ jid, err }, 'Failed to update typing status');
}
}
/**
* Sync group metadata from WhatsApp.
* Fetches all participating groups and stores their names in the database.
* Called on startup, daily, and on-demand via IPC.
*/
async syncGroupMetadata(force = false): Promise<void> {
if (!force) {
const lastSync = getLastGroupSync();
if (lastSync) {
const lastSyncTime = new Date(lastSync).getTime();
if (Date.now() - lastSyncTime < GROUP_SYNC_INTERVAL_MS) {
logger.debug({ lastSync }, 'Skipping group sync - synced recently');
return;
}
}
}
try {
logger.info('Syncing group metadata from WhatsApp...');
const groups = await this.sock.groupFetchAllParticipating();
let count = 0;
for (const [jid, metadata] of Object.entries(groups)) {
if (metadata.subject) {
updateChatName(jid, metadata.subject);
count++;
}
}
setLastGroupSync();
logger.info({ count }, 'Group metadata synced');
} catch (err) {
logger.error({ err }, 'Failed to sync group metadata');
}
}
private async translateJid(jid: string): Promise<string> {
if (!jid.endsWith('@lid')) return jid;
const lidUser = jid.split('@')[0].split(':')[0];
// Check local cache first
const cached = this.lidToPhoneMap[lidUser];
if (cached) {
logger.debug(
{ lidJid: jid, phoneJid: cached },
'Translated LID to phone JID (cached)',
);
return cached;
}
// Query Baileys' signal repository for the mapping
try {
const pn = await this.sock.signalRepository?.lidMapping?.getPNForLID(jid);
if (pn) {
const phoneJid = `${pn.split('@')[0].split(':')[0]}@s.whatsapp.net`;
this.lidToPhoneMap[lidUser] = phoneJid;
logger.info(
{ lidJid: jid, phoneJid },
'Translated LID to phone JID (signalRepository)',
);
return phoneJid;
}
} catch (err) {
logger.debug({ err, jid }, 'Failed to resolve LID via signalRepository');
}
return jid;
}
private async flushOutgoingQueue(): Promise<void> {
if (this.flushing || this.outgoingQueue.length === 0) return;
this.flushing = true;
try {
logger.info(
{ count: this.outgoingQueue.length },
'Flushing outgoing message queue',
);
while (this.outgoingQueue.length > 0) {
const item = this.outgoingQueue.shift()!;
// Send directly — queued items are already prefixed by sendMessage
await this.sock.sendMessage(item.jid, { text: item.text });
logger.info(
{ jid: item.jid, length: item.text.length },
'Queued message sent',
);
}
} finally {
this.flushing = false;
}
}
}

View File

@@ -1,12 +1,16 @@
import os from 'os';
import path from 'path';
import { readEnvFile } from './env.js';
// Read config values from .env (falls back to process.env).
// Secrets (API keys, tokens) are NOT read here — they are loaded only
// by the credential proxy (credential-proxy.ts), never exposed to containers.
const envConfig = readEnvFile(['ASSISTANT_NAME', 'ASSISTANT_HAS_OWN_NUMBER']);
// Secrets are NOT read here — they stay on disk and are loaded only
// where needed (container-runner.ts) to avoid leaking to child processes.
const envConfig = readEnvFile([
'ASSISTANT_NAME',
'ASSISTANT_HAS_OWN_NUMBER',
'TELEGRAM_BOT_TOKEN',
'TELEGRAM_ONLY',
]);
export const ASSISTANT_NAME =
process.env.ASSISTANT_NAME || envConfig.ASSISTANT_NAME || 'Andy';
@@ -18,7 +22,7 @@ export const SCHEDULER_POLL_INTERVAL = 60000;
// Absolute paths needed for container mounts
const PROJECT_ROOT = process.cwd();
const HOME_DIR = process.env.HOME || os.homedir();
const HOME_DIR = process.env.HOME || '/Users/user';
// Mount security: allowlist stored OUTSIDE project root, never mounted into containers
export const MOUNT_ALLOWLIST_PATH = path.join(
@@ -27,15 +31,10 @@ export const MOUNT_ALLOWLIST_PATH = path.join(
'nanoclaw',
'mount-allowlist.json',
);
export const SENDER_ALLOWLIST_PATH = path.join(
HOME_DIR,
'.config',
'nanoclaw',
'sender-allowlist.json',
);
export const STORE_DIR = path.resolve(PROJECT_ROOT, 'store');
export const GROUPS_DIR = path.resolve(PROJECT_ROOT, 'groups');
export const DATA_DIR = path.resolve(PROJECT_ROOT, 'data');
export const MAIN_GROUP_FOLDER = 'main';
export const CONTAINER_IMAGE =
process.env.CONTAINER_IMAGE || 'nanoclaw-agent:latest';
@@ -47,10 +46,6 @@ export const CONTAINER_MAX_OUTPUT_SIZE = parseInt(
process.env.CONTAINER_MAX_OUTPUT_SIZE || '10485760',
10,
); // 10MB default
export const CREDENTIAL_PROXY_PORT = parseInt(
process.env.CREDENTIAL_PROXY_PORT || '3001',
10,
);
export const IPC_POLL_INTERVAL = 1000;
export const IDLE_TIMEOUT = parseInt(process.env.IDLE_TIMEOUT || '1800000', 10); // 30min default — how long to keep container alive after last result
export const MAX_CONCURRENT_CONTAINERS = Math.max(
@@ -71,3 +66,9 @@ export const TRIGGER_PATTERN = new RegExp(
// Uses system timezone by default
export const TIMEZONE =
process.env.TZ || Intl.DateTimeFormat().resolvedOptions().timeZone;
// Telegram configuration
export const TELEGRAM_BOT_TOKEN =
process.env.TELEGRAM_BOT_TOKEN || envConfig.TELEGRAM_BOT_TOKEN || '';
export const TELEGRAM_ONLY =
(process.env.TELEGRAM_ONLY || envConfig.TELEGRAM_ONLY) === 'true';

View File

@@ -11,7 +11,6 @@ vi.mock('./config.js', () => ({
CONTAINER_IMAGE: 'nanoclaw-agent:latest',
CONTAINER_MAX_OUTPUT_SIZE: 10485760,
CONTAINER_TIMEOUT: 1800000, // 30min
CREDENTIAL_PROXY_PORT: 3001,
DATA_DIR: '/tmp/nanoclaw-test-data',
GROUPS_DIR: '/tmp/nanoclaw-test-groups',
IDLE_TIMEOUT: 1800000, // 30min

View File

@@ -2,7 +2,7 @@
* Container Runner for NanoClaw
* Spawns agent execution in containers and handles IPC
*/
import { ChildProcess, exec, spawn } from 'child_process';
import { ChildProcess, exec, execFileSync, spawn } from 'child_process';
import fs from 'fs';
import path from 'path';
@@ -10,22 +10,20 @@ import {
CONTAINER_IMAGE,
CONTAINER_MAX_OUTPUT_SIZE,
CONTAINER_TIMEOUT,
CREDENTIAL_PROXY_PORT,
DATA_DIR,
GROUPS_DIR,
IDLE_TIMEOUT,
TIMEZONE,
} from './config.js';
import { readEnvFile } from './env.js';
import { resolveGroupFolderPath, resolveGroupIpcPath } from './group-folder.js';
import { logger } from './logger.js';
import {
CONTAINER_HOST_GATEWAY,
CONTAINER_RUNTIME_BIN,
hostGatewayArgs,
isRootlessDocker,
readonlyMountArgs,
stopContainer,
} from './container-runtime.js';
import { detectAuthMode } from './credential-proxy.js';
import { validateAdditionalMounts } from './mount-security.js';
import { RegisteredGroup } from './types.js';
@@ -40,7 +38,9 @@ export interface ContainerInput {
chatJid: string;
isMain: boolean;
isScheduledTask?: boolean;
ipcSuffix?: string;
assistantName?: string;
secrets?: Record<string, string>;
}
export interface ContainerOutput {
@@ -59,6 +59,7 @@ interface VolumeMount {
function buildVolumeMounts(
group: RegisteredGroup,
isMain: boolean,
ipcSuffix?: string,
): VolumeMount[] {
const mounts: VolumeMount[] = [];
const projectRoot = process.cwd();
@@ -76,17 +77,6 @@ function buildVolumeMounts(
readonly: true,
});
// Shadow .env so the agent cannot read secrets from the mounted project root.
// Credentials are injected by the credential proxy, never exposed to containers.
const envFile = path.join(projectRoot, '.env');
if (fs.existsSync(envFile)) {
mounts.push({
hostPath: '/dev/null',
containerPath: '/workspace/project/.env',
readonly: true,
});
}
// Main also gets its group folder as the working directory
mounts.push({
hostPath: groupDir,
@@ -163,9 +153,13 @@ function buildVolumeMounts(
readonly: false,
});
// Per-group IPC namespace: each group gets its own IPC directory
// This prevents cross-group privilege escalation via IPC
const groupIpcDir = resolveGroupIpcPath(group.folder);
// Per-group IPC namespace: each group gets its own IPC directory.
// Task containers use a separate IPC dir (ipcSuffix='-task') to avoid
// cross-contamination with concurrent user message containers.
const ipcFolderName = ipcSuffix
? `${group.folder}${ipcSuffix}`
: group.folder;
const groupIpcDir = path.join(DATA_DIR, 'ipc', ipcFolderName);
fs.mkdirSync(path.join(groupIpcDir, 'messages'), { recursive: true });
fs.mkdirSync(path.join(groupIpcDir, 'tasks'), { recursive: true });
fs.mkdirSync(path.join(groupIpcDir, 'input'), { recursive: true });
@@ -212,6 +206,25 @@ function buildVolumeMounts(
return mounts;
}
/**
* Read allowed secrets from .env for passing to the container via stdin.
* Secrets are never written to disk or mounted as files.
*/
function readSecrets(): Record<string, string> {
return readEnvFile([
'CLAUDE_CODE_OAUTH_TOKEN',
'ANTHROPIC_API_KEY',
'PLANE_API_KEY',
'PLANE_BASE_URL',
'PLANE_WORKSPACE',
'EMAIL_ADDRESS',
'EMAIL_IMAP_PASSWORD',
'EMAIL_CALDAV_PASSWORD',
'CALDAV_URL',
'COOLIFY_API_KEY',
]);
}
function buildContainerArgs(
mounts: VolumeMount[],
containerName: string,
@@ -221,26 +234,6 @@ function buildContainerArgs(
// Pass host timezone so container's local time matches the user's
args.push('-e', `TZ=${TIMEZONE}`);
// Route API traffic through the credential proxy (containers never see real secrets)
args.push(
'-e',
`ANTHROPIC_BASE_URL=http://${CONTAINER_HOST_GATEWAY}:${CREDENTIAL_PROXY_PORT}`,
);
// Mirror the host's auth method with a placeholder value.
// API key mode: SDK sends x-api-key, proxy replaces with real key.
// OAuth mode: SDK exchanges placeholder token for temp API key,
// proxy injects real OAuth token on that exchange request.
const authMode = detectAuthMode();
if (authMode === 'api-key') {
args.push('-e', 'ANTHROPIC_API_KEY=placeholder');
} else {
args.push('-e', 'CLAUDE_CODE_OAUTH_TOKEN=placeholder');
}
// Runtime-specific args for host gateway resolution
args.push(...hostGatewayArgs());
// Run as host user so bind-mounted files are accessible.
// Skip when running as root (uid 0), as the container's node user (uid 1000),
// or when getuid is unavailable (native Windows without WSL).
@@ -275,7 +268,25 @@ export async function runContainerAgent(
const groupDir = resolveGroupFolderPath(group.folder);
fs.mkdirSync(groupDir, { recursive: true });
const mounts = buildVolumeMounts(group, input.isMain);
const mounts = buildVolumeMounts(group, input.isMain, input.ipcSuffix);
// In rootless Docker, the container's node user (uid 1000) maps to a
// subordinate uid that can't write to host-owned directories.
// Make writable mounts world-accessible so the container user can write.
if (isRootlessDocker()) {
for (const mount of mounts) {
if (!mount.readonly) {
try {
execFileSync('chmod', ['-R', 'a+rwX', mount.hostPath], {
stdio: 'pipe',
});
} catch {
/* best effort */
}
}
}
}
const safeName = group.folder.replace(/[^a-zA-Z0-9-]/g, '-');
const containerName = `nanoclaw-${safeName}-${Date.now()}`;
const containerArgs = buildContainerArgs(mounts, containerName);
@@ -318,8 +329,12 @@ export async function runContainerAgent(
let stdoutTruncated = false;
let stderrTruncated = false;
// Pass secrets via stdin (never written to disk or mounted as files)
input.secrets = readSecrets();
container.stdin.write(JSON.stringify(input));
container.stdin.end();
// Remove secrets from input so they don't appear in logs
delete input.secrets;
// Streaming output: parse OUTPUT_START/END marker pairs as they arrive
let parseBuffer = '';
@@ -369,10 +384,18 @@ export async function runContainerAgent(
// so idle timers start even for "silent" query completions.
outputChain = outputChain.then(() => onOutput(parsed));
} catch (err) {
logger.warn(
{ group: group.name, error: err },
'Failed to parse streamed output chunk',
);
parseErrorCount++;
if (parseErrorCount <= 3) {
logger.warn(
{ group: group.name, error: err },
'Failed to parse streamed output chunk',
);
} else if (parseErrorCount === 4) {
logger.warn(
{ group: group.name },
'Suppressing further parse error warnings for this container run',
);
}
}
}
}
@@ -402,6 +425,7 @@ export async function runContainerAgent(
let timedOut = false;
let hadStreamingOutput = false;
let parseErrorCount = 0;
const configTimeout = group.containerConfig?.timeout || CONTAINER_TIMEOUT;
// Grace period: hard timeout must be at least IDLE_TIMEOUT + 30s so the
// graceful _close sentinel has time to trigger before the hard kill fires.

View File

@@ -2,51 +2,33 @@
* Container runtime abstraction for NanoClaw.
* All runtime-specific logic lives here so swapping runtimes means changing one file.
*/
import { execSync } from 'child_process';
import fs from 'fs';
import os from 'os';
import { execFileSync, execSync } from 'child_process';
import { logger } from './logger.js';
/** The container runtime binary name. */
export const CONTAINER_RUNTIME_BIN = 'docker';
/** Hostname containers use to reach the host machine. */
export const CONTAINER_HOST_GATEWAY = 'host.docker.internal';
/**
* Address the credential proxy binds to.
* Docker Desktop (macOS): 127.0.0.1 — the VM routes host.docker.internal to loopback.
* Docker (Linux): bind to the docker0 bridge IP so only containers can reach it,
* falling back to 0.0.0.0 if the interface isn't found.
*/
export const PROXY_BIND_HOST =
process.env.CREDENTIAL_PROXY_HOST || detectProxyBindHost();
function detectProxyBindHost(): string {
if (os.platform() === 'darwin') return '127.0.0.1';
// WSL uses Docker Desktop (same VM routing as macOS) — loopback is correct.
// Check /proc filesystem, not env vars — WSL_DISTRO_NAME isn't set under systemd.
if (fs.existsSync('/proc/sys/fs/binfmt_misc/WSLInterop')) return '127.0.0.1';
// Bare-metal Linux: bind to the docker0 bridge IP instead of 0.0.0.0
const ifaces = os.networkInterfaces();
const docker0 = ifaces['docker0'];
if (docker0) {
const ipv4 = docker0.find((a) => a.family === 'IPv4');
if (ipv4) return ipv4.address;
/** Detect rootless Docker (container root maps to host user). */
let _rootlessDocker: boolean | undefined;
export function isRootlessDocker(): boolean {
if (_rootlessDocker === undefined) {
try {
const info = execFileSync(
CONTAINER_RUNTIME_BIN,
['info', '--format', '{{.SecurityOptions}}'],
{
stdio: 'pipe',
encoding: 'utf-8',
timeout: 5000,
},
);
_rootlessDocker = info.includes('rootless');
} catch {
_rootlessDocker = false;
}
}
return '0.0.0.0';
}
/** CLI args needed for the container to resolve the host gateway. */
export function hostGatewayArgs(): string[] {
// On Linux, host.docker.internal isn't built-in — add it explicitly
if (os.platform() === 'linux') {
return ['--add-host=host.docker.internal:host-gateway'];
}
return [];
return _rootlessDocker;
}
/** Returns CLI args for a readonly bind mount. */

View File

@@ -2,14 +2,14 @@ import { describe, it, expect, beforeEach } from 'vitest';
import {
_initTestDatabase,
closeDatabase,
createTask,
deleteTask,
getAllChats,
getAllRegisteredGroups,
getMessagesSince,
getNewMessages,
getTaskById,
setRegisteredGroup,
logTaskRun,
storeChatMetadata,
storeMessage,
updateTask,
@@ -391,94 +391,37 @@ describe('task CRUD', () => {
});
});
// --- LIMIT behavior ---
describe('message query LIMIT', () => {
beforeEach(() => {
storeChatMetadata('group@g.us', '2024-01-01T00:00:00.000Z');
for (let i = 1; i <= 10; i++) {
store({
id: `lim-${i}`,
chat_jid: 'group@g.us',
sender: 'user@s.whatsapp.net',
sender_name: 'User',
content: `message ${i}`,
timestamp: `2024-01-01T00:00:${String(i).padStart(2, '0')}.000Z`,
});
}
});
it('getNewMessages caps to limit and returns most recent in chronological order', () => {
const { messages, newTimestamp } = getNewMessages(
['group@g.us'],
'2024-01-01T00:00:00.000Z',
'Andy',
3,
);
expect(messages).toHaveLength(3);
expect(messages[0].content).toBe('message 8');
expect(messages[2].content).toBe('message 10');
// Chronological order preserved
expect(messages[1].timestamp > messages[0].timestamp).toBe(true);
// newTimestamp reflects latest returned row
expect(newTimestamp).toBe('2024-01-01T00:00:10.000Z');
});
it('getMessagesSince caps to limit and returns most recent in chronological order', () => {
const messages = getMessagesSince(
'group@g.us',
'2024-01-01T00:00:00.000Z',
'Andy',
3,
);
expect(messages).toHaveLength(3);
expect(messages[0].content).toBe('message 8');
expect(messages[2].content).toBe('message 10');
expect(messages[1].timestamp > messages[0].timestamp).toBe(true);
});
it('returns all messages when count is under the limit', () => {
const { messages } = getNewMessages(
['group@g.us'],
'2024-01-01T00:00:00.000Z',
'Andy',
50,
);
expect(messages).toHaveLength(10);
describe('closeDatabase', () => {
it('can be called without throwing', () => {
expect(() => closeDatabase()).not.toThrow();
});
});
// --- RegisteredGroup isMain round-trip ---
describe('registered group isMain', () => {
it('persists isMain=true through set/get round-trip', () => {
setRegisteredGroup('main@s.whatsapp.net', {
name: 'Main Chat',
folder: 'whatsapp_main',
trigger: '@Andy',
added_at: '2024-01-01T00:00:00.000Z',
isMain: true,
describe('deleteTask atomicity', () => {
it('deletes task and its logs', () => {
createTask({
id: 'task-del-1',
group_folder: 'main',
chat_jid: 'jid@g.us',
prompt: 'test',
schedule_type: 'once',
schedule_value: '2026-01-01T00:00:00Z',
context_mode: 'isolated',
next_run: '2026-01-01T00:00:00Z',
status: 'active',
created_at: new Date().toISOString(),
});
logTaskRun({
task_id: 'task-del-1',
run_at: new Date().toISOString(),
duration_ms: 100,
status: 'success',
result: 'ok',
error: null,
});
const groups = getAllRegisteredGroups();
const group = groups['main@s.whatsapp.net'];
expect(group).toBeDefined();
expect(group.isMain).toBe(true);
expect(group.folder).toBe('whatsapp_main');
});
deleteTask('task-del-1');
it('omits isMain for non-main groups', () => {
setRegisteredGroup('group@g.us', {
name: 'Family Chat',
folder: 'whatsapp_family-chat',
trigger: '@Andy',
added_at: '2024-01-01T00:00:00.000Z',
});
const groups = getAllRegisteredGroups();
const group = groups['group@g.us'];
expect(group).toBeDefined();
expect(group.isMain).toBeUndefined();
expect(getTaskById('task-del-1')).toBeUndefined();
});
});

View File

@@ -106,19 +106,6 @@ function createSchema(database: Database.Database): void {
/* column already exists */
}
// Add is_main column if it doesn't exist (migration for existing DBs)
try {
database.exec(
`ALTER TABLE registered_groups ADD COLUMN is_main INTEGER DEFAULT 0`,
);
// Backfill: existing rows with folder = 'main' are the main group
database.exec(
`UPDATE registered_groups SET is_main = 1 WHERE folder = 'main'`,
);
} catch {
/* column already exists */
}
// Add channel and is_group columns if they don't exist (migration for existing DBs)
try {
database.exec(`ALTER TABLE chats ADD COLUMN channel TEXT`);
@@ -158,6 +145,10 @@ export function _initTestDatabase(): void {
createSchema(db);
}
export function closeDatabase(): void {
db?.close();
}
/**
* Store chat metadata only (no message content).
* Used for all chats to enable group discovery without storing sensitive content.
@@ -276,7 +267,7 @@ export function storeMessage(msg: NewMessage): void {
}
/**
* Store a message directly.
* Store a message directly (for non-WhatsApp channels that don't use Baileys proto).
*/
export function storeMessageDirect(msg: {
id: string;
@@ -306,29 +297,24 @@ export function getNewMessages(
jids: string[],
lastTimestamp: string,
botPrefix: string,
limit: number = 200,
): { messages: NewMessage[]; newTimestamp: string } {
if (jids.length === 0) return { messages: [], newTimestamp: lastTimestamp };
const placeholders = jids.map(() => '?').join(',');
// Filter bot messages using both the is_bot_message flag AND the content
// prefix as a backstop for messages written before the migration ran.
// Subquery takes the N most recent, outer query re-sorts chronologically.
const sql = `
SELECT * FROM (
SELECT id, chat_jid, sender, sender_name, content, timestamp, is_from_me
FROM messages
WHERE timestamp > ? AND chat_jid IN (${placeholders})
AND is_bot_message = 0 AND content NOT LIKE ?
AND content != '' AND content IS NOT NULL
ORDER BY timestamp DESC
LIMIT ?
) ORDER BY timestamp
SELECT id, chat_jid, sender, sender_name, content, timestamp
FROM messages
WHERE timestamp > ? AND chat_jid IN (${placeholders})
AND is_bot_message = 0 AND content NOT LIKE ?
AND content != '' AND content IS NOT NULL
ORDER BY timestamp
`;
const rows = db
.prepare(sql)
.all(lastTimestamp, ...jids, `${botPrefix}:%`, limit) as NewMessage[];
.all(lastTimestamp, ...jids, `${botPrefix}:%`) as NewMessage[];
let newTimestamp = lastTimestamp;
for (const row of rows) {
@@ -342,25 +328,20 @@ export function getMessagesSince(
chatJid: string,
sinceTimestamp: string,
botPrefix: string,
limit: number = 200,
): NewMessage[] {
// Filter bot messages using both the is_bot_message flag AND the content
// prefix as a backstop for messages written before the migration ran.
// Subquery takes the N most recent, outer query re-sorts chronologically.
const sql = `
SELECT * FROM (
SELECT id, chat_jid, sender, sender_name, content, timestamp, is_from_me
FROM messages
WHERE chat_jid = ? AND timestamp > ?
AND is_bot_message = 0 AND content NOT LIKE ?
AND content != '' AND content IS NOT NULL
ORDER BY timestamp DESC
LIMIT ?
) ORDER BY timestamp
SELECT id, chat_jid, sender, sender_name, content, timestamp
FROM messages
WHERE chat_jid = ? AND timestamp > ?
AND is_bot_message = 0 AND content NOT LIKE ?
AND content != '' AND content IS NOT NULL
ORDER BY timestamp
`;
return db
.prepare(sql)
.all(chatJid, sinceTimestamp, `${botPrefix}:%`, limit) as NewMessage[];
.all(chatJid, sinceTimestamp, `${botPrefix}:%`) as NewMessage[];
}
export function createTask(
@@ -447,9 +428,10 @@ export function updateTask(
}
export function deleteTask(id: string): void {
// Delete child records first (FK constraint)
db.prepare('DELETE FROM task_run_logs WHERE task_id = ?').run(id);
db.prepare('DELETE FROM scheduled_tasks WHERE id = ?').run(id);
db.transaction(() => {
db.prepare('DELETE FROM task_run_logs WHERE task_id = ?').run(id);
db.prepare('DELETE FROM scheduled_tasks WHERE id = ?').run(id);
})();
}
export function getDueTasks(): ScheduledTask[] {
@@ -526,6 +508,10 @@ export function setSession(groupFolder: string, sessionId: string): void {
).run(groupFolder, sessionId);
}
export function deleteSession(groupFolder: string): void {
db.prepare('DELETE FROM sessions WHERE group_folder = ?').run(groupFolder);
}
export function getAllSessions(): Record<string, string> {
const rows = db
.prepare('SELECT group_folder, session_id FROM sessions')
@@ -553,7 +539,6 @@ export function getRegisteredGroup(
added_at: string;
container_config: string | null;
requires_trigger: number | null;
is_main: number | null;
}
| undefined;
if (!row) return undefined;
@@ -575,7 +560,6 @@ export function getRegisteredGroup(
: undefined,
requiresTrigger:
row.requires_trigger === null ? undefined : row.requires_trigger === 1,
isMain: row.is_main === 1 ? true : undefined,
};
}
@@ -584,8 +568,8 @@ export function setRegisteredGroup(jid: string, group: RegisteredGroup): void {
throw new Error(`Invalid group folder "${group.folder}" for JID ${jid}`);
}
db.prepare(
`INSERT OR REPLACE INTO registered_groups (jid, name, folder, trigger_pattern, added_at, container_config, requires_trigger, is_main)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
`INSERT OR REPLACE INTO registered_groups (jid, name, folder, trigger_pattern, added_at, container_config, requires_trigger)
VALUES (?, ?, ?, ?, ?, ?, ?)`,
).run(
jid,
group.name,
@@ -594,7 +578,6 @@ export function setRegisteredGroup(jid: string, group: RegisteredGroup): void {
group.added_at,
group.containerConfig ? JSON.stringify(group.containerConfig) : null,
group.requiresTrigger === undefined ? 1 : group.requiresTrigger ? 1 : 0,
group.isMain ? 1 : 0,
);
}
@@ -607,7 +590,6 @@ export function getAllRegisteredGroups(): Record<string, RegisteredGroup> {
added_at: string;
container_config: string | null;
requires_trigger: number | null;
is_main: number | null;
}>;
const result: Record<string, RegisteredGroup> = {};
for (const row of rows) {
@@ -628,7 +610,6 @@ export function getAllRegisteredGroups(): Record<string, RegisteredGroup> {
: undefined,
requiresTrigger:
row.requires_trigger === null ? undefined : row.requires_trigger === 1,
isMain: row.is_main === 1 ? true : undefined,
};
}
return result;

View File

@@ -58,14 +58,13 @@ describe('escapeXml', () => {
// --- formatMessages ---
describe('formatMessages', () => {
const TZ = 'UTC';
it('formats a single message as XML with context header', () => {
const result = formatMessages([makeMsg()], TZ);
expect(result).toContain('<context timezone="UTC" />');
expect(result).toContain('<message sender="Alice"');
expect(result).toContain('>hello</message>');
expect(result).toContain('Jan 1, 2024');
it('formats a single message as XML', () => {
const result = formatMessages([makeMsg()]);
expect(result).toBe(
'<messages>\n' +
'<message sender="Alice" time="2024-01-01T00:00:00.000Z">hello</message>\n' +
'</messages>',
);
});
it('formats multiple messages', () => {
@@ -74,16 +73,11 @@ describe('formatMessages', () => {
id: '1',
sender_name: 'Alice',
content: 'hi',
timestamp: '2024-01-01T00:00:00.000Z',
}),
makeMsg({
id: '2',
sender_name: 'Bob',
content: 'hey',
timestamp: '2024-01-01T01:00:00.000Z',
timestamp: 't1',
}),
makeMsg({ id: '2', sender_name: 'Bob', content: 'hey', timestamp: 't2' }),
];
const result = formatMessages(msgs, TZ);
const result = formatMessages(msgs);
expect(result).toContain('sender="Alice"');
expect(result).toContain('sender="Bob"');
expect(result).toContain('>hi</message>');
@@ -91,35 +85,22 @@ describe('formatMessages', () => {
});
it('escapes special characters in sender names', () => {
const result = formatMessages([makeMsg({ sender_name: 'A & B <Co>' })], TZ);
const result = formatMessages([makeMsg({ sender_name: 'A & B <Co>' })]);
expect(result).toContain('sender="A &amp; B &lt;Co&gt;"');
});
it('escapes special characters in content', () => {
const result = formatMessages(
[makeMsg({ content: '<script>alert("xss")</script>' })],
TZ,
);
const result = formatMessages([
makeMsg({ content: '<script>alert("xss")</script>' }),
]);
expect(result).toContain(
'&lt;script&gt;alert(&quot;xss&quot;)&lt;/script&gt;',
);
});
it('handles empty array', () => {
const result = formatMessages([], TZ);
expect(result).toContain('<context timezone="UTC" />');
expect(result).toContain('<messages>\n\n</messages>');
});
it('converts timestamps to local time for given timezone', () => {
// 2024-01-01T18:30:00Z in America/New_York (EST) = 1:30 PM
const result = formatMessages(
[makeMsg({ timestamp: '2024-01-01T18:30:00.000Z' })],
'America/New_York',
);
expect(result).toContain('1:30');
expect(result).toContain('PM');
expect(result).toContain('<context timezone="America/New_York" />');
const result = formatMessages([]);
expect(result).toBe('<messages>\n\n</messages>');
});
});

View File

@@ -243,41 +243,6 @@ describe('GroupQueue', () => {
expect(processed).toContain('group3@g.us');
});
// --- Running task dedup (Issue #138) ---
it('rejects duplicate enqueue of a currently-running task', async () => {
let resolveTask: () => void;
let taskCallCount = 0;
const taskFn = vi.fn(async () => {
taskCallCount++;
await new Promise<void>((resolve) => {
resolveTask = resolve;
});
});
// Start the task (runs immediately — slot available)
queue.enqueueTask('group1@g.us', 'task-1', taskFn);
await vi.advanceTimersByTimeAsync(10);
expect(taskCallCount).toBe(1);
// Scheduler poll re-discovers the same task while it's running —
// this must be silently dropped
const dupFn = vi.fn(async () => {});
queue.enqueueTask('group1@g.us', 'task-1', dupFn);
await vi.advanceTimersByTimeAsync(10);
// Duplicate was NOT queued
expect(dupFn).not.toHaveBeenCalled();
// Complete the original task
resolveTask!();
await vi.advanceTimersByTimeAsync(10);
// Only one execution total
expect(taskCallCount).toBe(1);
});
// --- Idle preemption ---
it('does NOT preempt active container when not idle', async () => {

View File

@@ -18,12 +18,13 @@ interface GroupState {
active: boolean;
idleWaiting: boolean;
isTaskContainer: boolean;
runningTaskId: string | null;
pendingMessages: boolean;
pendingTasks: QueuedTask[];
process: ChildProcess | null;
containerName: string | null;
groupFolder: string | null;
/** IPC dir name — may differ from groupFolder for task containers (e.g. 'main-task'). */
ipcFolder: string | null;
retryCount: number;
}
@@ -42,12 +43,12 @@ export class GroupQueue {
active: false,
idleWaiting: false,
isTaskContainer: false,
runningTaskId: null,
pendingMessages: false,
pendingTasks: [],
process: null,
containerName: null,
groupFolder: null,
ipcFolder: null,
retryCount: 0,
};
this.groups.set(groupJid, state);
@@ -92,11 +93,7 @@ export class GroupQueue {
const state = this.getGroup(groupJid);
// Prevent double-queuing: check both pending and currently-running task
if (state.runningTaskId === taskId) {
logger.debug({ groupJid, taskId }, 'Task already running, skipping');
return;
}
// Prevent double-queuing of the same task
if (state.pendingTasks.some((t) => t.id === taskId)) {
logger.debug({ groupJid, taskId }, 'Task already queued, skipping');
return;
@@ -134,11 +131,13 @@ export class GroupQueue {
proc: ChildProcess,
containerName: string,
groupFolder?: string,
ipcFolder?: string,
): void {
const state = this.getGroup(groupJid);
state.process = proc;
state.containerName = containerName;
if (groupFolder) state.groupFolder = groupFolder;
state.ipcFolder = ipcFolder ?? groupFolder ?? null;
}
/**
@@ -159,11 +158,11 @@ export class GroupQueue {
*/
sendMessage(groupJid: string, text: string): boolean {
const state = this.getGroup(groupJid);
if (!state.active || !state.groupFolder || state.isTaskContainer)
if (!state.active || !state.ipcFolder || state.isTaskContainer)
return false;
state.idleWaiting = false; // Agent is about to receive work, no longer idle
const inputDir = path.join(DATA_DIR, 'ipc', state.groupFolder, 'input');
const inputDir = path.join(DATA_DIR, 'ipc', state.ipcFolder, 'input');
try {
fs.mkdirSync(inputDir, { recursive: true });
const filename = `${Date.now()}-${Math.random().toString(36).slice(2, 6)}.json`;
@@ -182,9 +181,9 @@ export class GroupQueue {
*/
closeStdin(groupJid: string): void {
const state = this.getGroup(groupJid);
if (!state.active || !state.groupFolder) return;
if (!state.active || !state.ipcFolder) return;
const inputDir = path.join(DATA_DIR, 'ipc', state.groupFolder, 'input');
const inputDir = path.join(DATA_DIR, 'ipc', state.ipcFolder, 'input');
try {
fs.mkdirSync(inputDir, { recursive: true });
fs.writeFileSync(path.join(inputDir, '_close'), '');
@@ -226,6 +225,7 @@ export class GroupQueue {
state.process = null;
state.containerName = null;
state.groupFolder = null;
state.ipcFolder = null;
this.activeCount--;
this.drainGroup(groupJid);
}
@@ -236,7 +236,6 @@ export class GroupQueue {
state.active = true;
state.idleWaiting = false;
state.isTaskContainer = true;
state.runningTaskId = task.id;
this.activeCount++;
logger.debug(
@@ -251,10 +250,10 @@ export class GroupQueue {
} finally {
state.active = false;
state.isTaskContainer = false;
state.runningTaskId = null;
state.process = null;
state.containerName = null;
state.groupFolder = null;
state.ipcFolder = null;
this.activeCount--;
this.drainGroup(groupJid);
}

View File

@@ -3,18 +3,16 @@ import path from 'path';
import {
ASSISTANT_NAME,
CREDENTIAL_PROXY_PORT,
DATA_DIR,
IDLE_TIMEOUT,
MAIN_GROUP_FOLDER,
POLL_INTERVAL,
TIMEZONE,
TELEGRAM_BOT_TOKEN,
TELEGRAM_ONLY,
TRIGGER_PATTERN,
} from './config.js';
import { startCredentialProxy } from './credential-proxy.js';
import './channels/index.js';
import {
getChannelFactory,
getRegisteredChannelNames,
} from './channels/registry.js';
import { WhatsAppChannel } from './channels/whatsapp.js';
import { TelegramChannel } from './channels/telegram.js';
import {
ContainerOutput,
runContainerAgent,
@@ -22,18 +20,14 @@ import {
writeTasksSnapshot,
} from './container-runner.js';
import {
cleanupOrphans,
ensureContainerRuntimeRunning,
PROXY_BIND_HOST,
} from './container-runtime.js';
import {
closeDatabase,
deleteSession,
getAllChats,
getAllRegisteredGroups,
getAllSessions,
getAllTasks,
getMessagesSince,
getNewMessages,
getRegisteredGroup,
getRouterState,
initDatabase,
setRegisteredGroup,
@@ -43,18 +37,16 @@ import {
storeMessage,
} from './db.js';
import { GroupQueue } from './group-queue.js';
import { resolveGroupFolderPath } from './group-folder.js';
import { startIpcWatcher } from './ipc.js';
import { findChannel, formatMessages, formatOutbound } from './router.js';
import {
isSenderAllowed,
isTriggerAllowed,
loadSenderAllowlist,
shouldDropMessage,
} from './sender-allowlist.js';
import { startSchedulerLoop } from './task-scheduler.js';
import { Channel, NewMessage, RegisteredGroup } from './types.js';
import { logger } from './logger.js';
import {
ensureContainerRuntimeRunning,
cleanupOrphans,
} from './container-runtime.js';
import { resolveGroupFolderPath } from './group-folder.js';
// Re-export for backwards compatibility during refactor
export { escapeXml, formatMessages } from './router.js';
@@ -65,6 +57,7 @@ let registeredGroups: Record<string, RegisteredGroup> = {};
let lastAgentTimestamp: Record<string, string> = {};
let messageLoopRunning = false;
let whatsapp: WhatsAppChannel;
const channels: Channel[] = [];
const queue = new GroupQueue();
@@ -91,22 +84,24 @@ function saveState(): void {
}
function registerGroup(jid: string, group: RegisteredGroup): void {
let groupDir: string;
try {
groupDir = resolveGroupFolderPath(group.folder);
} catch (err) {
logger.warn(
{ jid, folder: group.folder, err },
'Rejecting group registration with invalid folder',
);
return;
}
registeredGroups[jid] = group;
// Validate + persist first (setRegisteredGroup throws on invalid folder)
setRegisteredGroup(jid, group);
// Create group folder
fs.mkdirSync(path.join(groupDir, 'logs'), { recursive: true });
// Create group folder after DB write so a mkdir failure doesn't leave
// a DB record with no matching directory
const groupDir = resolveGroupFolderPath(group.folder);
try {
fs.mkdirSync(path.join(groupDir, 'logs'), { recursive: true });
} catch (err) {
logger.error(
{ jid, folder: group.folder, err },
'Failed to create group directory',
);
// Don't throw — DB record is written; directory can be created on next restart
}
// Update in-memory state after I/O succeeds
registeredGroups[jid] = group;
logger.info(
{ jid, name: group.name, folder: group.folder },
@@ -153,7 +148,7 @@ async function processGroupMessages(chatJid: string): Promise<boolean> {
return true;
}
const isMainGroup = group.isMain === true;
const isMainGroup = group.folder === MAIN_GROUP_FOLDER;
const sinceTimestamp = lastAgentTimestamp[chatJid] || '';
const missedMessages = getMessagesSince(
@@ -166,16 +161,13 @@ async function processGroupMessages(chatJid: string): Promise<boolean> {
// For non-main groups, check if trigger is required and present
if (!isMainGroup && group.requiresTrigger !== false) {
const allowlistCfg = loadSenderAllowlist();
const hasTrigger = missedMessages.some(
(m) =>
TRIGGER_PATTERN.test(m.content.trim()) &&
(m.is_from_me || isTriggerAllowed(chatJid, m.sender, allowlistCfg)),
const hasTrigger = missedMessages.some((m) =>
TRIGGER_PATTERN.test(m.content.trim()),
);
if (!hasTrigger) return true;
}
const prompt = formatMessages(missedMessages, TIMEZONE);
const prompt = formatMessages(missedMessages);
// Advance cursor so the piping path in startMessageLoop won't re-fetch
// these messages. Save the old cursor so we can roll back on error.
@@ -225,10 +217,6 @@ async function processGroupMessages(chatJid: string): Promise<boolean> {
resetIdleTimer();
}
if (result.status === 'success') {
queue.notifyIdle(chatJid);
}
if (result.status === 'error') {
hadError = true;
}
@@ -266,7 +254,7 @@ async function runAgent(
chatJid: string,
onOutput?: (output: ContainerOutput) => Promise<void>,
): Promise<'success' | 'error'> {
const isMain = group.isMain === true;
const isMain = group.folder === MAIN_GROUP_FOLDER;
const sessionId = sessions[group.folder];
// Update tasks snapshot for container to read (filtered by group)
@@ -314,7 +302,6 @@ async function runAgent(
groupFolder: group.folder,
chatJid,
isMain,
assistantName: ASSISTANT_NAME,
},
(proc, containerName) =>
queue.registerProcess(chatJid, proc, containerName, group.folder),
@@ -350,6 +337,7 @@ async function startMessageLoop(): Promise<void> {
logger.info(`NanoClaw running (trigger: @${ASSISTANT_NAME})`);
let consecutiveErrors = 0;
while (true) {
try {
const jids = Object.keys(registeredGroups);
@@ -387,19 +375,15 @@ async function startMessageLoop(): Promise<void> {
continue;
}
const isMainGroup = group.isMain === true;
const isMainGroup = group.folder === MAIN_GROUP_FOLDER;
const needsTrigger = !isMainGroup && group.requiresTrigger !== false;
// For non-main groups, only act on trigger messages.
// Non-trigger messages accumulate in DB and get pulled as
// context when a trigger eventually arrives.
if (needsTrigger) {
const allowlistCfg = loadSenderAllowlist();
const hasTrigger = groupMessages.some(
(m) =>
TRIGGER_PATTERN.test(m.content.trim()) &&
(m.is_from_me ||
isTriggerAllowed(chatJid, m.sender, allowlistCfg)),
const hasTrigger = groupMessages.some((m) =>
TRIGGER_PATTERN.test(m.content.trim()),
);
if (!hasTrigger) continue;
}
@@ -413,7 +397,7 @@ async function startMessageLoop(): Promise<void> {
);
const messagesToSend =
allPending.length > 0 ? allPending : groupMessages;
const formatted = formatMessages(messagesToSend, TIMEZONE);
const formatted = formatMessages(messagesToSend);
if (queue.sendMessage(chatJid, formatted)) {
logger.debug(
@@ -435,8 +419,17 @@ async function startMessageLoop(): Promise<void> {
}
}
}
consecutiveErrors = 0;
} catch (err) {
logger.error({ err }, 'Error in message loop');
consecutiveErrors++;
if (consecutiveErrors === 1) {
logger.error({ err }, 'Error in message loop');
} else if (consecutiveErrors % 10 === 0) {
logger.error(
{ err, consecutiveErrors },
'Message loop error persisting — check DB and disk',
);
}
}
await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL));
}
@@ -460,29 +453,19 @@ function recoverPendingMessages(): void {
}
}
function ensureContainerSystemRunning(): void {
async function main(): Promise<void> {
ensureContainerRuntimeRunning();
cleanupOrphans();
}
async function main(): Promise<void> {
ensureContainerSystemRunning();
initDatabase();
logger.info('Database initialized');
loadState();
// Start credential proxy (containers route API calls through this)
const proxyServer = await startCredentialProxy(
CREDENTIAL_PROXY_PORT,
PROXY_BIND_HOST,
);
// Graceful shutdown handlers
const shutdown = async (signal: string) => {
logger.info({ signal }, 'Shutdown signal received');
proxyServer.close();
await queue.shutdown(10000);
for (const ch of channels) await ch.disconnect();
closeDatabase();
process.exit(0);
};
process.on('SIGTERM', () => shutdown('SIGTERM'));
@@ -490,25 +473,7 @@ async function main(): Promise<void> {
// Channel callbacks (shared by all channels)
const channelOpts = {
onMessage: (chatJid: string, msg: NewMessage) => {
// Sender allowlist drop mode: discard messages from denied senders before storing
if (!msg.is_from_me && !msg.is_bot_message && registeredGroups[chatJid]) {
const cfg = loadSenderAllowlist();
if (
shouldDropMessage(chatJid, cfg) &&
!isSenderAllowed(chatJid, msg.sender, cfg)
) {
if (cfg.logDenied) {
logger.debug(
{ chatJid, sender: msg.sender },
'sender-allowlist: dropping message (drop mode)',
);
}
return;
}
}
storeMessage(msg);
},
onMessage: (_chatJid: string, msg: NewMessage) => storeMessage(msg),
onChatMetadata: (
chatJid: string,
timestamp: string,
@@ -517,27 +482,30 @@ async function main(): Promise<void> {
isGroup?: boolean,
) => storeChatMetadata(chatJid, timestamp, name, channel, isGroup),
registeredGroups: () => registeredGroups,
clearSession: (chatJid: string) => {
const group = registeredGroups[chatJid];
if (group) {
delete sessions[group.folder];
deleteSession(group.folder);
logger.info(
{ chatJid, folder: group.folder },
'Session cleared via /reset',
);
}
},
};
// Create and connect all registered channels.
// Each channel self-registers via the barrel import above.
// Factories return null when credentials are missing, so unconfigured channels are skipped.
for (const channelName of getRegisteredChannelNames()) {
const factory = getChannelFactory(channelName)!;
const channel = factory(channelOpts);
if (!channel) {
logger.warn(
{ channel: channelName },
'Channel installed but credentials missing — skipping. Check .env or re-run the channel skill.',
);
continue;
}
channels.push(channel);
await channel.connect();
// Create and connect channels
if (!TELEGRAM_ONLY) {
whatsapp = new WhatsAppChannel(channelOpts);
channels.push(whatsapp);
await whatsapp.connect();
}
if (channels.length === 0) {
logger.fatal('No channels connected');
process.exit(1);
if (TELEGRAM_BOT_TOKEN) {
const telegram = new TelegramChannel(TELEGRAM_BOT_TOKEN, channelOpts);
channels.push(telegram);
await telegram.connect();
}
// Start subsystems (independently of connection handler)
@@ -545,8 +513,14 @@ async function main(): Promise<void> {
registeredGroups: () => registeredGroups,
getSessions: () => sessions,
queue,
onProcess: (groupJid, proc, containerName, groupFolder) =>
queue.registerProcess(groupJid, proc, containerName, groupFolder),
onProcess: (queueKey, proc, containerName, groupFolder, ipcFolder) =>
queue.registerProcess(
queueKey,
proc,
containerName,
groupFolder,
ipcFolder,
),
sendMessage: async (jid, rawText) => {
const channel = findChannel(channels, jid);
if (!channel) {
@@ -565,23 +539,15 @@ async function main(): Promise<void> {
},
registeredGroups: () => registeredGroups,
registerGroup,
syncGroups: async (force: boolean) => {
await Promise.all(
channels
.filter((ch) => ch.syncGroups)
.map((ch) => ch.syncGroups!(force)),
);
},
syncGroupMetadata: (force) =>
whatsapp?.syncGroupMetadata(force) ?? Promise.resolve(),
getAvailableGroups,
writeGroupsSnapshot: (gf, im, ag, rj) =>
writeGroupsSnapshot(gf, im, ag, rj),
});
queue.setProcessMessagesFn(processGroupMessages);
recoverPendingMessages();
startMessageLoop().catch((err) => {
logger.fatal({ err }, 'Message loop crashed unexpectedly');
process.exit(1);
});
startMessageLoop();
}
// Guard: only run when executed directly, not when imported by tests

View File

@@ -14,10 +14,9 @@ import { RegisteredGroup } from './types.js';
// Set up registered groups used across tests
const MAIN_GROUP: RegisteredGroup = {
name: 'Main',
folder: 'whatsapp_main',
folder: 'main',
trigger: 'always',
added_at: '2024-01-01T00:00:00.000Z',
isMain: true,
};
const OTHER_GROUP: RegisteredGroup = {
@@ -59,7 +58,7 @@ beforeEach(() => {
setRegisteredGroup(jid, group);
// Mock the fs.mkdirSync that registerGroup does
},
syncGroups: async () => {},
syncGroupMetadata: async () => {},
getAvailableGroups: () => [],
writeGroupsSnapshot: () => {},
};
@@ -74,10 +73,10 @@ describe('schedule_task authorization', () => {
type: 'schedule_task',
prompt: 'do something',
schedule_type: 'once',
schedule_value: '2025-06-01T00:00:00',
schedule_value: '2025-06-01T00:00:00.000Z',
targetJid: 'other@g.us',
},
'whatsapp_main',
'main',
true,
deps,
);
@@ -94,7 +93,7 @@ describe('schedule_task authorization', () => {
type: 'schedule_task',
prompt: 'self task',
schedule_type: 'once',
schedule_value: '2025-06-01T00:00:00',
schedule_value: '2025-06-01T00:00:00.000Z',
targetJid: 'other@g.us',
},
'other-group',
@@ -113,7 +112,7 @@ describe('schedule_task authorization', () => {
type: 'schedule_task',
prompt: 'unauthorized',
schedule_type: 'once',
schedule_value: '2025-06-01T00:00:00',
schedule_value: '2025-06-01T00:00:00.000Z',
targetJid: 'main@g.us',
},
'other-group',
@@ -131,10 +130,10 @@ describe('schedule_task authorization', () => {
type: 'schedule_task',
prompt: 'no target',
schedule_type: 'once',
schedule_value: '2025-06-01T00:00:00',
schedule_value: '2025-06-01T00:00:00.000Z',
targetJid: 'unknown@g.us',
},
'whatsapp_main',
'main',
true,
deps,
);
@@ -150,11 +149,11 @@ describe('pause_task authorization', () => {
beforeEach(() => {
createTask({
id: 'task-main',
group_folder: 'whatsapp_main',
group_folder: 'main',
chat_jid: 'main@g.us',
prompt: 'main task',
schedule_type: 'once',
schedule_value: '2025-06-01T00:00:00',
schedule_value: '2025-06-01T00:00:00.000Z',
context_mode: 'isolated',
next_run: '2025-06-01T00:00:00.000Z',
status: 'active',
@@ -166,7 +165,7 @@ describe('pause_task authorization', () => {
chat_jid: 'other@g.us',
prompt: 'other task',
schedule_type: 'once',
schedule_value: '2025-06-01T00:00:00',
schedule_value: '2025-06-01T00:00:00.000Z',
context_mode: 'isolated',
next_run: '2025-06-01T00:00:00.000Z',
status: 'active',
@@ -177,7 +176,7 @@ describe('pause_task authorization', () => {
it('main group can pause any task', async () => {
await processTaskIpc(
{ type: 'pause_task', taskId: 'task-other' },
'whatsapp_main',
'main',
true,
deps,
);
@@ -215,7 +214,7 @@ describe('resume_task authorization', () => {
chat_jid: 'other@g.us',
prompt: 'paused task',
schedule_type: 'once',
schedule_value: '2025-06-01T00:00:00',
schedule_value: '2025-06-01T00:00:00.000Z',
context_mode: 'isolated',
next_run: '2025-06-01T00:00:00.000Z',
status: 'paused',
@@ -226,7 +225,7 @@ describe('resume_task authorization', () => {
it('main group can resume any task', async () => {
await processTaskIpc(
{ type: 'resume_task', taskId: 'task-paused' },
'whatsapp_main',
'main',
true,
deps,
);
@@ -264,7 +263,7 @@ describe('cancel_task authorization', () => {
chat_jid: 'other@g.us',
prompt: 'cancel me',
schedule_type: 'once',
schedule_value: '2025-06-01T00:00:00',
schedule_value: '2025-06-01T00:00:00.000Z',
context_mode: 'isolated',
next_run: null,
status: 'active',
@@ -273,7 +272,7 @@ describe('cancel_task authorization', () => {
await processTaskIpc(
{ type: 'cancel_task', taskId: 'task-to-cancel' },
'whatsapp_main',
'main',
true,
deps,
);
@@ -287,7 +286,7 @@ describe('cancel_task authorization', () => {
chat_jid: 'other@g.us',
prompt: 'my task',
schedule_type: 'once',
schedule_value: '2025-06-01T00:00:00',
schedule_value: '2025-06-01T00:00:00.000Z',
context_mode: 'isolated',
next_run: null,
status: 'active',
@@ -306,11 +305,11 @@ describe('cancel_task authorization', () => {
it('non-main group cannot cancel another groups task', async () => {
createTask({
id: 'task-foreign',
group_folder: 'whatsapp_main',
group_folder: 'main',
chat_jid: 'main@g.us',
prompt: 'not yours',
schedule_type: 'once',
schedule_value: '2025-06-01T00:00:00',
schedule_value: '2025-06-01T00:00:00.000Z',
context_mode: 'isolated',
next_run: null,
status: 'active',
@@ -357,7 +356,7 @@ describe('register_group authorization', () => {
folder: '../../outside',
trigger: '@Andy',
},
'whatsapp_main',
'main',
true,
deps,
);
@@ -398,12 +397,8 @@ describe('IPC message authorization', () => {
}
it('main group can send to any group', () => {
expect(
isMessageAuthorized('whatsapp_main', true, 'other@g.us', groups),
).toBe(true);
expect(
isMessageAuthorized('whatsapp_main', true, 'third@g.us', groups),
).toBe(true);
expect(isMessageAuthorized('main', true, 'other@g.us', groups)).toBe(true);
expect(isMessageAuthorized('main', true, 'third@g.us', groups)).toBe(true);
});
it('non-main group can send to its own chat', () => {
@@ -429,9 +424,9 @@ describe('IPC message authorization', () => {
it('main group can send to unregistered JID', () => {
// Main is always authorized regardless of target
expect(
isMessageAuthorized('whatsapp_main', true, 'unknown@g.us', groups),
).toBe(true);
expect(isMessageAuthorized('main', true, 'unknown@g.us', groups)).toBe(
true,
);
});
});
@@ -447,7 +442,7 @@ describe('schedule_task schedule types', () => {
schedule_value: '0 9 * * *', // every day at 9am
targetJid: 'other@g.us',
},
'whatsapp_main',
'main',
true,
deps,
);
@@ -471,7 +466,7 @@ describe('schedule_task schedule types', () => {
schedule_value: 'not a cron',
targetJid: 'other@g.us',
},
'whatsapp_main',
'main',
true,
deps,
);
@@ -490,7 +485,7 @@ describe('schedule_task schedule types', () => {
schedule_value: '3600000', // 1 hour
targetJid: 'other@g.us',
},
'whatsapp_main',
'main',
true,
deps,
);
@@ -513,7 +508,7 @@ describe('schedule_task schedule types', () => {
schedule_value: 'abc',
targetJid: 'other@g.us',
},
'whatsapp_main',
'main',
true,
deps,
);
@@ -530,7 +525,7 @@ describe('schedule_task schedule types', () => {
schedule_value: '0',
targetJid: 'other@g.us',
},
'whatsapp_main',
'main',
true,
deps,
);
@@ -547,7 +542,7 @@ describe('schedule_task schedule types', () => {
schedule_value: 'not-a-date',
targetJid: 'other@g.us',
},
'whatsapp_main',
'main',
true,
deps,
);
@@ -565,11 +560,11 @@ describe('schedule_task context_mode', () => {
type: 'schedule_task',
prompt: 'group context',
schedule_type: 'once',
schedule_value: '2025-06-01T00:00:00',
schedule_value: '2025-06-01T00:00:00.000Z',
context_mode: 'group',
targetJid: 'other@g.us',
},
'whatsapp_main',
'main',
true,
deps,
);
@@ -584,11 +579,11 @@ describe('schedule_task context_mode', () => {
type: 'schedule_task',
prompt: 'isolated context',
schedule_type: 'once',
schedule_value: '2025-06-01T00:00:00',
schedule_value: '2025-06-01T00:00:00.000Z',
context_mode: 'isolated',
targetJid: 'other@g.us',
},
'whatsapp_main',
'main',
true,
deps,
);
@@ -603,11 +598,11 @@ describe('schedule_task context_mode', () => {
type: 'schedule_task',
prompt: 'bad context',
schedule_type: 'once',
schedule_value: '2025-06-01T00:00:00',
schedule_value: '2025-06-01T00:00:00.000Z',
context_mode: 'bogus' as any,
targetJid: 'other@g.us',
},
'whatsapp_main',
'main',
true,
deps,
);
@@ -622,10 +617,10 @@ describe('schedule_task context_mode', () => {
type: 'schedule_task',
prompt: 'no context mode',
schedule_type: 'once',
schedule_value: '2025-06-01T00:00:00',
schedule_value: '2025-06-01T00:00:00.000Z',
targetJid: 'other@g.us',
},
'whatsapp_main',
'main',
true,
deps,
);
@@ -647,7 +642,7 @@ describe('register_group success', () => {
folder: 'new-group',
trigger: '@Andy',
},
'whatsapp_main',
'main',
true,
deps,
);
@@ -668,7 +663,7 @@ describe('register_group success', () => {
name: 'Partial',
// missing folder and trigger
},
'whatsapp_main',
'main',
true,
deps,
);

View File

@@ -3,7 +3,12 @@ import path from 'path';
import { CronExpressionParser } from 'cron-parser';
import { DATA_DIR, IPC_POLL_INTERVAL, TIMEZONE } from './config.js';
import {
DATA_DIR,
IPC_POLL_INTERVAL,
MAIN_GROUP_FOLDER,
TIMEZONE,
} from './config.js';
import { AvailableGroup } from './container-runner.js';
import { createTask, deleteTask, getTaskById, updateTask } from './db.js';
import { isValidGroupFolder } from './group-folder.js';
@@ -14,7 +19,7 @@ export interface IpcDeps {
sendMessage: (jid: string, text: string) => Promise<void>;
registeredGroups: () => Record<string, RegisteredGroup>;
registerGroup: (jid: string, group: RegisteredGroup) => void;
syncGroups: (force: boolean) => Promise<void>;
syncGroupMetadata: (force: boolean) => Promise<void>;
getAvailableGroups: () => AvailableGroup[];
writeGroupsSnapshot: (
groupFolder: string,
@@ -52,14 +57,13 @@ export function startIpcWatcher(deps: IpcDeps): void {
const registeredGroups = deps.registeredGroups();
// Build folder→isMain lookup from registered groups
const folderIsMain = new Map<string, boolean>();
for (const group of Object.values(registeredGroups)) {
if (group.isMain) folderIsMain.set(group.folder, true);
}
for (const sourceGroup of groupFolders) {
const isMain = folderIsMain.get(sourceGroup) === true;
// Strip '-task' suffix so task containers (e.g. 'main-task') are authorized
// as their base group ('main') for all IPC operations.
const baseFolder = sourceGroup.endsWith('-task')
? sourceGroup.slice(0, -5)
: sourceGroup;
const isMain = baseFolder === MAIN_GROUP_FOLDER;
const messagesDir = path.join(ipcBaseDir, sourceGroup, 'messages');
const tasksDir = path.join(ipcBaseDir, sourceGroup, 'tasks');
@@ -78,7 +82,9 @@ export function startIpcWatcher(deps: IpcDeps): void {
const targetGroup = registeredGroups[data.chatJid];
if (
isMain ||
(targetGroup && targetGroup.folder === sourceGroup)
(targetGroup &&
(targetGroup.folder === sourceGroup ||
targetGroup.folder === baseFolder))
) {
await deps.sendMessage(data.chatJid, data.text);
logger.info(
@@ -177,6 +183,9 @@ export async function processTaskIpc(
deps: IpcDeps,
): Promise<void> {
const registeredGroups = deps.registeredGroups();
const baseFolder = sourceGroup.endsWith('-task')
? sourceGroup.slice(0, -5)
: sourceGroup;
switch (data.type) {
case 'schedule_task':
@@ -201,7 +210,11 @@ export async function processTaskIpc(
const targetFolder = targetGroupEntry.folder;
// Authorization: non-main groups can only schedule for themselves
if (!isMain && targetFolder !== sourceGroup) {
if (
!isMain &&
targetFolder !== sourceGroup &&
targetFolder !== baseFolder
) {
logger.warn(
{ sourceGroup, targetFolder },
'Unauthorized schedule_task attempt blocked',
@@ -236,20 +249,18 @@ export async function processTaskIpc(
}
nextRun = new Date(Date.now() + ms).toISOString();
} else if (scheduleType === 'once') {
const date = new Date(data.schedule_value);
if (isNaN(date.getTime())) {
const scheduled = new Date(data.schedule_value);
if (isNaN(scheduled.getTime())) {
logger.warn(
{ scheduleValue: data.schedule_value },
'Invalid timestamp',
);
break;
}
nextRun = date.toISOString();
nextRun = scheduled.toISOString();
}
const taskId =
data.taskId ||
`task-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
const taskId = `task-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
const contextMode =
data.context_mode === 'group' || data.context_mode === 'isolated'
? data.context_mode
@@ -276,7 +287,12 @@ export async function processTaskIpc(
case 'pause_task':
if (data.taskId) {
const task = getTaskById(data.taskId);
if (task && (isMain || task.group_folder === sourceGroup)) {
if (
task &&
(isMain ||
task.group_folder === sourceGroup ||
task.group_folder === baseFolder)
) {
updateTask(data.taskId, { status: 'paused' });
logger.info(
{ taskId: data.taskId, sourceGroup },
@@ -294,7 +310,12 @@ export async function processTaskIpc(
case 'resume_task':
if (data.taskId) {
const task = getTaskById(data.taskId);
if (task && (isMain || task.group_folder === sourceGroup)) {
if (
task &&
(isMain ||
task.group_folder === sourceGroup ||
task.group_folder === baseFolder)
) {
updateTask(data.taskId, { status: 'active' });
logger.info(
{ taskId: data.taskId, sourceGroup },
@@ -312,7 +333,12 @@ export async function processTaskIpc(
case 'cancel_task':
if (data.taskId) {
const task = getTaskById(data.taskId);
if (task && (isMain || task.group_folder === sourceGroup)) {
if (
task &&
(isMain ||
task.group_folder === sourceGroup ||
task.group_folder === baseFolder)
) {
deleteTask(data.taskId);
logger.info(
{ taskId: data.taskId, sourceGroup },
@@ -327,70 +353,6 @@ export async function processTaskIpc(
}
break;
case 'update_task':
if (data.taskId) {
const task = getTaskById(data.taskId);
if (!task) {
logger.warn(
{ taskId: data.taskId, sourceGroup },
'Task not found for update',
);
break;
}
if (!isMain && task.group_folder !== sourceGroup) {
logger.warn(
{ taskId: data.taskId, sourceGroup },
'Unauthorized task update attempt',
);
break;
}
const updates: Parameters<typeof updateTask>[1] = {};
if (data.prompt !== undefined) updates.prompt = data.prompt;
if (data.schedule_type !== undefined)
updates.schedule_type = data.schedule_type as
| 'cron'
| 'interval'
| 'once';
if (data.schedule_value !== undefined)
updates.schedule_value = data.schedule_value;
// Recompute next_run if schedule changed
if (data.schedule_type || data.schedule_value) {
const updatedTask = {
...task,
...updates,
};
if (updatedTask.schedule_type === 'cron') {
try {
const interval = CronExpressionParser.parse(
updatedTask.schedule_value,
{ tz: TIMEZONE },
);
updates.next_run = interval.next().toISOString();
} catch {
logger.warn(
{ taskId: data.taskId, value: updatedTask.schedule_value },
'Invalid cron in task update',
);
break;
}
} else if (updatedTask.schedule_type === 'interval') {
const ms = parseInt(updatedTask.schedule_value, 10);
if (!isNaN(ms) && ms > 0) {
updates.next_run = new Date(Date.now() + ms).toISOString();
}
}
}
updateTask(data.taskId, updates);
logger.info(
{ taskId: data.taskId, sourceGroup, updates },
'Task updated via IPC',
);
}
break;
case 'refresh_groups':
// Only main group can request a refresh
if (isMain) {
@@ -398,7 +360,7 @@ export async function processTaskIpc(
{ sourceGroup },
'Group metadata refresh requested via IPC',
);
await deps.syncGroups(true);
await deps.syncGroupMetadata(true);
// Write updated snapshot immediately
const availableGroups = deps.getAvailableGroups();
deps.writeGroupsSnapshot(
@@ -432,7 +394,6 @@ export async function processTaskIpc(
);
break;
}
// Defense in depth: agent cannot set isMain via IPC
deps.registerGroup(data.jid, {
name: data.name,
folder: data.folder,

View File

@@ -1,5 +1,4 @@
import { Channel, NewMessage } from './types.js';
import { formatLocalTime } from './timezone.js';
export function escapeXml(s: string): string {
if (!s) return '';
@@ -10,18 +9,12 @@ export function escapeXml(s: string): string {
.replace(/"/g, '&quot;');
}
export function formatMessages(
messages: NewMessage[],
timezone: string,
): string {
const lines = messages.map((m) => {
const displayTime = formatLocalTime(m.timestamp, timezone);
return `<message sender="${escapeXml(m.sender_name)}" time="${escapeXml(displayTime)}">${escapeXml(m.content)}</message>`;
});
const header = `<context timezone="${escapeXml(timezone)}" />\n`;
return `${header}<messages>\n${lines.join('\n')}\n</messages>`;
export function formatMessages(messages: NewMessage[]): string {
const lines = messages.map(
(m) =>
`<message sender="${escapeXml(m.sender_name)}" time="${m.timestamp}">${escapeXml(m.content)}</message>`,
);
return `<messages>\n${lines.join('\n')}\n</messages>`;
}
export function stripInternalTags(text: string): string {

View File

@@ -22,6 +22,16 @@ describe('JID ownership patterns', () => {
const jid = '12345678@s.whatsapp.net';
expect(jid.endsWith('@s.whatsapp.net')).toBe(true);
});
it('Telegram JID: starts with tg:', () => {
const jid = 'tg:123456789';
expect(jid.startsWith('tg:')).toBe(true);
});
it('Telegram group JID: starts with tg: and has negative ID', () => {
const jid = 'tg:-1001234567890';
expect(jid.startsWith('tg:')).toBe(true);
});
});
// --- getAvailableGroups ---
@@ -167,4 +177,103 @@ describe('getAvailableGroups', () => {
const groups = getAvailableGroups();
expect(groups).toHaveLength(0);
});
it('includes Telegram chat JIDs', () => {
storeChatMetadata(
'tg:100200300',
'2024-01-01T00:00:01.000Z',
'Telegram Chat',
'telegram',
true,
);
storeChatMetadata(
'user@s.whatsapp.net',
'2024-01-01T00:00:02.000Z',
'User DM',
'whatsapp',
false,
);
const groups = getAvailableGroups();
expect(groups).toHaveLength(1);
expect(groups[0].jid).toBe('tg:100200300');
});
it('returns Telegram group JIDs with negative IDs', () => {
storeChatMetadata(
'tg:-1001234567890',
'2024-01-01T00:00:01.000Z',
'TG Group',
'telegram',
true,
);
const groups = getAvailableGroups();
expect(groups).toHaveLength(1);
expect(groups[0].jid).toBe('tg:-1001234567890');
expect(groups[0].name).toBe('TG Group');
});
it('marks registered Telegram chats correctly', () => {
storeChatMetadata(
'tg:100200300',
'2024-01-01T00:00:01.000Z',
'TG Registered',
'telegram',
true,
);
storeChatMetadata(
'tg:999999',
'2024-01-01T00:00:02.000Z',
'TG Unregistered',
'telegram',
true,
);
_setRegisteredGroups({
'tg:100200300': {
name: 'TG Registered',
folder: 'tg-registered',
trigger: '@Andy',
added_at: '2024-01-01T00:00:00.000Z',
},
});
const groups = getAvailableGroups();
const tgReg = groups.find((g) => g.jid === 'tg:100200300');
const tgUnreg = groups.find((g) => g.jid === 'tg:999999');
expect(tgReg?.isRegistered).toBe(true);
expect(tgUnreg?.isRegistered).toBe(false);
});
it('mixes WhatsApp and Telegram chats ordered by activity', () => {
storeChatMetadata(
'wa@g.us',
'2024-01-01T00:00:01.000Z',
'WhatsApp',
'whatsapp',
true,
);
storeChatMetadata(
'tg:100',
'2024-01-01T00:00:03.000Z',
'Telegram',
'telegram',
true,
);
storeChatMetadata(
'wa2@g.us',
'2024-01-01T00:00:02.000Z',
'WhatsApp 2',
'whatsapp',
true,
);
const groups = getAvailableGroups();
expect(groups).toHaveLength(3);
expect(groups[0].jid).toBe('tg:100');
expect(groups[1].jid).toBe('wa2@g.us');
expect(groups[2].jid).toBe('wa@g.us');
});
});

View File

@@ -3,7 +3,6 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import { _initTestDatabase, createTask, getTaskById } from './db.js';
import {
_resetSchedulerLoopForTests,
computeNextRun,
startSchedulerLoop,
} from './task-scheduler.js';
@@ -51,79 +50,4 @@ describe('task scheduler', () => {
const task = getTaskById('task-invalid-folder');
expect(task?.status).toBe('paused');
});
it('computeNextRun anchors interval tasks to scheduled time to prevent drift', () => {
const scheduledTime = new Date(Date.now() - 2000).toISOString(); // 2s ago
const task = {
id: 'drift-test',
group_folder: 'test',
chat_jid: 'test@g.us',
prompt: 'test',
schedule_type: 'interval' as const,
schedule_value: '60000', // 1 minute
context_mode: 'isolated' as const,
next_run: scheduledTime,
last_run: null,
last_result: null,
status: 'active' as const,
created_at: '2026-01-01T00:00:00.000Z',
};
const nextRun = computeNextRun(task);
expect(nextRun).not.toBeNull();
// Should be anchored to scheduledTime + 60s, NOT Date.now() + 60s
const expected = new Date(scheduledTime).getTime() + 60000;
expect(new Date(nextRun!).getTime()).toBe(expected);
});
it('computeNextRun returns null for once-tasks', () => {
const task = {
id: 'once-test',
group_folder: 'test',
chat_jid: 'test@g.us',
prompt: 'test',
schedule_type: 'once' as const,
schedule_value: '2026-01-01T00:00:00.000Z',
context_mode: 'isolated' as const,
next_run: new Date(Date.now() - 1000).toISOString(),
last_run: null,
last_result: null,
status: 'active' as const,
created_at: '2026-01-01T00:00:00.000Z',
};
expect(computeNextRun(task)).toBeNull();
});
it('computeNextRun skips missed intervals without infinite loop', () => {
// Task was due 10 intervals ago (missed)
const ms = 60000;
const missedBy = ms * 10;
const scheduledTime = new Date(Date.now() - missedBy).toISOString();
const task = {
id: 'skip-test',
group_folder: 'test',
chat_jid: 'test@g.us',
prompt: 'test',
schedule_type: 'interval' as const,
schedule_value: String(ms),
context_mode: 'isolated' as const,
next_run: scheduledTime,
last_run: null,
last_result: null,
status: 'active' as const,
created_at: '2026-01-01T00:00:00.000Z',
};
const nextRun = computeNextRun(task);
expect(nextRun).not.toBeNull();
// Must be in the future
expect(new Date(nextRun!).getTime()).toBeGreaterThan(Date.now());
// Must be aligned to the original schedule grid
const offset =
(new Date(nextRun!).getTime() - new Date(scheduledTime).getTime()) % ms;
expect(offset).toBe(0);
});
});

View File

@@ -2,7 +2,12 @@ import { ChildProcess } from 'child_process';
import { CronExpressionParser } from 'cron-parser';
import fs from 'fs';
import { ASSISTANT_NAME, SCHEDULER_POLL_INTERVAL, TIMEZONE } from './config.js';
import {
ASSISTANT_NAME,
MAIN_GROUP_FOLDER,
SCHEDULER_POLL_INTERVAL,
TIMEZONE,
} from './config.js';
import {
ContainerOutput,
runContainerAgent,
@@ -21,56 +26,16 @@ import { resolveGroupFolderPath } from './group-folder.js';
import { logger } from './logger.js';
import { RegisteredGroup, ScheduledTask } from './types.js';
/**
* Compute the next run time for a recurring task, anchored to the
* task's scheduled time rather than Date.now() to prevent cumulative
* drift on interval-based tasks.
*
* Co-authored-by: @community-pr-601
*/
export function computeNextRun(task: ScheduledTask): string | null {
if (task.schedule_type === 'once') return null;
const now = Date.now();
if (task.schedule_type === 'cron') {
const interval = CronExpressionParser.parse(task.schedule_value, {
tz: TIMEZONE,
});
return interval.next().toISOString();
}
if (task.schedule_type === 'interval') {
const ms = parseInt(task.schedule_value, 10);
if (!ms || ms <= 0) {
// Guard against malformed interval that would cause an infinite loop
logger.warn(
{ taskId: task.id, value: task.schedule_value },
'Invalid interval value',
);
return new Date(now + 60_000).toISOString();
}
// Anchor to the scheduled time, not now, to prevent drift.
// Skip past any missed intervals so we always land in the future.
let next = new Date(task.next_run!).getTime() + ms;
while (next <= now) {
next += ms;
}
return new Date(next).toISOString();
}
return null;
}
export interface SchedulerDependencies {
registeredGroups: () => Record<string, RegisteredGroup>;
getSessions: () => Record<string, string>;
queue: GroupQueue;
onProcess: (
groupJid: string,
queueKey: string,
proc: ChildProcess,
containerName: string,
groupFolder: string,
ipcFolder?: string,
) => void;
sendMessage: (jid: string, text: string) => Promise<void>;
}
@@ -130,7 +95,7 @@ async function runTask(
}
// Update tasks snapshot for container to read (filtered by group)
const isMain = group.isMain === true;
const isMain = task.group_folder === MAIN_GROUP_FOLDER;
const tasks = getAllTasks();
writeTasksSnapshot(
task.group_folder,
@@ -160,11 +125,15 @@ async function runTask(
const TASK_CLOSE_DELAY_MS = 10000;
let closeTimer: ReturnType<typeof setTimeout> | null = null;
// Tasks run in a separate queue slot (task:chatJid) and separate IPC dir
// (groupFolder-task/) so they don't block or contaminate user message containers.
const taskQueueKey = `task:${task.chat_jid}`;
const scheduleClose = () => {
if (closeTimer) return; // already scheduled
closeTimer = setTimeout(() => {
logger.debug({ taskId: task.id }, 'Closing task container after result');
deps.queue.closeStdin(task.chat_jid);
deps.queue.closeStdin(taskQueueKey);
}, TASK_CLOSE_DELAY_MS);
};
@@ -178,20 +147,29 @@ async function runTask(
chatJid: task.chat_jid,
isMain,
isScheduledTask: true,
ipcSuffix: '-task',
assistantName: ASSISTANT_NAME,
},
(proc, containerName) =>
deps.onProcess(task.chat_jid, proc, containerName, task.group_folder),
deps.onProcess(
taskQueueKey,
proc,
containerName,
task.group_folder,
`${task.group_folder}-task`,
),
async (streamedOutput: ContainerOutput) => {
if (streamedOutput.result) {
result = streamedOutput.result;
// Forward result to user (sendMessage handles formatting)
await deps.sendMessage(task.chat_jid, streamedOutput.result);
scheduleClose();
}
if (streamedOutput.status === 'success') {
deps.queue.notifyIdle(task.chat_jid);
scheduleClose(); // Close promptly even when result is null (e.g. IPC-only tasks)
// Close task containers on any successful output, not just when
// result has text. Tasks that send output via MCP return result=null
// but are still done — without this they'd hang until hard timeout.
scheduleClose();
deps.queue.notifyIdle(taskQueueKey);
}
if (streamedOutput.status === 'error') {
error = streamedOutput.error || 'Unknown error';
@@ -204,7 +182,7 @@ async function runTask(
if (output.status === 'error') {
error = output.error || 'Unknown error';
} else if (output.result) {
// Result was already forwarded to the user via the streaming callback above
// Messages are sent via MCP tool (IPC), result text is just logged
result = output.result;
}
@@ -229,7 +207,18 @@ async function runTask(
error,
});
const nextRun = computeNextRun(task);
let nextRun: string | null = null;
if (task.schedule_type === 'cron') {
const interval = CronExpressionParser.parse(task.schedule_value, {
tz: TIMEZONE,
});
nextRun = interval.next().toISOString();
} else if (task.schedule_type === 'interval') {
const ms = parseInt(task.schedule_value, 10);
nextRun = new Date(Date.now() + ms).toISOString();
}
// 'once' tasks have no next run
const resultSummary = error
? `Error: ${error}`
: result
@@ -262,7 +251,23 @@ export function startSchedulerLoop(deps: SchedulerDependencies): void {
continue;
}
deps.queue.enqueueTask(currentTask.chat_jid, currentTask.id, () =>
// Advance next_run immediately to prevent re-queuing the same task
// while it's still running (task takes longer than the poll interval).
if (currentTask.schedule_type === 'cron') {
const interval = CronExpressionParser.parse(
currentTask.schedule_value,
{ tz: TIMEZONE },
);
const nextRun = interval.next().toISOString();
updateTaskAfterRun(currentTask.id, nextRun, '');
logger.debug(
{ taskId: currentTask.id, nextRun },
'Advanced next_run before task start',
);
}
const taskQueueKey = `task:${currentTask.chat_jid}`;
deps.queue.enqueueTask(taskQueueKey, currentTask.id, () =>
runTask(currentTask, deps),
);
}

112
src/transcription.ts Normal file
View File

@@ -0,0 +1,112 @@
import { downloadMediaMessage } from '@whiskeysockets/baileys';
import { WAMessage, WASocket } from '@whiskeysockets/baileys';
import { readEnvFile } from './env.js';
import { logger } from './logger.js';
interface TranscriptionConfig {
model: string;
enabled: boolean;
fallbackMessage: string;
}
const DEFAULT_CONFIG: TranscriptionConfig = {
model: 'gpt-4o-mini-transcribe',
enabled: true,
fallbackMessage: '[Voice Message - transcription unavailable]',
};
async function transcribeWithOpenAI(
audioBuffer: Buffer,
config: TranscriptionConfig,
): Promise<string | null> {
const env = readEnvFile(['OPENAI_API_KEY']);
const apiKey = env.OPENAI_API_KEY;
if (!apiKey) {
logger.warn('OPENAI_API_KEY not set in .env');
return null;
}
try {
const openaiModule = await import('openai');
const OpenAI = openaiModule.default;
const toFile = openaiModule.toFile;
const openai = new OpenAI({ apiKey });
const file = await toFile(audioBuffer, 'voice.ogg', {
type: 'audio/ogg',
});
const transcription = await openai.audio.transcriptions.create({
file: file,
model: config.model,
response_format: 'text',
});
// When response_format is 'text', the API returns a plain string
return transcription as unknown as string;
} catch (err) {
logger.error({ err }, 'OpenAI transcription failed');
return null;
}
}
export async function transcribeBuffer(
audioBuffer: Buffer,
): Promise<string | null> {
const config = DEFAULT_CONFIG;
if (!config.enabled) {
return null;
}
const transcript = await transcribeWithOpenAI(audioBuffer, config);
return transcript ? transcript.trim() : null;
}
export async function transcribeAudioMessage(
msg: WAMessage,
sock: WASocket,
): Promise<string | null> {
const config = DEFAULT_CONFIG;
if (!config.enabled) {
return config.fallbackMessage;
}
try {
const buffer = (await downloadMediaMessage(
msg,
'buffer',
{},
{
logger: console as any,
reuploadRequest: sock.updateMediaMessage,
},
)) as Buffer;
if (!buffer || buffer.length === 0) {
logger.error('Failed to download audio message');
return config.fallbackMessage;
}
logger.info({ bytes: buffer.length }, 'Downloaded audio message');
const transcript = await transcribeWithOpenAI(buffer, config);
if (!transcript) {
return config.fallbackMessage;
}
return transcript.trim();
} catch (err) {
logger.error({ err }, 'Transcription error');
return config.fallbackMessage;
}
}
export function isVoiceMessage(msg: WAMessage): boolean {
return msg.message?.audioMessage?.ptt === true;
}

View File

@@ -39,7 +39,6 @@ export interface RegisteredGroup {
added_at: string;
containerConfig?: ContainerConfig;
requiresTrigger?: boolean; // Default: true for groups, false for solo chats
isMain?: boolean; // True for the main control group (no trigger, elevated privileges)
}
export interface NewMessage {
@@ -88,8 +87,6 @@ export interface Channel {
disconnect(): Promise<void>;
// Optional: typing indicator. Channels that support it implement it.
setTyping?(jid: string, isTyping: boolean): Promise<void>;
// Optional: sync group/chat names from the platform.
syncGroups?(force: boolean): Promise<void>;
}
// Callback type that channels use to deliver inbound messages
@@ -97,7 +94,7 @@ export type OnInboundMessage = (chatJid: string, message: NewMessage) => void;
// Callback for chat metadata discovery.
// name is optional — channels that deliver names inline (Telegram) pass it here;
// channels that sync names separately (via syncGroups) omit it.
// channels that sync names separately (WhatsApp syncGroupMetadata) omit it.
export type OnChatMetadata = (
chatJid: string,
timestamp: string,

180
src/whatsapp-auth.ts Normal file
View File

@@ -0,0 +1,180 @@
/**
* WhatsApp Authentication Script
*
* Run this during setup to authenticate with WhatsApp.
* Displays QR code, waits for scan, saves credentials, then exits.
*
* Usage: npx tsx src/whatsapp-auth.ts
*/
import fs from 'fs';
import path from 'path';
import pino from 'pino';
import qrcode from 'qrcode-terminal';
import readline from 'readline';
import makeWASocket, {
Browsers,
DisconnectReason,
fetchLatestWaWebVersion,
makeCacheableSignalKeyStore,
useMultiFileAuthState,
} from '@whiskeysockets/baileys';
const AUTH_DIR = './store/auth';
const QR_FILE = './store/qr-data.txt';
const STATUS_FILE = './store/auth-status.txt';
const logger = pino({
level: 'warn', // Quiet logging - only show errors
});
// Check for --pairing-code flag and phone number
const usePairingCode = process.argv.includes('--pairing-code');
const phoneArg = process.argv.find((_, i, arr) => arr[i - 1] === '--phone');
function askQuestion(prompt: string): Promise<string> {
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
});
return new Promise((resolve) => {
rl.question(prompt, (answer) => {
rl.close();
resolve(answer.trim());
});
});
}
async function connectSocket(
phoneNumber?: string,
isReconnect = false,
): Promise<void> {
const { state, saveCreds } = await useMultiFileAuthState(AUTH_DIR);
if (state.creds.registered && !isReconnect) {
fs.writeFileSync(STATUS_FILE, 'already_authenticated');
console.log('✓ Already authenticated with WhatsApp');
console.log(
' To re-authenticate, delete the store/auth folder and run again.',
);
process.exit(0);
}
const { version } = await fetchLatestWaWebVersion({}).catch((err) => {
logger.warn(
{ err },
'Failed to fetch latest WA Web version, using default',
);
return { version: undefined };
});
const sock = makeWASocket({
version,
auth: {
creds: state.creds,
keys: makeCacheableSignalKeyStore(state.keys, logger),
},
printQRInTerminal: false,
logger,
browser: Browsers.macOS('Chrome'),
});
if (usePairingCode && phoneNumber && !state.creds.me) {
// Request pairing code after a short delay for connection to initialize
// Only on first connect (not reconnect after 515)
setTimeout(async () => {
try {
const code = await sock.requestPairingCode(phoneNumber!);
console.log(`\n🔗 Your pairing code: ${code}\n`);
console.log(' 1. Open WhatsApp on your phone');
console.log(' 2. Tap Settings → Linked Devices → Link a Device');
console.log(' 3. Tap "Link with phone number instead"');
console.log(` 4. Enter this code: ${code}\n`);
fs.writeFileSync(STATUS_FILE, `pairing_code:${code}`);
} catch (err: any) {
console.error('Failed to request pairing code:', err.message);
process.exit(1);
}
}, 3000);
}
sock.ev.on('connection.update', (update) => {
const { connection, lastDisconnect, qr } = update;
if (qr) {
// Write raw QR data to file so the setup skill can render it
fs.writeFileSync(QR_FILE, qr);
console.log('Scan this QR code with WhatsApp:\n');
console.log(' 1. Open WhatsApp on your phone');
console.log(' 2. Tap Settings → Linked Devices → Link a Device');
console.log(' 3. Point your camera at the QR code below\n');
qrcode.generate(qr, { small: true });
}
if (connection === 'close') {
const reason = (lastDisconnect?.error as any)?.output?.statusCode;
if (reason === DisconnectReason.loggedOut) {
fs.writeFileSync(STATUS_FILE, 'failed:logged_out');
console.log('\n✗ Logged out. Delete store/auth and try again.');
process.exit(1);
} else if (reason === DisconnectReason.timedOut) {
fs.writeFileSync(STATUS_FILE, 'failed:qr_timeout');
console.log('\n✗ QR code timed out. Please try again.');
process.exit(1);
} else if (reason === 515) {
// 515 = stream error, often happens after pairing succeeds but before
// registration completes. Reconnect to finish the handshake.
console.log('\n⟳ Stream error (515) after pairing — reconnecting...');
connectSocket(phoneNumber, true);
} else {
fs.writeFileSync(STATUS_FILE, `failed:${reason || 'unknown'}`);
console.log('\n✗ Connection failed. Please try again.');
process.exit(1);
}
}
if (connection === 'open') {
fs.writeFileSync(STATUS_FILE, 'authenticated');
// Clean up QR file now that we're connected
try {
fs.unlinkSync(QR_FILE);
} catch {}
console.log('\n✓ Successfully authenticated with WhatsApp!');
console.log(' Credentials saved to store/auth/');
console.log(' You can now start the NanoClaw service.\n');
// Give it a moment to save credentials, then exit
setTimeout(() => process.exit(0), 1000);
}
});
sock.ev.on('creds.update', saveCreds);
}
async function authenticate(): Promise<void> {
fs.mkdirSync(AUTH_DIR, { recursive: true });
// Clean up any stale QR/status files from previous runs
try {
fs.unlinkSync(QR_FILE);
} catch {}
try {
fs.unlinkSync(STATUS_FILE);
} catch {}
let phoneNumber = phoneArg;
if (usePairingCode && !phoneNumber) {
phoneNumber = await askQuestion(
'Enter your phone number (with country code, no + or spaces, e.g. 14155551234): ',
);
}
console.log('Starting WhatsApp authentication...\n');
await connectSocket(phoneNumber);
}
authenticate().catch((err) => {
console.error('Authentication failed:', err.message);
process.exit(1);
});