#!/usr/bin/env python3 from argparse import ArgumentParser from asyncio import ( BoundedSemaphore, IncompleteReadError, create_subprocess_exec, gather, get_running_loop, run, ) from asyncio.subprocess import DEVNULL, PIPE from contextlib import asynccontextmanager from enum import Enum, auto from logging import DEBUG, INFO, basicConfig, getLogger from os import cpu_count, remove, removedirs, renames from os.path import isdir, join as pjoin, relpath, split as psplit from sys import exit logger = getLogger(__name__) class Mode(Enum): identical = auto() nonconflicting = auto() def parse_args(args=None): parser = ArgumentParser(description="Extract common files.") modes = parser.add_mutually_exclusive_group(required=True) for m in Mode: modes.add_argument( f"--{m.name}", dest="mode", action="store_const", const=m, help=f"extract {m.name} common files", ) parser.add_argument("-n", "--dry-run", action="store_true", help="Dry run (do nothing)") parser.add_argument("-v", "--verbose", action="store_true", help="Raise verbosity") parser.add_argument( "common_dir", metavar="COMMON_DIR", help="common files directory to move to" ) parser.add_argument("targets", nargs="+", metavar="INPUT_DIR", help="directory to move from") return parser.prog, parser.parse_args(args) @asynccontextmanager async def bounded_exec(*args, **kwargs): loop = get_running_loop() try: semaphore = loop.__bounded_exec_semaphore except AttributeError: semaphore = loop.__bounded_exec_semaphore = BoundedSemaphore(value=cpu_count() + 1) async with semaphore: process = await create_subprocess_exec(*args, **kwargs) try: yield process finally: if process.returncode is None: try: process.kill() except OSError: pass await process.wait() class DiffError(RuntimeError): pass async def diff(file1, file2): async with bounded_exec( "diff", "-q", file1, file2, stdin=DEVNULL, stdout=DEVNULL, stderr=DEVNULL ) as p: ret = await p.wait() if ret != 0: raise DiffError() async def identical(files): if len(files) < 2: return True first, *rest = files fut = gather(*[diff(first, f) for f in rest]) try: await fut except DiffError: return False else: return True finally: fut.cancel() def removes(path): remove(path) head, tail = psplit(path) if head and tail: try: removedirs(head) except OSError: pass def commonify_file(common_file, files): first, *rest = files renames(first, common_file) for f in rest: removes(f) async def find_files(path): async with bounded_exec( "find", path, "-type", "f", "-print0", stdin=DEVNULL, stdout=PIPE, stderr=DEVNULL ) as p: while True: try: line = await p.stdout.readuntil(b"\x00") except IncompleteReadError as e: line = e.partial if not line: break raise yield line[:-1].decode() async def scan_one(target, files): n = 0 async for f in find_files(target): files.setdefault(relpath(f, target), []).append(target) n += 1 logger.info("Found %d files in %r", n, target) async def scan(targets): files = {} await gather(*[scan_one(t, files) for t in targets]) return files def arg_dir(s): if not isdir(s): raise ValueError(f"{s!r} is not a directory") return s async def commonify(settings, common_file, targets): files = [pjoin(t, common_file) for t in targets] if not await identical(files): logger.info("Divergent file %r", common_file) return False if len(files) == len(settings.targets): logger.debug("Identical file %r", common_file) elif settings.mode == Mode.nonconflicting: logger.debug("Nonconflicting file %r in %r", common_file, targets) else: logger.debug("Partly identical file %r in %r", common_file, targets) return False if not settings.dry_run: commonify_file(pjoin(settings.common_dir, common_file), files) return True async def main(settings): files = await scan(arg_dir(t) for t in settings.targets) results = await gather(*[commonify(settings, *ft) for ft in files.items()]) logger.info("%d %s files in %r", sum(results), settings.mode.name, settings.common_dir) return 0 if __name__ == "__main__": prog, settings = parse_args() basicConfig(level=DEBUG if settings.verbose else INFO, format=f"{prog}: %(message)s") exit(run(main(settings), debug=settings.verbose))