| import streamlit as st |
| import pandas as pd |
| import plotly.express as px |
| import plotly.graph_objects as go |
| from Bio import pairwise2 |
| from collections import defaultdict |
| import re |
|
|
| |
| IMPORTANT_GENES = { |
| 'rpoB': {'range': (759807, 763325), 'description': 'RNA polymerase β subunit (Rifampicin resistance)'}, |
| 'katG': {'range': (2153889, 2156111), 'description': 'Catalase-peroxidase (Isoniazid resistance)'}, |
| 'inhA': {'range': (1674202, 1675011), 'description': 'Enoyl-ACP reductase (Isoniazid resistance)'}, |
| 'gyrA': {'range': (7302, 9818), 'description': 'DNA gyrase subunit A (Fluoroquinolone resistance)'} |
| } |
|
|
| def read_fasta_from_upload(uploaded_file): |
| """Read a FASTA file from Streamlit upload""" |
| content = uploaded_file.getvalue().decode('utf-8').strip() |
| parts = content.split('\n', 1) |
| sequence = ''.join(parts[1].split('\n')).replace(' ', '') |
| return sequence.upper() |
|
|
| def split_genome_into_chunks(sequence, chunk_size=10000, overlap=100): |
| """Split genome into manageable chunks for alignment""" |
| chunks = [] |
| positions = [] |
| for i in range(0, len(sequence), chunk_size - overlap): |
| chunk = sequence[i:i + chunk_size] |
| chunks.append(chunk) |
| positions.append(i) |
| return chunks, positions |
|
|
| def find_mutations_in_chunk(ref_chunk, query_chunk, chunk_start): |
| """Find mutations in a genome chunk""" |
| mutations = [] |
| |
| alignments = pairwise2.align.globalms(ref_chunk, query_chunk, |
| match=2, |
| mismatch=-3, |
| open=-10, |
| extend=-0.5) |
| |
| if not alignments: |
| return mutations |
| |
| alignment = alignments[0] |
| ref_aligned, query_aligned = alignment[0], alignment[1] |
| |
| real_pos = 0 |
| for i in range(len(ref_aligned)): |
| if ref_aligned[i] != '-': |
| real_pos += 1 |
| |
| if ref_aligned[i] != query_aligned[i]: |
| abs_pos = chunk_start + real_pos - 1 |
| mut = { |
| 'position': abs_pos, |
| 'ref_base': ref_aligned[i], |
| 'query_base': query_aligned[i] if query_aligned[i] != '-' else 'None', |
| 'type': 'SNP' if ref_aligned[i] != '-' and query_aligned[i] != '-' else 'INDEL', |
| 'context': { |
| 'ref': ref_aligned[max(0,i-5):i] + '[' + ref_aligned[i] + ']' + ref_aligned[i+1:i+6], |
| 'query': query_aligned[max(0,i-5):i] + '[' + query_aligned[i] + ']' + query_aligned[i+1:i+6] |
| } |
| } |
| |
| |
| for gene, info in IMPORTANT_GENES.items(): |
| start, end = info['range'] |
| if start <= abs_pos <= end: |
| mut['gene'] = gene |
| mut['gene_position'] = abs_pos - start + 1 |
| mut['gene_description'] = info['description'] |
| |
| mutations.append(mut) |
| |
| return mutations |
|
|
| def visualize_mutations(mutations, genome_length): |
| """Create mutation visualization plots""" |
| |
| gene_regions = [] |
| for gene, info in IMPORTANT_GENES.items(): |
| start, end = info['range'] |
| gene_regions.append({ |
| 'gene': gene, |
| 'start': start, |
| 'end': end, |
| 'y': 1 |
| }) |
|
|
| |
| fig = go.Figure() |
|
|
| |
| for region in gene_regions: |
| fig.add_trace(go.Scatter( |
| x=[region['start'], region['end']], |
| y=[region['y'], region['y']], |
| mode='lines', |
| name=region['gene'], |
| line=dict(width=10), |
| hoverinfo='text', |
| hovertext=f"{region['gene']}: {region['start']}-{region['end']}" |
| )) |
|
|
| |
| mutation_data = pd.DataFrame(mutations) |
| if not mutation_data.empty: |
| fig.add_trace(go.Scatter( |
| x=mutation_data['position'], |
| y=[1.1] * len(mutation_data), |
| mode='markers', |
| name='Mutations', |
| marker=dict( |
| color=['red' if t == 'SNP' else 'blue' for t in mutation_data['type']], |
| size=8 |
| ), |
| hoverinfo='text', |
| hovertext=mutation_data.apply( |
| lambda x: f"Position: {x['position']}<br>" |
| f"Type: {x['type']}<br>" |
| f"Change: {x['ref_base']}->{x['query_base']}", |
| axis=1 |
| ) |
| )) |
|
|
| fig.update_layout( |
| title="Genome-wide Mutation Distribution", |
| xaxis_title="Genome Position", |
| yaxis_visible=False, |
| showlegend=True, |
| height=400 |
| ) |
|
|
| return fig |
|
|
| def analyze_mutations(mutations): |
| """Generate comprehensive mutation statistics""" |
| stats = { |
| 'total_mutations': len(mutations), |
| 'snps': len([m for m in mutations if m['type'] == 'SNP']), |
| 'indels': len([m for m in mutations if m['type'] == 'INDEL']), |
| 'by_gene': defaultdict(int), |
| 'important_mutations': [] |
| } |
| |
| for mut in mutations: |
| if 'gene' in mut: |
| stats['by_gene'][mut['gene']] += 1 |
| stats['important_mutations'].append(mut) |
| |
| return stats |
|
|
| def main(): |
| st.title("M. tuberculosis Full Genome Comparison") |
| |
| st.markdown(""" |
| This tool performs whole-genome comparison of M. tuberculosis strains, identifying mutations |
| and analyzing resistance-associated genes. |
| |
| **Instructions:** |
| 1. Upload your reference genome (typically H37Rv) |
| 2. Upload your query genome (clinical isolate) |
| 3. Configure analysis parameters if needed |
| 4. Run the analysis |
| """) |
| |
| |
| col1, col2 = st.columns(2) |
| with col1: |
| reference_file = st.file_uploader("Reference Genome (FASTA)", type=['fasta', 'fa']) |
| with col2: |
| query_file = st.file_uploader("Query Genome (FASTA)", type=['fasta', 'fa']) |
| |
| |
| with st.expander("Advanced Settings"): |
| chunk_size = st.slider("Analysis chunk size (bp)", 5000, 20000, 10000, 1000) |
| overlap = st.slider("Chunk overlap (bp)", 50, 200, 100, 10) |
| |
| if reference_file and query_file: |
| if st.button("Run Analysis"): |
| with st.spinner("Analyzing genomes..."): |
| try: |
| |
| ref_genome = read_fasta_from_upload(reference_file) |
| query_genome = read_fasta_from_upload(query_file) |
| |
| |
| progress_bar = st.progress(0) |
| status = st.empty() |
| |
| |
| status.text("Splitting genomes into chunks...") |
| ref_chunks, chunk_positions = split_genome_into_chunks(ref_genome, chunk_size, overlap) |
| query_chunks, _ = split_genome_into_chunks(query_genome, chunk_size, overlap) |
| |
| |
| status.text("Analyzing mutations...") |
| all_mutations = [] |
| total_chunks = len(ref_chunks) |
| |
| for i, (ref_chunk, query_chunk, chunk_start) in enumerate(zip(ref_chunks, query_chunks, chunk_positions)): |
| progress_bar.progress((i + 1) / total_chunks) |
| mutations = find_mutations_in_chunk(ref_chunk, query_chunk, chunk_start) |
| all_mutations.extend(mutations) |
| |
| |
| progress_bar.empty() |
| status.empty() |
| |
| |
| stats = analyze_mutations(all_mutations) |
| |
| |
| st.success("Analysis complete!") |
| |
| |
| st.header("Results Summary") |
| col1, col2, col3 = st.columns(3) |
| col1.metric("Total Mutations", stats['total_mutations']) |
| col2.metric("SNPs", stats['snps']) |
| col3.metric("INDELs", stats['indels']) |
| |
| |
| st.plotly_chart(visualize_mutations(all_mutations, len(ref_genome))) |
| |
| |
| st.header("Resistance-Associated Genes") |
| gene_mutations = pd.DataFrame([ |
| {"Gene": gene, "Mutations": count, "Description": IMPORTANT_GENES[gene]['description']} |
| for gene, count in stats['by_gene'].items() |
| ]) |
| |
| if not gene_mutations.empty: |
| st.dataframe(gene_mutations) |
| |
| |
| if stats['important_mutations']: |
| st.header("Detailed Mutation Analysis") |
| mutations_df = pd.DataFrame(stats['important_mutations']) |
| st.dataframe(mutations_df) |
| |
| |
| csv = mutations_df.to_csv(index=False) |
| st.download_button( |
| "Download Results (CSV)", |
| csv, |
| "mtb_mutations.csv", |
| "text/csv", |
| key='download-csv' |
| ) |
| |
| except Exception as e: |
| st.error(f"Analysis error: {str(e)}") |
|
|
| if __name__ == "__main__": |
| main() |