edtech / apps /api /src /services /queue.ts
CognxSafeTrack
feat: Time-Travel complet via Redis β€” effectiveDay override, coupe-circuits SUITE/INSCRIPTION, skip COMPLETED en mode replay
d978795
import { Queue } from 'bullmq';
import Redis from 'ioredis';
const connection = process.env.REDIS_URL
? new Redis(process.env.REDIS_URL, { maxRetriesPerRequest: null })
: new Redis({
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
username: process.env.REDIS_USERNAME || 'default',
password: process.env.REDIS_PASSWORD || undefined,
tls: process.env.REDIS_TLS === 'true' ? {} : undefined,
maxRetriesPerRequest: null
});
export const whatsappQueue = new Queue('whatsapp-queue', { connection: connection as any });
// ─── Time-Travel Context (Redis overlay for historical lesson replay) ────────
export const redis = connection; // Shared connection for time-travel ops
const TT_TTL = 1800; // 30 minutes
const ttKey = (userId: string) => `time_travel:${userId}`;
export async function setTimeTravelContext(userId: string, replayDay: number): Promise<void> {
await connection.set(ttKey(userId), String(replayDay), 'EX', TT_TTL);
console.log(`[TIME-TRAVEL] πŸ•°οΈ SET User ${userId} β†’ Day ${replayDay} (TTL: ${TT_TTL}s)`);
}
export async function getTimeTravelContext(userId: string): Promise<number | null> {
const val = await connection.get(ttKey(userId));
if (!val) return null;
const day = parseFloat(val);
return isNaN(day) ? null : day;
}
export async function clearTimeTravelContext(userId: string): Promise<void> {
const n = await connection.del(ttKey(userId));
if (n > 0) console.log(`[TIME-TRAVEL] πŸ—‘οΈ CLEARED User ${userId}`);
}
export async function scheduleMessage(userId: string, text: string, delayMs: number = 0) {
if (process.env.DISABLE_WHATSAPP_SEND === 'true') {
console.warn(`[QUEUE] DISABLE_WHATSAPP_SEND is true. Skipping 'send-message' for user ${userId}`);
return;
}
await whatsappQueue.add('send-message', { userId, text }, { delay: delayMs });
}
export async function scheduleTrackDay(userId: string, trackId: string, dayNumber: number, delayMs: number = 0) {
if (process.env.DISABLE_WHATSAPP_SEND === 'true') {
console.warn(`[QUEUE] DISABLE_WHATSAPP_SEND is true. Skipping 'send-content' for user ${userId}`);
return;
}
await whatsappQueue.add('send-content', { userId, trackId, dayNumber }, { delay: delayMs });
}
export async function enrollUser(userId: string, trackId: string) {
if (process.env.DISABLE_WHATSAPP_SEND === 'true') {
console.warn(`[QUEUE] DISABLE_WHATSAPP_SEND is true. Skipping 'enroll-user' for user ${userId}`);
return;
}
await whatsappQueue.add('enroll-user', { userId, trackId });
}
/** Send a WhatsApp interactive BUTTON message (max 3 buttons). */
export async function scheduleInteractiveButtons(
userId: string,
bodyText: string,
buttons: Array<{ id: string; title: string }>
) {
if (process.env.DISABLE_WHATSAPP_SEND === 'true') {
console.warn(`[QUEUE] DISABLE_WHATSAPP_SEND is true. Skipping 'send-interactive-buttons' for user ${userId}`);
return;
}
await whatsappQueue.add('send-interactive-buttons', { userId, bodyText, buttons });
}
/** Send a WhatsApp interactive LIST message (up to 10 rows, grouped in sections). */
export async function scheduleInteractiveList(
userId: string,
headerText: string,
bodyText: string,
buttonLabel: string,
sections: Array<{ title: string; rows: Array<{ id: string; title: string; description?: string }> }>
) {
if (process.env.DISABLE_WHATSAPP_SEND === 'true') {
console.warn(`[QUEUE] DISABLE_WHATSAPP_SEND is true. Skipping 'send-interactive-list' for user ${userId}`);
return;
}
await whatsappQueue.add('send-interactive-list', { userId, headerText, bodyText, buttonLabel, sections });
}
/** 🚨 ASYNC HANDOVER: Send inbound message for background processing in the worker. */
export async function scheduleInboundMessage(payload: { phone: string, text: string, audioUrl?: string, imageUrl?: string, messageId?: string }) {
await whatsappQueue.add('handle-inbound', payload, {
attempts: 3,
backoff: { type: 'exponential', delay: 1000 },
removeOnComplete: true,
removeOnFail: false
});
}