-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Oluwarotimi Quadri
committed
Sep 24, 2025
1 parent
619f1a2
commit 0542993
Showing
26 changed files
with
1,117 additions
and
40 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
__pycache__/ | ||
*.py[cod] | ||
.coverage | ||
*.coverage |
File renamed without changes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Binary file not shown.
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
from __future__ import annotations | ||
|
||
from typing import Optional | ||
|
||
|
||
class MemoryCache: | ||
"""In-memory cache that satisfies the Cache protocol used by metrics.""" | ||
|
||
def __init__(self) -> None: | ||
self._store: dict[str, tuple[bytes, Optional[str]]] = {} | ||
|
||
def get(self, key: str) -> bytes | None: | ||
entry = self._store.get(key) | ||
if entry is None: | ||
return None | ||
return entry[0] | ||
|
||
def set(self, key: str, data: bytes, etag: str | None = None) -> None: | ||
self._store[key] = (data, etag) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,42 +1,151 @@ | ||
import sys | ||
from __future__ import annotations | ||
|
||
import logging | ||
import os | ||
import time | ||
from pathlib import Path | ||
from .types import ReportRow | ||
from typing import Dict | ||
|
||
from .handlers import GitHubHandler | ||
from .metrics import heuristic_metrics # noqa: F401 - ensure registration | ||
from .metrics import license_metric # noqa: F401 - ensure registration | ||
from .metrics.base import MetricRunResult, collect_all | ||
from .reporter import write_ndjson | ||
from .scoring import ScoringEngine | ||
from .types import ReportRow, TargetSpec | ||
|
||
_LOGGER = logging.getLogger("acmecli") | ||
|
||
|
||
def _classify(url: str) -> str: | ||
u = url.strip().lower() | ||
if "huggingface.co/datasets/" in u: | ||
return "DATASET" | ||
if "github.com/" in u: | ||
return "CODE" | ||
return "MODEL" | ||
if "huggingface.co/" in u: | ||
return "MODEL" | ||
return "CODE" | ||
|
||
def _stub_row(name: str) -> ReportRow: | ||
zero = 0.0 | ||
return ReportRow( | ||
name=name, category="MODEL", | ||
net_score=zero, net_score_latency=0, | ||
ramp_up_time=zero, ramp_up_time_latency=0, | ||
bus_factor=zero, bus_factor_latency=0, | ||
performance_claims=zero, performance_claims_latency=0, | ||
license=zero, license_latency=0, | ||
size_score={"raspberry_pi":0.0, "jetson_nano":0.0, "desktop_pc":0.0, "aws_server":0.0}, | ||
size_score_latency=0, | ||
dataset_and_code_score=zero, dataset_and_code_score_latency=0, | ||
dataset_quality=zero, dataset_quality_latency=0, | ||
code_quality=zero, code_quality_latency=0 | ||
|
||
def _setup_logging() -> None: | ||
level = os.getenv("LOG_LEVEL", "0") | ||
path = os.getenv("LOG_FILE") | ||
|
||
try: | ||
numeric_level = int(level) | ||
except ValueError: | ||
numeric_level = 0 | ||
|
||
if numeric_level <= 0 or not path: | ||
logging.basicConfig(level=logging.CRITICAL) | ||
return | ||
|
||
log_level = logging.INFO if numeric_level == 1 else logging.DEBUG | ||
|
||
handler = logging.FileHandler(path, encoding="utf-8") | ||
handler.setLevel(log_level) | ||
handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s")) | ||
|
||
_LOGGER.setLevel(log_level) | ||
_LOGGER.handlers.clear() | ||
_LOGGER.addHandler(handler) | ||
|
||
|
||
def _build_spec(url: str) -> TargetSpec: | ||
category = _classify(url) | ||
name = url.rstrip("/").split("/")[-1] or url | ||
source = "GITHUB" if "github.com" in url.lower() else "HUGGINGFACE" | ||
return TargetSpec(url=url, source=source, name=name, category=category) | ||
|
||
|
||
def _clamp(value: float) -> float: | ||
return max(0.0, min(1.0, value)) | ||
|
||
|
||
def _value_for(metrics: Dict[str, float], name: str) -> float: | ||
return _clamp(metrics.get(name, 0.0)) | ||
|
||
|
||
def _latency_for(latencies: Dict[str, int], name: str) -> int: | ||
return int(latencies.get(name, 0)) | ||
|
||
|
||
def _metrics_to_maps(result: MetricRunResult) -> tuple[Dict[str, float], Dict[str, int]]: | ||
value_map: Dict[str, float] = {} | ||
latency_map: Dict[str, int] = {} | ||
for metric in result.values: | ||
value_map[metric.name] = metric.value | ||
latency_map[metric.name] = metric.latency_ms | ||
return value_map, latency_map | ||
|
||
|
||
def _size_average(breakdown: Dict[str, float]) -> float: | ||
if not breakdown: | ||
return 0.0 | ||
return sum(breakdown.values()) / len(breakdown) | ||
|
||
|
||
def _process_url(url: str, handler: GitHubHandler, engine: ScoringEngine) -> None: | ||
spec = _build_spec(url) | ||
if spec.source != "GITHUB": | ||
_LOGGER.info("Skipping unsupported source for URL: %s", url) | ||
return | ||
|
||
result = collect_all(spec, handler) | ||
values, latencies = _metrics_to_maps(result) | ||
|
||
size_breakdown = result.size_breakdown | ||
size_average = _size_average(size_breakdown) | ||
|
||
compute_start = time.perf_counter() | ||
net_score, metric_latency_sum = engine.compute(result.values, size_avg=size_average) | ||
net_latency = metric_latency_sum + int((time.perf_counter() - compute_start) * 1000) | ||
|
||
repo_name = result.meta.get("full_name") or spec.name | ||
|
||
row = ReportRow( | ||
name=repo_name, | ||
category=spec.category, | ||
net_score=_clamp(net_score), | ||
net_score_latency=net_latency, | ||
ramp_up_time=_value_for(values, "ramp_up_time"), | ||
ramp_up_time_latency=_latency_for(latencies, "ramp_up_time"), | ||
bus_factor=_value_for(values, "bus_factor"), | ||
bus_factor_latency=_latency_for(latencies, "bus_factor"), | ||
performance_claims=_value_for(values, "performance_claims"), | ||
performance_claims_latency=_latency_for(latencies, "performance_claims"), | ||
license=_value_for(values, "license"), | ||
license_latency=_latency_for(latencies, "license"), | ||
size_score=size_breakdown, | ||
size_score_latency=_latency_for(latencies, "size"), | ||
dataset_and_code_score=_value_for(values, "dataset_and_code_score"), | ||
dataset_and_code_score_latency=_latency_for(latencies, "dataset_and_code_score"), | ||
dataset_quality=_value_for(values, "dataset_quality"), | ||
dataset_quality_latency=_latency_for(latencies, "dataset_quality"), | ||
code_quality=_value_for(values, "code_quality"), | ||
code_quality_latency=_latency_for(latencies, "code_quality"), | ||
) | ||
|
||
write_ndjson(row) | ||
|
||
|
||
def main(argv: list[str]) -> int: | ||
# argv pattern: ["score", "/abs/path/URL_FILE"] | ||
if len(argv) < 2: | ||
print("Usage: acmecli score <URL_FILE>") | ||
return 1 | ||
|
||
_setup_logging() | ||
handler = GitHubHandler(logger=_LOGGER) | ||
engine = ScoringEngine() | ||
|
||
_, url_file = argv | ||
lines = Path(url_file).read_text(encoding="utf-8").splitlines() | ||
|
||
for raw in lines: | ||
url = raw.strip() | ||
if not url: | ||
continue | ||
if _classify(url) == "MODEL": | ||
write_ndjson(_stub_row(url)) | ||
_process_url(url, handler, engine) | ||
|
||
return 0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
from .github import GitHubHandler | ||
|
||
__all__ = ["GitHubHandler"] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,145 @@ | ||
from __future__ import annotations | ||
|
||
import json | ||
import logging | ||
import os | ||
import re | ||
from typing import Iterable | ||
from urllib import error, request | ||
|
||
from ..types import TargetSpec | ||
|
||
_GITHUB_API = "https://api.github.com" | ||
_RAW_BASE = "https://raw.githubusercontent.com" | ||
|
||
|
||
class GitHubHandler: | ||
"""Fetches lightweight repository metadata from GitHub's public API.""" | ||
|
||
def __init__(self, token: str | None = None, *, logger: logging.Logger | None = None) -> None: | ||
self._token = token or os.getenv("GITHUB_TOKEN") or os.getenv("GITHUB_TOKEN_ACME") | ||
self._log = logger or logging.getLogger("acmecli.github") | ||
|
||
# --- SourceHandler protocol ------------------------------------------------- | ||
def resolve_revision(self, url: str) -> str: | ||
owner, repo = _split_repo(url) | ||
data = self._fetch_json(f"{_GITHUB_API}/repos/{owner}/{repo}") | ||
if not data: | ||
return "main" | ||
return data.get("default_branch") or "main" | ||
|
||
def fetch_meta(self, spec: TargetSpec) -> dict: | ||
owner, repo = _split_repo(spec.url) | ||
repo_url = f"{_GITHUB_API}/repos/{owner}/{repo}" | ||
repo_data = self._fetch_json(repo_url) | ||
if not repo_data: | ||
return { | ||
"owner": owner, | ||
"repo": repo, | ||
"full_name": f"{owner}/{repo}", | ||
"stars": 0, | ||
"forks": 0, | ||
"watchers": 0, | ||
"open_issues": 0, | ||
"default_branch": "main", | ||
"license": "", | ||
"size_kb": 0, | ||
"readme_text": "", | ||
"contributors_count": 0, | ||
"recent_commits": 0, | ||
"topics": [], | ||
"description": "", | ||
"has_wiki": False, | ||
"pushed_at": None, | ||
} | ||
|
||
default_branch = repo_data.get("default_branch") or "main" | ||
readme_text = self._fetch_readme(owner, repo, default_branch) | ||
contributors_count = self._fetch_contributors_count(owner, repo) | ||
recent_commits = self._fetch_recent_commits(owner, repo) | ||
|
||
return { | ||
"owner": owner, | ||
"repo": repo, | ||
"full_name": repo_data.get("full_name") or f"{owner}/{repo}", | ||
"stars": repo_data.get("stargazers_count", 0), | ||
"forks": repo_data.get("forks_count", 0), | ||
"watchers": repo_data.get("subscribers_count", 0), | ||
"open_issues": repo_data.get("open_issues_count", 0), | ||
"default_branch": default_branch, | ||
"license": (repo_data.get("license") or {}).get("name") or "", | ||
"size_kb": repo_data.get("size", 0), | ||
"readme_text": readme_text, | ||
"contributors_count": contributors_count, | ||
"recent_commits": recent_commits, | ||
"topics": repo_data.get("topics", []), | ||
"description": repo_data.get("description") or "", | ||
"has_wiki": bool(repo_data.get("has_wiki", False)), | ||
"pushed_at": repo_data.get("pushed_at"), | ||
"created_at": repo_data.get("created_at"), | ||
"updated_at": repo_data.get("updated_at"), | ||
} | ||
|
||
def stream_files(self, spec: TargetSpec, patterns: list[str]) -> Iterable[tuple[str, bytes]]: | ||
meta = self.fetch_meta(spec) | ||
if meta.get("readme_text"): | ||
yield "README.md", meta["readme_text"].encode("utf-8", errors="ignore") | ||
|
||
# --- Internal helpers ------------------------------------------------------- | ||
def _fetch_json(self, url: str) -> dict: | ||
try: | ||
req = request.Request(url, headers=self._headers()) | ||
with request.urlopen(req, timeout=15) as resp: | ||
payload = resp.read() | ||
return json.loads(payload.decode("utf-8")) | ||
except error.HTTPError as exc: | ||
if exc.code == 403: | ||
self._log.debug("GitHub API rate limit hit for %s", url) | ||
else: | ||
self._log.debug("GitHub API error %s for %s", exc.code, url) | ||
except Exception as exc: # pragma: no cover - defensive | ||
self._log.debug("GitHub API fetch failed for %s: %s", url, exc) | ||
return {} | ||
|
||
def _fetch_text(self, url: str) -> str: | ||
try: | ||
req = request.Request(url, headers=self._headers(accept="text/plain")) | ||
with request.urlopen(req, timeout=15) as resp: | ||
payload = resp.read() | ||
return payload.decode("utf-8", errors="ignore") | ||
except Exception: | ||
return "" | ||
|
||
def _fetch_readme(self, owner: str, repo: str, branch: str) -> str: | ||
return self._fetch_text(f"{_RAW_BASE}/{owner}/{repo}/{branch}/README.md") | ||
|
||
def _fetch_contributors_count(self, owner: str, repo: str) -> int: | ||
data = self._fetch_json(f"{_GITHUB_API}/repos/{owner}/{repo}/contributors?per_page=100") | ||
if isinstance(data, list): | ||
return len(data) | ||
return 0 | ||
|
||
def _fetch_recent_commits(self, owner: str, repo: str) -> int: | ||
commits = self._fetch_json(f"{_GITHUB_API}/repos/{owner}/{repo}/commits?per_page=30") | ||
if isinstance(commits, list): | ||
return len(commits) | ||
return 0 | ||
|
||
def _headers(self, *, accept: str = "application/vnd.github+json") -> dict[str, str]: | ||
headers = { | ||
"Accept": accept, | ||
"User-Agent": "acmecli/0.0.1", | ||
} | ||
if self._token: | ||
headers["Authorization"] = f"Bearer {self._token}" | ||
return headers | ||
|
||
|
||
def _split_repo(url: str) -> tuple[str, str]: | ||
match = re.search(r"github\.com/([^/]+)/([^/#?]+)", url) | ||
if not match: | ||
raise ValueError(f"Unsupported GitHub URL: {url}") | ||
owner, repo = match.group(1), match.group(2) | ||
if repo.endswith(".git"): | ||
repo = repo[:-4] | ||
return owner, repo |
Oops, something went wrong.