CognxSafeTrack commited on
Commit
74e06ff
·
1 Parent(s): c1cf9c6

feat: Async-First Webhook Architecture & Idempotency Hardening

Browse files
apps/api/src/routes/whatsapp.ts CHANGED
@@ -1,5 +1,4 @@
1
  import { FastifyInstance } from 'fastify';
2
- import { WhatsAppService } from '../services/whatsapp';
3
  import crypto from 'crypto';
4
  import { z } from 'zod';
5
 
@@ -160,25 +159,26 @@ export async function whatsappRoutes(fastify: FastifyInstance) {
160
  return reply.code(200).send('EVENT_RECEIVED');
161
  } catch (error: unknown) {
162
  request.log.error(`[WEBHOOK] Forward throwing error: ${(error instanceof Error ? error.message : String(error))}`);
163
- return reply.code(500).send({ error: 'Gateway forwarding failed' });
 
164
  }
165
  }
166
  }
167
 
168
- // ── 3. Return 200 IMMEDIATELY to satisfy Meta's < 20s timeout ───────
169
- if (reply.sent) return; // safety check
170
- reply.code(200).send('EVENT_RECEIVED');
 
 
 
171
 
172
- // ── 3. Validate payload with Zod (async, after reply sent) ──────────
173
  setImmediate(async () => {
174
  try {
175
  const parsed = WebhookPayloadSchema.safeParse(request.body);
 
176
 
177
- if (!parsed.success) {
178
- fastify.log.warn(`[WEBHOOK] Invalid payload schema: ${JSON.stringify(parsed.error.flatten())}`);
179
- return;
180
- }
181
-
182
  const payload = parsed.data;
183
 
184
  for (const entry of payload.entry) {
@@ -187,25 +187,22 @@ export async function whatsappRoutes(fastify: FastifyInstance) {
187
 
188
  for (const message of messages) {
189
  const phone = message.from;
190
- let text = '';
191
 
192
  if (message.type === 'text' && message.text) {
193
- text = message.text.body;
194
 
195
  } else if (message.type === 'interactive' && message.interactive) {
196
- // Handle both button_reply and list_reply
197
  if (message.interactive.type === 'button_reply' && message.interactive.button_reply) {
198
  text = message.interactive.button_reply.id;
199
- fastify.log.info(`[WEBHOOK] Button reply: ${text}`);
200
  } else if (message.interactive.type === 'list_reply' && message.interactive.list_reply) {
201
  text = message.interactive.list_reply.id;
202
- fastify.log.info(`[WEBHOOK] List reply: ${text}`);
203
  }
 
204
 
205
  } else if (message.type === 'audio' && message.audio) {
206
- // ─── Audio inbound: delegate download to Railway worker ────────────
207
- // HF cannot reach graph.facebook.com — enqueue a high-priority job.
208
- // Media IDs expire in ~5 minutes so the job is marked priority: 1.
209
  const accessToken = process.env.WHATSAPP_ACCESS_TOKEN || undefined;
210
  const { Queue } = await import('bullmq');
211
  const Redis = (await import('ioredis')).default;
@@ -213,6 +210,7 @@ export async function whatsappRoutes(fastify: FastifyInstance) {
213
  ? new Redis(process.env.REDIS_URL, { maxRetriesPerRequest: null })
214
  : new Redis({ host: process.env.REDIS_HOST || 'localhost', port: parseInt(process.env.REDIS_PORT || '6379'), maxRetriesPerRequest: null });
215
  const q = new Queue('whatsapp-queue', { connection: conn as any });
 
216
  await q.add('download-media', {
217
  mediaId: message.audio.id,
218
  mimeType: message.audio.mime_type || 'audio/ogg',
@@ -224,20 +222,12 @@ export async function whatsappRoutes(fastify: FastifyInstance) {
224
  phone,
225
  text: "⏳ J'analyse ton audio..."
226
  });
227
-
228
- fastify.log.info(`[WEBHOOK] Audio ${message.audio.id} enqueued for Railway download`);
229
- // Don't process further — Railway will call handleIncomingMessage after download
230
- continue;
231
- }
232
-
233
- if (phone && text) {
234
- await WhatsAppService.handleIncomingMessage(phone, text);
235
  }
236
  }
237
  }
238
  }
239
  } catch (error) {
240
- fastify.log.error(`[WEBHOOK] Async processing error: ${String(error)}`);
241
  }
242
  });
243
  });
 
1
  import { FastifyInstance } from 'fastify';
 
2
  import crypto from 'crypto';
3
  import { z } from 'zod';
4
 
 
159
  return reply.code(200).send('EVENT_RECEIVED');
