Skip to content

Add GitHub Scoring Pipeline #2

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file removed .coverage
Binary file not shown.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
__pycache__/
*.py[cod]
.coverage
*.coverage
File renamed without changes.
4 changes: 4 additions & 0 deletions run
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ def do_test():
return 0 if r.returncode == 0 else 1


def do_score(url_file):
from acmecli.cli import main
return main(["score", url_file])

def main():
if len(sys.argv) < 2:
print("Usage: run install|test|score <URL_FILE>")
Expand Down
11 changes: 10 additions & 1 deletion src/acmecli.egg-info/SOURCES.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pyproject.toml
src/acmecli/__init__.py
src/acmecli/cache.py
src/acmecli/cli.py
src/acmecli/reporter.py
src/acmecli/scoring.py
Expand All @@ -8,8 +9,16 @@ src/acmecli.egg-info/PKG-INFO
src/acmecli.egg-info/SOURCES.txt
src/acmecli.egg-info/dependency_links.txt
src/acmecli.egg-info/top_level.txt
src/acmecli/handlers/__init__.py
src/acmecli/handlers/github.py
src/acmecli/metrics/__init__.py
src/acmecli/metrics/base.py
src/acmecli/metrics/heuristic_metrics.py
src/acmecli/metrics/license_metric.py
tests/test_metrics_contract.py
tests/test_reporter_schema.py
tests/test_reporter_schema.py
requirements.txt
tests/test_cli_flow.py
tests/test_collect_all.py
tests/test_handlers_github.py
tests/test_metrics_behavior.py
Binary file removed src/acmecli/__pycache__/__init__.cpython-313.pyc
Binary file not shown.
Binary file removed src/acmecli/__pycache__/types.cpython-313.pyc
Binary file not shown.
19 changes: 19 additions & 0 deletions src/acmecli/cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from __future__ import annotations

from typing import Optional


class MemoryCache:
"""In-memory cache that satisfies the Cache protocol used by metrics."""

def __init__(self) -> None:
self._store: dict[str, tuple[bytes, Optional[str]]] = {}

def get(self, key: str) -> bytes | None:
entry = self._store.get(key)
if entry is None:
return None
return entry[0]

def set(self, key: str, data: bytes, etag: str | None = None) -> None:
self._store[key] = (data, etag)
149 changes: 129 additions & 20 deletions src/acmecli/cli.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,151 @@
import sys
from __future__ import annotations

import logging
import os
import time
from pathlib import Path
from .types import ReportRow
from typing import Dict

from .handlers import GitHubHandler
from .metrics import heuristic_metrics # noqa: F401 - ensure registration
from .metrics import license_metric # noqa: F401 - ensure registration
from .metrics.base import MetricRunResult, collect_all
from .reporter import write_ndjson
from .scoring import ScoringEngine
from .types import ReportRow, TargetSpec

_LOGGER = logging.getLogger("acmecli")


def _classify(url: str) -> str:
u = url.strip().lower()
if "huggingface.co/datasets/" in u:
return "DATASET"
if "github.com/" in u:
return "CODE"
return "MODEL"
if "huggingface.co/" in u:
return "MODEL"
return "CODE"

