#!/usr/bin/env python3 from __future__ import print_function, absolute_import, division import errno import logging import math import os import signal import stat import subprocess from dataclasses import dataclass from functools import reduce from itertools import groupby from operator import itemgetter from time import time, mktime, time_ns from typing import Sequence, Iterator, Union from dateutil import parser as dateparser from fuse import FUSE, Operations, LoggingMixIn, FuseOSError @dataclass class Entry: name: str parent: 'Directory | None' expiration: float @property def path(self) -> str: if self.parent is None: return self.name else: return f'{self.parent.path}/{self.name}'.lstrip('/') @dataclass class Directory(Entry, dict[str, Entry]): def __iter__(self) -> Iterator[str]: return iter(self.children) def __len__(self) -> int: return len(self.children) def __getitem__(self, name: str) -> Union[Entry, None]: if name in list(self.children.keys()): return self.children[name] else: return None children: dict[str, Entry] @dataclass class File(Entry): content: Union[str, None] mod_time: Union[int, None] loaded: bool = False @property def size(self): return len(self.content) def parse_history_line(line: str) -> int: return int(mktime(dateparser.parse(''.join(line.split('-')[2:5]).strip()).timetuple())) def get_gopass_data(cmd: Sequence[str]) -> Union[str, None]: my_env: dict[str, str] = os.environ.copy() process: subprocess.Popen = subprocess.Popen(cmd, stdout=subprocess.PIPE, env=my_env, stdin=subprocess.PIPE) stdout, stderr = process.communicate() exit_code: int = process.wait() if exit_code == 0: return stdout.decode('utf-8') else: return None def get_secret(entry: File) -> Union[str, None]: cmd: Sequence[str] = ['gopass', 'show', '-f', entry.path] data: Union[str, None] = get_gopass_data(cmd) return data def get_file_history(file: File) -> Union[list[str], None]: cmd: Sequence[str] = ['gopass', 'history', file.path] data: Union[str, None] = get_gopass_data(cmd) if data is None: return None else: return [s for s in data.split('\n') if len(s) > 0] def get_secrets(path: str) -> Union[list[str], None]: cmd: Sequence[str] = ['gopass', 'list', '--flat', '--strip-prefix', path, path] data: Union[str, None] = get_gopass_data(cmd) if data is None: return None else: return data.split() class GopassFilesystem(LoggingMixIn, Operations): logger: logging refresh_interval: int root: Directory user_id: int group_id: int use_ns: bool = True def __init__(self, refresh_interval: int): self.logger = logging.getLogger() self.refresh_interval = refresh_interval self.user_id = os.getuid() self.group_id = os.getgid() self.root = Directory('', None, 0, {}) self.refresh(self.root) # noinspection PyUnusedLocal def reload(self, signum: int, frame) -> None: self.refresh_item(self.root) def refresh_item(self, entry: Entry) -> None: if isinstance(entry, Directory): entry.children = self.get_directory(entry.path, entry).children elif isinstance(entry, File): entry.loaded = False entry.content = None entry.expiration = time() + self.refresh_interval def refresh(self, entry: Entry) -> None: if time() > entry.expiration: self.refresh_item(entry) elif isinstance(entry, Directory): if len(entry.children) == 0: self.refresh_item(entry) entry.expiration = time() + self.refresh_interval if entry.parent is not None: self.refresh(entry.parent) def get_directory(self, path: str, parent: Union[Directory, None]) -> Directory: secrets: Union[Sequence[str], None] = get_secrets(path) children: dict[str, Entry] if secrets is None: children = dict() else: children = self.convert_to_directory_tree(parent, secrets) return Directory(path.split('/')[-1] if parent is not None else '', parent, time() + self.refresh_interval, children) def convert_to_directory_tree(self, parent: Directory, entries: Union[Sequence[str], Sequence[Sequence[str]]]) -> \ dict[str, Entry]: now: float = time() if isinstance(entries[0], list): paths: Sequence[Sequence[str]] = entries else: paths: Sequence[Sequence[str]] = [s.split('/') for s in entries] grouped_paths: groupby[str, Sequence[str]] = groupby(paths, key=itemgetter(0)) tree: dict[str, Entry] = dict() name: str children: Sequence[Sequence[str]] for name, children in grouped_paths: children: Sequence[Sequence[str]] = [s[1:] for s in children] if len(children[0]) > 0: new_directory: Directory = Directory(name, parent, now + self.refresh_interval, {}) new_directory.children = self.convert_to_directory_tree(new_directory, children) tree[name] = new_directory else: tree[name] = File(name, parent, math.inf, None, None) return tree def get_entry(self, path: str) -> Union[Entry, None]: if path == '/': return self.root path = path.lstrip('/') path_parts: Sequence[str] = path.split('/') if path_parts[0] not in list(self.root.children.keys()): return None entry: Entry = reduce((lambda e, p: e[p] if e[p] is not None else e), path_parts, self.root) if entry.name != path_parts[-1]: return None else: return entry def load_file(self, file: File) -> None: if not file.loaded: secret: Union[str, None] = get_secret(file) if secret is None: raise FuseOSError(errno.EIO) file.content = secret history: Union[Sequence[str], None] = get_file_history(file) if history is None: raise FuseOSError(errno.EIO) file.mod_time = parse_history_line(history[0]) * (10 ** 9) file.loaded = True file.expiration = time() + self.refresh_interval def getattr(self, path: str, fh: int = None) -> dict[str, int]: now: int = time_ns() st: dict[str, int] = dict(st_atime=now, st_mode=0, st_uid=self.user_id, st_gid=self.group_id) if path == '/': st['st_mode'] = stat.S_IFDIR | 0o755 st['st_nlink'] = 2 else: entry: Union[Entry, None] = self.get_entry(path) if isinstance(entry, File): st['st_mode'] = stat.S_IFREG | 0o400 st['st_nlink'] = 1 self.load_file(entry) st['st_size'] = len(entry.content) st['st_ctime'] = entry.mod_time st['st_mtime'] = entry.mod_time elif isinstance(entry, Directory): st['st_mode'] = stat.S_IFDIR | 0o500 st['st_nlink'] = len([d for d in entry.children if isinstance(d, Directory)]) st['st_ctime'] = now st['st_mtime'] = now else: raise FuseOSError(errno.ENOENT) return st def read(self, path: str, size: int, offset: int, fh: int) -> bytes: entry: Union[Entry, None] = self.get_entry(path) if isinstance(entry, File): self.load_file(entry) return ('%s\n' % entry.content).encode('utf-8') elif isinstance(entry, Directory): raise FuseOSError(errno.EISDIR) elif entry is None: raise FuseOSError(errno.ENOENT) def readdir(self, path: str, fh: int) -> list[str]: entry: Union[Entry, None] = self.get_entry(path) if isinstance(entry, Directory): return ['.', '..'] + list(entry.children.keys()) if isinstance(entry, File): raise FuseOSError(errno.ENOTDIR) elif entry is None: raise FuseOSError(errno.ENOENT) def access(self, path: str, amode: int) -> None: entry: Union[Entry, None] = self.get_entry(path) if entry is None: raise FuseOSError(errno.ENOENT) else: self.refresh(entry) def open(self, path: str, flags: int) -> int: entry: Union[Entry, None] = self.get_entry(path) if entry is None: raise FuseOSError(errno.ENOENT) else: access_mode: int = os.O_RDONLY | os.O_WRONLY | os.O_RDWR | os.O_APPEND | os.O_CREAT if (flags & access_mode) != os.O_RDONLY: raise FuseOSError(errno.EROFS) else: return 0 if __name__ == '__main__': import argparse parser = argparse.ArgumentParser() parser.add_argument('--mount', required=True, type=str) parser.add_argument('--refresh-interval', required=False, default=60, type=int) parser.add_argument('--foreground', required=False, action='store_true') args = parser.parse_args() logging.basicConfig(level=logging.INFO) gfs = GopassFilesystem(args.refresh_interval) signal.signal(signal.SIGUSR1, gfs.reload) fuse = FUSE( gfs, args.mount, foreground=args.foreground, nothreads=True, allow_other=False)