160
  } catch (error: unknown) {
161
  request.log.error(`[WEBHOOK] Forward throwing error: ${(error instanceof Error ? error.message : String(error))}`);
162
+ // Still return 200 to Meta to avoid retries, even if gateway forward failed internally
163
+ return reply.code(200).send('EVENT_RECEIVED_FW_ERR');
164
  }
165
  }
166
  }
167
 
168
+ // ── 3. DETACH IMMEDIATELY ──
169
+ // Respond 200 OK right now to release the HF Gateway connection.
170
+ // We do this BEFORE any parsing or work.
171
+ if (!reply.sent) {
172
+ reply.code(200).send('EVENT_RECEIVED');
173
+ }
174
 
175
+ // ── 4. Background Processing (enqueue to Worker) ──
176
  setImmediate(async () => {
177
  try {
178
  const parsed = WebhookPayloadSchema.safeParse(request.body);
179
+ if (!parsed.success) return;
180
 
181
+ const { scheduleInboundMessage } = await import('../services/queue');
 
 
 
 
182
  const payload = parsed.data;
183
 
184
  for (const entry of payload.entry) {
 
187
 
188
  for (const message of messages) {
189
  const phone = message.from;
190
+ const messageId = message.id;
191
 
192
  if (message.type === 'text' && message.text) {
193
+ await scheduleInboundMessage({ phone, text: message.text.body, messageId });
194
 
195
  } else if (message.type === 'interactive' && message.interactive) {
196
+ let text = '';
197
  if (message.interactive.type === 'button_reply' && message.interactive.button_reply) {
198
  text = message.interactive.button_reply.id;
 
199
  } else if (message.interactive.type === 'list_reply' && message.interactive.list_reply) {
200
  text = message.interactive.list_reply.id;
 
201
  }
202
+ if (text) await scheduleInboundMessage({ phone, text, messageId });
203
 
204
  } else if (message.type === 'audio' && message.audio) {
205
+ // Existing audio logic (queues download-media)
 
 
206
  const accessToken = process.env.WHATSAPP_ACCESS_TOKEN || undefined;
207
  const { Queue } = await import('bullmq');
208
  const Redis = (await import('ioredis')).default;
 
210
  ? new Redis(process.env.REDIS_URL, { maxRetriesPerRequest: null })
211
  : new Redis({ host: process.env.REDIS_HOST || 'localhost', port: parseInt(process.env.REDIS_PORT || '6379'), maxRetriesPerRequest: null });
212
  const q = new Queue('whatsapp-queue', { connection: conn as any });
213
+
214
  await q.add('download-media', {
215
  mediaId: message.audio.id,
216
  mimeType: message.audio.mime_type || 'audio/ogg',
 
222
  phone,
223
  text: "⏳ J'analyse ton audio..."
224
  });
 
 
 
 
 
 
 
 
225
  }
226
  }
227
  }
228
  }
229
  } catch (error) {
230
+ fastify.log.error(`[WEBHOOK] Detached processing error: ${String(error)}`);
231
  }
232
  });
233
  });
apps/api/src/services/queue.ts CHANGED
@@ -66,3 +66,13 @@ export async function scheduleInteractiveList(
66
  await whatsappQueue.add('send-interactive-list', { userId, headerText, bodyText, buttonLabel, sections });
67
  }
68
 
 
 
 
 
 
 
 
 
 
 
 
66
  await whatsappQueue.add('send-interactive-list', { userId, headerText, bodyText, buttonLabel, sections });
67
  }
68
 
69
+ /** 🚨 ASYNC HANDOVER: Send inbound message for background processing in the worker. */
70
+ export async function scheduleInboundMessage(payload: { phone: string, text: string, audioUrl?: string, imageUrl?: string, messageId?: string }) {
71
+ await whatsappQueue.add('handle-inbound', payload, {
72
+ attempts: 3,
73
+ backoff: { type: 'exponential', delay: 1000 },
74
+ removeOnComplete: true,
75
+ removeOnFail: false
76
+ });
77
+ }
78
+
apps/whatsapp-worker/src/index.ts CHANGED
@@ -9,6 +9,7 @@ import { sendLessonDay } from './pedagogy';
9
  import { updateBehavioralScore } from './scoring';
10
  import { normalizeWolof } from './normalizeWolof';
11
  import { getApiUrl, getAdminApiKey, validateEnvironment, isFeatureEnabled } from './config';
 
12
 
13
  dotenv.config();
14
 
