| import os |
| import io |
| import wave |
| import struct |
| import logging |
| import hashlib |
| from flask import Flask, render_template, request, send_file, jsonify |
| from werkzeug.utils import secure_filename |
| from pydub import AudioSegment |
| import magic |
|
|
| app = Flask(__name__) |
|
|
| logging.basicConfig(level=logging.DEBUG) |
| logger = app.logger |
|
|
| UPLOAD_FOLDER = '/tmp' |
| ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'mp3', 'wav', 'txt', 'pdf', 'docx'} |
|
|
| app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER |
|
|
| def allowed_file(filename): |
| return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS |
|
|
| @app.route('/') |
| def index(): |
| return render_template('index.html') |
|
|
| @app.route('/process', methods=['POST']) |
| def process_file(): |
| logger.info("Processing file request received") |
| if 'file' not in request.files: |
| logger.error("No file part in the request") |
| return jsonify({'error': 'No file part'}), 400 |
| file = request.files['file'] |
| if file.filename == '': |
| logger.error("No selected file") |
| return jsonify({'error': 'No selected file'}), 400 |
| if file and allowed_file(file.filename): |
| filename = secure_filename(file.filename) |
| filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) |
| file.save(filepath) |
| logger.info(f"File saved: {filepath}") |
|
|
| factor = int(request.form['factor']) |
| is_decrypting = filename.lower().endswith('.mp3') |
| logger.info(f"Processing file: {'decrypting' if is_decrypting else 'encrypting'} with factor {factor}") |
|
|
| try: |
| if is_decrypting: |
| output, output_filename, mimetype = decrypt_file(filepath, factor) |
| else: |
| output, output_filename, mimetype = encrypt_file(filepath, factor) |
|
|
| logger.info(f"File processed successfully. Output size: {len(output)} bytes") |
| os.remove(filepath) |
| logger.info(f"Temporary file removed: {filepath}") |
|
|
| return send_file( |
| io.BytesIO(output), |
| mimetype=mimetype, |
| as_attachment=True, |
| download_name=output_filename |
| ) |
| except Exception as e: |
| logger.error(f"Error processing file: {str(e)}") |
| os.remove(filepath) |
| return jsonify({'error': str(e)}), 500 |
|
|
| logger.error("Invalid file type") |
| return jsonify({'error': 'Invalid file type'}), 400 |
|
|
| def calculate_checksum(data): |
| return hashlib.md5(data).hexdigest() |
|
|
| def encrypt_file(filepath, factor): |
| logger.info(f"Encrypting file: {filepath}") |
| with open(filepath, 'rb') as file: |
| data = file.read() |
| |
| |
| checksum = calculate_checksum(data) |
| |
| |
| samples = [int(byte * factor) for byte in data] |
| |
| |
| checksum_samples = [ord(byte) for byte in checksum.encode('ascii')] |
| samples = checksum_samples + samples |
| |
| |
| wav_output = io.BytesIO() |
| with wave.open(wav_output, 'wb') as wav_file: |
| wav_file.setnchannels(1) |
| wav_file.setsampwidth(2) |
| wav_file.setframerate(44100) |
| wav_file.writeframes(struct.pack(f'{len(samples)}h', *samples)) |
| |
| |
| wav_output.seek(0) |
| audio = AudioSegment.from_wav(wav_output) |
| mp3_output = io.BytesIO() |
| audio.export(mp3_output, format="mp3") |
| |
| logger.info(f"Encryption complete. Output size: {mp3_output.getbuffer().nbytes} bytes") |
| return mp3_output.getvalue(), 'encrypted.mp3', 'audio/mpeg' |
|
|
| def decrypt_file(filepath, factor): |
| logger.info(f"Decrypting file: {filepath}") |
| |
| |
| audio = AudioSegment.from_mp3(filepath) |
| wav_output = io.BytesIO() |
| audio.export(wav_output, format="wav") |
| wav_output.seek(0) |
| |
| with wave.open(wav_output, 'rb') as wav_file: |
| params = wav_file.getparams() |
| frames = wav_file.readframes(wav_file.getnframes()) |
| |
| samples = struct.unpack(f'{len(frames)//2}h', frames) |
| |
| |
| checksum = ''.join(chr(sample // factor) for sample in samples[:32]) |
| samples = samples[32:] |
| |
| |
| decrypted_bytes = bytearray() |
| for sample in samples: |
| byte_value = int(round(sample / factor)) |
| decrypted_bytes.append(max(0, min(byte_value, 255))) |
| |
| decrypted_data = bytes(decrypted_bytes) |
| |
| |
| if calculate_checksum(decrypted_data) != checksum: |
| raise ValueError("Checksum verification failed. The file may be corrupted or the wrong factor was used.") |
| |
| |
| mime = magic.Magic(mime=True) |
| file_type = mime.from_buffer(decrypted_data) |
| file_extension = file_type.split('/')[-1] |
| |
| logger.info(f"Decryption complete. Output size: {len(decrypted_data)} bytes") |
| return decrypted_data, f'decrypted_file.{file_extension}', file_type |
|
|
| if __name__ == '__main__': |
| app.run(debug=True) |