This commit is contained in:
2026-04-12 01:06:31 +07:00
commit 10d660cbcb
1066 changed files with 228596 additions and 0 deletions

View File

@@ -0,0 +1,349 @@
#!/usr/bin/env python3
"""
Compression Evaluator - Evaluate compression quality with probe-based testing.
Usage:
python compression_evaluator.py evaluate <original_file> <compressed_file>
python compression_evaluator.py generate-probes <context_file>
"""
import argparse
import json
import os
import re
import sys
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
MAX_FILE_SIZE_MB = 100
def load_file(path: str, as_json: bool = True):
"""Load file with proper error handling and size validation."""
try:
size_mb = os.path.getsize(path) / (1024 * 1024)
if size_mb > MAX_FILE_SIZE_MB:
print(f"Error: File too large ({size_mb:.1f}MB). Max {MAX_FILE_SIZE_MB}MB", file=sys.stderr)
sys.exit(1)
with open(path, encoding='utf-8') as f:
return json.load(f) if as_json else f.read()
except FileNotFoundError:
print(f"Error: File not found: {path}", file=sys.stderr)
sys.exit(1)
except PermissionError:
print(f"Error: Permission denied: {path}", file=sys.stderr)
sys.exit(1)
except json.JSONDecodeError as e:
print(f"Error: Invalid JSON in {path}: {e}", file=sys.stderr)
sys.exit(1)
class ProbeType(Enum):
RECALL = "recall" # Factual retention
ARTIFACT = "artifact" # File tracking
CONTINUATION = "continuation" # Task planning
DECISION = "decision" # Reasoning chains
@dataclass
class Probe:
type: ProbeType
question: str
ground_truth: str
context_reference: Optional[str] = None
@dataclass
class ProbeResult:
probe: Probe
response: str
scores: dict
overall_score: float
@dataclass
class EvaluationReport:
compression_ratio: float
quality_score: float
dimension_scores: dict
probe_results: list
recommendations: list = field(default_factory=list)
# Six evaluation dimensions with weights
DIMENSIONS = {
"accuracy": {"weight": 0.20, "description": "Technical correctness"},
"context_awareness": {"weight": 0.15, "description": "Conversation state"},
"artifact_trail": {"weight": 0.20, "description": "File tracking"},
"completeness": {"weight": 0.20, "description": "Coverage and depth"},
"continuity": {"weight": 0.15, "description": "Work continuation"},
"instruction_following": {"weight": 0.10, "description": "Constraint adherence"}
}
def estimate_tokens(text: str) -> int:
"""Estimate token count."""
return len(text) // 4
def extract_facts(messages: list) -> list:
"""Extract factual statements that can be probed."""
facts = []
patterns = [
(r"error[:\s]+([^.]+)", "error"),
(r"next step[s]?[:\s]+([^.]+)", "next_step"),
(r"decided to\s+([^.]+)", "decision"),
(r"implemented\s+([^.]+)", "implementation"),
(r"found that\s+([^.]+)", "finding")
]
for msg in messages:
content = str(msg.get("content", "") if isinstance(msg, dict) else msg)
for pattern, fact_type in patterns:
matches = re.findall(pattern, content, re.IGNORECASE)
for match in matches:
facts.append({"type": fact_type, "content": match.strip()})
return facts
def extract_files(messages: list) -> list:
"""Extract file references."""
files = []
patterns = [
r"(?:created|modified|updated|edited|read)\s+[`'\"]?([a-zA-Z0-9_/.-]+\.[a-zA-Z]+)[`'\"]?",
r"file[:\s]+[`'\"]?([a-zA-Z0-9_/.-]+\.[a-zA-Z]+)[`'\"]?"
]
for msg in messages:
content = str(msg.get("content", "") if isinstance(msg, dict) else msg)
for pattern in patterns:
matches = re.findall(pattern, content)
files.extend(matches)
return list(set(files))
def extract_decisions(messages: list) -> list:
"""Extract decision points."""
decisions = []
patterns = [
r"chose\s+([^.]+)\s+(?:because|since|over)",
r"decided\s+(?:to\s+)?([^.]+)",
r"went with\s+([^.]+)"
]
for msg in messages:
content = str(msg.get("content", "") if isinstance(msg, dict) else msg)
for pattern in patterns:
matches = re.findall(pattern, content, re.IGNORECASE)
decisions.extend(matches)
return decisions
def generate_probes(messages: list) -> list:
"""Generate probe set for evaluation."""
probes = []
# Recall probes from facts
facts = extract_facts(messages)
for fact in facts[:3]: # Limit to 3 recall probes
probes.append(Probe(
type=ProbeType.RECALL,
question=f"What was the {fact['type'].replace('_', ' ')}?",
ground_truth=fact["content"]
))
# Artifact probes from files
files = extract_files(messages)
if files:
probes.append(Probe(
type=ProbeType.ARTIFACT,
question="Which files have been modified or created?",
ground_truth=", ".join(files)
))
# Continuation probe
probes.append(Probe(
type=ProbeType.CONTINUATION,
question="What should be done next?",
ground_truth="[Extracted from context]" # Would need LLM to generate
))
# Decision probes
decisions = extract_decisions(messages)
for decision in decisions[:2]: # Limit to 2 decision probes
probes.append(Probe(
type=ProbeType.DECISION,
question=f"Why was the decision made to {decision[:50]}...?",
ground_truth=decision
))
return probes
def evaluate_response(probe: Probe, response: str) -> dict:
"""
Evaluate response against probe.
Note: Production should use LLM-as-Judge.
"""
scores = {}
response_lower = response.lower()
ground_truth_lower = probe.ground_truth.lower()
# Heuristic scoring (replace with LLM evaluation in production)
# Check for ground truth presence
if ground_truth_lower in response_lower:
base_score = 1.0
elif any(word in response_lower for word in ground_truth_lower.split()[:3]):
base_score = 0.6
else:
base_score = 0.3
# Adjust based on probe type
if probe.type == ProbeType.ARTIFACT:
# Check file mentions
files_mentioned = len(re.findall(r'\.[a-z]+', response_lower))
scores["artifact_trail"] = min(1.0, base_score + files_mentioned * 0.1)
scores["accuracy"] = base_score
elif probe.type == ProbeType.RECALL:
scores["accuracy"] = base_score
scores["completeness"] = base_score
elif probe.type == ProbeType.CONTINUATION:
scores["continuity"] = base_score
scores["context_awareness"] = base_score
elif probe.type == ProbeType.DECISION:
scores["accuracy"] = base_score
scores["context_awareness"] = base_score
return scores
def calculate_compression_ratio(original: str, compressed: str) -> float:
"""Calculate compression ratio."""
original_tokens = estimate_tokens(original)
compressed_tokens = estimate_tokens(compressed)
if original_tokens == 0:
return 0.0
return 1.0 - (compressed_tokens / original_tokens)
def evaluate_compression(original_messages: list, compressed_text: str,
probes: Optional[list] = None) -> EvaluationReport:
"""
Evaluate compression quality.
Args:
original_messages: Original context messages
compressed_text: Compressed summary
probes: Optional pre-generated probes
Returns:
EvaluationReport with scores and recommendations
"""
# Generate probes if not provided
if probes is None:
probes = generate_probes(original_messages)
# Calculate compression ratio
original_text = json.dumps(original_messages)
compression_ratio = calculate_compression_ratio(original_text, compressed_text)
# Evaluate each probe (simulated - production uses LLM)
probe_results = []
dimension_scores = {dim: [] for dim in DIMENSIONS}
for probe in probes:
# In production, send compressed_text + probe.question to LLM
# Here we simulate with heuristic check
scores = evaluate_response(probe, compressed_text)
overall = sum(scores.values()) / len(scores) if scores else 0
probe_results.append(ProbeResult(
probe=probe,
response="[Would be LLM response]",
scores=scores,
overall_score=overall
))
# Aggregate by dimension
for dim, score in scores.items():
if dim in dimension_scores:
dimension_scores[dim].append(score)
# Calculate dimension averages
avg_dimensions = {}
for dim, scores in dimension_scores.items():
avg_dimensions[dim] = sum(scores) / len(scores) if scores else 0.5
# Calculate weighted quality score
quality_score = sum(
avg_dimensions.get(dim, 0.5) * info["weight"]
for dim, info in DIMENSIONS.items()
)
# Generate recommendations
recommendations = []
if compression_ratio > 0.99:
recommendations.append("Very high compression. Risk of information loss.")
if avg_dimensions.get("artifact_trail", 1) < 0.5:
recommendations.append("Artifact tracking weak. Add explicit file section to summary.")
if avg_dimensions.get("continuity", 1) < 0.5:
recommendations.append("Continuity low. Add 'Next Steps' section to summary.")
if quality_score < 0.6:
recommendations.append("Quality below threshold. Consider less aggressive compression.")
return EvaluationReport(
compression_ratio=compression_ratio,
quality_score=quality_score,
dimension_scores=avg_dimensions,
probe_results=probe_results,
recommendations=recommendations
)
def main():
parser = argparse.ArgumentParser(description="Compression quality evaluator")
subparsers = parser.add_subparsers(dest="command", required=True)
# Evaluate command
eval_parser = subparsers.add_parser("evaluate", help="Evaluate compression quality")
eval_parser.add_argument("original_file", help="JSON file with original messages")
eval_parser.add_argument("compressed_file", help="Text file with compressed summary")
# Generate probes command
probe_parser = subparsers.add_parser("generate-probes", help="Generate evaluation probes")
probe_parser.add_argument("context_file", help="JSON file with context messages")
args = parser.parse_args()
if args.command == "evaluate":
original = load_file(args.original_file, as_json=True)
messages = original if isinstance(original, list) else original.get("messages", [])
compressed = load_file(args.compressed_file, as_json=False)
report = evaluate_compression(messages, compressed)
print(json.dumps({
"compression_ratio": f"{report.compression_ratio:.1%}",
"quality_score": f"{report.quality_score:.2f}",
"dimension_scores": {k: f"{v:.2f}" for k, v in report.dimension_scores.items()},
"probe_count": len(report.probe_results),
"recommendations": report.recommendations
}, indent=2))
elif args.command == "generate-probes":
data = load_file(args.context_file, as_json=True)
messages = data if isinstance(data, list) else data.get("messages", [])
probes = generate_probes(messages)
output = []
for probe in probes:
output.append({
"type": probe.type.value,
"question": probe.question,
"ground_truth": probe.ground_truth
})
print(json.dumps(output, indent=2))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,317 @@
#!/usr/bin/env python3
"""
Context Analyzer - Health analysis and degradation detection for agent contexts.
Usage:
python context_analyzer.py analyze <context_file>
python context_analyzer.py budget --system 2000 --tools 1500 --docs 3000 --history 5000
"""
import argparse
import json
import math
import os
import re
import sys
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
MAX_FILE_SIZE_MB = 100
def load_json_file(path: str):
"""Load JSON file with proper error handling and size validation."""
try:
size_mb = os.path.getsize(path) / (1024 * 1024)
if size_mb > MAX_FILE_SIZE_MB:
print(f"Error: File too large ({size_mb:.1f}MB). Max {MAX_FILE_SIZE_MB}MB", file=sys.stderr)
sys.exit(1)
with open(path, encoding='utf-8') as f:
return json.load(f)
except FileNotFoundError:
print(f"Error: File not found: {path}", file=sys.stderr)
sys.exit(1)
except PermissionError:
print(f"Error: Permission denied: {path}", file=sys.stderr)
sys.exit(1)
except json.JSONDecodeError as e:
print(f"Error: Invalid JSON in {path}: {e}", file=sys.stderr)
sys.exit(1)
class HealthStatus(Enum):
HEALTHY = "healthy"
WARNING = "warning"
DEGRADED = "degraded"
CRITICAL = "critical"
@dataclass
class ContextAnalysis:
total_tokens: int
token_limit: int
utilization: float
health_status: HealthStatus
health_score: float
degradation_risk: float
poisoning_risk: float
recommendations: list = field(default_factory=list)
def estimate_tokens(text: str) -> int:
"""Estimate token count (~4 chars per token for English)."""
return len(text) // 4
def estimate_message_tokens(messages: list) -> int:
"""Estimate tokens in message list."""
total = 0
for msg in messages:
if isinstance(msg, dict):
content = msg.get("content", "")
total += estimate_tokens(str(content))
# Add overhead for role, metadata
total += 10
else:
total += estimate_tokens(str(msg))
return total
def measure_attention_distribution(context_length: int, sample_size: int = 100) -> list:
"""
Simulate U-shaped attention distribution.
Real implementation would extract from model attention weights.
"""
attention = []
for i in range(sample_size):
position = i / sample_size
# U-shaped curve: high at start/end, low in middle
if position < 0.1:
score = 0.9 - position * 2
elif position > 0.9:
score = 0.7 + (position - 0.9) * 2
else:
score = 0.3 + 0.1 * math.sin(position * math.pi)
attention.append(score)
return attention
def detect_lost_in_middle(messages: list, critical_keywords: list) -> list:
"""Identify critical items in attention-degraded regions."""
if not messages:
return []
total = len(messages)
warnings = []
for i, msg in enumerate(messages):
position = i / total
content = str(msg.get("content", "") if isinstance(msg, dict) else msg)
# Middle region (10%-90%)
if 0.1 < position < 0.9:
for keyword in critical_keywords:
if keyword.lower() in content.lower():
warnings.append({
"position": i,
"position_pct": f"{position:.1%}",
"keyword": keyword,
"risk": "high" if 0.3 < position < 0.7 else "medium"
})
return warnings
def detect_poisoning_patterns(messages: list) -> dict:
"""Detect potential context poisoning indicators."""
error_patterns = [
r"error", r"failed", r"exception", r"cannot", r"unable",
r"invalid", r"not found", r"undefined", r"null"
]
# Simple contradiction check - look for both positive and negative statements
contradiction_keywords = [
("is correct", "is not correct"),
("should work", "should not work"),
("will succeed", "will fail"),
("is valid", "is invalid"),
]
errors_found = []
contradictions = []
for i, msg in enumerate(messages):
content = str(msg.get("content", "") if isinstance(msg, dict) else msg).lower()
# Check error patterns
for pattern in error_patterns:
if re.search(pattern, content):
errors_found.append({"position": i, "pattern": pattern})
# Check for contradiction keywords (simplified)
for pos_phrase, neg_phrase in contradiction_keywords:
if pos_phrase in content and neg_phrase in content:
contradictions.append({"position": i, "type": "self-contradiction"})
total = max(len(messages), 1)
return {
"error_density": len(errors_found) / total,
"contradiction_count": len(contradictions),
"poisoning_risk": min(1.0, (len(errors_found) * 0.1 + len(contradictions) * 0.3))
}
def calculate_health_score(utilization: float, degradation_risk: float, poisoning_risk: float) -> float:
"""
Calculate composite health score.
1.0 = healthy, 0.0 = critical
"""
score = 1.0
# Utilization penalty (kicks in after 70%)
if utilization > 0.7:
score -= (utilization - 0.7) * 1.5
# Degradation penalty
score -= degradation_risk * 0.3
# Poisoning penalty
score -= poisoning_risk * 0.2
return max(0.0, min(1.0, score))
def get_health_status(score: float) -> HealthStatus:
"""Map health score to status."""
if score > 0.8:
return HealthStatus.HEALTHY
elif score > 0.6:
return HealthStatus.WARNING
elif score > 0.4:
return HealthStatus.DEGRADED
return HealthStatus.CRITICAL
def analyze_context(messages: list, token_limit: int = 128000,
critical_keywords: Optional[list] = None) -> ContextAnalysis:
"""
Comprehensive context health analysis.
Args:
messages: List of context messages
token_limit: Model's context window size
critical_keywords: Keywords that should be at attention-favored positions
Returns:
ContextAnalysis with health metrics and recommendations
"""
critical_keywords = critical_keywords or ["goal", "task", "important", "critical", "must"]
# Calculate token utilization
total_tokens = estimate_message_tokens(messages)
utilization = total_tokens / token_limit
# Check for lost-in-middle issues
middle_warnings = detect_lost_in_middle(messages, critical_keywords)
degradation_risk = min(1.0, len(middle_warnings) * 0.2)
# Check for poisoning
poisoning = detect_poisoning_patterns(messages)
poisoning_risk = poisoning["poisoning_risk"]
# Calculate health
health_score = calculate_health_score(utilization, degradation_risk, poisoning_risk)
health_status = get_health_status(health_score)
# Generate recommendations
recommendations = []
if utilization > 0.8:
recommendations.append("URGENT: Context utilization >80%. Trigger compaction immediately.")
elif utilization > 0.7:
recommendations.append("WARNING: Context utilization >70%. Plan for compaction.")
if middle_warnings:
recommendations.append(f"Found {len(middle_warnings)} critical items in middle region. "
"Consider moving to beginning/end.")
if poisoning_risk > 0.3:
recommendations.append("High poisoning risk detected. Review recent tool outputs for errors.")
if health_status == HealthStatus.CRITICAL:
recommendations.append("CRITICAL: Consider context reset with clean state.")
return ContextAnalysis(
total_tokens=total_tokens,
token_limit=token_limit,
utilization=utilization,
health_status=health_status,
health_score=health_score,
degradation_risk=degradation_risk,
poisoning_risk=poisoning_risk,
recommendations=recommendations
)
def calculate_budget(system: int, tools: int, docs: int, history: int,
buffer_pct: float = 0.15) -> dict:
"""Calculate context budget allocation."""
subtotal = system + tools + docs + history
buffer = int(subtotal * buffer_pct)
total = subtotal + buffer
return {
"allocation": {
"system_prompt": system,
"tool_definitions": tools,
"retrieved_docs": docs,
"message_history": history,
"reserved_buffer": buffer
},
"total_budget": total,
"warning_threshold": int(total * 0.7),
"critical_threshold": int(total * 0.8),
"recommendations": [
f"Trigger compaction at {int(total * 0.7):,} tokens",
f"Aggressive optimization at {int(total * 0.8):,} tokens",
f"Reserved {buffer:,} tokens ({buffer_pct:.0%}) for responses"
]
}
def main():
parser = argparse.ArgumentParser(description="Context health analyzer")
subparsers = parser.add_subparsers(dest="command", required=True)
# Analyze command
analyze_parser = subparsers.add_parser("analyze", help="Analyze context health")
analyze_parser.add_argument("context_file", help="JSON file with messages array")
analyze_parser.add_argument("--limit", type=int, default=128000, help="Token limit")
analyze_parser.add_argument("--keywords", nargs="+", help="Critical keywords to track")
# Budget command
budget_parser = subparsers.add_parser("budget", help="Calculate context budget")
budget_parser.add_argument("--system", type=int, default=2000, help="System prompt tokens")
budget_parser.add_argument("--tools", type=int, default=1500, help="Tool definitions tokens")
budget_parser.add_argument("--docs", type=int, default=3000, help="Retrieved docs tokens")
budget_parser.add_argument("--history", type=int, default=5000, help="Message history tokens")
budget_parser.add_argument("--buffer", type=float, default=0.15, help="Buffer percentage")
args = parser.parse_args()
if args.command == "analyze":
data = load_json_file(args.context_file)
messages = data if isinstance(data, list) else data.get("messages", [])
result = analyze_context(messages, args.limit, args.keywords)
print(json.dumps({
"total_tokens": result.total_tokens,
"token_limit": result.token_limit,
"utilization": f"{result.utilization:.1%}",
"health_status": result.health_status.value,
"health_score": f"{result.health_score:.2f}",
"degradation_risk": f"{result.degradation_risk:.2f}",
"poisoning_risk": f"{result.poisoning_risk:.2f}",
"recommendations": result.recommendations
}, indent=2))
elif args.command == "budget":
result = calculate_budget(args.system, args.tools, args.docs, args.history, args.buffer)
print(json.dumps(result, indent=2))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,246 @@
"""Tests for context-engineering edge case handling.
Tests the error handling improvements in compression_evaluator.py and context_analyzer.py:
- File not found
- Permission denied
- Invalid JSON
- File too large
- UTF-8 encoding
"""
import json
import os
import stat
import subprocess
import sys
import tempfile
from pathlib import Path
import pytest
SCRIPTS_DIR = Path(__file__).parent.parent
PYTHON = sys.executable
class TestCompressionEvaluatorEdgeCases:
"""Test edge cases in compression_evaluator.py"""
@pytest.fixture
def valid_json_file(self, tmp_path):
"""Create valid JSON file."""
f = tmp_path / "valid.json"
f.write_text('{"messages": [{"role": "user", "content": "hello"}]}', encoding='utf-8')
return str(f)
@pytest.fixture
def valid_text_file(self, tmp_path):
"""Create valid text file."""
f = tmp_path / "compressed.txt"
f.write_text("Summary of conversation", encoding='utf-8')
return str(f)
def run_script(self, *args, timeout=30):
"""Run compression_evaluator.py with args."""
cmd = [PYTHON, str(SCRIPTS_DIR / "compression_evaluator.py")] + list(args)
result = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
return result
def test_missing_file_exits_1(self, tmp_path):
"""Test exit code 1 when file not found."""
result = self.run_script("evaluate", "/nonexistent/file.json", str(tmp_path / "c.txt"))
assert result.returncode == 1
assert "File not found" in result.stderr
def test_missing_file_error_message(self, tmp_path):
"""Test error message format for missing file."""
missing = "/this/path/does/not/exist/file.json"
result = self.run_script("evaluate", missing, str(tmp_path / "c.txt"))
assert result.returncode == 1
assert missing in result.stderr or "not found" in result.stderr.lower()
def test_invalid_json_exits_1(self, tmp_path, valid_text_file):
"""Test exit code 1 when JSON is invalid."""
bad_json = tmp_path / "bad.json"
bad_json.write_text("{invalid json content", encoding='utf-8')
result = self.run_script("evaluate", str(bad_json), valid_text_file)
assert result.returncode == 1
assert "Invalid JSON" in result.stderr or "JSON" in result.stderr
def test_valid_files_succeed(self, valid_json_file, valid_text_file):
"""Test success with valid inputs."""
result = self.run_script("evaluate", valid_json_file, valid_text_file)
assert result.returncode == 0
output = json.loads(result.stdout)
assert "compression_ratio" in output
assert "quality_score" in output
def test_generate_probes_missing_file(self):
"""Test generate-probes with missing file."""
result = self.run_script("generate-probes", "/nonexistent/context.json")
assert result.returncode == 1
assert "File not found" in result.stderr
def test_generate_probes_invalid_json(self, tmp_path):
"""Test generate-probes with invalid JSON."""
bad = tmp_path / "bad.json"
bad.write_text("not valid json {{{", encoding='utf-8')
result = self.run_script("generate-probes", str(bad))
assert result.returncode == 1
assert "Invalid JSON" in result.stderr or "JSON" in result.stderr
def test_generate_probes_success(self, valid_json_file):
"""Test generate-probes with valid file."""
result = self.run_script("generate-probes", valid_json_file)
assert result.returncode == 0
output = json.loads(result.stdout)
assert isinstance(output, list)
def test_utf8_content(self, tmp_path):
"""Test UTF-8 encoding with special characters."""
utf8_file = tmp_path / "utf8.json"
content = {"messages": [{"role": "user", "content": "日本語テスト émojis 🎉"}]}
utf8_file.write_text(json.dumps(content), encoding='utf-8')
compressed = tmp_path / "compressed.txt"
compressed.write_text("Summary with 日本語 and émojis 🎉", encoding='utf-8')
result = self.run_script("evaluate", str(utf8_file), str(compressed))
assert result.returncode == 0
@pytest.mark.skipif(os.name == 'nt', reason="Permission test not reliable on Windows")
def test_permission_denied(self, tmp_path):
"""Test permission denied error."""
protected = tmp_path / "protected.json"
protected.write_text('{"messages": []}', encoding='utf-8')
os.chmod(protected, 0o000)
try:
result = self.run_script("generate-probes", str(protected))
assert result.returncode == 1
assert "Permission denied" in result.stderr or "permission" in result.stderr.lower()
finally:
os.chmod(protected, stat.S_IRUSR | stat.S_IWUSR)
class TestContextAnalyzerEdgeCases:
"""Test edge cases in context_analyzer.py"""
@pytest.fixture
def valid_context_file(self, tmp_path):
"""Create valid context file."""
f = tmp_path / "context.json"
content = {
"messages": [
{"role": "user", "content": "implement feature X"},
{"role": "assistant", "content": "I'll help with that"}
]
}
f.write_text(json.dumps(content), encoding='utf-8')
return str(f)
def run_script(self, *args, timeout=30):
"""Run context_analyzer.py with args."""
cmd = [PYTHON, str(SCRIPTS_DIR / "context_analyzer.py")] + list(args)
result = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
return result
def test_missing_file_exits_1(self):
"""Test exit code 1 when file not found."""
result = self.run_script("analyze", "/nonexistent/context.json")
assert result.returncode == 1
assert "File not found" in result.stderr
def test_invalid_json_exits_1(self, tmp_path):
"""Test exit code 1 when JSON is invalid."""
bad = tmp_path / "bad.json"
bad.write_text("not json", encoding='utf-8')
result = self.run_script("analyze", str(bad))
assert result.returncode == 1
assert "Invalid JSON" in result.stderr or "JSON" in result.stderr
def test_valid_file_succeeds(self, valid_context_file):
"""Test success with valid input."""
result = self.run_script("analyze", valid_context_file)
assert result.returncode == 0
output = json.loads(result.stdout)
assert "health_status" in output or "health_score" in output
def test_utf8_content(self, tmp_path):
"""Test UTF-8 encoding with international characters."""
utf8_file = tmp_path / "utf8.json"
content = {
"messages": [
{"role": "user", "content": "日本語で説明してください"},
{"role": "assistant", "content": "はい、説明します。émojis: 🎉🚀"}
]
}
utf8_file.write_text(json.dumps(content, ensure_ascii=False), encoding='utf-8')
result = self.run_script("analyze", str(utf8_file))
assert result.returncode == 0
def test_empty_messages_array(self, tmp_path):
"""Test handling of empty messages array."""
f = tmp_path / "empty.json"
f.write_text('{"messages": []}', encoding='utf-8')
result = self.run_script("analyze", str(f))
assert result.returncode == 0
def test_direct_messages_list(self, tmp_path):
"""Test handling of direct messages list (no wrapper)."""
f = tmp_path / "direct.json"
content = [
{"role": "user", "content": "hello"},
{"role": "assistant", "content": "hi"}
]
f.write_text(json.dumps(content), encoding='utf-8')
result = self.run_script("analyze", str(f))
assert result.returncode == 0
@pytest.mark.skipif(os.name == 'nt', reason="Permission test not reliable on Windows")
def test_permission_denied(self, tmp_path):
"""Test permission denied error."""
protected = tmp_path / "protected.json"
protected.write_text('{"messages": []}', encoding='utf-8')
os.chmod(protected, 0o000)
try:
result = self.run_script("analyze", str(protected))
assert result.returncode == 1
assert "Permission denied" in result.stderr or "permission" in result.stderr.lower()
finally:
os.chmod(protected, stat.S_IRUSR | stat.S_IWUSR)
def test_with_keywords_filter(self, valid_context_file):
"""Test analyze with keywords filter."""
result = self.run_script("analyze", valid_context_file, "--keywords", "feature,implement")
assert result.returncode == 0
def test_with_limit(self, valid_context_file):
"""Test analyze with limit parameter."""
result = self.run_script("analyze", valid_context_file, "--limit", "10")
assert result.returncode == 0
class TestFileSizeValidation:
"""Test file size validation (100MB limit)."""
def test_large_file_warning_in_code(self):
"""Verify MAX_FILE_SIZE_MB constant exists in scripts."""
evaluator = SCRIPTS_DIR / "compression_evaluator.py"
analyzer = SCRIPTS_DIR / "context_analyzer.py"
eval_content = evaluator.read_text()
analyzer_content = analyzer.read_text()
assert "MAX_FILE_SIZE_MB = 100" in eval_content
assert "MAX_FILE_SIZE_MB = 100" in analyzer_content
if __name__ == "__main__":
pytest.main([__file__, "-v"])