CodeMode / scripts /core /ingestion /repo_metadata.py
CodeMode Agent
Deploy CodeMode via Agent
463fc7e
"""
Repository Metadata Extractor - Advanced metadata extraction for Git repositories.
This module extracts comprehensive metadata from Git repositories with a
special focus on agentic framework detection. It analyzes repository structure,
dependencies, git history, and patterns to identify agentic code patterns.
ARCHITECTURE POSITION:
- Repository Analyzer: Deep analysis of Git repositories
- Agentic Detector: Identifies agentic framework usage
- Dependency Mapper: Extracts dependency information
KEY FEATURES:
1. Agentic framework detection across multiple frameworks
2. Comprehensive dependency extraction (Python, Node.js, Docker)
3. Git metadata extraction (commits, branches, tags)
4. Repository structure analysis
5. Entry point and configuration file discovery
"""
import json
import re
import subprocess
from pathlib import Path
from typing import Dict, List, Optional
from datetime import datetime
class RepoMetadataExtractor:
"""Enhanced metadata extractor for agentic codebases"""
AGENTIC_FRAMEWORKS = {
"langchain": ["langchain", "langsmith", "lc", "chain", "agent"],
"autogen": ["autogen", "agent", "groupchat"],
"crewai": ["crewai", "crew", "task", "agent"],
"haystack": ["haystack", "pipeline", "node"],
"llamaindex": ["llama_index", "query_engine", "index"],
"semantic_kernel": ["semantic_kernel", "sk"],
"transformers_agents": ["transformers_agents", "huggingface"],
"camel": ["camel", "role_playing"],
"agents": ["agent", "tool", "workflow", "orchestrator"],
}
def __init__(self, repo_path: Path):
self.repo_path = repo_path
# ---------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------
def extract_comprehensive_metadata(self) -> Dict:
return {
"basic": self.extract_basic_metadata(),
"git": self.extract_git_metadata(),
"dependencies": self.extract_dependency_info(),
"structure": self.extract_structure_info(),
"agentic_detection": self.detect_agentic_frameworks(),
"entry_points": self.find_entry_points(),
"config_files": self.find_config_files(),
}
# 🔧 FIXED: Now returns actual repo name, not folder name
def extract_basic_metadata(self) -> Dict:
"""Extract basic repository metadata"""
return {
"repo_name": self._get_actual_repo_name(), # 🎯 FIXED LINE
"local_path": str(self.repo_path),
"size_mb": self._get_repo_size_mb(),
"file_count": self._count_files(),
"extracted_at": datetime.now().isoformat(),
}
# 🆕 NEW HELPER METHOD
def _get_actual_repo_name(self) -> str:
"""
Get actual repository name from Git remote or folder structure.
Returns 'crewAI' not 'crewai_test'.
"""
# 1. Try to get from git remote URL
try:
remote_url = self._run_git_command(["config", "--get", "remote.origin.url"])
if remote_url:
remote_url = remote_url.strip()
# Extract repo name from URL
# github.com/owner/repo.git -> repo
if '/' in remote_url:
repo_name = remote_url.split('/')[-1]
if repo_name.endswith('.git'):
repo_name = repo_name[:-4]
return repo_name
except Exception:
pass
# 2. Fallback: clean folder name
folder_name = self.repo_path.name
# Remove common suffixes
for suffix in ['_test', '_copy', '_backup', '_temp', '_local']:
if folder_name.lower().endswith(suffix.lower()):
return folder_name[:-len(suffix)]
return folder_name
def extract_git_metadata(self) -> Dict:
try:
remote_url = self._run_git_command(
["config", "--get", "remote.origin.url"]
)
latest_commit = self._run_git_command(
["log", "-1", "--pretty=format:%H|%an|%ae|%ad|%s"]
)
commit_parts = latest_commit.split("|") if latest_commit else []
branches_raw = self._run_git_command(["branch", "-a"])
branch_list = (
[
b.strip().replace("* ", "")
for b in branches_raw.split("\n")
if b.strip()
]
if branches_raw
else []
)
tags_raw = self._run_git_command(["tag", "-l"])
tag_list = (
[t.strip() for t in tags_raw.split("\n") if t.strip()]
if tags_raw
else []
)
current_branch = self._run_git_command(["branch", "--show-current"])
return {
"remote_url": remote_url or "",
"branch": current_branch or "",
"latest_commit": {
"hash": commit_parts[0] if len(commit_parts) > 0 else "",
"author": commit_parts[1] if len(commit_parts) > 1 else "",
"email": commit_parts[2] if len(commit_parts) > 2 else "",
"date": commit_parts[3] if len(commit_parts) > 3 else "",
"message": commit_parts[4] if len(commit_parts) > 4 else "",
},
"branch_count": len(branch_list),
"branches": branch_list[:10],
"tag_count": len(tag_list),
"tags": tag_list[:10],
}
except Exception as e:
return {"error": str(e)}
# ---------------------------------------------------------------------
# Agentic detection
# ---------------------------------------------------------------------
def detect_agentic_frameworks(self) -> Dict:
detected: Dict[str, str] = {}
deps = self.extract_dependency_info()
python_packages = deps.get("python_packages", [])
for framework, keywords in self.AGENTIC_FRAMEWORKS.items():
for package in python_packages:
if any(k in package.lower() for k in keywords):
detected[framework] = "dependency"
break
else:
if self._scan_for_framework(keywords):
detected[framework] = "usage"
if self._has_agent_patterns():
detected["custom_agents"] = "implementation"
return detected
def _scan_for_framework(self, keywords: List[str]) -> bool:
python_files = list(self.repo_path.rglob("*.py"))[:50]
for py_file in python_files:
try:
content = py_file.read_text(encoding="utf-8", errors="ignore").lower()
if any(f"import {k}" in content or f"from {k}" in content for k in keywords):
return True
if any(re.search(rf"class.*{k}", content) for k in keywords):
return True
except Exception:
continue
return False
def _has_agent_patterns(self) -> bool:
patterns = [
r"class.*Agent",
r"def.*agent",
r"class.*Tool",
r"def.*tool",
r"class.*Workflow",
r"def.*workflow",
r"class.*Orchestrator",
r"def.*orchestrator",
r"@tool",
r"@agent",
r"@workflow",
]
python_files = list(self.repo_path.rglob("*.py"))[:20]
for py_file in python_files:
try:
content = py_file.read_text(encoding="utf-8", errors="ignore")
if any(re.search(p, content, re.IGNORECASE) for p in patterns):
return True
except Exception:
continue
return False
# ---------------------------------------------------------------------
# Dependencies
# ---------------------------------------------------------------------
def extract_dependency_info(self) -> Dict:
deps = {
"python_packages": [],
"nodejs_packages": [],
"docker": False,
"other_dependencies": [],
}
req_files = [
"requirements.txt",
"pyproject.toml",
"setup.py",
"setup.cfg",
"Pipfile",
"environment.yml",
]
for req_file in req_files:
path = self.repo_path / req_file
if path.exists():
try:
deps["python_packages"].extend(
self._parse_python_dependencies(path, req_file)
)
except Exception as e:
print(f"⚠️ Error parsing {req_file}: {e}")
package_json = self.repo_path / "package.json"
if package_json.exists():
try:
data = json.loads(package_json.read_text())
deps["nodejs_packages"].extend(data.get("dependencies", {}).keys())
deps["nodejs_packages"].extend(data.get("devDependencies", {}).keys())
except Exception:
pass
deps["docker"] = any(
(self.repo_path / f).exists()
for f in ["Dockerfile", "docker-compose.yml", "docker-compose.yaml"]
)
return deps
def _parse_python_dependencies(self, path: Path, file_name: str) -> List[str]:
packages: List[str] = []
if file_name == "requirements.txt":
for line in path.read_text().splitlines():
line = line.strip()
if line and not line.startswith("#"):
pkg = (
line.split("==")[0]
.split(">=")[0]
.split("<=")[0]
.split("~=")[0]
.strip()
)
if pkg and not pkg.startswith("-"):
packages.append(pkg)
elif file_name == "pyproject.toml":
import toml
data = toml.load(path)
deps = data.get("project", {}).get("dependencies", [])
for d in deps:
packages.append(d.split("==")[0].split(">=")[0].strip())
return packages
# ---------------------------------------------------------------------
# Structure & utilities
# ---------------------------------------------------------------------
def extract_structure_info(self) -> Dict:
structure = {
"directories": [],
"file_types": {},
"has_agentic_structure": False,
}
for item in self.repo_path.iterdir():
if item.is_dir() and item.name != ".git":
structure["directories"].append(item.name)
ext_count: Dict[str, int] = {}
for f in self.repo_path.rglob("*"):
if f.is_file():
ext_count[f.suffix.lower()] = ext_count.get(f.suffix.lower(), 0) + 1
structure["file_types"] = dict(
sorted(ext_count.items(), key=lambda x: x[1], reverse=True)[:10]
)
agentic_dirs = {
"agent",
"agents",
"workflow",
"workflows",
"tool",
"tools",
"pipeline",
"pipelines",
"orchestrator",
}
structure["has_agentic_structure"] = any(
any(k in d.lower() for k in agentic_dirs)
for d in structure["directories"]
)
return structure
def find_entry_points(self) -> List[str]:
patterns = [
"main.py",
"app.py",
"run.py",
"cli.py",
"server.py",
"agent.py",
"pipeline.py",
"__main__.py",
]
return [
str(p.relative_to(self.repo_path))
for pat in patterns
for p in self.repo_path.rglob(pat)
][:5]
def find_config_files(self) -> List[str]:
patterns = [
"config*.py",
"settings*.py",
".env*",
"*.toml",
"*.yaml",
"*.yml",
"*.json",
"*.cfg",
"*.ini",
]
files: List[str] = []
for pat in patterns:
for p in self.repo_path.rglob(pat):
rel = str(p.relative_to(self.repo_path))
if not any(x in rel for x in [".git", "__pycache__", "node_modules"]):
files.append(rel)
return sorted(files)[:10]
# ---------------------------------------------------------------------
# Internals
# ---------------------------------------------------------------------
def _get_repo_size_mb(self) -> float:
total = sum(
f.stat().st_size for f in self.repo_path.rglob("*") if f.is_file()
)
return round(total / (1024 * 1024), 2)
def _count_files(self) -> int:
return sum(
1
for f in self.repo_path.rglob("*")
if f.is_file() and ".git" not in str(f)
)
def _run_git_command(self, args: List[str]) -> Optional[str]:
try:
result = subprocess.run(
["git", "-C", str(self.repo_path)] + args,
capture_output=True,
text=True,
check=True,
)
return result.stdout.strip() or None
except Exception:
return None