def _stub_row(name: str) -> ReportRow:
zero = 0.0
return ReportRow(
name=name, category="MODEL",
net_score=zero, net_score_latency=0,
ramp_up_time=zero, ramp_up_time_latency=0,
bus_factor=zero, bus_factor_latency=0,
performance_claims=zero, performance_claims_latency=0,
license=zero, license_latency=0,
size_score={"raspberry_pi":0.0, "jetson_nano":0.0, "desktop_pc":0.0, "aws_server":0.0},
size_score_latency=0,
dataset_and_code_score=zero, dataset_and_code_score_latency=0,
dataset_quality=zero, dataset_quality_latency=0,
code_quality=zero, code_quality_latency=0

def _setup_logging() -> None:
level = os.getenv("LOG_LEVEL", "0")
path = os.getenv("LOG_FILE")

try:
numeric_level = int(level)
except ValueError:
numeric_level = 0

if numeric_level <= 0 or not path:
logging.basicConfig(level=logging.CRITICAL)
return

log_level = logging.INFO if numeric_level == 1 else logging.DEBUG

handler = logging.FileHandler(path, encoding="utf-8")
handler.setLevel(log_level)
handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s"))

_LOGGER.setLevel(log_level)
_LOGGER.handlers.clear()
_LOGGER.addHandler(handler)


def _build_spec(url: str) -> TargetSpec:
category = _classify(url)
name = url.rstrip("/").split("/")[-1] or url
source = "GITHUB" if "github.com" in url.lower() else "HUGGINGFACE"
return TargetSpec(url=url, source=source, name=name, category=category)


def _clamp(value: float) -> float:
return max(0.0, min(1.0, value))


def _value_for(metrics: Dict[str, float], name: str) -> float:
return _clamp(metrics.get(name, 0.0))


def _latency_for(latencies: Dict[str, int], name: str) -> int:
return int(latencies.get(name, 0))


def _metrics_to_maps(result: MetricRunResult) -> tuple[Dict[str, float], Dict[str, int]]:
value_map: Dict[str, float] = {}
latency_map: Dict[str, int] = {}
for metric in result.values:
value_map[metric.name] = metric.value
latency_map[metric.name] = metric.latency_ms
return value_map, latency_map


def _size_average(breakdown: Dict[str, float]) -> float:
if not breakdown:
return 0.0
return sum(breakdown.values()) / len(breakdown)


def _process_url(url: str, handler: GitHubHandler, engine: ScoringEngine) -> None:
spec = _build_spec(url)
if spec.source != "GITHUB":
_LOGGER.info("Skipping unsupported source for URL: %s", url)
return

result = collect_all(spec, handler)
values, latencies = _metrics_to_maps(result)

size_breakdown = result.size_breakdown
size_average = _size_average(size_breakdown)

compute_start = time.perf_counter()
net_score, metric_latency_sum = engine.compute(result.values, size_avg=size_average)
net_latency = metric_latency_sum + int((time.perf_counter() - compute_start) * 1000)

repo_name = result.meta.get("full_name") or spec.name

row = ReportRow(
name=repo_name,
category=spec.category,
net_score=_clamp(net_score),
net_score_latency=net_latency,
ramp_up_time=_value_for(values, "ramp_up_time"),
ramp_up_time_latency=_latency_for(latencies, "ramp_up_time"),
bus_factor=_value_for(values, "bus_factor"),
bus_factor_latency=_latency_for(latencies, "bus_factor"),
performance_claims=_value_for(values, "performance_claims"),
performance_claims_latency=_latency_for(latencies, "performance_claims"),
license=_value_for(values, "license"),
license_latency=_latency_for(latencies, "license"),
size_score=size_breakdown,
size_score_latency=_latency_for(latencies, "size"),
dataset_and_code_score=_value_for(values, "dataset_and_code_score"),
dataset_and_code_score_latency=_latency_for(latencies, "dataset_and_code_score"),
dataset_quality=_value_for(values, "dataset_quality"),
dataset_quality_latency=_latency_for(latencies, "dataset_quality"),
code_quality=_value_for(values, "code_quality"),
code_quality_latency=_latency_for(latencies, "code_quality"),
)

write_ndjson(row)


def main(argv: list[str]) -> int:
# argv pattern: ["score", "/abs/path/URL_FILE"]
if len(argv) < 2:
print("Usage: acmecli score <URL_FILE>")
return 1

_setup_logging()
handler = GitHubHandler(logger=_LOGGER)
engine = ScoringEngine()

_, url_file = argv
lines = Path(url_file).read_text(encoding="utf-8").splitlines()

for raw in lines:
url = raw.strip()
if not url:
continue
if _classify(url) == "MODEL":
write_ndjson(_stub_row(url))
_process_url(url, handler, engine)

return 0
3 changes: 3 additions & 0 deletions src/acmecli/handlers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .github import GitHubHandler

__all__ = ["GitHubHandler"]
145 changes: 145 additions & 0 deletions src/acmecli/handlers/github.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
from __future__ import annotations

import json
import logging
import os
import re
from typing import Iterable
from urllib import error, request

from ..types import TargetSpec

_GITHUB_API = "https://api.github.com"
_RAW_BASE = "https://raw.githubusercontent.com"


class GitHubHandler:
"""Fetches lightweight repository metadata from GitHub's public API."""

def __init__(self, token: str | None = None, *, logger: logging.Logger | None = None) -> None:
self._token = token or os.getenv("GITHUB_TOKEN") or os.getenv("GITHUB_TOKEN_ACME")
self._log = logger or logging.getLogger("acmecli.github")

# --- SourceHandler protocol -------------------------------------------------
def resolve_revision(self, url: str) -> str:
owner, repo = _split_repo(url)
data = self._fetch_json(f"{_GITHUB_API}/repos/{owner}/{repo}")
if not data:
return "main"
return data.get("default_branch") or "main"

def fetch_meta(self, spec: TargetSpec) -> dict:
owner, repo = _split_repo(spec.url)
repo_url = f"{_GITHUB_API}/repos/{owner}/{repo}"
repo_data = self._fetch_json(repo_url)
if not repo_data:
return {
"owner": owner,
"repo": repo,
"full_name": f"{owner}/{repo}",
"stars": 0,
"forks": 0,
"watchers": 0,
"open_issues": 0,
"default_branch": "main",
"license": "",
"size_kb": 0,
"readme_text": "",
"contributors_count": 0,
"recent_commits": 0,
"topics": [],
"description": "",
"has_wiki": False,
"pushed_at": None,
}

default_branch = repo_data.get("default_branch") or "main"
readme_text = self._fetch_readme(owner, repo, default_branch)
contributors_count = self._fetch_contributors_count(owner, repo)
recent_commits = self._fetch_recent_commits(owner, repo)

return {
"owner": owner,
"repo": repo,
"full_name": repo_data.get("full_name") or f"{owner}/{repo}",
"stars": repo_data.get("stargazers_count", 0),
"forks": repo_data.get("forks_count", 0),
"watchers": repo_data.get("subscribers_count", 0),
"open_issues": repo_data.get("open_issues_count", 0),
"default_branch": default_branch,
"license": (repo_data.get("license") or {}).get("name") or "",
"size_kb": repo_data.get("size", 0),
"readme_text": readme_text,
"contributors_count": contributors_count,
"recent_commits": recent_commits,
"topics": repo_data.get("topics", []),
"description": repo_data.get("description") or "",
"has_wiki": bool(repo_data.get("has_wiki", False)),
"pushed_at": repo_data.get("pushed_at"),
"created_at": repo_data.get("created_at"),
"updated_at": repo_data.get("updated_at"),
}

def stream_files(self, spec: TargetSpec, patterns: list[str]) -> Iterable[tuple[str, bytes]]:
meta = self.fetch_meta(spec)
if meta.get("readme_text"):
yield "README.md", meta["readme_text"].encode("utf-8", errors="ignore")

# --- Internal helpers -------------------------------------------------------
def _fetch_json(self, url: str) -> dict:
try:
req = request.Request(url, headers=self._headers())
with request.urlopen(req, timeout=15) as resp:
payload = resp.read()
return json.loads(payload.decode("utf-8"))
except error.HTTPError as exc:
if exc.code == 403:
self._log.debug("GitHub API rate limit hit for %s", url)
else:
self._log.debug("GitHub API error %s for %s", exc.code, url)
except Exception as exc: # pragma: no cover - defensive
self._log.debug("GitHub API fetch failed for %s: %s", url, exc)
return {}

def _fetch_text(self, url: str) -> str:
try:
req = request.Request(url, headers=self._headers(accept="text/plain"))
with request.urlopen(req, timeout=15) as resp:
payload = resp.read()
return payload.decode("utf-8", errors="ignore")
except Exception:
return ""

def _fetch_readme(self, owner: str, repo: str, branch: str) -> str:
return self._fetch_text(f"{_RAW_BASE}/{owner}/{repo}/{branch}/README.md")

def _fetch_contributors_count(self, owner: str, repo: str) -> int:
data = self._fetch_json(f"{_GITHUB_API}/repos/{owner}/{repo}/contributors?per_page=100")
if isinstance(data, list):
return len(data)
return 0

def _fetch_recent_commits(self, owner: str, repo: str) -> int:
commits = self._fetch_json(f"{_GITHUB_API}/repos/{owner}/{repo}/commits?per_page=30")
if isinstance(commits, list):
return len(commits)
return 0

def _headers(self, *, accept: str = "application/vnd.github+json") -> dict[str, str]:
headers = {
"Accept": accept,
"User-Agent": "acmecli/0.0.1",
}
if self._token:
headers["Authorization"] = f"Bearer {self._token}"
return headers


def _split_repo(url: str) -> tuple[str, str]:
match = re.search(r"github\.com/([^/]+)/([^/#?]+)", url)
if not match:
raise ValueError(f"Unsupported GitHub URL: {url}")
owner, repo = match.group(1), match.group(2)
if repo.endswith(".git"):
repo = repo[:-4]
return owner, repo
Loading
Loading