Arslan1997 commited on
Commit
aa8b834
·
1 Parent(s): 8ece370

added latest models

Browse files
app/routes/forms.py CHANGED
@@ -703,13 +703,16 @@ async def chat_stream(
703
  except Exception as e:
704
  logger.warning(f"Could not load responses: {e}")
705
 
 
 
 
706
  # Determine chat type based on whether this is likely a response analysis
707
  chat_type = "form_editing" # Default
708
 
709
  # Save user message to database
710
  user_chat_message = ChatMessageModel(
711
  form_id=form_id,
712
- user_id=current_user.id,
713
  chat_type=chat_type,
714
  role="user",
715
  content=chat_data.message
@@ -929,45 +932,42 @@ async def chat_stream(
929
  logger.error(f"Stream error: {e}", exc_info=True)
930
  collected_response['content'] = f"Error: {str(e)}"
931
  yield f"event: error\ndata: {json.dumps({'error': str(e)})}\n\n"
932
- finally:
933
- if conn:
934
- conn.close()
935
 
936
- async def save_assistant_message_background():
937
- """Save assistant message after stream completes"""
938
- # Create a new db session for background task
939
  from ..core.db import SessionLocal
 
940
  try:
941
- background_db = SessionLocal()
942
- if collected_response.get('content'):
943
- # Update user message with correct chat_type if it was response analysis
944
- if collected_response.get('route') == 'analyze_responses':
945
- user_chat_message.chat_type = "response_analysis"
946
- background_db.merge(user_chat_message)
947
-
948
- assistant_message = ChatMessageModel(
949
- form_id=form_id,
950
- user_id=current_user.id,
951
- chat_type="response_analysis" if collected_response.get('route') == 'analyze_responses' else "form_editing",
952
  role="assistant",
953
- content=collected_response.get('content', ''),
954
- query_type=collected_response.get('query_type'),
955
- sql_query=collected_response.get('sql_query'),
956
- result_data=collected_response.get('result_data')
957
  )
958
- background_db.add(assistant_message)
959
- background_db.commit()
 
960
  except Exception as e:
961
  logger.error(f"Failed to save chat message: {e}")
 
 
962
  finally:
963
- background_db.close()
 
964
 
965
  async def streaming_with_save():
966
  """Wrap generator to save message after completion"""
967
  async for event in event_generator():
968
  yield event
969
- # Save after streaming completes
970
- await save_assistant_message_background()
971
 
972
  return StreamingResponse(
973
  streaming_with_save(),
@@ -1109,8 +1109,7 @@ async def chat_edit_form(
1109
  if status_breakdown:
1110
  assistant_response += f"**Status:** {', '.join([f'{k}: {v}' for k, v in status_breakdown.items()])}"
1111
 
1112
- if conn:
1113
- conn.close()
1114
 
1115
  return ChatResponse(
1116
  route="analyze_responses",
@@ -1123,17 +1122,14 @@ async def chat_edit_form(
1123
 
1124
  except Exception as e:
1125
  logger.error(f"Response analysis failed: {e}", exc_info=True)
1126
- if conn:
1127
- conn.close()
1128
  return ChatResponse(
1129
  route="analyze_responses",
1130
  response={"message": f"Error analyzing responses: {str(e)}"},
1131
  changes_made=None
1132
  )
1133
 
1134
- # Close conn if opened but not used
1135
- if conn:
1136
- conn.close()
1137
 
1138
  # === Handle No Responses ===
1139
  if route == 'no_responses':
 
703
  except Exception as e:
704
  logger.warning(f"Could not load responses: {e}")
705
 
706
+ # Capture user ID as plain int to avoid session detachment issues
707
+ user_id_for_save = current_user.id
708
+
709
  # Determine chat type based on whether this is likely a response analysis
710
  chat_type = "form_editing" # Default
711
 
712
  # Save user message to database
713
  user_chat_message = ChatMessageModel(
714
  form_id=form_id,
715
+ user_id=user_id_for_save,
716
  chat_type=chat_type,
717
  role="user",
718
  content=chat_data.message
 
932
  logger.error(f"Stream error: {e}", exc_info=True)
933
  collected_response['content'] = f"Error: {str(e)}"
934
  yield f"event: error\ndata: {json.dumps({'error': str(e)})}\n\n"
935
+ # Note: Don't close conn - it's cached for session reuse
 
 
936
 
937
+ def save_assistant_message_sync(form_id_val: int, user_id_val: int, response_dict: dict):
938
+ """Save assistant message in a completely isolated session"""
 
939
  from ..core.db import SessionLocal
940
+ session = None
941
  try:
942
+ session = SessionLocal()
943
+ if response_dict.get('content'):
944
+ msg = ChatMessageModel(
945
+ form_id=form_id_val,
946
+ user_id=user_id_val,
947
+ chat_type="response_analysis" if response_dict.get('route') == 'analyze_responses' else "form_editing",
 
 
 
 
 
948
  role="assistant",
949
+ content=response_dict.get('content', ''),
950
+ query_type=response_dict.get('query_type'),
951
+ sql_query=response_dict.get('sql_query'),
952
+ result_data=response_dict.get('result_data')
953
  )
954
+ session.add(msg)
955
+ session.commit()
956
+ logger.debug(f"Saved assistant message for form {form_id_val}")
957
  except Exception as e:
958
  logger.error(f"Failed to save chat message: {e}")
959
+ if session:
960
+ session.rollback()
961
  finally:
962
+ if session:
963
+ session.close()
964
 
965
  async def streaming_with_save():
966
  """Wrap generator to save message after completion"""
967
  async for event in event_generator():
968
  yield event
969
+ # Save after streaming completes - pass values explicitly to avoid closure issues
970
+ save_assistant_message_sync(form_id, user_id_for_save, collected_response.copy())
971
 
972
  return StreamingResponse(
973
  streaming_with_save(),
 
1109
  if status_breakdown:
1110
  assistant_response += f"**Status:** {', '.join([f'{k}: {v}' for k, v in status_breakdown.items()])}"
1111
 
1112
+ # Note: Don't close conn - it's cached for session reuse
 
1113
 
1114
  return ChatResponse(
1115
  route="analyze_responses",
 
1122
 
1123
  except Exception as e:
1124
  logger.error(f"Response analysis failed: {e}", exc_info=True)
1125
+ # Note: Don't close conn - it's cached for session reuse
 
1126
  return ChatResponse(
1127
  route="analyze_responses",
1128
  response={"message": f"Error analyzing responses: {str(e)}"},
1129
  changes_made=None
1130
  )
1131
 
1132
+ # Note: Don't close conn - it's cached for session reuse
 
 
1133
 
1134
  # === Handle No Responses ===
1135
  if route == 'no_responses':
app/routes/responses.py CHANGED
@@ -27,6 +27,7 @@ from ..services.validation_service import validation_service
27
  from ..services.analytics_service import analytics_service
28
  from ..services.webhook_service import webhook_service
29
  from ..services.s3_service import s3_service
 
30
  from ..middleware.rate_limiter import rate_limiter
31
 
32
  import logging
@@ -270,6 +271,9 @@ async def submit_form(
270
  db.commit()
271
  db.refresh(form_response)
272
 
 
 
 
273
  return SubmissionResponse(
274
  id=form_response.id,
275
  form_id=form.id,
@@ -408,6 +412,9 @@ async def autosave_submission(
408
  db.commit()
409
  db.refresh(form_response)
410
 
 
 
 
411
  # Validate answers (non-blocking)
412
  validation_result = validation_service.validate_submission(
413
  db=db,
 
27
  from ..services.analytics_service import analytics_service
28
  from ..services.webhook_service import webhook_service
29
  from ..services.s3_service import s3_service
30
+ from ..services.response_chat_service import invalidate_duckdb_cache
31
  from ..middleware.rate_limiter import rate_limiter
32
 
33
  import logging
 
271
  db.commit()
272
  db.refresh(form_response)
273
 
274
+ # Invalidate DuckDB cache so new data is reflected in analytics
275
+ invalidate_duckdb_cache(form.id)
276
+
277
  return SubmissionResponse(
278
  id=form_response.id,
279
  form_id=form.id,
 
412
  db.commit()
413
  db.refresh(form_response)
414
 
415
+ # Invalidate DuckDB cache so new data is reflected in analytics
416
+ invalidate_duckdb_cache(form.id)
417
+
418
  # Validate answers (non-blocking)
419
  validation_result = validation_service.validate_submission(
420
  db=db,
app/services/response_chat_service.py CHANGED
@@ -11,6 +11,7 @@ import json
11
  import logging
12
  import os
13
  import re
 
14
  from typing import Dict, Any, List, Optional, Tuple
15
  from datetime import datetime
16
  from sqlalchemy.orm import Session
@@ -20,6 +21,76 @@ from ..models import Form, FormQuestion, FormResponse, ResponseAnswer, ChatMessa
20
  logger = logging.getLogger(__name__)
21
 
22
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
23
  class ResponseChatService:
24
  """Service for analyzing form responses via chat interface using DuckDB."""
25
 
@@ -37,6 +108,25 @@ class ResponseChatService:
37
  Returns:
38
  Tuple of (DuckDB connection, column names, question_id to column name mapping)
39
  """
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
40
  # Get form and questions
41
  form = db.query(Form).filter(Form.id == form_id).first()
42
  if not form:
@@ -46,11 +136,14 @@ class ResponseChatService:
46
  FormQuestion.form_id == form_id
47
  ).order_by(FormQuestion.question_order).all()
48
 
49
- # Get responses
50
  responses = db.query(FormResponse).filter(
51
- FormResponse.form_id == form_id
 
52
  ).all()
53
 
 
 
54
  # Create column mapping from questions
55
  # Column names: response_id, submitted_at, status, q1_<sanitized_text>, q2_<sanitized_text>, ...
56
  question_id_to_col = {}
@@ -102,6 +195,9 @@ class ResponseChatService:
102
  col_defs = ", ".join([f'"{c}" VARCHAR' for c in columns])
103
  conn.execute(f"CREATE TABLE responses ({col_defs})")
104
 
 
 
 
105
  return conn, columns, question_id_to_col
106
 
107
  def _extract_answer_text(self, answer_value: Any, question_type: str) -> str:
@@ -109,7 +205,15 @@ class ResponseChatService:
109
  if answer_value is None:
110
  return ""
111
 
 
 
 
 
112
  if isinstance(answer_value, dict):
 
 
 
 
113
  # Handle different answer formats
114
  if "text" in answer_value:
115
  val = answer_value["text"]
@@ -117,6 +221,14 @@ class ResponseChatService:
117
  if "number" in answer_value:
118
  val = answer_value["number"]
119
  return str(val) if val is not None else ""
 
 
 
 
 
 
 
 
120
  if "choices" in answer_value:
121
  choices = answer_value["choices"]
122
  if choices is None:
@@ -145,7 +257,8 @@ class ResponseChatService:
145
  if isinstance(items, list):
146
  return " > ".join(str(i) for i in items)
147
  return str(items)
148
- # Fallback for other dict types
 
149
  return json.dumps(answer_value)
150
 
151
  return str(answer_value)
@@ -166,6 +279,72 @@ class ResponseChatService:
166
  value_rows.append(f"({', '.join(values)})")
167
  return ", ".join(value_rows)
168
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
169
  def execute_query(
170
  self,
171
  conn: duckdb.DuckDBPyConnection,
 
11
  import logging
12
  import os
13
  import re
14
+ import time
15
  from typing import Dict, Any, List, Optional, Tuple
16
  from datetime import datetime
17
  from sqlalchemy.orm import Session
 
21
  logger = logging.getLogger(__name__)
22
 
23
 
24
+ # Simple session cache for DuckDB connections (per form)
25
+ # Structure: {form_id: {'conn': connection, 'columns': [...], 'question_map': {...}, 'timestamp': float, 'count': int}}
26
+ _duckdb_sessions: Dict[int, Dict[str, Any]] = {}
27
+ _SESSION_TTL = 1200 # 20 minutes
28
+
29
+
30
+ def get_cached_duckdb(form_id: int) -> Optional[Dict[str, Any]]:
31
+ """Get cached DuckDB session if still valid (< 5 mins old)."""
32
+ if form_id not in _duckdb_sessions:
33
+ return None
34
+
35
+ session = _duckdb_sessions[form_id]
36
+ age = time.time() - session['timestamp']
37
+
38
+ if age > _SESSION_TTL:
39
+ # Expired - remove it
40
+ try:
41
+ session['conn'].close()
42
+ except:
43
+ pass
44
+ del _duckdb_sessions[form_id]
45
+ logger.debug(f"DuckDB session expired for form {form_id} (age: {age:.0f}s)")
46
+ return None
47
+
48
+ # Verify connection is still valid
49
+ try:
50
+ session['conn'].execute("SELECT 1").fetchone()
51
+ logger.debug(f"Using cached DuckDB session for form {form_id} (age: {age:.0f}s)")
52
+ return session
53
+ except Exception as e:
54
+ logger.warning(f"DuckDB connection invalid for form {form_id}: {e}")
55
+ try:
56
+ session['conn'].close()
57
+ except:
58
+ pass
59
+ del _duckdb_sessions[form_id]
60
+ return None
61
+
62
+
63
+ def set_cached_duckdb(form_id: int, conn: duckdb.DuckDBPyConnection,
64
+ columns: List[str], question_map: Dict[int, str], count: int) -> None:
65
+ """Cache a DuckDB session for a form."""
66
+ # Close old connection if exists
67
+ if form_id in _duckdb_sessions:
68
+ try:
69
+ _duckdb_sessions[form_id]['conn'].close()
70
+ except:
71
+ pass
72
+
73
+ _duckdb_sessions[form_id] = {
74
+ 'conn': conn,
75
+ 'columns': columns,
76
+ 'question_map': question_map,
77
+ 'count': count,
78
+ 'timestamp': time.time()
79
+ }
80
+ logger.debug(f"Cached DuckDB session for form {form_id} with {count} responses")
81
+
82
+
83
+ def invalidate_duckdb_cache(form_id: int) -> None:
84
+ """Invalidate cached DuckDB session for a form."""
85
+ if form_id in _duckdb_sessions:
86
+ try:
87
+ _duckdb_sessions[form_id]['conn'].close()
88
+ except:
89
+ pass
90
+ del _duckdb_sessions[form_id]
91
+ logger.debug(f"Invalidated DuckDB cache for form {form_id}")
92
+
93
+
94
  class ResponseChatService:
95
  """Service for analyzing form responses via chat interface using DuckDB."""
96
 
 
108
  Returns:
109
  Tuple of (DuckDB connection, column names, question_id to column name mapping)
110
  """
111
+ from sqlalchemy import func
112
+
113
+ # Get current response count to check if cache is valid
114
+ current_count = db.query(func.count(FormResponse.id)).filter(
115
+ FormResponse.form_id == form_id,
116
+ FormResponse.status.in_(["complete", "partial"])
117
+ ).scalar() or 0
118
+
119
+ # Check cache first
120
+ cached = get_cached_duckdb(form_id)
121
+ if cached is not None and cached['count'] == current_count:
122
+ # Cache is valid and count matches
123
+ return cached['conn'], cached['columns'], cached['question_map']
124
+
125
+ # Cache miss or count changed - need to reload
126
+ if cached is not None:
127
+ logger.info(f"Response count changed for form {form_id}: {cached['count']} -> {current_count}, reloading")
128
+ invalidate_duckdb_cache(form_id)
129
+
130
  # Get form and questions
131
  form = db.query(Form).filter(Form.id == form_id).first()
132
  if not form:
 
136
  FormQuestion.form_id == form_id
137
  ).order_by(FormQuestion.question_order).all()
138
 
139
+ # Get responses - only complete and partial
140
  responses = db.query(FormResponse).filter(
141
+ FormResponse.form_id == form_id,
142
+ FormResponse.status.in_(["complete", "partial"])
143
  ).all()
144
 
145
+ logger.info(f"Loading {len(responses)} responses into DuckDB for form {form_id}")
146
+
147
  # Create column mapping from questions
148
  # Column names: response_id, submitted_at, status, q1_<sanitized_text>, q2_<sanitized_text>, ...
149
  question_id_to_col = {}
 
195
  col_defs = ", ".join([f'"{c}" VARCHAR' for c in columns])
196
  conn.execute(f"CREATE TABLE responses ({col_defs})")
197
 
198
+ # Cache for reuse within session (5 min TTL)
199
+ set_cached_duckdb(form_id, conn, columns, question_id_to_col, len(responses))
200
+
201
  return conn, columns, question_id_to_col
202
 
203
  def _extract_answer_text(self, answer_value: Any, question_type: str) -> str:
 
205
  if answer_value is None:
206
  return ""
207
 
208
+ # Handle plain numeric types directly (int, float)
209
+ if isinstance(answer_value, (int, float)):
210
+ return str(answer_value)
211
+
212
  if isinstance(answer_value, dict):
213
+ # Empty dict - return empty string
214
+ if not answer_value:
215
+ return ""
216
+
217
  # Handle different answer formats
218
  if "text" in answer_value:
219
  val = answer_value["text"]
 
221
  if "number" in answer_value:
222
  val = answer_value["number"]
223
  return str(val) if val is not None else ""
224
+ # Also check for 'value' key (common for sliders/scales)
225
+ if "value" in answer_value:
226
+ val = answer_value["value"]
227
+ return str(val) if val is not None else ""
228
+ # Check for 'scale' key
229
+ if "scale" in answer_value:
230
+ val = answer_value["scale"]
231
+ return str(val) if val is not None else ""
232
  if "choices" in answer_value:
233
  choices = answer_value["choices"]
234
  if choices is None:
 
257
  if isinstance(items, list):
258
  return " > ".join(str(i) for i in items)
259
  return str(items)
260
+ # Fallback for other dict types - log and dump
261
+ logger.debug(f"Unknown answer format for {question_type}: {answer_value}")
262
  return json.dumps(answer_value)
263
 
264
  return str(answer_value)
 
279
  value_rows.append(f"({', '.join(values)})")
280
  return ", ".join(value_rows)
281
 
282
+ def get_responses_direct(
283
+ self,
284
+ db: Session,
285
+ form_id: int,
286
+ limit: int = 100
287
+ ) -> Tuple[List[Dict], List[str], Dict[str, Any]]:
288
+ """
289
+ Get responses directly from database (fallback when DuckDB fails).
290
+
291
+ Returns:
292
+ Tuple of (rows, columns, summary_dict)
293
+ """
294
+ # Get form questions for column names
295
+ questions = db.query(FormQuestion).filter(
296
+ FormQuestion.form_id == form_id
297
+ ).order_by(FormQuestion.question_order).all()
298
+
299
+ # Get responses with status filter
300
+ responses = db.query(FormResponse).filter(
301
+ FormResponse.form_id == form_id,
302
+ FormResponse.status.in_(["complete", "partial"])
303
+ ).order_by(FormResponse.submitted_at.desc()).limit(limit).all()
304
+
305
+ if not responses:
306
+ return [], ["response_id", "submitted_at", "status"], {"total_responses": 0, "status_breakdown": {}}
307
+
308
+ # Build column names
309
+ columns = ["response_id", "submitted_at", "status"]
310
+ question_id_to_col = {}
311
+ for i, q in enumerate(questions):
312
+ sanitized = re.sub(r'[^a-zA-Z0-9]', '_', q.question_text[:30]).lower()
313
+ col_name = f"q{i+1}_{sanitized}"
314
+ columns.append(col_name)
315
+ question_id_to_col[q.id] = col_name
316
+
317
+ # Build rows
318
+ rows = []
319
+ status_counts = {}
320
+ for response in responses:
321
+ # Get answers for this response
322
+ answers = db.query(ResponseAnswer).filter(
323
+ ResponseAnswer.form_response_id == response.id
324
+ ).all()
325
+
326
+ row = {
327
+ "response_id": str(response.id),
328
+ "submitted_at": response.submitted_at.isoformat() if response.submitted_at else None,
329
+ "status": response.status
330
+ }
331
+
332
+ status_counts[response.status] = status_counts.get(response.status, 0) + 1
333
+
334
+ answer_map = {a.form_question_id: a.answer_value for a in answers}
335
+ for q in questions:
336
+ col_name = question_id_to_col[q.id]
337
+ answer_value = answer_map.get(q.id)
338
+ row[col_name] = self._extract_answer_text(answer_value, q.question_type.value)
339
+ rows.append(row)
340
+
341
+ summary = {
342
+ "total_responses": len(responses),
343
+ "status_breakdown": status_counts
344
+ }
345
+
346
+ return rows, columns, summary
347
+
348
  def execute_query(
349
  self,
350
  conn: duckdb.DuckDBPyConnection,
app/stripe_routes.py CHANGED
@@ -83,7 +83,7 @@ def create_checkout_session(
83
 
84
  # Known promotion codes mapping (code -> Stripe promotion code ID)
85
  KNOWN_PROMO_CODES = {
86
- "FIRST100": "promo_1SrJbKBACqQSnujJriPjszSv",
87
  }
88
 
89
  # Apply promo code if provided
 
83
 
84
  # Known promotion codes mapping (code -> Stripe promotion code ID)
85
  KNOWN_PROMO_CODES = {
86
+ "NEWYEARS": "promo_1SrJE0BACqQSnujJY6Q9o3X2",
87
  }
88
 
89
  # Apply promo code if provided