File size: 14,806 Bytes
75f48fa
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f8a3c0e
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
"""

Advanced Recognition Module for NAVADA

Handles face recognition, custom object detection, and RAG-enhanced identification

"""

import cv2
import numpy as np
from typing import List, Dict, Tuple, Optional
import logging
from .database import db
from .face_detection import face_detector
import time
import uuid

# Configure logging
logger = logging.getLogger(__name__)

class NAVADARecognition:
    """Advanced recognition system with database integration"""
    
    def __init__(self):
        """Initialize recognition system"""
        self.face_threshold = 0.6  # Face recognition threshold
        self.object_threshold = 0.5  # Object recognition threshold
        self.session_id = str(uuid.uuid4())
        
    def extract_face_encoding(self, face_image: np.ndarray) -> Optional[np.ndarray]:
        """

        Extract face encoding for recognition

        This is a simplified version - in production, use face_recognition library

        """
        try:
            # Convert to grayscale and resize
            gray = cv2.cvtColor(face_image, cv2.COLOR_RGB2GRAY)
            resized = cv2.resize(gray, (128, 128))
            
            # Flatten and normalize as simple encoding
            encoding = resized.flatten().astype(np.float64)
            encoding = encoding / np.linalg.norm(encoding)  # Normalize
            
            return encoding
            
        except Exception as e:
            logger.error(f"Face encoding extraction failed: {e}")
            return None
    
    def compare_face_encodings(self, encoding1: np.ndarray, encoding2: np.ndarray) -> float:
        """Compare two face encodings and return similarity score"""
        try:
            # Calculate cosine similarity
            similarity = np.dot(encoding1, encoding2) / (
                np.linalg.norm(encoding1) * np.linalg.norm(encoding2)
            )
            return float(similarity)
            
        except Exception as e:
            logger.error(f"Face comparison failed: {e}")
            return 0.0
    
    def recognize_faces(self, image: np.ndarray) -> Tuple[np.ndarray, List[Dict]]:
        """

        Recognize faces in image against database

        

        Returns:

            Annotated image and list of recognition results

        """
        try:
            if not db:
                return image, []
            
            # Detect faces first
            annotated_img, face_stats = face_detector.detect_faces(image)
            
            # Get known faces from database
            known_faces = db.get_faces()
            
            recognition_results = []
            
            if face_stats and face_stats['faces']:
                for face_info in face_stats['faces']:
                    # Extract face region
                    pos = face_info['position']
                    x, y, w, h = pos['x'], pos['y'], pos['width'], pos['height']
                    face_region = image[y:y+h, x:x+w]
                    
                    if face_region.size > 0:
                        # Extract face encoding
                        face_encoding = self.extract_face_encoding(face_region)
                        
                        if face_encoding is not None:
                            # Compare with known faces
                            best_match = None
                            best_similarity = 0.0
                            
                            for known_face in known_faces:
                                similarity = self.compare_face_encodings(
                                    face_encoding, known_face['encoding']
                                )
                                
                                if similarity > best_similarity and similarity > self.face_threshold:
                                    best_similarity = similarity
                                    best_match = known_face
                            
                            # Add recognition result
                            if best_match:
                                # Draw name on image
                                name = best_match['name']
                                cv2.putText(annotated_img, f"{name} ({best_similarity:.2f})", 
                                          (x, y-30), cv2.FONT_HERSHEY_SIMPLEX, 0.6, 
                                          (0, 255, 0), 2)
                                
                                recognition_results.append({
                                    'face_id': face_info['face_id'],
                                    'name': name,
                                    'similarity': best_similarity,
                                    'position': pos,
                                    'database_id': best_match['id']
                                })
                            else:
                                # Unknown face
                                cv2.putText(annotated_img, "Unknown", 
                                          (x, y-30), cv2.FONT_HERSHEY_SIMPLEX, 0.6, 
                                          (0, 0, 255), 2)
                                
                                recognition_results.append({
                                    'face_id': face_info['face_id'],
                                    'name': 'Unknown',
                                    'similarity': 0.0,
                                    'position': pos,
                                    'database_id': None
                                })
            
            return annotated_img, recognition_results
            
        except Exception as e:
            logger.error(f"Face recognition failed: {e}")
            return image, []
    
    def add_new_face(self, image: np.ndarray, name: str, face_region: Tuple = None) -> bool:
        """

        Add a new face to the database

        

        Args:

            image: Full image containing the face

            name: Person's name

            face_region: Optional (x, y, w, h) region, if None will detect automatically

            

        Returns:

            Success status

        """
        try:
            if not db:
                logger.error("Database not available")
                return False
            
            if face_region:
                # Use provided region
                x, y, w, h = face_region
                face_img = image[y:y+h, x:x+w]
            else:
                # Detect face automatically
                _, face_stats = face_detector.detect_faces(image)
                
                if not face_stats or not face_stats['faces']:
                    logger.error("No face detected in image")
                    return False
                
                # Use first detected face
                pos = face_stats['faces'][0]['position']
                x, y, w, h = pos['x'], pos['y'], pos['width'], pos['height']
                face_img = image[y:y+h, x:x+w]
            
            # Extract encoding
            encoding = self.extract_face_encoding(face_img)
            if encoding is None:
                logger.error("Failed to extract face encoding")
                return False
            
            # Add to database
            face_id = db.add_face(
                name=name,
                face_encoding=encoding,
                image=face_img,
                confidence=0.9,
                metadata={
                    'source': 'user_added',
                    'session_id': self.session_id,
                    'timestamp': time.time()
                }
            )
            
            logger.info(f"Added new face '{name}' with ID {face_id}")
            return True
            
        except Exception as e:
            logger.error(f"Failed to add new face: {e}")
            return False
    
    def add_custom_object(self, image: np.ndarray, label: str, category: str, 

                         bbox: Tuple = None) -> bool:
        """

        Add a custom object to the database

        

        Args:

            image: Full image containing the object

            label: Object label/name

            category: Object category

            bbox: Optional (x, y, w, h) bounding box

            

        Returns:

            Success status

        """
        try:
            if not db:
                logger.error("Database not available")
                return False
            
            if bbox:
                # Use provided bounding box
                x, y, w, h = bbox
                object_img = image[y:y+h, x:x+w]
            else:
                # Use entire image as object
                object_img = image
                bbox = (0, 0, image.shape[1], image.shape[0])
            
            # Extract simple features (can be enhanced with deep learning)
            features = self.extract_object_features(object_img)
            
            # Add to database
            object_id = db.add_object(
                label=label,
                category=category,
                features=features,
                image=object_img,
                bounding_box=bbox,
                confidence=0.8,
                metadata={
                    'source': 'user_added',
                    'session_id': self.session_id,
                    'timestamp': time.time()
                }
            )
            
            logger.info(f"Added custom object '{label}' with ID {object_id}")
            return True
            
        except Exception as e:
            logger.error(f"Failed to add custom object: {e}")
            return False
    
    def extract_object_features(self, object_img: np.ndarray) -> np.ndarray:
        """Extract features from object image (simplified implementation)"""
        try:
            # Convert to grayscale and resize
            gray = cv2.cvtColor(object_img, cv2.COLOR_RGB2GRAY)
            resized = cv2.resize(gray, (64, 64))
            
            # Extract histogram features
            hist = cv2.calcHist([resized], [0], None, [256], [0, 256])
            hist_normalized = hist.flatten() / hist.sum()
            
            # Extract edge features
            edges = cv2.Canny(resized, 50, 150)
            edge_density = edges.sum() / edges.size
            
            # Combine features
            features = np.concatenate([hist_normalized, [edge_density]])
            
            return features.astype(np.float64)
            
        except Exception as e:
            logger.error(f"Feature extraction failed: {e}")
            return np.array([])
    
    def enhance_with_rag(self, detections: List, face_matches: List = None) -> str:
        """

        Use RAG to enhance detection results with context

        

        Args:

            detections: List of detected objects

            face_matches: List of face recognition results

            

        Returns:

            Enhanced description with context

        """
        try:
            if not db:
                return "Enhanced analysis not available (database offline)"
            
            # Build search queries from detections
            queries = []
            
            # Add object queries
            for detection in detections:
                queries.append(detection)
            
            # Add face queries
            if face_matches:
                for match in face_matches:
                    if match['name'] != 'Unknown':
                        queries.append(match['name'])
            
            # Search knowledge base
            knowledge_results = []
            for query in queries:
                results = db.search_knowledge(query)
                knowledge_results.extend(results)
            
            # Build enhanced description
            if knowledge_results:
                enhanced_desc = "🧠 **Enhanced Analysis with Context:**\n\n"
                
                # Group by entity type
                face_context = [r for r in knowledge_results if r['entity_type'] == 'face']
                object_context = [r for r in knowledge_results if r['entity_type'] == 'object']
                
                if face_context:
                    enhanced_desc += "πŸ‘₯ **Known Individuals:**\n"
                    for ctx in face_context[:3]:  # Limit to 3 results
                        enhanced_desc += f"  β€’ {ctx['content']}\n"
                    enhanced_desc += "\n"
                
                if object_context:
                    enhanced_desc += "🏷️ **Recognized Objects:**\n"
                    for ctx in object_context[:3]:  # Limit to 3 results
                        enhanced_desc += f"  β€’ {ctx['content']}\n"
                    enhanced_desc += "\n"
                
                enhanced_desc += "πŸ“Š **Context Insights:**\n"
                enhanced_desc += f"  β€’ Found {len(knowledge_results)} relevant knowledge entries\n"
                enhanced_desc += f"  β€’ Analysis includes both detected and learned objects\n"
                
                return enhanced_desc
            else:
                return "πŸ” **Context Analysis:** No additional context found in knowledge base."
                
        except Exception as e:
            logger.error(f"RAG enhancement failed: {e}")
            return "❌ Enhanced analysis unavailable due to processing error."
    
    def save_session_data(self, image: np.ndarray, detections: List, 

                         face_matches: List = None, processing_time: float = 0.0):
        """Save current session data to database"""
        try:
            if db:
                db.save_detection_history(
                    session_id=self.session_id,
                    image=image,
                    detections=detections,
                    face_matches=face_matches,
                    processing_time=processing_time,
                    metadata={
                        'timestamp': time.time(),
                        'version': '2.0'
                    }
                )
        except Exception as e:
            logger.error(f"Failed to save session data: {e}")

# Global recognition instance
try:
    recognition_system = NAVADARecognition()
    logger.info("Recognition system initialized successfully")
except Exception as e:
    logger.error(f"Failed to initialize recognition system: {e}")
    recognition_system = None