diff options
author | Eric Anderson | 2017-12-28 23:39:49 -0600 |
---|---|---|
committer | Eric Anderson | 2017-12-28 23:39:49 -0600 |
commit | ead7cc8bdba22f846237e6af92fbd89a483d5e79 (patch) | |
tree | 3a21a0e24038fa152f640a93ee615b0fd45c669a | |
parent | 9e79f3864bb93cf494da1e00d572be8eefe30a36 (diff) | |
download | aur-ead7cc8bdba22f846237e6af92fbd89a483d5e79.tar.gz |
Bump to 0.5.0 for daemon-based cache
This keeps the cache continually up-to-date as the daemon is provided
updates from Avahi as devices come and go. The daemon automatically
exits after a while to clean itself up.
-rw-r--r-- | .SRCINFO | 10 | ||||
-rw-r--r-- | PKGBUILD | 8 | ||||
-rwxr-xr-x | pkgdistcache-client | 260 | ||||
-rw-r--r-- | pkgdistcache.conf | 5 |
4 files changed, 183 insertions, 100 deletions
@@ -1,9 +1,9 @@ # Generated by mksrcinfo v8 -# Thu Dec 28 20:23:42 UTC 2017 +# Fri Dec 29 05:39:42 UTC 2017 pkgbase = pkgdistcache pkgdesc = A distributed local-network cache for pacman packages - pkgver = 0.4.6 - pkgrel = 3 + pkgver = 0.5.0 + pkgrel = 1 url = http://venator.ath.cx/dw/doku.php?id=linux:pkgdistcache install = pkgdistcache.install arch = any @@ -19,9 +19,9 @@ pkgbase = pkgdistcache source = pkgdistcache-daemon source = pkgdistcache.conf source = pkgdistcached.service - sha256sums = 4141d8d07a7c67ebd0dba24ca8aa2a97bcf93a2915b7afbebbd85728bbbc356c + sha256sums = f80850ff4cfe775e0dd73961924d5eb934e2c4be6a8d8e3e2c3c404835e7f05b sha256sums = 10379b95265e7aa3c6334197ef255327281e35b958c3c062ae893dd3a646a66e - sha256sums = d77ac418aa651bc622cd91204d6907554c6cdb4bb989e484cc54da32342faa51 + sha256sums = 5628cf4d81adaf7a3dd7243f51af1a30fff73a31b421615a7c0051280618f457 sha256sums = b5fb3b3d40b31af92a1676364dfc42a35f564067988697ea4d56fd71b4b414e3 pkgname = pkgdistcache @@ -2,8 +2,8 @@ # Maintainer: Eric Anderson <ejona86@gmail.com> pkgname=pkgdistcache -pkgver=0.4.6 -pkgrel=3 +pkgver=0.5.0 +pkgrel=1 pkgdesc='A distributed local-network cache for pacman packages' arch=('any') url='http://venator.ath.cx/dw/doku.php?id=linux:pkgdistcache' @@ -14,9 +14,9 @@ source=('pkgdistcache-client' 'pkgdistcache-daemon' 'pkgdistcache.conf' 'pkgdistcached.service') -sha256sums=('4141d8d07a7c67ebd0dba24ca8aa2a97bcf93a2915b7afbebbd85728bbbc356c' +sha256sums=('f80850ff4cfe775e0dd73961924d5eb934e2c4be6a8d8e3e2c3c404835e7f05b' '10379b95265e7aa3c6334197ef255327281e35b958c3c062ae893dd3a646a66e' - 'd77ac418aa651bc622cd91204d6907554c6cdb4bb989e484cc54da32342faa51' + '5628cf4d81adaf7a3dd7243f51af1a30fff73a31b421615a7c0051280618f457' 'b5fb3b3d40b31af92a1676364dfc42a35f564067988697ea4d56fd71b4b414e3') package() { diff --git a/pkgdistcache-client b/pkgdistcache-client index c153d40052d8..dd499cc040cf 100755 --- a/pkgdistcache-client +++ b/pkgdistcache-client @@ -5,9 +5,11 @@ # by Alessio Bianchi <venator85@gmail.com> # +import collections import os import os.path import pickle +import socket import string import subprocess import sys @@ -18,7 +20,7 @@ import dbus import dbus.glib import requests import xdg.BaseDirectory -from gi.repository import GObject +from gi.repository import GLib colors = { 'none': '\033[0m', @@ -73,6 +75,25 @@ class Service(object): return hash(self) == hash(other) +def once(func): + def wrapped(*args): + if wrapped.ran: + return + wrapped.ran = True + return func(*args) + wrapped.ran = False + return wrapped + + +AvahiService = collections.namedtuple( + 'AvahiService', + ['interface', 'protocol', 'name', 'stype', 'domain', 'flags']) +AvahiResolvedService = collections.namedtuple( + 'AvahiResolvedService', + ['interface', 'protocol', 'name', 'type', 'domain', 'host', + 'aprotocol', 'address', 'port', 'txt', 'flags']) + + class AvahiBrowser(object): def __init__(self): # Connect to the system bus... @@ -84,10 +105,11 @@ class AvahiBrowser(object): self.server = dbus.Interface(avahi_proxy, avahi.DBUS_INTERFACE_SERVER) self.version_string = self.server.GetVersionString() self.domain = "local" - self.loop = GObject.MainLoop() - self.services = [] + self.services = {} + + def browse(self, stype, all_for_now_callback): + all_for_now_callback = once(all_for_now_callback) - def browse(self, stype): # Ask the server for a path to the browser object for the service we're # interested in... browser_path = self.server.ServiceBrowserNew( @@ -103,34 +125,155 @@ class AvahiBrowser(object): browser_proxy, avahi.DBUS_INTERFACE_SERVICE_BROWSER) # Now connect the call backs to the relevant signals. - browser.connect_to_signal('ItemNew', self.new_service) - browser.connect_to_signal('AllForNow', self.all_for_now) - self.loop.run() - return self.services - - def new_service(self, interface, protocol, name, stype, domain, flags): - if flags & avahi.LOOKUP_RESULT_LOCAL: + browser.connect_to_signal('ItemNew', self._item_new) + browser.connect_to_signal('ItemRemove', self._item_remove) + browser.connect_to_signal('AllForNow', all_for_now_callback) + + def failure(*args): + self._failure(*args) + all_for_now_callback() + browser.connect_to_signal('Failure', failure) + + def _item_new(self, *args): + service = AvahiService(*args) + if service.flags & avahi.LOOKUP_RESULT_LOCAL: # The service is on this machine; ignore return try: s = self.server.ResolveService( - interface, - protocol, - name, - stype, - domain, + service.interface, + service.protocol, + service.name, + service.stype, + service.domain, avahi.PROTO_UNSPEC, dbus.UInt32(0)) - # service name, host, ip, port - service = Service(str(s[3]), str(s[2]), str(s[7]), int(s[8])) - self.services.append(service) + self.services[service] = AvahiResolvedService(*s) except dbus.DBusException as ex: # Mainly expect to see: # org.freedesktop.Avahi.TimeoutError: Timeout reached printwarn(ex) - def all_for_now(self): - self.loop.quit() + def _item_remove(self, *args): + self.services.remove(AvahiService(*args)) + + def _failure(self, error): + printwarn(error) + + def discovered_services(self): + return self.services.values() + + +def cache_main(argv): + os.setsid() + os.chdir('/') + + loop = GLib.MainLoop() + + GLib.timeout_add_seconds(60, loop.quit) + + lis = socket.fromfd(0, socket.AF_UNIX, socket.SOCK_STREAM) + + def start_accepting(): + channel = GLib.IOChannel.unix_new(lis.fileno()) + GLib.io_add_watch(channel, GLib.IO_IN, accept, None) + + def accept(source, condition, data): + # TODO: reset quit timer? + (sock, addr) = lis.accept() + with sock: + sock.shutdown(socket.SHUT_RD) + # remove duplicates (eg services offered on more than a network + # card etc.) + clients = set() + for client in browser.discovered_services(): + clients.add(Service(str(client.name), str(client.host), + str(client.address), int(client.port))) + pkgdistcache_clients = list(clients) + with sock.makefile('wb') as f: + pickle.dump(pkgdistcache_clients, f, -1) + return True + + # discover other pkgdistcache capable hosts via avahi + browser = AvahiBrowser() + browser.browse("_pkgdistcache._tcp", start_accepting) + + loop.run() + + # We don't unlink the socket file, since we don't easily know whether our + # current fd corresponds to the current file + lis.close() + return 0 + + +def spawn_cache_process(cache_file): + try: + # Clean up stale file + os.unlink(cache_file) + except FileNotFoundError: + pass + with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock: + try: + sock.bind(cache_file) + except OSError as e: + if e.errno != 98: + raise + # OSError: [Errno 98] Address already in use + return # Assume race, where a cache process is already running + + sock.listen() + subprocess.Popen([__file__, '--cache'], stdin=sock) + + +def connect_to_cache_process(cache_file): + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + while True: + try: + sock.connect(cache_file) + return sock + except ConnectionRefusedError as e: + spawn_cache_process(cache_file) + + +def fetch_from_peer(pkg, dst, download_cmd_template): + runtime_dir = xdg.BaseDirectory.get_runtime_dir(strict=False) + cache_file = os.path.join(runtime_dir, 'pkgdistcache') + + with connect_to_cache_process(cache_file) as sock: + sock.shutdown(socket.SHUT_WR) + with sock.makefile('rb') as f: + pkgdistcache_clients = pickle.load(f) + + if pkgdistcache_clients: + print("-- Discovered hosts: %s" + % ", ".join(set([c.host for c in pkgdistcache_clients]))) + else: + print("-- No hosts discovered") + + for client in pkgdistcache_clients: + clientip = client.ip + if ":" in clientip: + clientip = "[" + clientip + "]" + url = "http://" + clientip + ":" + str(client.port) + "/" + pkg + try: + r = requests.head(url, timeout=1) + except Exception as e: + printwarn("Failed checking host '%s' with ip '%s': %s" + % (client.host, clientip, repr(e))) + continue + if r.status_code != 200: + continue + + printmsg("Downloading %s from host '%s'" % (pkg, client.host)) + download_cmd = download_cmd_template.substitute( + {'u': url, 'o': dst}) + ret = runcmd2(download_cmd) + if ret == 0: + return True + else: + printwarn("Host '%s' doesn't have %s in cache" + % (client.host, pkg)) + return False def main(argv): @@ -145,74 +288,16 @@ def main(argv): download_cmd_template = string.Template(config['download_cmd']) pkg = os.path.basename(argv[1]) # argv[1] = %u passed by pacman + dst = argv[2] # argv[2] = %o passed by pacman must_download = True if not (pkg.endswith('.db') or pkg.endswith('.db.sig')): - runtime_dir = xdg.BaseDirectory.get_runtime_dir(strict=False) - cache_file = os.path.join(runtime_dir, 'pkgdistcache') try: - # No exception means file exists - stat = os.stat(cache_file) - # Check age of file - cache_life_secs = int(config['cache_file_life']) * 60 - cache_valid = time.time() < stat.st_mtime + cache_life_secs - except FileNotFoundError: - cache_valid = False - if cache_valid: - with open(cache_file, 'rb') as f: - pkgdistcache_clients = pickle.load(f) - else: - # recent cache file not found, discover other pkgdistcache capable - # hosts via avahi and save result to a new cache file - browser = AvahiBrowser() - clients = browser.browse("_pkgdistcache._tcp") - # remove duplicates (eg services offered on more than a network - # card etc.) - clients = set(clients) - pkgdistcache_clients = list(clients) - - try: - os.unlink(cache_file) # remove any old cache file - except FileNotFoundError: - pass - with open(cache_file, 'wb') as f: - pickle.dump(pkgdistcache_clients, f, -1) - - if pkgdistcache_clients: - print("-- Discovered hosts: %s" - % ", ".join(set([c.host for c in pkgdistcache_clients]))) - else: - print("-- No hosts discovered") - - for client in pkgdistcache_clients: - clientip = client.ip - if ":" in clientip: - clientip = "[" + clientip + "]" - url = "http://" + clientip + ":" + str(client.port) + "/" + pkg - try: - r = requests.head(url, timeout=1) - except Exception as e: - printwarn("Failed checking host '%s' with ip '%s': %s" - % (client.host, clientip, repr(e))) - continue - if r.status_code != 200: - continue - - dst = argv[2] - printmsg("Downloading %s from host '%s'" % (pkg, client.host)) - download_cmd = download_cmd_template.substitute( - {'u': url, 'o': dst}) - try: - ret = runcmd2(download_cmd) - if ret == 0: - must_download = False - break - else: - printwarn("Host '%s' doesn't have %s in cache" - % (client.host, pkg)) - except KeyboardInterrupt: - printerr("Aborted") - return 1 + must_download = not fetch_from_peer(pkg, dst, + download_cmd_template) + except KeyboardInterrupt: + printerr("Aborted") + return 1 # download package file from mirror if necessary if must_download: @@ -229,4 +314,7 @@ def main(argv): if __name__ == '__main__': - sys.exit(main(sys.argv)) + if len(sys.argv) == 2 and sys.argv[1] == '--cache': + sys.exit(cache_main(sys.argv)) + else: + sys.exit(main(sys.argv)) diff --git a/pkgdistcache.conf b/pkgdistcache.conf index 1271087c0464..8573795b5aa2 100644 --- a/pkgdistcache.conf +++ b/pkgdistcache.conf @@ -2,11 +2,6 @@ # pkgdistcache configuration file { - # The maximum life for the cache file in minutes. After this - # amount of time, a new Avahi query will be done to discover - # pkgdistcache-capable hosts on local net. - 'cache_file_life': 10, - # The TCP port for the daemon to listen on 'port': 12500, |