#!/usr/bin/env python3 from argparse import ArgumentParser from asyncio.subprocess import DEVNULL from asyncio import get_event_loop, gather, BoundedSemaphore, create_subprocess_exec from enum import Enum, auto from logging import getLogger, basicConfig, INFO from os import cpu_count from pathlib import Path from sys import exit logger = getLogger(__name__) class Mode(Enum): identical = auto() nonconflicting = auto() def parse_args(args=None): parser = ArgumentParser(description="Extract common files.") modes = parser.add_mutually_exclusive_group(required=True) for m in Mode: modes.add_argument(f"--{m.name}", dest="mode", action="store_const", const=m, help=f"extract {m.name} common files") parser.add_argument("common_dir", metavar="COMMON_DIR", help="common files directory to move to") parser.add_argument("targets", nargs="+", metavar="INPUT_DIR", help="directory to move from") return parser.parse_args(args) diff_semaphore = BoundedSemaphore(value=cpu_count()) class DiffError(RuntimeError): def __init__(self, file1, file2): self.file1 = file1 self.file2 = file2 async def diff(file1, file2): async with diff_semaphore: p = await create_subprocess_exec("diff", "-q", str(file1), str(file2), stdin=DEVNULL, stdout=DEVNULL, stderr=DEVNULL) try: ret = await p.wait() except: try: p.kill() except: pass if ret != 0: raise DiffError(file1, file2) async def identical(files): if len(files) < 2: return True first, *rest = files fut = gather(*[diff(first, f) for f in rest]) try: await fut except DiffError as e: logger.info("Divergent files: %s <> %s", e.file1, e.file2) return False else: return True finally: fut.cancel() def commonify_file(common_file, files): common_file.parent.mkdir(parents=True, exist_ok=True) first, *rest = files first.rename(common_file) for f in rest: f.unlink() async def commonify_identical(common_file, files): if await identical(files): commonify_file(common_file, files) return 1 return 0 def arg_dir(s): p = Path(s) if not p.is_dir(): raise ValueError(f"{s!r} is not a directory") return p async def main(settings): ignore_len = settings.mode == Mode.nonconflicting common_dir = arg_dir(settings.common_dir) targets = [arg_dir(t) for t in settings.targets] d = {} for t in targets: n = 0 for f in t.glob("**/*"): if f.is_file(): d.setdefault(f.relative_to(t), []).append(f) n += 1 logger.info("%s: Found %d files", t, n) results = await gather(*[ commonify_identical(common_dir / f, tf) for f, tf in d.items() if ignore_len or len(tf) == len(targets) ]) logger.info("%s: %d %s files", common_dir, sum(results), settings.mode.name) return 0 if __name__ == "__main__": basicConfig(level=INFO) settings = parse_args() loop = get_event_loop() exit(loop.run_until_complete(main(settings)))