| """
|
| FASTQ Processing Pipeline
|
| Quality control and preprocessing of sequencing data
|
| """
|
|
|
| from pathlib import Path
|
| from typing import Dict, List, Optional
|
| import yaml
|
| import logging
|
| from Bio import SeqIO
|
| from Bio.SeqIO.QualityIO import FastqGeneralIterator
|
|
|
| logging.basicConfig(level=logging.INFO)
|
| logger = logging.getLogger(__name__)
|
|
|
|
|
| class FASTQProcessor:
|
| """Process FASTQ sequencing files"""
|
|
|
| def __init__(self, config_path: str = "config.yml"):
|
| with open(config_path, 'r') as f:
|
| self.config = yaml.safe_load(f)['pipeline']['fastq']
|
|
|
| self.quality_threshold = self.config['quality_threshold']
|
| self.min_length = self.config['min_length']
|
| self.output_dir = Path(self.config['output_dir'])
|
| self.output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
| def quality_filter(
|
| self,
|
| input_file: Path,
|
| output_file: Optional[Path] = None
|
| ) -> Dict:
|
| """
|
| Filter FASTQ reads by quality score
|
|
|
| Args:
|
| input_file: Input FASTQ file
|
| output_file: Output filtered FASTQ file
|
|
|
| Returns:
|
| Statistics dictionary
|
| """
|
| if output_file is None:
|
| output_file = self.output_dir / f"{input_file.stem}_filtered.fastq"
|
|
|
| stats = {
|
| 'total_reads': 0,
|
| 'passed_reads': 0,
|
| 'failed_reads': 0,
|
| 'total_bases': 0,
|
| 'passed_bases': 0
|
| }
|
|
|
| try:
|
| with open(input_file, 'r') as in_f, open(output_file, 'w') as out_f:
|
| for title, sequence, quality in FastqGeneralIterator(in_f):
|
| stats['total_reads'] += 1
|
| stats['total_bases'] += len(sequence)
|
|
|
|
|
| quality_scores = [ord(q) - 33 for q in quality]
|
| avg_quality = sum(quality_scores) / len(quality_scores)
|
|
|
|
|
| if avg_quality >= self.quality_threshold and len(sequence) >= self.min_length:
|
| out_f.write(f"@{title}\n{sequence}\n+\n{quality}\n")
|
| stats['passed_reads'] += 1
|
| stats['passed_bases'] += len(sequence)
|
| else:
|
| stats['failed_reads'] += 1
|
|
|
| stats['pass_rate'] = stats['passed_reads'] / stats['total_reads'] if stats['total_reads'] > 0 else 0
|
|
|
| logger.info(f"Filtered {input_file.name}: {stats['passed_reads']}/{stats['total_reads']} reads passed")
|
| return stats
|
|
|
| except Exception as e:
|
| logger.error(f"Error filtering FASTQ: {e}")
|
| return stats
|
|
|
| def trim_adapters(
|
| self,
|
| input_file: Path,
|
| adapter_sequence: str,
|
| output_file: Optional[Path] = None
|
| ) -> Path:
|
| """
|
| Trim adapter sequences from reads
|
|
|
| Args:
|
| input_file: Input FASTQ file
|
| adapter_sequence: Adapter sequence to trim
|
| output_file: Output trimmed file
|
| """
|
| if output_file is None:
|
| output_file = self.output_dir / f"{input_file.stem}_trimmed.fastq"
|
|
|
| trimmed_count = 0
|
|
|
| try:
|
| with open(input_file, 'r') as in_f, open(output_file, 'w') as out_f:
|
| for title, sequence, quality in FastqGeneralIterator(in_f):
|
|
|
| adapter_pos = sequence.find(adapter_sequence)
|
|
|
| if adapter_pos != -1:
|
|
|
| sequence = sequence[:adapter_pos]
|
| quality = quality[:adapter_pos]
|
| trimmed_count += 1
|
|
|
| if len(sequence) >= self.min_length:
|
| out_f.write(f"@{title}\n{sequence}\n+\n{quality}\n")
|
|
|
| logger.info(f"Trimmed adapters from {trimmed_count} reads")
|
| return output_file
|
|
|
| except Exception as e:
|
| logger.error(f"Error trimming adapters: {e}")
|
| return input_file
|
|
|
| def calculate_statistics(self, fastq_file: Path) -> Dict:
|
| """
|
| Calculate statistics for FASTQ file
|
|
|
| Returns:
|
| Dictionary with read count, length distribution, quality scores
|
| """
|
| stats = {
|
| 'total_reads': 0,
|
| 'total_bases': 0,
|
| 'min_length': float('inf'),
|
| 'max_length': 0,
|
| 'avg_length': 0,
|
| 'avg_quality': 0,
|
| 'gc_content': 0
|
| }
|
|
|
| lengths = []
|
| qualities = []
|
| gc_count = 0
|
|
|
| try:
|
| with open(fastq_file, 'r') as f:
|
| for title, sequence, quality in FastqGeneralIterator(f):
|
| stats['total_reads'] += 1
|
| seq_len = len(sequence)
|
| stats['total_bases'] += seq_len
|
|
|
| lengths.append(seq_len)
|
| stats['min_length'] = min(stats['min_length'], seq_len)
|
| stats['max_length'] = max(stats['max_length'], seq_len)
|
|
|
|
|
| quality_scores = [ord(q) - 33 for q in quality]
|
| qualities.extend(quality_scores)
|
|
|
|
|
| gc_count += sequence.count('G') + sequence.count('C')
|
|
|
| if stats['total_reads'] > 0:
|
| stats['avg_length'] = sum(lengths) / len(lengths)
|
| stats['avg_quality'] = sum(qualities) / len(qualities)
|
| stats['gc_content'] = (gc_count / stats['total_bases']) * 100
|
|
|
| return stats
|
|
|
| except Exception as e:
|
| logger.error(f"Error calculating statistics: {e}")
|
| return stats
|
|
|
| def convert_to_fasta(
|
| self,
|
| input_file: Path,
|
| output_file: Optional[Path] = None
|
| ) -> Path:
|
| """Convert FASTQ to FASTA format"""
|
| if output_file is None:
|
| output_file = self.output_dir / f"{input_file.stem}.fasta"
|
|
|
| try:
|
| count = SeqIO.convert(str(input_file), "fastq", str(output_file), "fasta")
|
| logger.info(f"Converted {count} sequences to FASTA")
|
| return output_file
|
|
|
| except Exception as e:
|
| logger.error(f"Error converting to FASTA: {e}")
|
| return input_file
|
|
|
|
|
| class FASTQQualityControl:
|
| """Quality control analysis for FASTQ files"""
|
|
|
| def __init__(self):
|
| self.processor = FASTQProcessor()
|
|
|
| def run_qc(self, fastq_file: Path) -> Dict:
|
| """
|
| Run comprehensive QC on FASTQ file
|
|
|
| Returns:
|
| QC report dictionary
|
| """
|
| report = {
|
| 'file': str(fastq_file),
|
| 'statistics': {},
|
| 'quality_check': 'PASS',
|
| 'warnings': []
|
| }
|
|
|
|
|
| stats = self.processor.calculate_statistics(fastq_file)
|
| report['statistics'] = stats
|
|
|
|
|
| if stats['avg_quality'] < 20:
|
| report['warnings'].append('Low average quality score')
|
| report['quality_check'] = 'WARN'
|
|
|
| if stats['avg_length'] < 50:
|
| report['warnings'].append('Short average read length')
|
| report['quality_check'] = 'WARN'
|
|
|
| if stats['gc_content'] < 30 or stats['gc_content'] > 70:
|
| report['warnings'].append(f'Unusual GC content: {stats["gc_content"]:.1f}%')
|
|
|
| return report
|
|
|
| def generate_qc_report(self, fastq_files: List[Path]) -> Dict:
|
| """Generate QC report for multiple FASTQ files"""
|
| reports = {}
|
|
|
| for fastq_file in fastq_files:
|
| report = self.run_qc(fastq_file)
|
| reports[fastq_file.name] = report
|
|
|
|
|
| summary = {
|
| 'total_files': len(fastq_files),
|
| 'passed': sum(1 for r in reports.values() if r['quality_check'] == 'PASS'),
|
| 'warnings': sum(1 for r in reports.values() if r['quality_check'] == 'WARN'),
|
| 'failed': sum(1 for r in reports.values() if r['quality_check'] == 'FAIL')
|
| }
|
|
|
| return {
|
| 'summary': summary,
|
| 'file_reports': reports
|
| }
|
|
|