@@ -47,6 +48,21 @@ const worker = new Worker('whatsapp-queue', async (job: Job) => {
47
  const { phone, text } = job.data;
48
  await sendTextMessage(phone, text);
49
  }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
50
  else if (job.name === 'generate-feedback') {
51
  const { userId, text, trackId, exercisePrompt, lessonText, exerciseCriteria, currentDay, totalDays, language, userActivity, userRegion, previousResponses, isDeepDive, iterationCount, imageUrl } = job.data;
52
  const user = await prisma.user.findUnique({
@@ -66,8 +82,8 @@ const worker = new Worker('whatsapp-queue', async (job: Job) => {
66
  const textHash = text ? text.substring(0, 10).replace(/[^a-z0-9]/gi, '') : '';
67
  const lockKey = `lock:feedback:${userId}:${currentDay}:${textHash}`;
68
 
69
- // EX 60 : Le verrou expire dans 60 secondes. S'il existe déjà, NX retourne null (ou 0).
70
- const isLocked = await redis.set(lockKey, "1", "EX", 60, "NX");
71
  if (!isLocked) {
72
  console.log(`[WORKER] 🔒 Lock activé : ignorer ce job de feedback en double (User ${userId}, Day ${currentDay})`);
73
  return;
@@ -671,37 +687,12 @@ const worker = new Worker('whatsapp-queue', async (job: Job) => {
671
  }
672
 
673
  // Process the transcribed text as a normal incoming message via API
 
674
  if (transcribedText) {
675
  const traceId = `[STT-FLOW-${phone.slice(-4)}]`;
676
- console.log(`${traceId} Forwarding to API handle-message...`);
677
-
678
- // ─── Port Hardening: Handle HF Space 7860 vs Local 8080 ─────────
679
- let finalApiUrl = AI_API_BASE_URL.replace(/\/$/, "");
680
- if (finalApiUrl.includes('localhost:8080')) {
681
- try {
682
- // Quick check if 8080 is up, if not assume 7860 (Hugging Face)
683
- const pingRes = await fetch('http://localhost:8080/health').catch(() => null);
684
- if (!pingRes || !pingRes.ok) {
685
- console.log(`${traceId} Local port 8080 not responding, trying 7860 (HF Default)...`);
686
- finalApiUrl = finalApiUrl.replace('8080', '7860');
687
- }
688
- } catch (e) {
689
- finalApiUrl = finalApiUrl.replace('8080', '7860');
690
- }
691
- }
692
-
693
- const handleRes = await fetch(`${finalApiUrl}/v1/internal/handle-message`, {
694
- method: 'POST',
695
- headers: { 'Content-Type': 'application/json', 'Authorization': `Bearer ${apiKey}` },
696
- body: JSON.stringify({ phone, text: transcribedText, audioUrl })
697
- });
698
-
699
- if (!handleRes.ok) {
700
- const errText = await handleRes.text();
701
- console.error(`${traceId} API handle-message failed (HTTP ${handleRes.status}): ${errText}`);
702
- throw new Error(`API handle-message failed: ${handleRes.status}`);
703
- }
704
- console.log(`${traceId} API handle-message success.`);
705
  }
706
  } else if (transcribeRes.status === 429) {
707
  // OpenAI quota exceeded — send fallback and do NOT requeue
@@ -721,34 +712,11 @@ const worker = new Worker('whatsapp-queue', async (job: Job) => {
721
  }
722
  } else if (mimeType.startsWith('image/')) {
723
  // 📸 VISION FLOW (Lead AI Architect Requirement)
724
- console.log(`${traceId} Image detected. Forwarding to API handle-message...`);
725
-
726
- // ─── Port Hardening: Handle HF Space 7860 vs Local 8080 ─────────
727
- let finalApiUrl = AI_API_BASE_URL.replace(/\/$/, "");
728
- if (finalApiUrl.includes('localhost:8080')) {
729
- try {
730
- const pingRes = await fetch('http://localhost:8080/health').catch(() => null);
731
- if (!pingRes || !pingRes.ok) {
732
- console.log(`${traceId} Local port 8080 not responding, trying 7860 (HF Default)...`);
733
- finalApiUrl = finalApiUrl.replace('8080', '7860');
734
- }
735
- } catch (e) {
736
- finalApiUrl = finalApiUrl.replace('8080', '7860');
737
- }
738
- }
739
-
740
- const handleRes = await fetch(`${finalApiUrl}/v1/internal/handle-message`, {
741
- method: 'POST',
742
- headers: { 'Content-Type': 'application/json', 'Authorization': `Bearer ${apiKey}` },
743
- body: JSON.stringify({ phone, text: 'Image reçue', imageUrl: audioUrl }) // text: 'Image reçue' to pass gibberish guardrail
744
- });
745
-
746
- if (!handleRes.ok) {
747
- const errText = await handleRes.text();
748
- console.error(`${traceId} API handle-message (IMAGE) failed: ${errText}`);
749
- throw new Error(`API handle-message failed: ${handleRes.status}`);
750
- }
751
- console.log(`${traceId} API handle-message (IMAGE) success.`);
752
  }
753
  } catch (err: unknown) {
754
  console.error(`[WORKER] download-media failed:`, err);
 
9
  import { updateBehavioralScore } from './scoring';
10
  import { normalizeWolof } from './normalizeWolof';
11
  import { getApiUrl, getAdminApiKey, validateEnvironment, isFeatureEnabled } from './config';
12
+ import { WhatsAppLogic } from './services/whatsapp-logic';
13
 
14
  dotenv.config();
15
 
 
48
  const { phone, text } = job.data;
49
  await sendTextMessage(phone, text);
50
  }
51
+ else if (job.name === 'handle-inbound') {
52
+ const { phone, text, audioUrl, imageUrl, messageId } = job.data;
53
+
54
+ // 🚨 Idempotence Lock for Inbound Messages
55
+ if (messageId) {
56
+ const lockKey = `lock:inbound:${messageId}`;
57
+ const isLocked = await connection.set(lockKey, "1", "EX", 300, "NX");
58
+ if (!isLocked) {
59
+ console.log(`[WORKER] 🔒 Lock inbound activé : message ${messageId} déjà traité.`);
60
+ return;
61
+ }
62
+ }
63
+
64
+ await WhatsAppLogic.handleIncomingMessage(phone, text, audioUrl, imageUrl);
65
+ }
66
  else if (job.name === 'generate-feedback') {
67
  const { userId, text, trackId, exercisePrompt, lessonText, exerciseCriteria, currentDay, totalDays, language, userActivity, userRegion, previousResponses, isDeepDive, iterationCount, imageUrl } = job.data;
68
  const user = await prisma.user.findUnique({
 
82
  const textHash = text ? text.substring(0, 10).replace(/[^a-z0-9]/gi, '') : '';
83
  const lockKey = `lock:feedback:${userId}:${currentDay}:${textHash}`;
84
 
85
+ // 🚨 HARDENING: Le verrou expire dans 300 secondes (5 minutes).
86
+ const isLocked = await redis.set(lockKey, "1", "EX", 300, "NX");
87
  if (!isLocked) {
88
  console.log(`[WORKER] 🔒 Lock activé : ignorer ce job de feedback en double (User ${userId}, Day ${currentDay})`);
89
  return;
 
687
  }
688
 
689
  // Process the transcribed text as a normal incoming message via API
690
+ // ─── Routing: Process transcribed text ─────────
691
  if (transcribedText) {
692
  const traceId = `[STT-FLOW-${phone.slice(-4)}]`;
693
+ console.log(`${traceId} Processing transcribed text via WhatsAppLogic...`);
694
+ await WhatsAppLogic.handleIncomingMessage(phone, transcribedText, audioUrl);
695
+ console.log(`${traceId} Inbound audio processing complete.`);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
696
  }
697
  } else if (transcribeRes.status === 429) {
698
  // OpenAI quota exceeded — send fallback and do NOT requeue
 
712
  }
713
  } else if (mimeType.startsWith('image/')) {
714
  // 📸 VISION FLOW (Lead AI Architect Requirement)
715
+ console.log(`${traceId} Image detected. Processing via WhatsAppLogic...`);
716
+ // Provide a placeholder text or attempt vision analysis if desired.
717
+ // For now, consistent with older logic: text = 'Image reçue'
718
+ await WhatsAppLogic.handleIncomingMessage(phone, 'Image reçue', undefined, audioUrl);
719
+ console.log(`${traceId} Inbound image processing complete.`);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
720
  }
721
  } catch (err: unknown) {
722
  console.error(`[WORKER] download-media failed:`, err);
apps/whatsapp-worker/src/services/whatsapp-logic.ts ADDED
@@ -0,0 +1,312 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import { PrismaClient } from '@repo/database';
2
+ import { Queue } from 'bullmq';
3
+ import Redis from 'ioredis';
4
+
5
+ const prisma = new PrismaClient();
6
+
7
+ // Setup local queue access for the worker-side service
8
+ const connection = process.env.REDIS_URL
9
+ ? new Redis(process.env.REDIS_URL, { maxRetriesPerRequest: null })
10
+ : new Redis({
11
+ host: process.env.REDIS_HOST || 'localhost',
12
+ port: parseInt(process.env.REDIS_PORT || '6379'),
13
+ maxRetriesPerRequest: null
14
+ });
15
+
16
+ const whatsappQueue = new Queue('whatsapp-queue', { connection: connection as any });
17
+
18
+ async function scheduleMessage(userId: string, text: string, delayMs: number = 0) {
19
+ await whatsappQueue.add('send-message', { userId, text }, { delay: delayMs });
20
+ }
21
+
22
+ async function enrollUser(userId: string, trackId: string) {
23
+ await whatsappQueue.add('enroll-user', { userId, trackId });
24
+ }
25
+
26
+ export class WhatsAppLogic {
27
+ private static normalizeCommand(text: string): string {
28
+ return text
29
+ .trim()
30
+ .toLowerCase()
31
+ .replace(/[.,!?;:]+$/, "")
32
+ .toUpperCase();
33
+ }
34
+
35
+ private static detectIntent(text: string): 'YES' | 'NO' | 'UNKNOWN' {
36
+ const normalized = text.trim().toLowerCase().replace(/[.,!?;:]+$/, "");
37
+ const yesWords = ['oui', 'ouais', 'wi', 'waaw', 'yes', 'yep', 'ok', 'd’accord', 'daccord', 'da’accord'];
38
+ const noWords = ['non', 'déet', 'deet', 'no', 'nah', 'nein'];
39
+
40
+ if (yesWords.some(w => normalized.includes(w))) return 'YES';
41
+ if (noWords.some(w => normalized.includes(w))) return 'NO';
42
+ return 'UNKNOWN';
43
+ }
44
+
45
+ private static levenshteinDistance(a: string, b: string): number {
46
+ const matrix: number[][] = [];
47
+ for (let i = 0; i <= b.length; i++) matrix[i] = [i];
48
+ for (let j = 0; j <= a.length; j++) matrix[0][j] = j;
49
+
50
+ for (let i = 1; i <= b.length; i++) {
51
+ for (let j = 1; j <= a.length; j++) {
52
+ if (b.charAt(i - 1) === a.charAt(j - 1)) {
53
+ matrix[i][j] = matrix[i - 1][j - 1];
54
+ } else {
55
+ matrix[i][j] = Math.min(
56
+ matrix[i - 1][j - 1] + 1,
57
+ matrix[i][j - 1] + 1,
58
+ matrix[i - 1][j] + 1
59
+ );
60
+ }
61
+ }
62
+ }
63
+ return matrix[b.length][a.length];
64
+ }
65
+
66
+ private static isFuzzyMatch(text: string, target: string, threshold = 0.8): boolean {
67
+ const normalized = text.trim().toUpperCase();
68
+ const tar = target.toUpperCase();
69
+ if (normalized === tar) return true;
70
+ if (normalized.includes(tar) || tar.includes(normalized)) return true;
71
+
72
+ const distance = this.levenshteinDistance(normalized, tar);
73
+ const maxLength = Math.max(normalized.length, tar.length);
74
+ const similarity = 1 - distance / maxLength;
75
+ return similarity >= threshold;
76
+ }
77
+
78
+ static async handleIncomingMessage(phone: string, text: string, audioUrl?: string, imageUrl?: string) {
79
+ const traceId = audioUrl ? `[STT-FLOW-${phone.slice(-4)}]` : imageUrl ? `[IMG-FLOW-${phone.slice(-4)}]` : `[TXT-FLOW-${phone.slice(-4)}]`;
80
+ const normalizedText = this.normalizeCommand(text);
81
+ console.log(`${traceId} Processing Inbound (Async): ${normalizedText} (Audio: ${audioUrl || 'N/A'})`);
82
+
83
+ // 1. Find or Create User
84
+ let user = await prisma.user.findUnique({ where: { phone } });
85
+
86
+ if (!user) {
87
+ const isInscription = this.isFuzzyMatch(normalizedText, 'INSCRIPTION') || normalizedText.includes('INSCRI');
88
+
89
+ if (isInscription) {
90
+ user = await prisma.user.create({ data: { phone } });
91
+ await whatsappQueue.add('send-interactive-buttons', {
92
+ userId: user.id,
93
+ bodyText: "Dalal jàmm! Xamle ngay tàmbali. ⏳ 30s.\n(FR) Ton cours se prépare (30s).",
94
+ buttons: [
95
+ { id: 'LANG_FR', title: 'Français 🇫🇷' },
96
+ { id: 'LANG_WO', title: 'Wolof 🇸🇳' }
97
+ ]
98
+ });
99
+ return;
100
+ } else {
101
+ await whatsappQueue.add('send-message-direct', {
102
+ phone,
103
+ text: "🎓 Bienvenue chez XAMLÉ !\nPour commencer ta formation gratuite, envoie le mot : *INSCRIPTION*\n\n(WO) Dalal jàmm ! Ngir tàmbali sa njàng mburu, bindal : *INSCRIPTION*"
104
+ });
105
+ return;
106
+ }
107
+ }
108
+
109
+ // 1.2 Log message
110
+ try {
111
+ await prisma.message.create({
112
+ data: {
113
+ content: text,
114
+ mediaUrl: audioUrl || imageUrl,
115
+ direction: 'INBOUND',
116
+ userId: user.id
117
+ }
118
+ });
119
+ } catch (err) {}
120
+
121
+ // 1.5. Testing / Cheat Codes
122
+ if (this.isFuzzyMatch(normalizedText, 'INSCRIPTION')) {
123
+ await prisma.enrollment.deleteMany({ where: { userId: user.id } });
124
+ await prisma.userProgress.deleteMany({ where: { userId: user.id } });
125
+ await prisma.response.deleteMany({ where: { userId: user.id } });
126
+ await prisma.message.deleteMany({ where: { userId: user.id } });
127
+ await (prisma as any).businessProfile.deleteMany({ where: { userId: user.id } });
128
+ user = await prisma.user.update({
129
+ where: { id: user.id },
130
+ data: { city: null, activity: null }
131
+ });
132
+ await whatsappQueue.add('send-interactive-buttons', {
133
+ userId: user.id,
134
+ bodyText: "Réinitialisation réussie – choisissez votre langue / Tànnal sa làkk :",
135
+ buttons: [
136
+ { id: 'LANG_FR', title: 'Français 🇫🇷' },
137
+ { id: 'LANG_WO', title: 'Wolof 🇸🇳' }
138
+ ]
139
+ });
140
+ return;
141
+ }
142
+
143
+ const systemCommands = ['1', '2', 'SUITE', 'APPROFONDIR', 'INSCRIPTION', 'SEED'];
144
+ const isSystemCommand = systemCommands.some(cmd => this.isFuzzyMatch(normalizedText, cmd)) || normalizedText.includes('INSCRI');
145
+
146
+ if (text.length < 2 && !isSystemCommand) {
147
+ await scheduleMessage(user.id, user.language === 'WOLOF'
148
+ ? "Dama lay xaar nga wax ma lu gën a yaatu ci sa mbir. Waxtaanal ak man !"
149
+ : "Je n'ai pas bien compris. Peux-tu me réexpliquer en quelques mots ?");
150
+ return;
151
+ }
152
+
153
+ if (this.isFuzzyMatch(normalizedText, 'SEED')) {
154
+ try {
155
+ // @ts-ignore
156
+ const { seedDatabase } = await import('@repo/database/seed');
157
+ const result = await seedDatabase(prisma);
158
+ await (prisma as any).businessProfile.deleteMany({ where: { userId: user!.id } });
159
+ await prisma.user.update({ where: { id: user!.id }, data: { activity: null } });
160
+
161
+ await scheduleMessage(user.id, result.seeded
162
+ ? "✅ Seeding terminé ! Le Cache Cognitif a été réinitialisé.\nEnvoie INSCRIPTION pour commencer."
163
+ : "ℹ️ Les données existent déjà. Cache Cognitif purgé. Envoie INSCRIPTION."
164
+ );
165
+ } catch (err) {
166
+ await scheduleMessage(user.id, `❌ Erreur seed`);
167
+ }
168
+ return;
169
+ }
170
+
171
+ // Handle Interactive LIST menu (REPLAY, EXERCISE, etc.)
172
+ const dayActionMatch = normalizedText.match(/^DAY(\d+)_(EXERCISE|REPLAY|CONTINUE|PROMPT)$/);
173
+ if (dayActionMatch) {
174
+ const action = dayActionMatch[2];
175
+ const enrollment = await prisma.enrollment.findFirst({ where: { userId: user.id, status: 'ACTIVE' } });
176
+
177
+ if (action === 'REPLAY' && enrollment) {
178
+ await whatsappQueue.add('send-content', { userId: user.id, trackId: enrollment.trackId, dayNumber: enrollment.currentDay });
179
+ return;
180
+ } else if (action === 'EXERCISE') {
181
+ await scheduleMessage(user.id, user.language === 'WOLOF' ? "🎙️ Yónnee sa tontu :" : "🎙️ Envoie ta réponse :");
182
+ return;
183
+ }
184
+ }
185
+
186
+ // Language selection
187
+ if (normalizedText === 'LANG_FR' || normalizedText === 'LANG_WO') {
188
+ const newLang = normalizedText === 'LANG_FR' ? 'FR' : 'WOLOF';
189
+ user = await prisma.user.update({ where: { id: user.id }, data: { language: newLang } });
190
+ const promptText = newLang === 'FR' ? "Parfait ! Dans quel domaine te trouves-tu ?" : "Baax na ! Ci ban mbir ngay yëngu ?";
191
+
192
+ await whatsappQueue.add('send-interactive-list', {
193
+ userId: user.id,
194
+ headerText: newLang === 'FR' ? "Ton secteur" : "Sa Mbir",
195
+ bodyText: promptText,
196
+ buttonLabel: newLang === 'FR' ? "Secteurs" : "Tànn",
197
+ sections: [{
198
+ title: newLang === 'FR' ? 'Liste' : 'Mbir',
199
+ rows: [
200
+ { id: 'SEC_COMMERCE', title: newLang === 'FR' ? 'Commerce / Vente' : 'Njaay' },
201
+ { id: 'SEC_AGRI', title: newLang === 'FR' ? 'Agri / Élevage' : 'Mbay / Samm' },
202
+ { id: 'SEC_FOOD', title: newLang === 'FR' ? 'Alimentation / Rest.' : 'Lekk / Restauration' },
203
+ { id: 'SEC_COUTURE', title: newLang === 'FR' ? 'Couture / Mode' : 'Couture' },
204
+ { id: 'SEC_BEAUTE', title: newLang === 'FR' ? 'Beauté / Bien-être' : 'Rafet' },
205
+ { id: 'SEC_TRANSPORT', title: newLang === 'FR' ? 'Transport / Livr.' : 'Transport / Yëgël' },
206
+ { id: 'SEC_TECH', title: newLang === 'FR' ? 'Tech / Digital' : 'Tech / Digital' },
207
+ { id: 'SEC_AUTRE', title: newLang === 'FR' ? 'Autre secteur' : 'Beneen mbir' }
208
+ ]
209
+ }]
210
+ });
211
+ return;
212
+ }
213
+
214
+ const SECTOR_LABELS: Record<string, { fr: string; wo: string }> = {
215
+ SEC_COMMERCE: { fr: 'Commerce / Vente', wo: 'Njaay' },
216
+ SEC_AGRI: { fr: 'Agriculture / Élevage', wo: 'Mbay' },
217
+ SEC_FOOD: { fr: 'Alimentation / Restauration', wo: 'Lekk / Restauration' },
218
+ SEC_TECH: { fr: 'Tech / Digital', wo: 'Tech / Digital' },
219
+ SEC_BEAUTE: { fr: 'Beauté / Bien-être', wo: 'Rafet' },
220
+ SEC_COUTURE: { fr: 'Couture / Mode', wo: 'Couture' },
221
+ SEC_TRANSPORT: { fr: 'Transport / Livraison', wo: 'Transport / Yëgël' },
222
+ };
223
+
224
+ const sectorLabel = SECTOR_LABELS[normalizedText];
225
+ const activeEnrollment = await prisma.enrollment.findFirst({
226
+ where: { userId: user.id, status: 'ACTIVE' },
227
+ include: { track: true }
228
+ });
229
+
230
+ if (activeEnrollment && (sectorLabel || normalizedText.startsWith('SEC_'))) return;
231
+
232
+ if (!activeEnrollment && (sectorLabel || (!user.activity && text.length > 2))) {
233
+ const activity = sectorLabel ? (user.language === 'WOLOF' ? sectorLabel.wo : sectorLabel.fr) : text.trim();
234
+ user = await prisma.user.update({ where: { id: user.id }, data: { activity } });
235
+ await scheduleMessage(user.id, user.language === 'FR' ? `Secteur noté : *${activity}*` : `Bind nanu la ci: *${activity}*`);
236
+ const trackId = user.language === 'FR' ? "T1-FR" : "T1-WO";
237
+ await enrollUser(user.id, trackId);
238
+ return;
239
+ }
240
+
241
+ if (activeEnrollment) {
242
+ this.detectIntent(text); // Scan for intent (log purposes or future use)
243
+ const isSuite = this.isFuzzyMatch(normalizedText, 'SUITE') || normalizedText === '2';
244
+ const isApprofondir = this.isFuzzyMatch(normalizedText, 'APPROFONDIR') || normalizedText === '1';
245
+
246
+ if (isSuite) {
247
+ const userProgress = await prisma.userProgress.findUnique({ where: { userId_trackId: { userId: user.id, trackId: activeEnrollment.trackId } } });
248
+ const lastResponse = await prisma.response.findFirst({ where: { userId: user.id, dayNumber: activeEnrollment.currentDay }, orderBy: { createdAt: 'desc' } });
249
+
250
+ if (userProgress?.exerciseStatus !== 'COMPLETED' && userProgress?.exerciseStatus !== 'PENDING_DEEPDIVE' && !lastResponse) {
251
+ await scheduleMessage(user.id, user.language === 'WOLOF' ? "Dafa laaj nga tontu !" : "Tu dois d'abord répondre !");
252
+ return;
253
+ }
254
+ const nextDay = activeEnrollment.currentDay % 1 !== 0 ? Math.floor(activeEnrollment.currentDay) + 1 : activeEnrollment.currentDay + 1;
255
+ await prisma.enrollment.update({ where: { id: activeEnrollment.id }, data: { currentDay: nextDay } });
256
+ await prisma.userProgress.update({ where: { userId_trackId: { userId: user.id, trackId: activeEnrollment.trackId } }, data: { exerciseStatus: 'PENDING', iterationCount: 0 } });
257
+ await whatsappQueue.add('send-content', { userId: user.id, trackId: activeEnrollment.trackId, dayNumber: nextDay });
258
+ return;
259
+ }
260
+
261
+ if (isApprofondir) {
262
+ const userProgress = await prisma.userProgress.findUnique({ where: { userId_trackId: { userId: user.id, trackId: activeEnrollment.trackId } } });
263
+ const lastResponse = await prisma.response.findFirst({ where: { userId: user.id, dayNumber: activeEnrollment.currentDay }, orderBy: { createdAt: 'desc' } });
264
+
265
+ if (userProgress?.exerciseStatus === 'COMPLETED' || (userProgress?.exerciseStatus === 'PENDING' && lastResponse)) {
266
+ await prisma.userProgress.update({ where: { id: userProgress!.id }, data: { exerciseStatus: 'PENDING_DEEPDIVE' } });
267
+ await scheduleMessage(user.id, user.language === 'WOLOF' ? "Wax ma ndox mi..." : "Très bien ! Quelle info ?");
268
+ return;
269
+ }
270
+ }
271
+
272
+ const pendingProgress = await prisma.userProgress.findFirst({
273
+ where: { userId: user.id, exerciseStatus: { in: ['PENDING', 'PENDING_REMEDIATION', 'PENDING_DEEPDIVE'] }, trackId: activeEnrollment.trackId },
274
+ });
275
+
276
+ if (pendingProgress) {
277
+ const trackDay = await prisma.trackDay.findFirst({ where: { trackId: activeEnrollment.trackId, dayNumber: activeEnrollment.currentDay } });
278
+ if (trackDay) {
279
+ const isDeepDiveAction = pendingProgress.exerciseStatus === 'PENDING_DEEPDIVE';
280
+ const wordCount = (text || '').trim().split(/\s+/).length;
281
+ if (wordCount < 3) {
282
+ await scheduleMessage(user.id, user.language === 'WOLOF' ? "Tontu bi gatt na..." : "Ta réponse est un peu courte.");
283
+ return;
284
+ }
285
+
286
+ await scheduleMessage(user.id, user.language === 'WOLOF' ? "⏳ Defar ak sa tontu..." : "⏳ Analyse de votre réponse...");
287
+
288
+ let currentIterationCount = pendingProgress.iterationCount || 0;
289
+ if (isDeepDiveAction) {
290
+ currentIterationCount += 1;
291
+ await prisma.userProgress.update({ where: { id: pendingProgress.id }, data: { iterationCount: currentIterationCount } });
292
+ }
293
+
294
+ await prisma.response.create({ data: { enrollmentId: activeEnrollment.id, userId: user.id, dayNumber: activeEnrollment.currentDay, content: text } });
295
+
296
+ const previousResponsesData = await prisma.response.findMany({ where: { userId: user.id, enrollmentId: activeEnrollment.id }, orderBy: { dayNumber: 'asc' }, take: 5 });
297
+ const previousResponses = previousResponsesData.map(r => ({ day: r.dayNumber, response: r.content }));
298
+
299
+ await whatsappQueue.add('generate-feedback', {
300
+ userId: user.id, text, trackId: activeEnrollment.trackId, trackDayId: trackDay.id,
301
+ exercisePrompt: trackDay.exercisePrompt || '', lessonText: trackDay.lessonText || '',
302
+ exerciseCriteria: trackDay.exerciseCriteria, pendingProgressId: pendingProgress.id,
303
+ dayNumber: activeEnrollment.currentDay, totalDays: activeEnrollment.track.duration, language: user.language,
304
+ userActivity: user.activity, userRegion: user.city, previousResponses,
305
+ isDeepDive: isDeepDiveAction, iterationCount: currentIterationCount, imageUrl: imageUrl
306
+ }, { attempts: 3, backoff: { type: 'exponential', delay: 2000 } });
307
+ return;
308
+ }
309
+ }
310
+ }
311
+ }
312
+ }