diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4d63574 --- /dev/null +++ b/.gitignore @@ -0,0 +1,91 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# Virtual Environments +venv/ +env/ +ENV/ +env.bak/ +venv.bak/ +pyvenv.cfg +bin/ +include/ + +# PyInstaller +*.manifest +*.spec + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.log +.pytest_cache/ + +# Project-specific working directories +input/ +output/ +temp/ +logs/ + +# Per-package metadata files (these are generated per submission) +metadata_*.json + +# IDE and Editor files +.vscode/ +.idea/ +*.swp +*.swo +*~ +.DS_Store + +# OS-specific +Thumbs.db +Desktop.ini + +# Jupyter Notebooks +.ipynb_checkpoints + +# PyCharm +.idea/ + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# Memory bank (optional - uncomment if you don't want to track memory) +# .memory-bank/ +# External dependencies (clone separately) +HathiTrustYAMLgenerator/ diff --git a/DEMO_step2.md b/DEMO_step2.md new file mode 100644 index 0000000..c18c83e --- /dev/null +++ b/DEMO_step2.md @@ -0,0 +1,41 @@ +## Step 2: Directory Discovery - DEMO + +### Create test files: +```bash +cd /home/schipp0/Digitization/HathiTrust + +# Create 5 test TIFF files with barcode 39015012345678 +python3 volume_discovery.py --create-test --barcode 39015012345678 --num-files 5 + +# Create another volume with different barcode +python3 volume_discovery.py --create-test --barcode 39015099887766 --num-files 3 +``` + +### Discover volumes: +```bash +python3 volume_discovery.py input/ +``` + +Expected output: +``` +============================================================ +VOLUME DISCOVERY SUMMARY +============================================================ + +šŸ“¦ Volume: 39015012345678 + Files: 5 + Range: 00000001 to 00000005 + Status: āœ“ Valid + +šŸ“¦ Volume: 39015099887766 + Files: 3 + Range: 00000001 to 00000003 + Status: āœ“ Valid +``` + +### Run tests: +```bash +python3 test_volume_discovery.py -v +``` + +All 7 tests should pass āœ“ diff --git a/DEMO_step3.md b/DEMO_step3.md new file mode 100644 index 0000000..0c986db --- /dev/null +++ b/DEMO_step3.md @@ -0,0 +1,81 @@ +## Step 3: OCR Processing Pipeline - DEMO + +### Prerequisites +Ensure Tesseract is installed: +```bash +# Check if tesseract is installed +tesseract --version + +# If not installed: +sudo apt-get update +sudo apt-get install tesseract-ocr tesseract-ocr-eng +``` + +### Test Setup + +#### 1. Create test TIFF files (if not already done): +```bash +cd /home/schipp0/Digitization/HathiTrust +python3 volume_discovery.py --create-test --barcode 39015012345678 --num-files 3 +``` + +#### 2. Run OCR on all discovered volumes: +```bash +python3 ocr_processor.py input/ +``` + +Expected output: +``` +šŸ“‚ Discovering volumes... +Found 1 volume(s) + +============================================================ +Processing Volume: 39015012345678 +============================================================ +Processing 3 files with OCR + [1/3] 39015012345678_00000001.tif + [2/3] 39015012345678_00000002.tif + [3/3] 39015012345678_00000003.tif + +āœ“ OCR Results: + Successful: 3 + Failed: 0 + Output: temp/39015012345678 +``` + +#### 3. Process specific volume only: +```bash +python3 ocr_processor.py input/ --volume-id 39015012345678 +``` + +#### 4. Check output files: +```bash +ls -l temp/39015012345678/ +``` + +Should show: +``` +00000001.txt # Plain text OCR +00000001.html # hOCR coordinate data +00000002.txt +00000002.html +00000003.txt +00000003.html +``` + +### Run Tests +```bash +python3 test_ocr_processor.py -v +``` + +### Output Format + +**Plain Text (.txt):** +- UTF-8 encoded +- Control characters removed (except tab, CR, LF) +- Raw text from Tesseract + +**hOCR (.html):** +- XML/HTML format with coordinate data +- Contains bounding box information for each word +- Compatible with HathiTrust requirements diff --git a/README.md b/README.md new file mode 100644 index 0000000..b9ad3f8 --- /dev/null +++ b/README.md @@ -0,0 +1,158 @@ +# HathiTrust Package Automation Pipeline + +## Project Structure +``` +HathiTrust/ +ā”œā”€ā”€ .memory-bank/ # Project memory storage +ā”œā”€ā”€ input/ # Source TIFF files (organized by barcode/ARK) +ā”œā”€ā”€ output/ # Final ZIP packages +ā”œā”€ā”€ temp/ # Intermediate processing files +ā”œā”€ā”€ logs/ # Processing logs +ā”œā”€ā”€ config.yaml # Global configuration +ā”œā”€ā”€ metadata_template.json # Template for package metadata +ā”œā”€ā”€ collect_metadata.py # Interactive metadata collection +ā”œā”€ā”€ requirements.txt # Python dependencies +└── README.md # This file +``` + +## Setup Instructions + +### 1. Install System Dependencies +```bash +sudo apt-get update +sudo apt-get install tesseract-ocr tesseract-ocr-eng +``` + +### 2. Install Python Dependencies +```bash +pip install -r requirements.txt +``` + +### 3. Clone YAML Generator +```bash +cd /home/schipp0/Digitization/HathiTrust +git clone https://github.com/moriahcaruso/HathiTrustYAMLgenerator.git +``` + +## Workflow: Creating a Submission Package + +### Step 1: Prepare TIFF Files +Place digitized TIFF files in `input/` directory: +- Files should follow naming: `_00000001.tif`, `_00000002.tif`, etc. +- Or: `_00000001.tif`, `_00000002.tif`, etc. + +### Step 2: Collect Package Metadata +Run the interactive metadata collection tool: +```bash +./collect_metadata.py +``` + +This will prompt you for: +- **Volume identifier** (barcode or ARK) +- **Capture info** (date, operator, CaptureOne version) +- **Image specs** (DPI, color mode, compression) +- **Page order** (scanning/reading order) +- **Content type** (book, journal, manuscript, etc.) + +Metadata is saved as: `metadata_.json` + +### Step 3: Process Package +(Main processing script to be implemented) +```bash +./process_package.py --metadata metadata_.json +``` + +This will: +1. Validate TIFF files +2. Run OCR (text + hOCR coordinates) +3. Generate meta.yml +4. Create checksum.md5 +5. Package into ZIP + +## Key Features + +### Per-Package Metadata +Unlike scanner-based workflows with static settings, this pipeline supports **variable capture settings** per submission: +- Different DPI (300, 400, 600, etc.) +- Various color modes (bitonal, grayscale, color) +- Multiple compression types +- Flexible reading orders + +### CaptureOne Integration +Designed for content digitized via **CaptureOne Cultural Heritage Edition**, not physical scanners. + +### HathiTrust Compliance +Output packages meet all HathiTrust requirements: +- 8-digit sequential file naming +- Plain text OCR (.txt) +- Coordinate OCR (.html hOCR format) +- meta.yml metadata +- checksum.md5 fixity file +- Proper ZIP structure (no subdirectories) + +## Next Development Steps +- [ ] Implement main processing script +- [ ] Integrate with HathiTrustYAMLgenerator +- [ ] Add validation checks +- [ ] Test with sample packages +- [ ] Add batch processing support + + +## Implementation Status + +### āœ… Step 1: Configuration & Setup +- Directory structure created +- Per-package metadata collection (`collect_metadata.py`) +- Configuration files (`config.yaml`, `metadata_template.json`) + +### āœ… Step 2: Directory Discovery & Organization +- Volume discovery module (`volume_discovery.py`) +- Barcode and ARK identifier extraction +- Sequential file validation +- Test suite with 7 passing tests +- Test file generator for development + +**Usage:** +```bash +# Discover volumes in input directory +python3 volume_discovery.py input/ + +# Create test files +python3 volume_discovery.py --create-test --barcode 39015012345678 --num-files 5 + +# Run tests +python3 test_volume_discovery.py +``` + +### āœ… Step 3: OCR Processing Pipeline +- OCR processor module (`ocr_processor.py`) +- Plain text OCR generation (.txt files) +- Coordinate OCR generation (.html hOCR format) +- Text sanitization (control character removal) +- UTF-8 encoding enforcement +- Batch processing with error handling +- Test suite with Tesseract integration tests + +**Usage:** +```bash +# Process all volumes with OCR +python3 ocr_processor.py input/ + +# Process specific volume +python3 ocr_processor.py input/ --volume-id 39015012345678 + +# Custom language/output +python3 ocr_processor.py input/ --language fra --output-dir /tmp/ocr + +# Run tests +python3 test_ocr_processor.py +``` + +### šŸ”„ Next Steps +- Step 4: File Validation & Naming Convention +- Step 5: YAML Metadata Generation +- Step 6: MD5 Checksum Generation +- Step 7: Package Assembly +- Step 8: ZIP Archive Creation +- Step 9: Quality Control & Validation +- Step 10: Main Processing Pipeline diff --git a/collect_metadata.py b/collect_metadata.py new file mode 100755 index 0000000..8db0f4f --- /dev/null +++ b/collect_metadata.py @@ -0,0 +1,219 @@ +#!/usr/bin/env python3 +""" +Interactive Metadata Collection for HathiTrust Packages +Prompts user for package-specific metadata at runtime +""" + +import json +import os +import sys +from datetime import datetime +from pathlib import Path + + +def prompt_with_default(prompt_text, default=None, required=True): + """Prompt user with optional default value""" + if default: + full_prompt = f"{prompt_text} [{default}]: " + else: + full_prompt = f"{prompt_text}: " + + while True: + response = input(full_prompt).strip() + + if response: + return response + elif default: + return default + elif not required: + return None + else: + print(" ⚠ This field is required. Please provide a value.") + + +def prompt_choice(prompt_text, choices, default=None): + """Prompt user to select from a list of choices""" + print(f"\n{prompt_text}") + for i, choice in enumerate(choices, 1): + print(f" {i}. {choice}") + + while True: + response = input(f"Select [1-{len(choices)}]" + (f" [{default}]" if default else "") + ": ").strip() + + if not response and default: + return choices[default - 1] + + try: + selection = int(response) + if 1 <= selection <= len(choices): + return choices[selection - 1] + else: + print(f" ⚠ Please select a number between 1 and {len(choices)}") + except ValueError: + print(" ⚠ Please enter a valid number") + + +def collect_metadata(): + """Interactive metadata collection""" + print("\n" + "="*60) + print("HathiTrust Package Metadata Collection") + print("="*60 + "\n") + + metadata = {} + + # Volume Identifier + print("šŸ“¦ VOLUME IDENTIFIER") + id_type = prompt_choice( + "Identifier type:", + ["barcode", "ark"], + default=1 + ) + id_value = prompt_with_default( + f"Enter {id_type}", + required=True + ) + + metadata['volume_identifier'] = { + 'type': id_type, + 'value': id_value + } + + # Capture Metadata + print("\nšŸ“ø CAPTURE INFORMATION") + capture_date = prompt_with_default( + "Capture date (YYYY-MM-DD)", + default=datetime.now().strftime("%Y-%m-%d") + ) + operator = prompt_with_default("Operator name", required=True) + software_version = prompt_with_default( + "CaptureOne version", + default="23.1.0" + ) + + metadata['capture_metadata'] = { + 'capture_date': capture_date, + 'operator': operator, + 'software': 'CaptureOne Cultural Heritage Edition', + 'software_version': software_version + } + + # Image Technical Metadata + print("\nšŸ–¼ļø IMAGE TECHNICAL SPECIFICATIONS") + resolution = int(prompt_with_default( + "Resolution (DPI)", + default="400" + )) + + color_mode = prompt_choice( + "Color mode:", + ["bitonal", "grayscale", "color"], + default=2 + ) + + # Set bitdepth based on color mode + bitdepth_map = {"bitonal": 1, "grayscale": 8, "color": 24} + bitdepth = bitdepth_map[color_mode] + + compression = prompt_choice( + "Compression:", + ["None", "LZW", "JPEG"], + default=1 + ) + + metadata['image_technical'] = { + 'resolution_dpi': resolution, + 'color_mode': color_mode, + 'bitdepth': bitdepth, + 'compression': compression, + 'file_format': 'TIFF' + } + + # Page Order + print("\nšŸ“– PAGE ORDER") + scanning_order = prompt_choice( + "Scanning order:", + ["left-to-right", "right-to-left"], + default=1 + ) + reading_order = prompt_choice( + "Reading order:", + ["left-to-right", "right-to-left"], + default=1 + ) + + metadata['page_order'] = { + 'scanning_order': scanning_order, + 'reading_order': reading_order + } + + # Content Description + print("\nšŸ“ CONTENT DESCRIPTION") + material_type = prompt_choice( + "Material type:", + ["book", "journal", "manuscript", "newspaper", "other"], + default=1 + ) + language = prompt_with_default( + "Language code (ISO 639-3)", + default="eng" + ) + notes = prompt_with_default( + "Additional notes (optional)", + required=False + ) + + metadata['content_description'] = { + 'material_type': material_type, + 'language': language, + 'notes': notes if notes else "" + } + + return metadata + + +def save_metadata(metadata, output_dir="."): + """Save metadata to JSON file""" + identifier = metadata['volume_identifier']['value'] + # Sanitize identifier for filename (remove special chars) + safe_id = "".join(c if c.isalnum() else "_" for c in identifier) + + filename = f"metadata_{safe_id}.json" + filepath = Path(output_dir) / filename + + with open(filepath, 'w', encoding='utf-8') as f: + json.dump(metadata, f, indent=2, ensure_ascii=False) + + return filepath + + +def main(): + """Main entry point""" + try: + metadata = collect_metadata() + + print("\n" + "="*60) + print("METADATA SUMMARY") + print("="*60) + print(json.dumps(metadata, indent=2)) + + confirm = input("\nāœ“ Save this metadata? [Y/n]: ").strip().lower() + + if confirm in ['', 'y', 'yes']: + output_dir = input("Output directory [current]: ").strip() or "." + filepath = save_metadata(metadata, output_dir) + print(f"\nāœ… Metadata saved to: {filepath}") + return metadata + else: + print("\nāŒ Metadata not saved") + return None + + except KeyboardInterrupt: + print("\n\nāŒ Cancelled by user") + sys.exit(1) + except Exception as e: + print(f"\nāŒ Error: {e}") + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..aeca01f --- /dev/null +++ b/config.yaml @@ -0,0 +1,38 @@ +# HathiTrust Package Automation Configuration +# =========================================== + +# Directory Paths +directories: + input: "/home/schipp0/Digitization/HathiTrust/input" + output: "/home/schipp0/Digitization/HathiTrust/output" + temp: "/home/schipp0/Digitization/HathiTrust/temp" + logs: "/home/schipp0/Digitization/HathiTrust/logs" + yaml_generator: "/home/schipp0/Digitization/HathiTrust/HathiTrustYAMLgenerator" + +# File Naming Patterns +patterns: + tiff_pattern: '^(\d{8})\.tif$' + barcode_pattern: '^(\d+)_' + # Alternative ARK pattern: '^ark:\/\d+\/([a-z0-9]+)' + +# OCR Configuration +ocr: + language: "eng" + tesseract_config: "--psm 1" # Automatic page segmentation with OSD + output_formats: + - text + - hocr + +# Processing Options +processing: + parallel_volumes: false # Set to true for parallel processing + max_workers: 4 + cleanup_temp: true + generate_report: true + interactive_metadata: true # Prompt for metadata per package + +# Validation +validation: + strict_mode: true + check_sequential_gaps: true + validate_checksums: true diff --git a/ocr_processor.py b/ocr_processor.py new file mode 100755 index 0000000..b2d7a2e --- /dev/null +++ b/ocr_processor.py @@ -0,0 +1,311 @@ +#!/usr/bin/env python3 +""" +OCR Processing Pipeline +Generates plain text and coordinate OCR (hOCR) for TIFF images using Tesseract +""" + +import logging +from pathlib import Path +from typing import Dict, List, Optional, Tuple +import re + +try: + import pytesseract + from PIL import Image +except ImportError as e: + print(f"Missing dependency: {e}") + print("Install with: pip install pytesseract Pillow") + exit(1) + + +class OCRResult: + """Container for OCR processing results""" + def __init__(self, tiff_file: Path): + self.tiff_file = tiff_file + self.text_file: Optional[Path] = None + self.hocr_file: Optional[Path] = None + self.success: bool = False + self.error: Optional[str] = None + + def __repr__(self): + status = "āœ“" if self.success else "āœ—" + return f"OCRResult({status} {self.tiff_file.name})" + + +class OCRProcessor: + """Handles OCR operations for volume processing""" + + def __init__(self, language: str = 'eng', config: str = '--psm 1'): + """ + Initialize OCR processor + + Args: + language: Tesseract language code (default: 'eng') + config: Tesseract configuration string (default: '--psm 1' for automatic page segmentation) + """ + self.language = language + self.config = config + self._verify_tesseract() + + def _verify_tesseract(self): + """Verify Tesseract is installed and accessible""" + try: + version = pytesseract.get_tesseract_version() + logging.info(f"Tesseract version: {version}") + except Exception as e: + logging.error(f"Tesseract not found: {e}") + raise RuntimeError("Tesseract OCR is not installed or not in PATH") + + @staticmethod + def remove_control_chars(text: str, keep: List[str] = ['\t', '\r', '\n']) -> str: + """ + Remove control characters except specified ones + Required for HathiTrust compliance + + Args: + text: Input text to clean + keep: List of control characters to preserve (default: tab, CR, LF) + + Returns: + Cleaned text string + """ + cleaned = [] + for char in text: + if char in keep: + cleaned.append(char) + elif not char.isprintable() and char not in keep: + # Skip non-printable control characters + continue + else: + cleaned.append(char) + + return ''.join(cleaned) + + def process_image_to_text(self, image_path: Path) -> str: + """ + Extract plain text from image using Tesseract + + Args: + image_path: Path to TIFF image file + + Returns: + Cleaned OCR text + """ + logging.debug(f"Processing text OCR: {image_path.name}") + + # Load image + image = Image.open(image_path) + + # Run Tesseract for plain text + text = pytesseract.image_to_string( + image, + lang=self.language, + config=self.config + ) + + # Clean control characters + text = self.remove_control_chars(text) + + return text + + def process_image_to_hocr(self, image_path: Path) -> str: + """ + Extract hOCR (coordinate OCR) from image using Tesseract + + Args: + image_path: Path to TIFF image file + + Returns: + hOCR XML/HTML string + """ + logging.debug(f"Processing hOCR: {image_path.name}") + + # Load image + image = Image.open(image_path) + + # Run Tesseract for hOCR + hocr = pytesseract.image_to_pdf_or_hocr( + image, + lang=self.language, + extension='hocr', + config=self.config + ) + + # hOCR comes as bytes, decode to string + if isinstance(hocr, bytes): + hocr = hocr.decode('utf-8') + + return hocr + + def process_single_file(self, tiff_path: Path, output_dir: Path) -> OCRResult: + """ + Process a single TIFF file to generate both text and hOCR outputs + + Args: + tiff_path: Path to input TIFF file + output_dir: Directory for output files + + Returns: + OCRResult object with processing results + """ + result = OCRResult(tiff_path) + + try: + # Get base filename without extension + base_name = tiff_path.stem # e.g., "39015012345678_00000001" + + # Extract just the sequence number for output + # Pattern: get the 8-digit sequence at the end + match = re.search(r'(\d{8})$', base_name) + if match: + sequence = match.group(1) + else: + # Fallback: use the full base name + sequence = base_name + + # Process text OCR + text_content = self.process_image_to_text(tiff_path) + text_file = output_dir / f"{sequence}.txt" + + with open(text_file, 'w', encoding='utf-8') as f: + f.write(text_content) + + result.text_file = text_file + logging.debug(f" Saved text: {text_file.name}") + + # Process hOCR + hocr_content = self.process_image_to_hocr(tiff_path) + hocr_file = output_dir / f"{sequence}.html" + + with open(hocr_file, 'w', encoding='utf-8') as f: + f.write(hocr_content) + + result.hocr_file = hocr_file + logging.debug(f" Saved hOCR: {hocr_file.name}") + + result.success = True + + except Exception as e: + result.success = False + result.error = str(e) + logging.error(f" OCR failed for {tiff_path.name}: {e}") + + return result + + def process_volume(self, tiff_files: List[Path], output_dir: Path) -> Dict[str, any]: + """ + Process all TIFF files for a volume + + Args: + tiff_files: List of TIFF file paths to process + output_dir: Directory for output files + + Returns: + Dictionary with processing results and statistics + """ + logging.info(f"Processing {len(tiff_files)} files with OCR") + + # Ensure output directory exists + output_dir.mkdir(parents=True, exist_ok=True) + + results = { + 'text_files': [], + 'hocr_files': [], + 'errors': [], + 'successful': 0, + 'failed': 0 + } + + for i, tiff_file in enumerate(tiff_files, 1): + logging.info(f" [{i}/{len(tiff_files)}] {tiff_file.name}") + + ocr_result = self.process_single_file(tiff_file, output_dir) + + if ocr_result.success: + results['text_files'].append(ocr_result.text_file) + results['hocr_files'].append(ocr_result.hocr_file) + results['successful'] += 1 + else: + results['errors'].append({ + 'file': tiff_file, + 'error': ocr_result.error + }) + results['failed'] += 1 + + logging.info(f"OCR complete: {results['successful']} successful, {results['failed']} failed") + + if results['errors']: + logging.warning(f"Errors encountered:") + for error in results['errors']: + logging.warning(f" {error['file'].name}: {error['error']}") + + return results + + +# Demo/Testing functionality +if __name__ == "__main__": + import argparse + from volume_discovery import discover_volumes + + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s' + ) + + parser = argparse.ArgumentParser(description='Process TIFF files with OCR') + parser.add_argument('input_dir', + default='/home/schipp0/Digitization/HathiTrust/input', + nargs='?', + help='Input directory containing TIFF files') + parser.add_argument('--output-dir', + default='/home/schipp0/Digitization/HathiTrust/temp', + help='Output directory for OCR files') + parser.add_argument('--language', default='eng', + help='Tesseract language code') + parser.add_argument('--volume-id', + help='Process only specific volume ID') + + args = parser.parse_args() + + try: + # Initialize OCR processor + processor = OCRProcessor(language=args.language) + + # Discover volumes + print("\nšŸ“‚ Discovering volumes...") + volumes = discover_volumes(args.input_dir) + + if not volumes: + print("No volumes found.") + exit(0) + + print(f"Found {len(volumes)} volume(s)\n") + + # Filter to specific volume if requested + if args.volume_id: + if args.volume_id not in volumes: + print(f"Volume '{args.volume_id}' not found.") + exit(1) + volumes = {args.volume_id: volumes[args.volume_id]} + + # Process each volume + for volume_id, volume_group in volumes.items(): + print(f"{'='*60}") + print(f"Processing Volume: {volume_id}") + print(f"{'='*60}") + + # Create output directory for this volume + output_dir = Path(args.output_dir) / volume_id + + # Process OCR + results = processor.process_volume(volume_group.tiff_files, output_dir) + + # Print summary + print(f"\nāœ“ OCR Results:") + print(f" Successful: {results['successful']}") + print(f" Failed: {results['failed']}") + print(f" Output: {output_dir}\n") + + except Exception as e: + logging.error(f"Error: {e}") + exit(1) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..096660c --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +# Python Dependencies for HathiTrust Package Automation +pytesseract>=0.3.10 +PyYAML>=6.0 +Pillow>=10.0.0 +tqdm>=4.65.0 diff --git a/test_ocr_processor.py b/test_ocr_processor.py new file mode 100644 index 0000000..9703716 --- /dev/null +++ b/test_ocr_processor.py @@ -0,0 +1,118 @@ +#!/usr/bin/env python3 +""" +Unit tests for OCR processor module +""" + +import unittest +from pathlib import Path +import tempfile +import shutil +from ocr_processor import OCRProcessor + + +class TestOCRProcessor(unittest.TestCase): + + @classmethod + def setUpClass(cls): + """Set up test fixtures""" + # Create a temporary directory for test outputs + cls.temp_dir = Path(tempfile.mkdtemp()) + + # Create a simple test image + try: + from PIL import Image, ImageDraw, ImageFont + import numpy as np + + # Create test image with text + img = Image.new('L', (800, 600), color=255) + draw = ImageDraw.Draw(img) + + # Add text to image + text = "This is a test page\nfor OCR processing\n123456789" + draw.text((50, 250), text, fill=0) + + # Save test TIFF + cls.test_tiff = cls.temp_dir / "39015012345678_00000001.tif" + img.save(cls.test_tiff, format='TIFF') + + cls.has_test_image = True + except Exception as e: + print(f"Warning: Could not create test image: {e}") + cls.has_test_image = False + + @classmethod + def tearDownClass(cls): + """Clean up test directory""" + if cls.temp_dir.exists(): + shutil.rmtree(cls.temp_dir) + + def test_remove_control_chars(self): + """Test control character removal""" + processor = OCRProcessor() + + # Test with control characters + text = "Hello\x00World\x01\t\n\rTest" + cleaned = processor.remove_control_chars(text) + + # Should keep tab, newline, carriage return + # Should remove null byte and SOH + self.assertNotIn('\x00', cleaned) + self.assertNotIn('\x01', cleaned) + self.assertIn('\t', cleaned) + self.assertIn('\n', cleaned) + self.assertIn('\r', cleaned) + + def test_processor_initialization(self): + """Test OCR processor initialization""" + try: + processor = OCRProcessor(language='eng', config='--psm 1') + self.assertEqual(processor.language, 'eng') + self.assertEqual(processor.config, '--psm 1') + except RuntimeError as e: + self.skipTest(f"Tesseract not available: {e}") + + @unittest.skipUnless(hasattr(setUpClass, 'has_test_image') and + getattr(setUpClass, 'has_test_image', False), + "Test image not available") + def test_process_single_file(self): + """Test processing a single file""" + if not self.has_test_image: + self.skipTest("Test image not created") + + try: + processor = OCRProcessor() + output_dir = self.temp_dir / "output" + output_dir.mkdir(exist_ok=True) + + result = processor.process_single_file(self.test_tiff, output_dir) + + # Check result object + self.assertIsNotNone(result) + self.assertTrue(result.success, f"OCR failed: {result.error}") + + # Check output files exist + self.assertIsNotNone(result.text_file) + self.assertIsNotNone(result.hocr_file) + self.assertTrue(result.text_file.exists()) + self.assertTrue(result.hocr_file.exists()) + + # Check file naming (should be 00000001.txt and 00000001.html) + self.assertEqual(result.text_file.name, "00000001.txt") + self.assertEqual(result.hocr_file.name, "00000001.html") + + # Check file contents + with open(result.text_file, 'r', encoding='utf-8') as f: + text_content = f.read() + self.assertGreater(len(text_content), 0, "Text file is empty") + + with open(result.hocr_file, 'r', encoding='utf-8') as f: + hocr_content = f.read() + self.assertIn('hocr', hocr_content.lower(), "Not valid hOCR format") + self.assertGreater(len(hocr_content), 0, "hOCR file is empty") + + except RuntimeError as e: + self.skipTest(f"Tesseract not available: {e}") + + +if __name__ == "__main__": + unittest.main() diff --git a/test_volume_discovery.py b/test_volume_discovery.py new file mode 100644 index 0000000..71cecb7 --- /dev/null +++ b/test_volume_discovery.py @@ -0,0 +1,105 @@ +#!/usr/bin/env python3 +""" +Unit tests for volume_discovery module +""" + +import unittest +from pathlib import Path +from volume_discovery import ( + extract_sequence_number, + extract_barcode_or_ark, + VolumeGroup +) + + +class TestVolumeDiscovery(unittest.TestCase): + + def test_extract_sequence_number(self): + """Test sequence number extraction""" + test_cases = [ + ("39015012345678_00000001.tif", 1), + ("39015012345678_00000023.tif", 23), + ("ark_12345_abc_00000100.tif", 100), + ("12345678_99999999.tif", 99999999), + ] + + for filename, expected in test_cases: + result = extract_sequence_number(filename) + self.assertEqual(result, expected, + f"Failed for {filename}: expected {expected}, got {result}") + + def test_extract_barcode(self): + """Test barcode extraction""" + test_cases = [ + ("39015012345678_00000001.tif", "39015012345678"), + ("12345678_00000001.tif", "12345678"), + ("9876543210_00000050.tif", "9876543210"), + ] + + for filename, expected in test_cases: + result = extract_barcode_or_ark(filename) + self.assertEqual(result, expected, + f"Failed for {filename}: expected {expected}, got {result}") + + def test_extract_ark(self): + """Test ARK identifier extraction""" + test_cases = [ + ("ark_12345_abc123_00000001.tif", "ark_12345_abc123"), + ("ark-12345-xyz789_00000001.tif", "ark_12345_xyz789"), + ] + + for filename, expected in test_cases: + result = extract_barcode_or_ark(filename) + self.assertEqual(result, expected, + f"Failed for {filename}: expected {expected}, got {result}") + + def test_volume_group_sequential_valid(self): + """Test valid sequential validation""" + group = VolumeGroup("test123") + group.add_file(Path("test_00000001.tif"), 1) + group.add_file(Path("test_00000002.tif"), 2) + group.add_file(Path("test_00000003.tif"), 3) + + is_valid, error = group.validate_sequential() + self.assertTrue(is_valid) + self.assertIsNone(error) + + def test_volume_group_gap_detection(self): + """Test gap detection in sequence""" + group = VolumeGroup("test123") + group.add_file(Path("test_00000001.tif"), 1) + group.add_file(Path("test_00000002.tif"), 2) + group.add_file(Path("test_00000005.tif"), 5) # Gap: missing 3, 4 + + is_valid, error = group.validate_sequential() + self.assertFalse(is_valid) + self.assertIn("Gap in sequence", error) + + def test_volume_group_wrong_start(self): + """Test detection of wrong starting sequence""" + group = VolumeGroup("test123") + group.add_file(Path("test_00000002.tif"), 2) # Should start at 1 + group.add_file(Path("test_00000003.tif"), 3) + + is_valid, error = group.validate_sequential() + self.assertFalse(is_valid) + self.assertIn("First sequence", error) + + def test_volume_group_sorting(self): + """Test that files are sorted by sequence""" + group = VolumeGroup("test123") + # Add files out of order + group.add_file(Path("test_00000003.tif"), 3) + group.add_file(Path("test_00000001.tif"), 1) + group.add_file(Path("test_00000002.tif"), 2) + + group.sort_by_sequence() + + # Check they're now sorted + self.assertEqual(group.sequence_numbers, [1, 2, 3]) + self.assertEqual([f.name for f in group.tiff_files], + ["test_00000001.tif", "test_00000002.tif", "test_00000003.tif"]) + + +if __name__ == "__main__": + unittest.main() diff --git a/volume_discovery.py b/volume_discovery.py new file mode 100755 index 0000000..c7e503a --- /dev/null +++ b/volume_discovery.py @@ -0,0 +1,265 @@ +#!/usr/bin/env python3 +""" +Directory Discovery & Volume Organization +Scans input directory and groups TIFF files by volume identifier (barcode or ARK) +""" + +import re +import os +from pathlib import Path +from typing import Dict, List, Tuple, Optional +import logging + + +# Regex patterns for file identification +TIFF_PATTERN = re.compile(r'^.*?(\d{8})\.tif$', re.IGNORECASE) +BARCODE_PATTERN = re.compile(r'^(\d+)_\d{8}\.tif$', re.IGNORECASE) +ARK_PATTERN = re.compile(r'^ark[_-](\d+)[_-]([a-z0-9]+)_\d{8}\.tif$', re.IGNORECASE) + + +class VolumeGroup: + """Container for volume file data""" + def __init__(self, identifier: str): + self.identifier = identifier + self.tiff_files: List[Path] = [] + self.sequence_numbers: List[int] = [] + + def add_file(self, filepath: Path, sequence: int): + """Add a TIFF file to this volume group""" + self.tiff_files.append(filepath) + self.sequence_numbers.append(sequence) + + def sort_by_sequence(self): + """Sort files by sequence number""" + sorted_pairs = sorted(zip(self.sequence_numbers, self.tiff_files)) + self.sequence_numbers, self.tiff_files = zip(*sorted_pairs) + self.sequence_numbers = list(self.sequence_numbers) + self.tiff_files = list(self.tiff_files) + + def validate_sequential(self) -> Tuple[bool, Optional[str]]: + """ + Validate that sequence numbers are sequential with no gaps + Returns: (is_valid, error_message) + """ + if not self.sequence_numbers: + return False, "No files in volume" + + self.sort_by_sequence() + + expected_start = 1 + if self.sequence_numbers[0] != expected_start: + return False, f"First sequence should be {expected_start}, found {self.sequence_numbers[0]}" + + for i in range(len(self.sequence_numbers) - 1): + current = self.sequence_numbers[i] + next_num = self.sequence_numbers[i + 1] + + if next_num != current + 1: + return False, f"Gap in sequence: {current} -> {next_num}" + + return True, None + + def __repr__(self): + return f"VolumeGroup(id={self.identifier}, files={len(self.tiff_files)})" + + +def extract_sequence_number(filename: str) -> Optional[int]: + """ + Extract 8-digit sequence number from filename + Example: 39015012345678_00000023.tif -> 23 + """ + match = TIFF_PATTERN.search(filename) + if match: + return int(match.group(1)) + return None + + +def extract_barcode_or_ark(filename: str) -> Optional[str]: + """ + Extract barcode or ARK identifier from filename + + Examples: + 39015012345678_00000001.tif -> 39015012345678 + ark_12345_abc123_00000001.tif -> ark_12345_abc123 + ark-12345-abc123_00000001.tif -> ark_12345_abc123 + """ + # Try barcode pattern first (most common) + match = BARCODE_PATTERN.match(filename) + if match: + return match.group(1) + + # Try ARK pattern + match = ARK_PATTERN.match(filename) + if match: + # Reconstruct ARK with underscores + return f"ark_{match.group(1)}_{match.group(2)}" + + return None + + +def discover_volumes(input_directory: str) -> Dict[str, VolumeGroup]: + """ + Scan input directory and group files by volume identifier + + Args: + input_directory: Path to directory containing TIFF files + + Returns: + Dictionary mapping volume identifiers to VolumeGroup objects + """ + logging.info(f"Scanning directory: {input_directory}") + + input_path = Path(input_directory) + if not input_path.exists(): + raise FileNotFoundError(f"Input directory does not exist: {input_directory}") + + if not input_path.is_dir(): + raise NotADirectoryError(f"Path is not a directory: {input_directory}") + + volume_groups: Dict[str, VolumeGroup] = {} + + # Scan for TIFF files + tiff_files = list(input_path.glob("*.tif")) + list(input_path.glob("*.TIF")) + + if not tiff_files: + logging.warning(f"No TIFF files found in {input_directory}") + return volume_groups + + logging.info(f"Found {len(tiff_files)} TIFF files") + + # Group files by identifier + for tiff_file in tiff_files: + filename = tiff_file.name + + # Extract identifier (barcode or ARK) + identifier = extract_barcode_or_ark(filename) + if not identifier: + logging.warning(f"Could not extract identifier from: {filename}") + continue + + # Extract sequence number + sequence = extract_sequence_number(filename) + if sequence is None: + logging.warning(f"Could not extract sequence number from: {filename}") + continue + + # Create volume group if needed + if identifier not in volume_groups: + volume_groups[identifier] = VolumeGroup(identifier) + logging.debug(f"Created volume group: {identifier}") + + # Add file to group + volume_groups[identifier].add_file(tiff_file, sequence) + + # Validate and sort each volume group + logging.info(f"Discovered {len(volume_groups)} volume(s)") + + for identifier, group in volume_groups.items(): + logging.info(f" {identifier}: {len(group.tiff_files)} files") + + # Sort by sequence + group.sort_by_sequence() + + # Validate sequential numbering + is_valid, error = group.validate_sequential() + if not is_valid: + logging.error(f" āœ— Validation failed for {identifier}: {error}") + else: + logging.info(f" āœ“ Valid sequence: {group.sequence_numbers[0]} to {group.sequence_numbers[-1]}") + + return volume_groups + + +def print_volume_summary(volume_groups: Dict[str, VolumeGroup]): + """Print a summary of discovered volumes""" + print("\n" + "="*60) + print("VOLUME DISCOVERY SUMMARY") + print("="*60) + + if not volume_groups: + print("No volumes discovered.") + return + + for identifier, group in volume_groups.items(): + print(f"\nšŸ“¦ Volume: {identifier}") + print(f" Files: {len(group.tiff_files)}") + print(f" Range: {group.sequence_numbers[0]:08d} to {group.sequence_numbers[-1]:08d}") + + is_valid, error = group.validate_sequential() + if is_valid: + print(f" Status: āœ“ Valid") + else: + print(f" Status: āœ— Invalid - {error}") + + +# Test/Demo functionality +def create_test_files(output_dir: str, barcode: str = "39015012345678", num_files: int = 5): + """ + Create test TIFF files for development/testing + + Args: + output_dir: Directory to create test files in + barcode: Barcode identifier to use + num_files: Number of test files to create + """ + from PIL import Image + import numpy as np + + output_path = Path(output_dir) + output_path.mkdir(parents=True, exist_ok=True) + + print(f"\nšŸ“ Creating {num_files} test TIFF files in {output_dir}") + + for i in range(1, num_files + 1): + sequence = f"{i:08d}" + filename = f"{barcode}_{sequence}.tif" + filepath = output_path / filename + + # Create a simple 400x600 grayscale test image + img_array = np.random.randint(200, 255, (600, 400), dtype=np.uint8) + img = Image.fromarray(img_array, mode='L') + + # Add text to image + from PIL import ImageDraw, ImageFont + draw = ImageDraw.Draw(img) + text = f"Test Page {i}\n{barcode}" + draw.text((150, 280), text, fill=0) + + img.save(filepath, format='TIFF', compression='none') + print(f" Created: {filename}") + + print(f"āœ“ Test files created successfully") + + +if __name__ == "__main__": + import argparse + + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s' + ) + + parser = argparse.ArgumentParser(description='Discover and validate HathiTrust volume files') + parser.add_argument('input_dir', nargs='?', + default='/home/schipp0/Digitization/HathiTrust/input', + help='Input directory to scan') + parser.add_argument('--create-test', action='store_true', + help='Create test files for development') + parser.add_argument('--barcode', default='39015012345678', + help='Barcode for test files') + parser.add_argument('--num-files', type=int, default=5, + help='Number of test files to create') + + args = parser.parse_args() + + if args.create_test: + create_test_files(args.input_dir, args.barcode, args.num_files) + print(f"\nNow run without --create-test to discover volumes:") + print(f" python {__file__} {args.input_dir}") + else: + try: + volumes = discover_volumes(args.input_dir) + print_volume_summary(volumes) + except Exception as e: + logging.error(f"Error: {e}") + exit(1)