| | """
|
| | GDC Data Portal Client
|
| | Download and parse cancer genomics data from GDC
|
| | """
|
| |
|
| | import os
|
| | import json
|
| | import requests
|
| | from typing import Dict, List, Optional, Any
|
| | from pathlib import Path
|
| | import yaml
|
| | from dataclasses import dataclass
|
| | import logging
|
| |
|
| | logging.basicConfig(level=logging.INFO)
|
| | logger = logging.getLogger(__name__)
|
| |
|
| |
|
| | @dataclass
|
| | class GDCFile:
|
| | """Represents a file from GDC Portal"""
|
| | file_id: str
|
| | file_name: str
|
| | file_size: int
|
| | data_type: str
|
| | data_format: str
|
| | experimental_strategy: str
|
| | case_id: str
|
| | project_id: str
|
| |
|
| |
|
| | class GDCClient:
|
| | """Client for interacting with GDC Data Portal API"""
|
| |
|
| | def __init__(self, config_path: str = "config.yml"):
|
| | with open(config_path, 'r') as f:
|
| | self.config = yaml.safe_load(f)['gdc']
|
| |
|
| | self.api_url = self.config['api_url']
|
| | self.download_dir = Path(self.config['download_dir'])
|
| | self.download_dir.mkdir(parents=True, exist_ok=True)
|
| |
|
| | self.session = requests.Session()
|
| | self.session.headers.update({
|
| | 'Content-Type': 'application/json'
|
| | })
|
| |
|
| | def search_files(
|
| | self,
|
| | filters: Optional[Dict] = None,
|
| | size: int = 100,
|
| | fields: Optional[List[str]] = None
|
| | ) -> List[GDCFile]:
|
| | """
|
| | Search for files in GDC
|
| |
|
| | Args:
|
| | filters: GDC filter query
|
| | size: Number of results to return
|
| | fields: Fields to include in response
|
| | """
|
| | endpoint = f"{self.api_url}/files"
|
| |
|
| | if fields is None:
|
| | fields = [
|
| | 'file_id', 'file_name', 'file_size', 'data_type',
|
| | 'data_format', 'experimental_strategy', 'cases.case_id',
|
| | 'cases.project.project_id'
|
| | ]
|
| |
|
| | params = {
|
| | 'size': size,
|
| | 'fields': ','.join(fields)
|
| | }
|
| |
|
| | if filters:
|
| | params['filters'] = json.dumps(filters)
|
| |
|
| | try:
|
| | response = self.session.get(endpoint, params=params)
|
| | response.raise_for_status()
|
| | data = response.json()
|
| |
|
| | files = []
|
| | for hit in data.get('data', {}).get('hits', []):
|
| | gdc_file = GDCFile(
|
| | file_id=hit.get('file_id'),
|
| | file_name=hit.get('file_name'),
|
| | file_size=hit.get('file_size', 0),
|
| | data_type=hit.get('data_type'),
|
| | data_format=hit.get('data_format'),
|
| | experimental_strategy=hit.get('experimental_strategy'),
|
| | case_id=hit.get('cases', [{}])[0].get('case_id') if hit.get('cases') else None,
|
| | project_id=hit.get('cases', [{}])[0].get('project', {}).get('project_id') if hit.get('cases') else None
|
| | )
|
| | files.append(gdc_file)
|
| |
|
| | logger.info(f"Found {len(files)} files")
|
| | return files
|
| |
|
| | except Exception as e:
|
| | logger.error(f"Error searching files: {e}")
|
| | return []
|
| |
|
| | def download_file(
|
| | self,
|
| | file_id: str,
|
| | output_dir: Optional[Path] = None
|
| | ) -> Optional[Path]:
|
| | """
|
| | Download a file from GDC
|
| |
|
| | Args:
|
| | file_id: GDC file UUID
|
| | output_dir: Directory to save file (defaults to config download_dir)
|
| |
|
| | Returns:
|
| | Path to downloaded file or None if failed
|
| | """
|
| | if output_dir is None:
|
| | output_dir = self.download_dir
|
| |
|
| | output_dir = Path(output_dir)
|
| | output_dir.mkdir(parents=True, exist_ok=True)
|
| |
|
| | endpoint = f"{self.api_url}/data/{file_id}"
|
| |
|
| | try:
|
| | logger.info(f"Downloading file {file_id}")
|
| | response = self.session.get(endpoint, stream=True)
|
| | response.raise_for_status()
|
| |
|
| |
|
| | content_disposition = response.headers.get('content-disposition', '')
|
| | if 'filename=' in content_disposition:
|
| | filename = content_disposition.split('filename=')[1].strip('"')
|
| | else:
|
| | filename = file_id
|
| |
|
| | output_path = output_dir / filename
|
| |
|
| | with open(output_path, 'wb') as f:
|
| | for chunk in response.iter_content(chunk_size=8192):
|
| | f.write(chunk)
|
| |
|
| | logger.info(f"Downloaded to {output_path}")
|
| | return output_path
|
| |
|
| | except Exception as e:
|
| | logger.error(f"Error downloading file {file_id}: {e}")
|
| | return None
|
| |
|
| | def get_project_files(
|
| | self,
|
| | project_id: str,
|
| | data_type: Optional[str] = None,
|
| | limit: int = 100
|
| | ) -> List[GDCFile]:
|
| | """
|
| | Get files for a specific project
|
| |
|
| | Args:
|
| | project_id: GDC project ID (e.g., TCGA-BRCA)
|
| | data_type: Filter by data type
|
| | limit: Maximum number of files
|
| | """
|
| | filters = {
|
| | "op": "and",
|
| | "content": [
|
| | {
|
| | "op": "in",
|
| | "content": {
|
| | "field": "cases.project.project_id",
|
| | "value": [project_id]
|
| | }
|
| | }
|
| | ]
|
| | }
|
| |
|
| | if data_type:
|
| | filters["content"].append({
|
| | "op": "in",
|
| | "content": {
|
| | "field": "data_type",
|
| | "value": [data_type]
|
| | }
|
| | })
|
| |
|
| | return self.search_files(filters=filters, size=limit)
|
| |
|
| | def get_mutation_data(self, project_id: str, limit: int = 100) -> List[GDCFile]:
|
| | """Get mutation/variant calling files for a project"""
|
| | return self.get_project_files(
|
| | project_id=project_id,
|
| | data_type="Simple Nucleotide Variation",
|
| | limit=limit
|
| | )
|
| |
|
| | def get_gene_expression_data(self, project_id: str, limit: int = 100) -> List[GDCFile]:
|
| | """Get gene expression data for a project"""
|
| | return self.get_project_files(
|
| | project_id=project_id,
|
| | data_type="Gene Expression Quantification",
|
| | limit=limit
|
| | )
|
| |
|
| | def search_cases(
|
| | self,
|
| | project_id: str,
|
| | filters: Optional[Dict] = None,
|
| | size: int = 100
|
| | ) -> List[Dict]:
|
| | """
|
| | Search for cases (patients) in GDC
|
| |
|
| | Args:
|
| | project_id: GDC project ID
|
| | filters: Additional filter criteria
|
| | size: Number of results
|
| | """
|
| | endpoint = f"{self.api_url}/cases"
|
| |
|
| | base_filters = {
|
| | "op": "in",
|
| | "content": {
|
| | "field": "project.project_id",
|
| | "value": [project_id]
|
| | }
|
| | }
|
| |
|
| | if filters:
|
| | filter_query = {
|
| | "op": "and",
|
| | "content": [base_filters, filters]
|
| | }
|
| | else:
|
| | filter_query = base_filters
|
| |
|
| | params = {
|
| | 'size': size,
|
| | 'filters': json.dumps(filter_query),
|
| | 'fields': 'case_id,project.project_id,demographic,diagnoses'
|
| | }
|
| |
|
| | try:
|
| | response = self.session.get(endpoint, params=params)
|
| | response.raise_for_status()
|
| | data = response.json()
|
| |
|
| | cases = data.get('data', {}).get('hits', [])
|
| | logger.info(f"Found {len(cases)} cases")
|
| | return cases
|
| |
|
| | except Exception as e:
|
| | logger.error(f"Error searching cases: {e}")
|
| | return []
|
| |
|
| |
|
| | class GDCDataParser:
|
| | """Parse downloaded GDC data files"""
|
| |
|
| | @staticmethod
|
| | def parse_maf(file_path: Path) -> List[Dict]:
|
| | """
|
| | Parse MAF (Mutation Annotation Format) file
|
| |
|
| | Returns list of mutation records
|
| | """
|
| | mutations = []
|
| |
|
| | try:
|
| | with open(file_path, 'r') as f:
|
| |
|
| | for line in f:
|
| | if not line.startswith('#'):
|
| | header_line = line.strip()
|
| | break
|
| |
|
| | headers = header_line.split('\t')
|
| |
|
| | for line in f:
|
| | if line.startswith('#'):
|
| | continue
|
| |
|
| | values = line.strip().split('\t')
|
| | if len(values) == len(headers):
|
| | mutation = dict(zip(headers, values))
|
| | mutations.append(mutation)
|
| |
|
| | logger.info(f"Parsed {len(mutations)} mutations from {file_path}")
|
| | return mutations
|
| |
|
| | except Exception as e:
|
| | logger.error(f"Error parsing MAF file: {e}")
|
| | return []
|
| |
|
| | @staticmethod
|
| | def parse_vcf(file_path: Path) -> List[Dict]:
|
| | """
|
| | Parse VCF (Variant Call Format) file
|
| |
|
| | Returns list of variant records
|
| | """
|
| | variants = []
|
| |
|
| | try:
|
| | with open(file_path, 'r') as f:
|
| | for line in f:
|
| | if line.startswith('##'):
|
| | continue
|
| | if line.startswith('#CHROM'):
|
| | headers = line.strip().split('\t')
|
| | continue
|
| |
|
| | values = line.strip().split('\t')
|
| | variant = {
|
| | 'chrom': values[0],
|
| | 'pos': values[1],
|
| | 'id': values[2],
|
| | 'ref': values[3],
|
| | 'alt': values[4],
|
| | 'qual': values[5],
|
| | 'filter': values[6],
|
| | 'info': values[7]
|
| | }
|
| | variants.append(variant)
|
| |
|
| | logger.info(f"Parsed {len(variants)} variants from {file_path}")
|
| | return variants
|
| |
|
| | except Exception as e:
|
| | logger.error(f"Error parsing VCF file: {e}")
|
| | return []
|
| |
|
| | @staticmethod
|
| | def parse_clinical_data(data: Dict) -> Dict:
|
| | """Parse clinical data from GDC case"""
|
| | clinical = {
|
| | 'case_id': data.get('case_id'),
|
| | 'project_id': data.get('project', {}).get('project_id'),
|
| | 'demographic': {},
|
| | 'diagnoses': []
|
| | }
|
| |
|
| |
|
| | demo = data.get('demographic', {})
|
| | clinical['demographic'] = {
|
| | 'age_at_index': demo.get('age_at_index'),
|
| | 'gender': demo.get('gender'),
|
| | 'race': demo.get('race'),
|
| | 'ethnicity': demo.get('ethnicity')
|
| | }
|
| |
|
| |
|
| | for diag in data.get('diagnoses', []):
|
| | diagnosis = {
|
| | 'diagnosis_id': diag.get('diagnosis_id'),
|
| | 'primary_diagnosis': diag.get('primary_diagnosis'),
|
| | 'tumor_stage': diag.get('tumor_stage'),
|
| | 'age_at_diagnosis': diag.get('age_at_diagnosis'),
|
| | 'vital_status': diag.get('vital_status')
|
| | }
|
| | clinical['diagnoses'].append(diagnosis)
|
| |
|
| | return clinical
|
| |
|