| import gradio as gr |
| import librosa |
| import numpy as np |
| from pydub import AudioSegment, effects |
| import uuid, os, json |
|
|
| |
| |
| |
|
|
| STATE_PATH = "monx_dj_state.json" |
|
|
| UPLOAD_DIR = "uploads" |
| OUTPUT_DIR = "outputs" |
|
|
| os.makedirs(UPLOAD_DIR, exist_ok=True) |
| os.makedirs(OUTPUT_DIR, exist_ok=True) |
|
|
| |
| |
| |
|
|
| def to_python(obj): |
| if isinstance(obj, dict): |
| return {k: to_python(v) for k, v in obj.items()} |
| if isinstance(obj, list): |
| return [to_python(v) for v in obj] |
| if isinstance(obj, np.generic): |
| return obj.item() |
| return obj |
|
|
| |
| |
| |
|
|
| DEFAULT_STATE = { |
| "weights": { |
| "bpm": 0.35, |
| "energy": 0.30, |
| "smooth": 0.20, |
| "drop_penalty": 0.15 |
| }, |
| "avg_score": 0.5, |
| "runs": 0 |
| } |
|
|
| def load_state(): |
| if os.path.exists(STATE_PATH): |
| try: |
| with open(STATE_PATH, "r") as f: |
| return json.load(f) |
| except: |
| pass |
| return DEFAULT_STATE.copy() |
|
|
| def save_state(state): |
| with open(STATE_PATH, "w") as f: |
| json.dump(to_python(state), f, indent=2) |
|
|
| state = load_state() |
| weights = state["weights"] |
|
|
| |
| |
| |
|
|
| def analyze_audio(path): |
| y, sr = librosa.load(path, mono=True) |
|
|
| |
| onset_env = librosa.onset.onset_strength(y=y, sr=sr) |
|
|
| try: |
| tempo = float(librosa.beat.tempo( |
| onset_envelope=onset_env, sr=sr |
| )[0]) |
| except Exception: |
| tempo = float(np.mean(onset_env) * 60) |
|
|
| if tempo <= 0 or np.isnan(tempo): |
| tempo = 120.0 |
|
|
| |
| rms = librosa.feature.rms(y=y)[0] |
| rms = librosa.util.normalize(rms) |
|
|
| |
| drops = [ |
| i for i in range(10, len(rms) - 10) |
| if rms[i] > np.mean(rms[i-10:i-1]) * 1.4 |
| ] |
|
|
| return { |
| "tempo": tempo, |
| "energy": rms, |
| "drops": drops |
| } |
|
|
| |
| |
| |
|
|
| def score_transition(a, b, t): |
| bpm_sim = max(0, 1 - abs(a["tempo"] - b["tempo"]) / 35) |
| energy_sim = max(0, 1 - abs(a["energy"][t] - b["energy"][0])) |
| drop_penalty = 1 if t in a["drops"] else 0 |
|
|
| return ( |
| weights["bpm"] * bpm_sim + |
| weights["energy"] * energy_sim + |
| weights["smooth"] * ((bpm_sim + energy_sim) / 2) - |
| weights["drop_penalty"] * drop_penalty |
| ) |
|
|
| |
| |
| |
|
|
| def bpm_adjust(segment, from_bpm, to_bpm): |
| if from_bpm <= 0 or to_bpm <= 0: |
| return segment |
|
|
| ratio = from_bpm / to_bpm |
| if abs(1 - ratio) > 0.20: |
| return segment |
|
|
| new_rate = int(segment.frame_rate * ratio) |
| return segment._spawn( |
| segment.raw_data, |
| overrides={"frame_rate": new_rate} |
| ).set_frame_rate(segment.frame_rate) |
|
|
| |
| |
| |
|
|
| def auto_dj_mix(files, durations, crossfade_sec): |
| log = "🎧 Iniciando MONX DJ (IA)\n" |
| yield log, None |
|
|
| durs = [float(x.strip()) for x in durations.split(",")] |
| crossfade_ms = int(crossfade_sec * 1000) |
|
|
| tracks, analyses, scores = [], [], [] |
|
|
| for i, f in enumerate(files): |
| log += f"\n🔍 Analizando canción {i+1}\n" |
| yield log, None |
|
|
| ext = f.name.split(".")[-1] |
| path = os.path.join(UPLOAD_DIR, f"{uuid.uuid4().hex}.{ext}") |
|
|
| with open(f.name, "rb") as src, open(path, "wb") as dst: |
| dst.write(src.read()) |
|
|
| audio = effects.normalize(AudioSegment.from_file(path)) |
| tracks.append(audio) |
| analyses.append(analyze_audio(path)) |
|
|
| mix = AudioSegment.silent(0) |
|
|
| for i in range(len(tracks)): |
| play_ms = min(int(durs[i] * 1000), len(tracks[i])) |
| segment = tracks[i][:play_ms] |
|
|
| if i == 0: |
| mix = segment |
| continue |
|
|
| prev, curr = analyses[i - 1], analyses[i] |
|
|
| candidates = range(5, min(len(prev["energy"]) - 1, 50)) |
| best_t = max(candidates, key=lambda t: score_transition(prev, curr, t)) |
| scores.append(score_transition(prev, curr, best_t)) |
|
|
| target_bpm = curr["tempo"] |
|
|
| mix_adj = bpm_adjust(mix, prev["tempo"], target_bpm) |
| seg_adj = bpm_adjust(segment, curr["tempo"], target_bpm) |
|
|
| safe_cf = min(crossfade_ms, len(mix_adj), len(seg_adj)) |
| mix = bpm_adjust( |
| mix_adj.append(seg_adj, crossfade=safe_cf), |
| target_bpm, curr["tempo"] |
| ) |
|
|
| out_path = os.path.join( |
| OUTPUT_DIR, f"monx_dj_mix_{uuid.uuid4().hex}.m4a" |
| ) |
| mix.export(out_path, format="ipod", codec="aac", bitrate="192k") |
|
|
| |
| if scores: |
| new_avg = sum(scores) / len(scores) |
| reward = 1 if new_avg > state["avg_score"] else -1 |
| lr = 0.05 |
|
|
| for k in weights: |
| weights[k] += reward * lr * abs(weights[k]) |
|
|
| total = sum(abs(v) for v in weights.values()) |
| for k in weights: |
| weights[k] /= total |
|
|
| state["avg_score"] = new_avg |
| state["runs"] += 1 |
| state["weights"] = weights |
| save_state(state) |
|
|
| log += "\n✅ Mix listo. Da feedback para que MONX DJ aprenda." |
| yield log, out_path |
|
|
| |
| |
| |
|
|
| def feedback(reward): |
| lr = 0.08 |
| for k in weights: |
| weights[k] += reward * lr * abs(weights[k]) |
|
|
| total = sum(abs(v) for v in weights.values()) |
| for k in weights: |
| weights[k] /= total |
|
|
| state["weights"] = weights |
| save_state(state) |
| return "🧠 Feedback recibido. MONX DJ ha aprendido." |
|
|
| |
| |
| |
|
|
| with gr.Blocks(title="MONX DJ") as demo: |
| gr.Markdown( |
| "<h1 style='text-align:center'>🎚️ MONX DJ</h1>" |
| "<p style='text-align:center'>IA DJ con aprendizaje real</p>" |
| ) |
|
|
| files = gr.File( |
| label="Sube 2 a 4 canciones", |
| file_count="multiple", |
| file_types=[".mp3", ".wav", ".flac", ".m4a"] |
| ) |
|
|
| durations = gr.Textbox(label="Duración por canción (seg)", value="90,90") |
| crossfade = gr.Slider(6, 20, value=12, step=1, label="Crossfade (seg)") |
|
|
| btn = gr.Button("🔥 Auto Mix") |
| status = gr.Markdown() |
| output = gr.Audio(type="filepath") |
|
|
| btn.click(auto_dj_mix, [files, durations, crossfade], [status, output]) |
|
|
| gr.Markdown("### ¿Te gustó el mix?") |
| like = gr.Button("👍 Sí") |
| dislike = gr.Button("👎 No") |
| fb_status = gr.Markdown() |
|
|
| like.click(lambda: feedback(1), None, fb_status) |
| dislike.click(lambda: feedback(-1), None, fb_status) |
|
|
| demo.launch() |