summarylogtreecommitdiffstats
diff options
context:
space:
mode:
authorEric Anderson2017-12-28 23:39:49 -0600
committerEric Anderson2017-12-28 23:39:49 -0600
commitead7cc8bdba22f846237e6af92fbd89a483d5e79 (patch)
tree3a21a0e24038fa152f640a93ee615b0fd45c669a
parent9e79f3864bb93cf494da1e00d572be8eefe30a36 (diff)
downloadaur-ead7cc8bdba22f846237e6af92fbd89a483d5e79.tar.gz
Bump to 0.5.0 for daemon-based cache
This keeps the cache continually up-to-date as the daemon is provided updates from Avahi as devices come and go. The daemon automatically exits after a while to clean itself up.
-rw-r--r--.SRCINFO10
-rw-r--r--PKGBUILD8
-rwxr-xr-xpkgdistcache-client260
-rw-r--r--pkgdistcache.conf5
4 files changed, 183 insertions, 100 deletions
diff --git a/.SRCINFO b/.SRCINFO
index e99736f1b2e3..721ce940875b 100644
--- a/.SRCINFO
+++ b/.SRCINFO
@@ -1,9 +1,9 @@
# Generated by mksrcinfo v8
-# Thu Dec 28 20:23:42 UTC 2017
+# Fri Dec 29 05:39:42 UTC 2017
pkgbase = pkgdistcache
pkgdesc = A distributed local-network cache for pacman packages
- pkgver = 0.4.6
- pkgrel = 3
+ pkgver = 0.5.0
+ pkgrel = 1
url = http://venator.ath.cx/dw/doku.php?id=linux:pkgdistcache
install = pkgdistcache.install
arch = any
@@ -19,9 +19,9 @@ pkgbase = pkgdistcache
source = pkgdistcache-daemon
source = pkgdistcache.conf
source = pkgdistcached.service
- sha256sums = 4141d8d07a7c67ebd0dba24ca8aa2a97bcf93a2915b7afbebbd85728bbbc356c
+ sha256sums = f80850ff4cfe775e0dd73961924d5eb934e2c4be6a8d8e3e2c3c404835e7f05b
sha256sums = 10379b95265e7aa3c6334197ef255327281e35b958c3c062ae893dd3a646a66e
- sha256sums = d77ac418aa651bc622cd91204d6907554c6cdb4bb989e484cc54da32342faa51
+ sha256sums = 5628cf4d81adaf7a3dd7243f51af1a30fff73a31b421615a7c0051280618f457
sha256sums = b5fb3b3d40b31af92a1676364dfc42a35f564067988697ea4d56fd71b4b414e3
pkgname = pkgdistcache
diff --git a/PKGBUILD b/PKGBUILD
index 1023485d724c..dccd7705e7c2 100644
--- a/PKGBUILD
+++ b/PKGBUILD
@@ -2,8 +2,8 @@
# Maintainer: Eric Anderson <ejona86@gmail.com>
pkgname=pkgdistcache
-pkgver=0.4.6
-pkgrel=3
+pkgver=0.5.0
+pkgrel=1
pkgdesc='A distributed local-network cache for pacman packages'
arch=('any')
url='http://venator.ath.cx/dw/doku.php?id=linux:pkgdistcache'
@@ -14,9 +14,9 @@ source=('pkgdistcache-client'
'pkgdistcache-daemon'
'pkgdistcache.conf'
'pkgdistcached.service')
-sha256sums=('4141d8d07a7c67ebd0dba24ca8aa2a97bcf93a2915b7afbebbd85728bbbc356c'
+sha256sums=('f80850ff4cfe775e0dd73961924d5eb934e2c4be6a8d8e3e2c3c404835e7f05b'
'10379b95265e7aa3c6334197ef255327281e35b958c3c062ae893dd3a646a66e'
- 'd77ac418aa651bc622cd91204d6907554c6cdb4bb989e484cc54da32342faa51'
+ '5628cf4d81adaf7a3dd7243f51af1a30fff73a31b421615a7c0051280618f457'
'b5fb3b3d40b31af92a1676364dfc42a35f564067988697ea4d56fd71b4b414e3')
package() {
diff --git a/pkgdistcache-client b/pkgdistcache-client
index c153d40052d8..dd499cc040cf 100755
--- a/pkgdistcache-client
+++ b/pkgdistcache-client
@@ -5,9 +5,11 @@
# by Alessio Bianchi <venator85@gmail.com>
#
+import collections
import os
import os.path
import pickle
+import socket
import string
import subprocess
import sys
@@ -18,7 +20,7 @@ import dbus
import dbus.glib
import requests
import xdg.BaseDirectory
-from gi.repository import GObject
+from gi.repository import GLib
colors = {
'none': '\033[0m',
@@ -73,6 +75,25 @@ class Service(object):
return hash(self) == hash(other)
+def once(func):
+ def wrapped(*args):
+ if wrapped.ran:
+ return
+ wrapped.ran = True
+ return func(*args)
+ wrapped.ran = False
+ return wrapped
+
+
+AvahiService = collections.namedtuple(
+ 'AvahiService',
+ ['interface', 'protocol', 'name', 'stype', 'domain', 'flags'])
+AvahiResolvedService = collections.namedtuple(
+ 'AvahiResolvedService',
+ ['interface', 'protocol', 'name', 'type', 'domain', 'host',
+ 'aprotocol', 'address', 'port', 'txt', 'flags'])
+
+
class AvahiBrowser(object):
def __init__(self):
# Connect to the system bus...
@@ -84,10 +105,11 @@ class AvahiBrowser(object):
self.server = dbus.Interface(avahi_proxy, avahi.DBUS_INTERFACE_SERVER)
self.version_string = self.server.GetVersionString()
self.domain = "local"
- self.loop = GObject.MainLoop()
- self.services = []
+ self.services = {}
+
+ def browse(self, stype, all_for_now_callback):
+ all_for_now_callback = once(all_for_now_callback)
- def browse(self, stype):
# Ask the server for a path to the browser object for the service we're
# interested in...
browser_path = self.server.ServiceBrowserNew(
@@ -103,34 +125,155 @@ class AvahiBrowser(object):
browser_proxy, avahi.DBUS_INTERFACE_SERVICE_BROWSER)
# Now connect the call backs to the relevant signals.
- browser.connect_to_signal('ItemNew', self.new_service)
- browser.connect_to_signal('AllForNow', self.all_for_now)
- self.loop.run()
- return self.services
-
- def new_service(self, interface, protocol, name, stype, domain, flags):
- if flags & avahi.LOOKUP_RESULT_LOCAL:
+ browser.connect_to_signal('ItemNew', self._item_new)
+ browser.connect_to_signal('ItemRemove', self._item_remove)
+ browser.connect_to_signal('AllForNow', all_for_now_callback)
+
+ def failure(*args):
+ self._failure(*args)
+ all_for_now_callback()
+ browser.connect_to_signal('Failure', failure)
+
+ def _item_new(self, *args):
+ service = AvahiService(*args)
+ if service.flags & avahi.LOOKUP_RESULT_LOCAL:
# The service is on this machine; ignore
return
try:
s = self.server.ResolveService(
- interface,
- protocol,
- name,
- stype,
- domain,
+ service.interface,
+ service.protocol,
+ service.name,
+ service.stype,
+ service.domain,
avahi.PROTO_UNSPEC,
dbus.UInt32(0))
- # service name, host, ip, port
- service = Service(str(s[3]), str(s[2]), str(s[7]), int(s[8]))
- self.services.append(service)
+ self.services[service] = AvahiResolvedService(*s)
except dbus.DBusException as ex:
# Mainly expect to see:
# org.freedesktop.Avahi.TimeoutError: Timeout reached
printwarn(ex)
- def all_for_now(self):
- self.loop.quit()
+ def _item_remove(self, *args):
+ self.services.remove(AvahiService(*args))
+
+ def _failure(self, error):
+ printwarn(error)
+
+ def discovered_services(self):
+ return self.services.values()
+
+
+def cache_main(argv):
+ os.setsid()
+ os.chdir('/')
+
+ loop = GLib.MainLoop()
+
+ GLib.timeout_add_seconds(60, loop.quit)
+
+ lis = socket.fromfd(0, socket.AF_UNIX, socket.SOCK_STREAM)
+
+ def start_accepting():
+ channel = GLib.IOChannel.unix_new(lis.fileno())
+ GLib.io_add_watch(channel, GLib.IO_IN, accept, None)
+
+ def accept(source, condition, data):
+ # TODO: reset quit timer?
+ (sock, addr) = lis.accept()
+ with sock:
+ sock.shutdown(socket.SHUT_RD)
+ # remove duplicates (eg services offered on more than a network
+ # card etc.)
+ clients = set()
+ for client in browser.discovered_services():
+ clients.add(Service(str(client.name), str(client.host),
+ str(client.address), int(client.port)))
+ pkgdistcache_clients = list(clients)
+ with sock.makefile('wb') as f:
+ pickle.dump(pkgdistcache_clients, f, -1)
+ return True
+
+ # discover other pkgdistcache capable hosts via avahi
+ browser = AvahiBrowser()
+ browser.browse("_pkgdistcache._tcp", start_accepting)
+
+ loop.run()
+
+ # We don't unlink the socket file, since we don't easily know whether our
+ # current fd corresponds to the current file
+ lis.close()
+ return 0
+
+
+def spawn_cache_process(cache_file):
+ try:
+ # Clean up stale file
+ os.unlink(cache_file)
+ except FileNotFoundError:
+ pass
+ with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock:
+ try:
+ sock.bind(cache_file)
+ except OSError as e:
+ if e.errno != 98:
+ raise
+ # OSError: [Errno 98] Address already in use
+ return # Assume race, where a cache process is already running
+
+ sock.listen()
+ subprocess.Popen([__file__, '--cache'], stdin=sock)
+
+
+def connect_to_cache_process(cache_file):
+ sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ while True:
+ try:
+ sock.connect(cache_file)
+ return sock
+ except ConnectionRefusedError as e:
+ spawn_cache_process(cache_file)
+
+
+def fetch_from_peer(pkg, dst, download_cmd_template):
+ runtime_dir = xdg.BaseDirectory.get_runtime_dir(strict=False)
+ cache_file = os.path.join(runtime_dir, 'pkgdistcache')
+
+ with connect_to_cache_process(cache_file) as sock:
+ sock.shutdown(socket.SHUT_WR)
+ with sock.makefile('rb') as f:
+ pkgdistcache_clients = pickle.load(f)
+
+ if pkgdistcache_clients:
+ print("-- Discovered hosts: %s"
+ % ", ".join(set([c.host for c in pkgdistcache_clients])))
+ else:
+ print("-- No hosts discovered")
+
+ for client in pkgdistcache_clients:
+ clientip = client.ip
+ if ":" in clientip:
+ clientip = "[" + clientip + "]"
+ url = "http://" + clientip + ":" + str(client.port) + "/" + pkg
+ try:
+ r = requests.head(url, timeout=1)
+ except Exception as e:
+ printwarn("Failed checking host '%s' with ip '%s': %s"
+ % (client.host, clientip, repr(e)))
+ continue
+ if r.status_code != 200:
+ continue
+
+ printmsg("Downloading %s from host '%s'" % (pkg, client.host))
+ download_cmd = download_cmd_template.substitute(
+ {'u': url, 'o': dst})
+ ret = runcmd2(download_cmd)
+ if ret == 0:
+ return True
+ else:
+ printwarn("Host '%s' doesn't have %s in cache"
+ % (client.host, pkg))
+ return False
def main(argv):
@@ -145,74 +288,16 @@ def main(argv):
download_cmd_template = string.Template(config['download_cmd'])
pkg = os.path.basename(argv[1]) # argv[1] = %u passed by pacman
+ dst = argv[2] # argv[2] = %o passed by pacman
must_download = True
if not (pkg.endswith('.db') or pkg.endswith('.db.sig')):
- runtime_dir = xdg.BaseDirectory.get_runtime_dir(strict=False)
- cache_file = os.path.join(runtime_dir, 'pkgdistcache')
try:
- # No exception means file exists
- stat = os.stat(cache_file)
- # Check age of file
- cache_life_secs = int(config['cache_file_life']) * 60
- cache_valid = time.time() < stat.st_mtime + cache_life_secs
- except FileNotFoundError:
- cache_valid = False
- if cache_valid:
- with open(cache_file, 'rb') as f:
- pkgdistcache_clients = pickle.load(f)
- else:
- # recent cache file not found, discover other pkgdistcache capable
- # hosts via avahi and save result to a new cache file
- browser = AvahiBrowser()
- clients = browser.browse("_pkgdistcache._tcp")
- # remove duplicates (eg services offered on more than a network
- # card etc.)
- clients = set(clients)
- pkgdistcache_clients = list(clients)
-
- try:
- os.unlink(cache_file) # remove any old cache file
- except FileNotFoundError:
- pass
- with open(cache_file, 'wb') as f:
- pickle.dump(pkgdistcache_clients, f, -1)
-
- if pkgdistcache_clients:
- print("-- Discovered hosts: %s"
- % ", ".join(set([c.host for c in pkgdistcache_clients])))
- else:
- print("-- No hosts discovered")
-
- for client in pkgdistcache_clients:
- clientip = client.ip
- if ":" in clientip:
- clientip = "[" + clientip + "]"
- url = "http://" + clientip + ":" + str(client.port) + "/" + pkg
- try:
- r = requests.head(url, timeout=1)
- except Exception as e:
- printwarn("Failed checking host '%s' with ip '%s': %s"
- % (client.host, clientip, repr(e)))
- continue
- if r.status_code != 200:
- continue
-
- dst = argv[2]
- printmsg("Downloading %s from host '%s'" % (pkg, client.host))
- download_cmd = download_cmd_template.substitute(
- {'u': url, 'o': dst})
- try:
- ret = runcmd2(download_cmd)
- if ret == 0:
- must_download = False
- break
- else:
- printwarn("Host '%s' doesn't have %s in cache"
- % (client.host, pkg))
- except KeyboardInterrupt:
- printerr("Aborted")
- return 1
+ must_download = not fetch_from_peer(pkg, dst,
+ download_cmd_template)
+ except KeyboardInterrupt:
+ printerr("Aborted")
+ return 1
# download package file from mirror if necessary
if must_download:
@@ -229,4 +314,7 @@ def main(argv):
if __name__ == '__main__':
- sys.exit(main(sys.argv))
+ if len(sys.argv) == 2 and sys.argv[1] == '--cache':
+ sys.exit(cache_main(sys.argv))
+ else:
+ sys.exit(main(sys.argv))
diff --git a/pkgdistcache.conf b/pkgdistcache.conf
index 1271087c0464..8573795b5aa2 100644
--- a/pkgdistcache.conf
+++ b/pkgdistcache.conf
@@ -2,11 +2,6 @@
# pkgdistcache configuration file
{
- # The maximum life for the cache file in minutes. After this
- # amount of time, a new Avahi query will be done to discover
- # pkgdistcache-capable hosts on local net.
- 'cache_file_life': 10,
-
# The TCP port for the daemon to listen on
'port': 12500,