import json import argparse import sys import os def parse_manifest_file(filepath): """ Parses the target sub-manifest file to extract host and file hashes. """ host = None file_data = {} try: with open(filepath, 'r') as f: for line in f: line = line.strip() if not line: continue # Extract Header Hostname if line.startswith('#'): if line.startswith('# host='): host = line.split('=')[1].strip() continue # Extract Hash and Path parts = line.split(maxsplit=1) if len(parts) == 2: md5, path = parts file_data[path] = md5 except Exception as e: print(f"Critical Error: Could not read manifest file: {e}") sys.exit(1) if not host: print("Warning: No '# host=' header found. Validation of 'varying' files will be impossible.") return host, file_data def compare_against_research(manifest_path, research_path): # 1. Load the Ground Truth (Research) try: with open(research_path, 'r') as f: research = json.load(f) except Exception as e: print(f"Critical Error: Could not load research JSON: {e}") sys.exit(1) # 2. Parse the Sub-Manifest (Target) target_host, target_files = parse_manifest_file(manifest_path) print(f"--- VALIDATION REPORT ---") print(f"Target Host: {target_host if target_host else 'UNKNOWN'}") print(f"Files Scanned: {len(target_files)}") print(f"Research Baseline: {research['meta']['total_hosts']} hosts known") print("-" * 30) # Buckets for results results = { "ok_consistent": [], # Matches cluster-wide invariant "ok_varying": [], # Matches expected value for this specific host "fail_consistent": [], # Violated a cluster-wide invariant "fail_varying": [], # Violated the specific expectation for this host "unknown_file": [], # File not present in original research "unverifiable_host": [] # File is varying, but we don't know this host } # 3. The Comparison Logic for fpath, fhash in target_files.items(): # CHECK A: Is it a Known Consistent File? if fpath in research['consistent_files']: expected_hash = research['consistent_files'][fpath] if fhash == expected_hash: results['ok_consistent'].append(fpath) else: results['fail_consistent'].append({ "path": fpath, "found": fhash, "expected": expected_hash }) # CHECK B: Is it a Known Varying File? elif fpath in research['varying_files']: # We need the hostname to validate varying files if not target_host: results['unverifiable_host'].append(fpath) continue node_states = research['varying_files'][fpath]['states'] # Does the research know about this host? if target_host in node_states: expected_hash = node_states[target_host] if expected_hash == "MISSING": # We found a file that research says shouldn't exist on this node results['fail_varying'].append({ "path": fpath, "found": fhash, "expected": "File should not exist on this node" }) elif fhash == expected_hash: results['ok_varying'].append(fpath) else: results['fail_varying'].append({ "path": fpath, "found": fhash, "expected": expected_hash }) else: # This is a new host not in the original research # We can't validate varying files for unknown hosts results['unverifiable_host'].append(fpath) # CHECK C: Unknown File else: results['unknown_file'].append(fpath) # 4. Print Summary # OK Sections if results['ok_consistent']: print(f"\n[PASS] Consistent Files Verified: {len(results['ok_consistent'])}") if results['ok_varying']: print(f"[PASS] Varying Files Verified: {len(results['ok_varying'])}") # FAIL Sections if results['fail_consistent']: print(f"\n[FAIL] GLOBAL CONSISTENCY VIOLATIONS ({len(results['fail_consistent'])})") print("These files MUST be identical across the cluster but differed:") for item in results['fail_consistent']: print(f" X {item['path']}") print(f" Expected: {item['expected']}") print(f" Found: {item['found']}") if results['fail_varying']: print(f"\n[FAIL] NODE SPECIFIC VIOLATIONS ({len(results['fail_varying'])})") print(f"These files did not match the expected state for {target_host}:") for item in results['fail_varying']: print(f" X {item['path']}") print(f" Expected: {item['expected']}") print(f" Found: {item['found']}") # WARNING Sections if results['unknown_file']: print(f"\n[WARN] UNKNOWN FILES ({len(results['unknown_file'])})") print("These files were not seen in the original research scan:") for f in results['unknown_file'][:5]: print(f" ? {f}") if len(results['unknown_file']) > 5: print(f" ... and {len(results['unknown_file'])-5} more") if results['unverifiable_host']: print(f"\n[WARN] UNVERIFIABLE FILES ({len(results['unverifiable_host'])})") print("These files vary per-node, but this host is new/unknown to the research:") for f in results['unverifiable_host'][:5]: print(f" ? {f}") # Exit Code if results['fail_consistent'] or results['fail_varying']: sys.exit(1) sys.exit(0) if __name__ == "__main__": parser = argparse.ArgumentParser(description="Validate a sub-manifest against cluster research.") parser.add_argument("sub_manifest", help="The partial manifest file (subdir scan)") parser.add_argument("research_json", help="The cluster_ground_truth.json file") args = parser.parse_args() compare_against_research(args.sub_manifest, args.research_json)