| import os
|
| import sys
|
| import zipfile
|
| import pandas as pd
|
| import numpy as np
|
| from flask import Flask, request, redirect, url_for, send_from_directory, flash, render_template
|
| from werkzeug.utils import secure_filename
|
| from tqdm import tqdm
|
| from sklearn.metrics import classification_report, precision_recall_fscore_support
|
| from inference_utils import DiamondInference
|
| from dotenv import load_dotenv
|
|
|
|
|
| load_dotenv()
|
|
|
| app = Flask(__name__)
|
| app.secret_key = "supersecretkey"
|
|
|
|
|
| HF_REPO_ID = os.getenv("HF_REPO_ID", "WebashalarForML/Diamcol")
|
| HF_TOKEN = os.getenv("HF_TOKEN")
|
|
|
|
|
| MODEL_ID = "322c4f4d"
|
| MODEL_NAME = f"model_vit_robust_{MODEL_ID}.keras"
|
|
|
| def download_model_from_hf():
|
| from huggingface_hub import hf_hub_download
|
| print("[INFO] Checking model files from Hugging Face...")
|
|
|
|
|
| if not os.path.exists(MODEL_NAME):
|
| print(f"[INFO] Downloading {MODEL_NAME}...")
|
| hf_hub_download(repo_id=HF_REPO_ID, filename=MODEL_NAME, token=HF_TOKEN, local_dir=".")
|
|
|
|
|
| encoder_files = [
|
| f"hyperparameters_{MODEL_ID}.pkl",
|
| f"cat_encoders_{MODEL_ID}.pkl",
|
| f"num_scaler_{MODEL_ID}.pkl",
|
| f"target_encoder_{MODEL_ID}.pkl",
|
| f"norm_stats_{MODEL_ID}.pkl"
|
| ]
|
| os.makedirs("encoder", exist_ok=True)
|
| for f in encoder_files:
|
| f_path = os.path.join("encoder", f)
|
| if not os.path.exists(f_path):
|
| print(f"[INFO] Downloading {f}...")
|
|
|
| hf_hub_download(repo_id=HF_REPO_ID, filename=f"encoder/{f}", token=HF_TOKEN, local_dir=".")
|
|
|
| UPLOAD_FOLDER = 'uploads'
|
| RESULTS_FOLDER = 'results'
|
| EXTRACT_FOLDER = os.path.join(UPLOAD_FOLDER, 'extracted_images')
|
|
|
| for folder in [UPLOAD_FOLDER, RESULTS_FOLDER, EXTRACT_FOLDER]:
|
| if not os.path.exists(folder):
|
| os.makedirs(folder)
|
|
|
| app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
|
| app.config['MAX_CONTENT_LENGTH'] = 500 * 1024 * 1024
|
|
|
|
|
| model_path = MODEL_NAME
|
| encoder_dir = "encoder"
|
| infer_engine = None
|
|
|
| def get_inference_engine():
|
| global infer_engine
|
| if infer_engine is None:
|
|
|
| try:
|
| download_model_from_hf()
|
| except Exception as e:
|
| print(f"[WARNING] Could not download from HF: {e}. Expecting local files.")
|
|
|
| infer_engine = DiamondInference(model_path, encoder_dir, MODEL_ID)
|
|
|
|
|
| print("[INFO] Warming up Inference Engine...")
|
| try:
|
|
|
| dummy_row = {"StoneType": "NATURAL", "Color": "D", "Brown": "N", "BlueUv": "N", "GrdType": "GIA", "Carat": 1.0, "Result": "D"}
|
|
|
|
|
| orig_process = infer_engine.process_image
|
| try:
|
| infer_engine.process_image = lambda path, tta_transform=None: np.zeros(infer_engine.hp["flat_patches_shape"], dtype=np.float32)
|
| infer_engine.predict(dummy_row, "warmup.jpg", use_tta=False)
|
| finally:
|
| infer_engine.process_image = orig_process
|
| print("[INFO] Warmup complete.")
|
| except Exception as e:
|
| print(f"[WARNING] Warmup failed: {e}")
|
|
|
| return infer_engine
|
|
|
| @app.route('/flush', methods=['POST'])
|
| def flush_data():
|
| import shutil
|
| try:
|
|
|
| for filename in os.listdir(UPLOAD_FOLDER):
|
| file_path = os.path.join(UPLOAD_FOLDER, filename)
|
| try:
|
| if os.path.isfile(file_path) or os.path.islink(file_path):
|
| os.unlink(file_path)
|
| elif os.path.isdir(file_path):
|
| shutil.rmtree(file_path)
|
| except Exception as e:
|
| print(f'Failed to delete {file_path}. Reason: {e}')
|
|
|
|
|
| if not os.path.exists(EXTRACT_FOLDER):
|
| os.makedirs(EXTRACT_FOLDER)
|
|
|
|
|
| for filename in os.listdir(RESULTS_FOLDER):
|
| file_path = os.path.join(RESULTS_FOLDER, filename)
|
| try:
|
| if os.path.isfile(file_path) or os.path.islink(file_path):
|
| os.unlink(file_path)
|
| elif os.path.isdir(file_path):
|
| shutil.rmtree(file_path)
|
| except Exception as e:
|
| print(f'Failed to delete {file_path}. Reason: {e}')
|
|
|
| flash('All data flushed successfully.')
|
| except Exception as e:
|
| flash(f'Error during flushing: {e}')
|
|
|
| return redirect(url_for('index'))
|
|
|
| @app.route('/')
|
| def index():
|
| return render_template('index.html')
|
|
|
| @app.route('/upload', methods=['POST'])
|
| def upload_files():
|
| if 'zip_file' not in request.files or 'excel_file' not in request.files:
|
| flash('Both Zip and Excel files are required.')
|
| return redirect(request.url)
|
|
|
| zip_file = request.files['zip_file']
|
| excel_file = request.files['excel_file']
|
|
|
| if zip_file.filename == '' or excel_file.filename == '':
|
| flash('No selected file')
|
| return redirect(request.url)
|
|
|
|
|
| zip_path = os.path.join(app.config['UPLOAD_FOLDER'], secure_filename(zip_file.filename))
|
| zip_file.save(zip_path)
|
|
|
| with zipfile.ZipFile(zip_path, 'r') as zip_ref:
|
| zip_ref.extractall(EXTRACT_FOLDER)
|
|
|
|
|
| excel_path = os.path.join(app.config['UPLOAD_FOLDER'], secure_filename(excel_file.filename))
|
| excel_file.save(excel_path)
|
| df = pd.read_excel(excel_path)
|
|
|
|
|
| engine = get_inference_engine()
|
|
|
|
|
| all_extracted_files = []
|
| for root, dirs, files in os.walk(EXTRACT_FOLDER):
|
| for f in files:
|
| if f.lower().endswith(('.jpg', '.jpeg', '.png')):
|
| all_extracted_files.append(os.path.join(root, f))
|
|
|
| print(f"[INFO] Found {len(all_extracted_files)} images in extraction folder.")
|
|
|
|
|
| y_true = []
|
| y_pred = []
|
|
|
| print(f"[INFO] Initializing Inference Pipeline for {len(df)} stones...")
|
| sys.stdout.flush()
|
|
|
|
|
| pbar = tqdm(df.iterrows(), total=len(df), desc="Inference Progress", file=sys.stdout)
|
|
|
| for index, row in pbar:
|
| l_code = str(row.get('L_Code', '')).split('.')[0]
|
| sr_no = str(row.get('SrNo', '')).split('.')[0]
|
| stone_id = str(row.get('Stone_Id', ''))
|
|
|
|
|
| if index % 5 == 0:
|
| print(f"[PROC] Stone {index+1}/{len(df)}: {l_code}")
|
| sys.stdout.flush()
|
|
|
| img_path = None
|
| for full_path in all_extracted_files:
|
| fname = os.path.basename(full_path)
|
| if l_code in fname and sr_no in fname:
|
| img_path = full_path
|
| break
|
|
|
| if not img_path and stone_id != 'nan' and stone_id:
|
| for full_path in all_extracted_files:
|
| if stone_id in os.basename(full_path):
|
| img_path = full_path
|
| break
|
|
|
| if img_path:
|
| prediction = engine.predict(row, img_path)
|
|
|
| web_path = os.path.relpath(img_path, start=EXTRACT_FOLDER)
|
| df.at[index, 'Predicted_FGrdCol'] = prediction
|
| df.at[index, 'Image_Path'] = web_path
|
|
|
|
|
| if 'FGrdCol' in row and pd.notna(row['FGrdCol']):
|
| y_true.append(str(row['FGrdCol']))
|
| y_pred.append(str(prediction))
|
| else:
|
| df.at[index, 'Predicted_FGrdCol'] = "Image Not Found"
|
| df.at[index, 'Image_Path'] = "N/A"
|
|
|
|
|
| metrics = None
|
| if y_true:
|
| report_dict = classification_report(y_true, y_pred, output_dict=True, zero_division=0)
|
|
|
|
|
| class_metrics = []
|
| labels = sorted(list(set(y_true) | set(y_pred)))
|
|
|
| from sklearn.metrics import confusion_matrix
|
| cm = confusion_matrix(y_true, y_pred, labels=labels)
|
|
|
| for label, scores in report_dict.items():
|
| if label not in ['accuracy', 'macro avg', 'weighted avg']:
|
| class_metrics.append({
|
| 'label': label,
|
| 'precision': round(scores['precision'], 4),
|
| 'recall': round(scores['recall'], 4),
|
| 'f1': round(scores['f1-score'], 4),
|
| 'support': scores['support']
|
| })
|
|
|
| metrics = {
|
| 'accuracy': round(report_dict['accuracy'], 4),
|
| 'class_metrics': class_metrics,
|
| 'weighted_avg': report_dict['weighted avg'],
|
| 'macro_avg': report_dict['macro avg'],
|
| 'precision': round(report_dict['weighted avg']['precision'], 4),
|
| 'recall': round(report_dict['weighted avg']['recall'], 4),
|
| 'f1': round(report_dict['weighted avg']['f1-score'], 4),
|
| 'macro_f1': round(report_dict['macro avg']['f1-score'], 4),
|
| 'macro_precision': round(report_dict['macro avg']['precision'], 4),
|
| 'macro_recall': round(report_dict['macro avg']['recall'], 4),
|
| 'confusion_matrix': {
|
| 'labels': labels,
|
| 'matrix': cm.tolist()
|
| }
|
| }
|
|
|
|
|
| model_features = ["StoneType", "Color", "Brown", "BlueUv", "GrdType", "Carat", "Result"]
|
|
|
|
|
| potential_oob = ['FancyYellow', 'Type2A', 'YellowUv']
|
| out_of_box_cols = []
|
| for col in potential_oob:
|
| if col in df.columns:
|
|
|
| if df[col].dropna().astype(str).str.strip().replace(['nan', 'None', ''], pd.NA).notna().any():
|
| out_of_box_cols.append(col)
|
|
|
| output_filename = f"report_{secure_filename(excel_file.filename)}"
|
| output_path = os.path.join(RESULTS_FOLDER, output_filename)
|
| df.to_excel(output_path, index=False)
|
|
|
| return render_template('report.html',
|
| report_data=df.to_dict(orient='records'),
|
| report_file=output_filename,
|
| out_of_box_cols=out_of_box_cols,
|
| model_features=model_features,
|
| metrics=metrics)
|
|
|
| @app.route('/download/<filename>')
|
| def download_file(filename):
|
| return send_from_directory(RESULTS_FOLDER, filename)
|
|
|
| @app.route('/image/<path:filename>')
|
| def serve_image(filename):
|
| return send_from_directory(EXTRACT_FOLDER, filename)
|
|
|
| if __name__ == '__main__':
|
| app.run(debug=True)
|
|
|