summarylogtreecommitdiffstats
diff options
context:
space:
mode:
authorHLFH2015-12-16 19:19:01 +0100
committerHLFH2015-12-16 19:19:01 +0100
commit023851e89fd3c94942a1ca179e76e36def1626a2 (patch)
tree6d938abe19b765970b3b35f8bc36f8be786ae4db
downloadaur-023851e89fd3c94942a1ca179e76e36def1626a2.tar.gz
Ajout de agetpkg dans AUR
-rw-r--r--.SRCINFO16
-rw-r--r--PKGBUILD19
-rwxr-xr-xagetpkg404
3 files changed, 439 insertions, 0 deletions
diff --git a/.SRCINFO b/.SRCINFO
new file mode 100644
index 000000000000..479b8633be44
--- /dev/null
+++ b/.SRCINFO
@@ -0,0 +1,16 @@
+# Generated by mksrcinfo v8
+# Wed Dec 16 18:18:21 UTC 2015
+pkgbase = agetpkg-git
+ pkgdesc = Archlinux Archive Get Package (Git version)
+ pkgver = 0
+ pkgrel = 1
+ url = https://github.com/seblu/agetpkg
+ arch = any
+ license = GPL2
+ makedepends = git
+ depends = python
+ depends = python-xdg
+ conflicts = agetpkg
+
+pkgname = agetpkg-git
+
diff --git a/PKGBUILD b/PKGBUILD
new file mode 100644
index 000000000000..be9f0a414229
--- /dev/null
+++ b/PKGBUILD
@@ -0,0 +1,19 @@
+# Maintainer: Sébastien Luttringer
+
+pkgname=agetpkg-git
+pkgver="$(git log --pretty=format:''|wc -l)"
+pkgrel=1
+pkgdesc='Archlinux Archive Get Package (Git version)'
+arch=('any')
+url='https://github.com/seblu/agetpkg'
+license=('GPL2')
+makedepends=('git')
+depends=('python' 'python-xdg')
+conflicts=('agetpkg')
+
+package() {
+ cd "$startdir"
+ install -Dm755 agetpkg "$pkgdir/usr/bin/agetpkg"
+}
+
+# vim:set ts=2 sw=2 et:
diff --git a/agetpkg b/agetpkg
new file mode 100755
index 000000000000..27e9705931a7
--- /dev/null
+++ b/agetpkg
@@ -0,0 +1,404 @@
+#!/usr/bin/python3
+# coding: utf-8
+
+# agetpkg - Archive Get Package
+# Copyright © 2015 Sébastien Luttringer
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+
+'''Archlinux Archive Get Package'''
+
+from argparse import ArgumentParser
+from collections import OrderedDict
+from email.utils import parsedate
+from logging import getLogger, error, warn, debug, DEBUG
+from lzma import open as xzopen
+from os import stat, uname, getcwd, chdir, geteuid, environ
+from os.path import basename, exists, join
+from pprint import pprint
+from re import match, compile as recompile
+from shutil import copyfileobj
+from subprocess import call
+from tempfile import TemporaryDirectory
+from time import mktime, time
+from urllib.request import urlopen, Request
+from xdg.BaseDirectory import save_cache_path
+
+# magics
+NAME = "agetpkg"
+VERSION = "1"
+ARCHIVE_URL = "http://ala.seblu.net/packages/.all/"
+INDEX_FILENAME = "index.0.xz"
+PKG_EXT = ".pkg.tar.xz"
+SIG_EXT = ".sig"
+
+class Error(BaseException):
+ """Error"""
+ ERR_USAGE = 1
+ ERR_FATAL = 2
+ ERR_ABORT = 3
+ ERR_UNKNOWN = 4
+
+class Url(object):
+ """Remote Ressource"""
+
+ HTTP_HEADERS = {
+ "User-Agent": "%s v%s" % (NAME, VERSION),
+ }
+
+ def __init__(self, url, timeout):
+ self.url = url
+ self.timeout = timeout
+
+ def __str__(self):
+ return self.url
+
+ @property
+ def exists(self):
+ try:
+ self.headers
+ return True
+ except Exception:
+ return False
+
+ @property
+ def size(self):
+ """Return the remote file size"""
+ try:
+ return int(self.headers["Content-Length"])
+ except Exception as exp:
+ raise Error("Failed to get size of %s: %s" % (self.url, exp))
+
+ @property
+ def lastmod(self):
+ try:
+ return int(mktime(parsedate(self.headers["Last-Modified"])))
+ except Exception as exp:
+ raise Error("Failed to get last modification date of %s: %s" % (self.url, exp))
+
+ @property
+ def headers(self):
+ """Return a dict with url headers"""
+ if not hasattr(self, "_headers"):
+ try:
+ debug("Request headers on URL: %s" % self.url)
+ url_req = Request(self.url, method="HEAD", headers=self.HTTP_HEADERS)
+ remote_fd = urlopen(url_req, timeout=self.timeout)
+ self._headers = dict(remote_fd.getheaders())
+ except Exception as exp:
+ raise Error("Failed to get headers at %s: %s" % (self, exp))
+ return getattr(self, "_headers")
+
+ def download(self, destination):
+ """Download URL to destination"""
+ debug("Downloading from : %s" % self.url)
+ debug(" to : %s" % destination)
+ try:
+ url_req = Request(self.url, headers=self.HTTP_HEADERS)
+ remote_fd = urlopen(url_req, timeout=self.timeout)
+ local_fd = open(destination, "wb")
+ copyfileobj(remote_fd, local_fd)
+ except Exception as exp:
+ raise Error("Failed to download %s: %s" % (self, exp))
+
+class Package(Url):
+ """Abstract a multi versionned package"""
+
+ def __init__(self, url, timeout):
+ self.url = Url(url, timeout)
+ self.sigurl = Url(url + SIG_EXT, timeout)
+ self.timeout = timeout
+ self.filename = basename(url)
+ self.sigfilename = self.filename + SIG_EXT
+ # regex is not strict, but we are not validating something here
+ m = match("^([\w@._+-]+)-((?:(\d+):)?([^-]+)-([^-]+))-(\w+)", self.filename)
+ if m is None:
+ raise Error("Unable to parse package info from filename %s" % self.filename)
+ (self.name, self.full_version, self.epoch, self.version, self.release,
+ self.arch) = m.groups()
+ # no epoch means 0 (man PKGBUILD)
+ if self.epoch is None:
+ self.epoch = 0
+
+ def __str__(self):
+ return self.filename
+
+ def __getitem__(self, key):
+ try:
+ return getattr(self, key)
+ except AttributeError:
+ raise KeyError()
+
+ @property
+ def size(self):
+ """Return package Content-Length (size in bytes)"""
+ return self.url.size
+
+ @property
+ def lastmod(self):
+ """Return package Last-Modified date (in seconds since epoch)"""
+ return self.url.lastmod
+
+ def get(self, sign=True, force=False):
+ """Download the package locally"""
+ if not force:
+ if exists(self.filename):
+ raise Error("Local file %s already exists" % self.filename)
+ if sign and exists(self.sigfilename):
+ raise Error("Local file %s already exists" % self.sigfilename)
+ self.url.download(self.filename)
+ if sign and self.sigurl.exists:
+ self.sigurl.download(self.sigfilename)
+
+class Archive(object):
+ """Abstract access to the package Archive"""
+
+ def __init__(self, url, timeout, update=1):
+ """Init the Archive interface
+ url of the archive (flat style)
+ update = update the local index cache (0: never, 1: when needed, 2: always)
+ timeout = the socket timeout for network requests
+ """
+ if url[-1] != "/":
+ raise Error("Archive URL must end with a /")
+ self.url = url
+ self.remote_index = Url(self.url + INDEX_FILENAME, timeout)
+ self.local_index = join(save_cache_path(NAME), INDEX_FILENAME)
+ self.timeout = timeout
+ if update > 0:
+ self.update_index(update == 2)
+ self._load_index()
+
+ def _load_index(self):
+ debug("Loading index from %s" % self.local_index)
+ fd = xzopen(self.local_index, "rb")
+ self._index = OrderedDict()
+ for line in fd.readlines():
+ key = line.decode().rstrip()
+ self._index[key] = Package("%s%s%s" % (self.url, key, PKG_EXT), self.timeout)
+ debug("Index loaded: %s packages" % len(self._index))
+
+ def update_index(self, force=False):
+ """Update index remotely when needed"""
+ debug("Check remote index for upgrade")
+ if force:
+ return self.remote_index.download(self.local_index)
+ # get remote info
+ try:
+ remote_size = self.remote_index.size
+ remote_lastmod = self.remote_index.lastmod
+ except Exception as exp:
+ debug("Failed to get remote index size/lastmod: %s" % exp)
+ return self.remote_index.download(self.local_index)
+ # get local info
+ try:
+ local_st = stat(self.local_index)
+ except Exception as exp:
+ debug("Failed to get local stat: %s" % exp)
+ return self.remote_index.download(self.local_index)
+ # compare size
+ if remote_size != local_st.st_size:
+ debug("Size differ between remote and local index (%s vs %s)" % (remote_size, local_st.st_size))
+ return self.remote_index.download(self.local_index)
+ # compare date
+ elif remote_lastmod > local_st.st_mtime:
+ debug("Remote index is newer than local, updating it (%s vs %s)" % (remote_lastmod, local_st.st_mtime))
+ return self.remote_index.download(self.local_index)
+ debug("Remote and local indexes seems equal. No update.")
+
+ def search(self, name_pattern, version_pattern, release_pattern, arch_list=None):
+ """Search for a package """
+ name_regex = recompile(name_pattern)
+ version_regex = recompile(version_pattern) if version_pattern is not None else None
+ release_regex = recompile(release_pattern) if release_pattern is not None else None
+ res = list()
+ for pkg in self._index.values():
+ if name_regex.search(pkg.name):
+ # check against arch
+ if arch_list is not None and len(arch_list) > 0:
+ if pkg.arch not in arch_list:
+ continue
+ # check against version
+ if version_regex is not None:
+ if not version_regex.search(pkg.version):
+ continue
+ # check against release
+ if release_regex is not None:
+ if not release_regex.search(pkg.release):
+ continue
+ res += [pkg]
+ return res
+
+def which(binary):
+ """lookup if bin exists into PATH"""
+ dirs = environ.get("PATH", "").split(":")
+ for d in dirs:
+ if exists(join(d, binary)):
+ return True
+ return False
+
+def pacman(args, asroot=True):
+ """execute pacman (optionally as root)"""
+ cmd = ["pacman" ] + args
+ # add sudo or su if not root and
+ if asroot and geteuid() != 0:
+ if which("sudo"):
+ cmd = ["sudo"] + cmd
+ elif which("su"):
+ cmd = ["su", "root", "-c=%s" % " ".join(cmd) ]
+ else:
+ error("Unable to execute as root: %s" % " ".join(cmd))
+ debug("calling: %s" % cmd)
+ call(cmd, close_fds=True)
+
+def list_packages(packages, long=False):
+ """display a list of packages on stdout"""
+ if long:
+ pattern = "%(name)s %(full_version)s %(arch)s %(size)s %(lastmod)s %(url)s"
+ else:
+ pattern = "%(name)s %(full_version)s %(arch)s"
+ for package in packages:
+ print(pattern % package)
+
+def select_packages(packages, select_all=False):
+ """select a package in a list"""
+ # shortcut to one package
+ if len(packages) == 1:
+ yield packages[0]
+ elif select_all:
+ for pkg in packages:
+ yield pkg
+ else:
+ # display a list of packages to select
+ index = dict(enumerate(packages))
+ pad = len("%d" % max(index.keys()))
+ for i, pkg in index.items():
+ print("{:{pad}} {}".format(i, pkg, pad=pad))
+ selection = ""
+ while not match("^(\d+ ){0,}\d+$", selection):
+ selection = input("Select packages (* for all): ").strip()
+ if selection == "":
+ return
+ # shortcut to select all packages
+ if selection == "*":
+ for pkg in packages:
+ yield pkg
+ return
+ # parse selection
+ numbers = [ int(x) for x in selection.split(" ") ]
+ for num in numbers:
+ if num in index.keys():
+ yield index[num]
+ else:
+ warn("No package n°%s" % num)
+
+def get_packages(packages, select_all=False):
+ """download packages"""
+ for pkg in select_packages(packages, select_all):
+ pkg.get()
+
+def install_packages(packages, select_all=False):
+ """install packages in one shot to allow deps to work"""
+ packages = list(select_packages(packages, select_all))
+ with TemporaryDirectory() as tmpdir:
+ cwd = getcwd()
+ chdir(tmpdir)
+ for pkg in packages:
+ pkg.get()
+ pacman(["-U"] + [ pkg.filename for pkg in packages ])
+ chdir(cwd)
+
+def parse_argv():
+ '''Parse command line arguments'''
+ local_arch = uname().machine
+ p_main = ArgumentParser()
+ # update index options
+ g_update = p_main.add_mutually_exclusive_group()
+ g_update.add_argument("-u", "--force-update",
+ action="store_const", dest="update", const=2, default=1,
+ help="force index update")
+ g_update.add_argument("-U", "--no-update",
+ action="store_const", dest="update", const=0,
+ help="disable index update")
+ # action mode options
+ g_action = p_main.add_mutually_exclusive_group()
+ g_action.add_argument("-g", "--get", action="store_const", dest="mode",
+ const="get", help="get matching packages (default mode)")
+ g_action.add_argument("-l", "--list", action="store_const", dest="mode",
+ const="list", help="only list matching packages")
+ g_action.add_argument("-i", "--install", action="store_const", dest="mode",
+ const="install", help="install matching packages")
+ # common options
+ p_main.add_argument("-a", "--all", action="store_true",
+ help="select all packages without prompting")
+ p_main.add_argument("-A", "--arch", nargs="*", default=[local_arch, "any"],
+ help="filter by architectures (default: %s and any. empty means all)" % local_arch)
+ p_main.add_argument("-v", "--verbose", action="store_true",
+ help="display more information")
+ p_main.add_argument("--url", help="archive URL, default: %s" % ARCHIVE_URL,
+ default=environ.get("ARCHIVE_URL", ARCHIVE_URL))
+ p_main.add_argument("-t", "--timeout", default=10,
+ help="connection timeout (default: 10s)")
+ p_main.add_argument("--version", action="version",
+ version="%(prog)s version " + VERSION)
+ p_main.add_argument("--debug", action="store_true",
+ help="debug mode")
+ # positional args
+ p_main.add_argument("package",
+ help="regex to match a package name")
+ p_main.add_argument("version", nargs="?",
+ help="regex to match a package version")
+ p_main.add_argument("release", nargs="?",
+ help="regex to match a package release")
+ return p_main.parse_args()
+
+def main():
+ '''Program entry point'''
+ try:
+ # parse command line
+ args = parse_argv()
+ # set global debug mode
+ if args.debug:
+ getLogger().setLevel(DEBUG)
+ # init archive interface
+ archive = Archive(args.url, args.timeout, args.update)
+ # select target pacakges
+ packages = archive.search(args.package, args.version, args.release, args.arch)
+ if len(packages) == 0:
+ print("No match found.")
+ exit(0)
+ if args.mode == "list":
+ list_packages(packages, long=args.verbose)
+ elif args.mode == "install":
+ install_packages(packages, args.all)
+ else:
+ get_packages(packages, args.all)
+ except KeyboardInterrupt:
+ exit(Error.ERR_ABORT)
+ except Error as exp:
+ error(exp)
+ exit(Error.ERR_FATAL)
+ except Exception as exp:
+ error("Unknown error. Please report it with --debug")
+ error("at https://github.com/seblu/agetpkg/issues.")
+ error(exp)
+ if getLogger().getEffectiveLevel() == DEBUG:
+ raise
+ exit(Error.ERR_UNKNOWN)
+
+if __name__ == '__main__':
+ main()
+
+# vim:set ts=4 sw=4 et ai: