File size: 4,277 Bytes
a59f28c 25374b3 a59f28c 25374b3 a59f28c bef7a3d a59f28c d978795 a59f28c df0edd7 1dec751 a59f28c df0edd7 1dec751 a59f28c 3c6fc2a df0edd7 1dec751 df0edd7 1dec751 df0edd7 1dec751 3c6fc2a 1dec751 74e06ff | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 | import { Queue } from 'bullmq';
import Redis from 'ioredis';
const connection = process.env.REDIS_URL
? new Redis(process.env.REDIS_URL, { maxRetriesPerRequest: null })
: new Redis({
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
username: process.env.REDIS_USERNAME || 'default',
password: process.env.REDIS_PASSWORD || undefined,
tls: process.env.REDIS_TLS === 'true' ? {} : undefined,
maxRetriesPerRequest: null
});
export const whatsappQueue = new Queue('whatsapp-queue', { connection: connection as any });
// βββ Time-Travel Context (Redis overlay for historical lesson replay) ββββββββ
export const redis = connection; // Shared connection for time-travel ops
const TT_TTL = 1800; // 30 minutes
const ttKey = (userId: string) => `time_travel:${userId}`;
export async function setTimeTravelContext(userId: string, replayDay: number): Promise<void> {
await connection.set(ttKey(userId), String(replayDay), 'EX', TT_TTL);
console.log(`[TIME-TRAVEL] π°οΈ SET User ${userId} β Day ${replayDay} (TTL: ${TT_TTL}s)`);
}
export async function getTimeTravelContext(userId: string): Promise<number | null> {
const val = await connection.get(ttKey(userId));
if (!val) return null;
const day = parseFloat(val);
return isNaN(day) ? null : day;
}
export async function clearTimeTravelContext(userId: string): Promise<void> {
const n = await connection.del(ttKey(userId));
if (n > 0) console.log(`[TIME-TRAVEL] ποΈ CLEARED User ${userId}`);
}
export async function scheduleMessage(userId: string, text: string, delayMs: number = 0) {
if (process.env.DISABLE_WHATSAPP_SEND === 'true') {
console.warn(`[QUEUE] DISABLE_WHATSAPP_SEND is true. Skipping 'send-message' for user ${userId}`);
return;
}
await whatsappQueue.add('send-message', { userId, text }, { delay: delayMs });
}
export async function scheduleTrackDay(userId: string, trackId: string, dayNumber: number, delayMs: number = 0) {
if (process.env.DISABLE_WHATSAPP_SEND === 'true') {
console.warn(`[QUEUE] DISABLE_WHATSAPP_SEND is true. Skipping 'send-content' for user ${userId}`);
return;
}
await whatsappQueue.add('send-content', { userId, trackId, dayNumber }, { delay: delayMs });
}
export async function enrollUser(userId: string, trackId: string) {
if (process.env.DISABLE_WHATSAPP_SEND === 'true') {
console.warn(`[QUEUE] DISABLE_WHATSAPP_SEND is true. Skipping 'enroll-user' for user ${userId}`);
return;
}
await whatsappQueue.add('enroll-user', { userId, trackId });
}
/** Send a WhatsApp interactive BUTTON message (max 3 buttons). */
export async function scheduleInteractiveButtons(
userId: string,
bodyText: string,
buttons: Array<{ id: string; title: string }>
) {
if (process.env.DISABLE_WHATSAPP_SEND === 'true') {
console.warn(`[QUEUE] DISABLE_WHATSAPP_SEND is true. Skipping 'send-interactive-buttons' for user ${userId}`);
return;
}
await whatsappQueue.add('send-interactive-buttons', { userId, bodyText, buttons });
}
/** Send a WhatsApp interactive LIST message (up to 10 rows, grouped in sections). */
export async function scheduleInteractiveList(
userId: string,
headerText: string,
bodyText: string,
buttonLabel: string,
sections: Array<{ title: string; rows: Array<{ id: string; title: string; description?: string }> }>
) {
if (process.env.DISABLE_WHATSAPP_SEND === 'true') {
console.warn(`[QUEUE] DISABLE_WHATSAPP_SEND is true. Skipping 'send-interactive-list' for user ${userId}`);
return;
}
await whatsappQueue.add('send-interactive-list', { userId, headerText, bodyText, buttonLabel, sections });
}
/** π¨ ASYNC HANDOVER: Send inbound message for background processing in the worker. */
export async function scheduleInboundMessage(payload: { phone: string, text: string, audioUrl?: string, imageUrl?: string, messageId?: string }) {
await whatsappQueue.add('handle-inbound', payload, {
attempts: 3,
backoff: { type: 'exponential', delay: 1000 },
removeOnComplete: true,
removeOnFail: false
});
}
|