diff options
author | HLFH | 2015-12-16 19:19:01 +0100 |
---|---|---|
committer | HLFH | 2015-12-16 19:19:01 +0100 |
commit | 023851e89fd3c94942a1ca179e76e36def1626a2 (patch) | |
tree | 6d938abe19b765970b3b35f8bc36f8be786ae4db | |
download | aur-023851e89fd3c94942a1ca179e76e36def1626a2.tar.gz |
Ajout de agetpkg dans AUR
-rw-r--r-- | .SRCINFO | 16 | ||||
-rw-r--r-- | PKGBUILD | 19 | ||||
-rwxr-xr-x | agetpkg | 404 |
3 files changed, 439 insertions, 0 deletions
diff --git a/.SRCINFO b/.SRCINFO new file mode 100644 index 000000000000..479b8633be44 --- /dev/null +++ b/.SRCINFO @@ -0,0 +1,16 @@ +# Generated by mksrcinfo v8 +# Wed Dec 16 18:18:21 UTC 2015 +pkgbase = agetpkg-git + pkgdesc = Archlinux Archive Get Package (Git version) + pkgver = 0 + pkgrel = 1 + url = https://github.com/seblu/agetpkg + arch = any + license = GPL2 + makedepends = git + depends = python + depends = python-xdg + conflicts = agetpkg + +pkgname = agetpkg-git + diff --git a/PKGBUILD b/PKGBUILD new file mode 100644 index 000000000000..be9f0a414229 --- /dev/null +++ b/PKGBUILD @@ -0,0 +1,19 @@ +# Maintainer: Sébastien Luttringer + +pkgname=agetpkg-git +pkgver="$(git log --pretty=format:''|wc -l)" +pkgrel=1 +pkgdesc='Archlinux Archive Get Package (Git version)' +arch=('any') +url='https://github.com/seblu/agetpkg' +license=('GPL2') +makedepends=('git') +depends=('python' 'python-xdg') +conflicts=('agetpkg') + +package() { + cd "$startdir" + install -Dm755 agetpkg "$pkgdir/usr/bin/agetpkg" +} + +# vim:set ts=2 sw=2 et: diff --git a/agetpkg b/agetpkg new file mode 100755 index 000000000000..27e9705931a7 --- /dev/null +++ b/agetpkg @@ -0,0 +1,404 @@ +#!/usr/bin/python3 +# coding: utf-8 + +# agetpkg - Archive Get Package +# Copyright © 2015 Sébastien Luttringer +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + +'''Archlinux Archive Get Package''' + +from argparse import ArgumentParser +from collections import OrderedDict +from email.utils import parsedate +from logging import getLogger, error, warn, debug, DEBUG +from lzma import open as xzopen +from os import stat, uname, getcwd, chdir, geteuid, environ +from os.path import basename, exists, join +from pprint import pprint +from re import match, compile as recompile +from shutil import copyfileobj +from subprocess import call +from tempfile import TemporaryDirectory +from time import mktime, time +from urllib.request import urlopen, Request +from xdg.BaseDirectory import save_cache_path + +# magics +NAME = "agetpkg" +VERSION = "1" +ARCHIVE_URL = "http://ala.seblu.net/packages/.all/" +INDEX_FILENAME = "index.0.xz" +PKG_EXT = ".pkg.tar.xz" +SIG_EXT = ".sig" + +class Error(BaseException): + """Error""" + ERR_USAGE = 1 + ERR_FATAL = 2 + ERR_ABORT = 3 + ERR_UNKNOWN = 4 + +class Url(object): + """Remote Ressource""" + + HTTP_HEADERS = { + "User-Agent": "%s v%s" % (NAME, VERSION), + } + + def __init__(self, url, timeout): + self.url = url + self.timeout = timeout + + def __str__(self): + return self.url + + @property + def exists(self): + try: + self.headers + return True + except Exception: + return False + + @property + def size(self): + """Return the remote file size""" + try: + return int(self.headers["Content-Length"]) + except Exception as exp: + raise Error("Failed to get size of %s: %s" % (self.url, exp)) + + @property + def lastmod(self): + try: + return int(mktime(parsedate(self.headers["Last-Modified"]))) + except Exception as exp: + raise Error("Failed to get last modification date of %s: %s" % (self.url, exp)) + + @property + def headers(self): + """Return a dict with url headers""" + if not hasattr(self, "_headers"): + try: + debug("Request headers on URL: %s" % self.url) + url_req = Request(self.url, method="HEAD", headers=self.HTTP_HEADERS) + remote_fd = urlopen(url_req, timeout=self.timeout) + self._headers = dict(remote_fd.getheaders()) + except Exception as exp: + raise Error("Failed to get headers at %s: %s" % (self, exp)) + return getattr(self, "_headers") + + def download(self, destination): + """Download URL to destination""" + debug("Downloading from : %s" % self.url) + debug(" to : %s" % destination) + try: + url_req = Request(self.url, headers=self.HTTP_HEADERS) + remote_fd = urlopen(url_req, timeout=self.timeout) + local_fd = open(destination, "wb") + copyfileobj(remote_fd, local_fd) + except Exception as exp: + raise Error("Failed to download %s: %s" % (self, exp)) + +class Package(Url): + """Abstract a multi versionned package""" + + def __init__(self, url, timeout): + self.url = Url(url, timeout) + self.sigurl = Url(url + SIG_EXT, timeout) + self.timeout = timeout + self.filename = basename(url) + self.sigfilename = self.filename + SIG_EXT + # regex is not strict, but we are not validating something here + m = match("^([\w@._+-]+)-((?:(\d+):)?([^-]+)-([^-]+))-(\w+)", self.filename) + if m is None: + raise Error("Unable to parse package info from filename %s" % self.filename) + (self.name, self.full_version, self.epoch, self.version, self.release, + self.arch) = m.groups() + # no epoch means 0 (man PKGBUILD) + if self.epoch is None: + self.epoch = 0 + + def __str__(self): + return self.filename + + def __getitem__(self, key): + try: + return getattr(self, key) + except AttributeError: + raise KeyError() + + @property + def size(self): + """Return package Content-Length (size in bytes)""" + return self.url.size + + @property + def lastmod(self): + """Return package Last-Modified date (in seconds since epoch)""" + return self.url.lastmod + + def get(self, sign=True, force=False): + """Download the package locally""" + if not force: + if exists(self.filename): + raise Error("Local file %s already exists" % self.filename) + if sign and exists(self.sigfilename): + raise Error("Local file %s already exists" % self.sigfilename) + self.url.download(self.filename) + if sign and self.sigurl.exists: + self.sigurl.download(self.sigfilename) + +class Archive(object): + """Abstract access to the package Archive""" + + def __init__(self, url, timeout, update=1): + """Init the Archive interface + url of the archive (flat style) + update = update the local index cache (0: never, 1: when needed, 2: always) + timeout = the socket timeout for network requests + """ + if url[-1] != "/": + raise Error("Archive URL must end with a /") + self.url = url + self.remote_index = Url(self.url + INDEX_FILENAME, timeout) + self.local_index = join(save_cache_path(NAME), INDEX_FILENAME) + self.timeout = timeout + if update > 0: + self.update_index(update == 2) + self._load_index() + + def _load_index(self): + debug("Loading index from %s" % self.local_index) + fd = xzopen(self.local_index, "rb") + self._index = OrderedDict() + for line in fd.readlines(): + key = line.decode().rstrip() + self._index[key] = Package("%s%s%s" % (self.url, key, PKG_EXT), self.timeout) + debug("Index loaded: %s packages" % len(self._index)) + + def update_index(self, force=False): + """Update index remotely when needed""" + debug("Check remote index for upgrade") + if force: + return self.remote_index.download(self.local_index) + # get remote info + try: + remote_size = self.remote_index.size + remote_lastmod = self.remote_index.lastmod + except Exception as exp: + debug("Failed to get remote index size/lastmod: %s" % exp) + return self.remote_index.download(self.local_index) + # get local info + try: + local_st = stat(self.local_index) + except Exception as exp: + debug("Failed to get local stat: %s" % exp) + return self.remote_index.download(self.local_index) + # compare size + if remote_size != local_st.st_size: + debug("Size differ between remote and local index (%s vs %s)" % (remote_size, local_st.st_size)) + return self.remote_index.download(self.local_index) + # compare date + elif remote_lastmod > local_st.st_mtime: + debug("Remote index is newer than local, updating it (%s vs %s)" % (remote_lastmod, local_st.st_mtime)) + return self.remote_index.download(self.local_index) + debug("Remote and local indexes seems equal. No update.") + + def search(self, name_pattern, version_pattern, release_pattern, arch_list=None): + """Search for a package """ + name_regex = recompile(name_pattern) + version_regex = recompile(version_pattern) if version_pattern is not None else None + release_regex = recompile(release_pattern) if release_pattern is not None else None + res = list() + for pkg in self._index.values(): + if name_regex.search(pkg.name): + # check against arch + if arch_list is not None and len(arch_list) > 0: + if pkg.arch not in arch_list: + continue + # check against version + if version_regex is not None: + if not version_regex.search(pkg.version): + continue + # check against release + if release_regex is not None: + if not release_regex.search(pkg.release): + continue + res += [pkg] + return res + +def which(binary): + """lookup if bin exists into PATH""" + dirs = environ.get("PATH", "").split(":") + for d in dirs: + if exists(join(d, binary)): + return True + return False + +def pacman(args, asroot=True): + """execute pacman (optionally as root)""" + cmd = ["pacman" ] + args + # add sudo or su if not root and + if asroot and geteuid() != 0: + if which("sudo"): + cmd = ["sudo"] + cmd + elif which("su"): + cmd = ["su", "root", "-c=%s" % " ".join(cmd) ] + else: + error("Unable to execute as root: %s" % " ".join(cmd)) + debug("calling: %s" % cmd) + call(cmd, close_fds=True) + +def list_packages(packages, long=False): + """display a list of packages on stdout""" + if long: + pattern = "%(name)s %(full_version)s %(arch)s %(size)s %(lastmod)s %(url)s" + else: + pattern = "%(name)s %(full_version)s %(arch)s" + for package in packages: + print(pattern % package) + +def select_packages(packages, select_all=False): + """select a package in a list""" + # shortcut to one package + if len(packages) == 1: + yield packages[0] + elif select_all: + for pkg in packages: + yield pkg + else: + # display a list of packages to select + index = dict(enumerate(packages)) + pad = len("%d" % max(index.keys())) + for i, pkg in index.items(): + print("{:{pad}} {}".format(i, pkg, pad=pad)) + selection = "" + while not match("^(\d+ ){0,}\d+$", selection): + selection = input("Select packages (* for all): ").strip() + if selection == "": + return + # shortcut to select all packages + if selection == "*": + for pkg in packages: + yield pkg + return + # parse selection + numbers = [ int(x) for x in selection.split(" ") ] + for num in numbers: + if num in index.keys(): + yield index[num] + else: + warn("No package n°%s" % num) + +def get_packages(packages, select_all=False): + """download packages""" + for pkg in select_packages(packages, select_all): + pkg.get() + +def install_packages(packages, select_all=False): + """install packages in one shot to allow deps to work""" + packages = list(select_packages(packages, select_all)) + with TemporaryDirectory() as tmpdir: + cwd = getcwd() + chdir(tmpdir) + for pkg in packages: + pkg.get() + pacman(["-U"] + [ pkg.filename for pkg in packages ]) + chdir(cwd) + +def parse_argv(): + '''Parse command line arguments''' + local_arch = uname().machine + p_main = ArgumentParser() + # update index options + g_update = p_main.add_mutually_exclusive_group() + g_update.add_argument("-u", "--force-update", + action="store_const", dest="update", const=2, default=1, + help="force index update") + g_update.add_argument("-U", "--no-update", + action="store_const", dest="update", const=0, + help="disable index update") + # action mode options + g_action = p_main.add_mutually_exclusive_group() + g_action.add_argument("-g", "--get", action="store_const", dest="mode", + const="get", help="get matching packages (default mode)") + g_action.add_argument("-l", "--list", action="store_const", dest="mode", + const="list", help="only list matching packages") + g_action.add_argument("-i", "--install", action="store_const", dest="mode", + const="install", help="install matching packages") + # common options + p_main.add_argument("-a", "--all", action="store_true", + help="select all packages without prompting") + p_main.add_argument("-A", "--arch", nargs="*", default=[local_arch, "any"], + help="filter by architectures (default: %s and any. empty means all)" % local_arch) + p_main.add_argument("-v", "--verbose", action="store_true", + help="display more information") + p_main.add_argument("--url", help="archive URL, default: %s" % ARCHIVE_URL, + default=environ.get("ARCHIVE_URL", ARCHIVE_URL)) + p_main.add_argument("-t", "--timeout", default=10, + help="connection timeout (default: 10s)") + p_main.add_argument("--version", action="version", + version="%(prog)s version " + VERSION) + p_main.add_argument("--debug", action="store_true", + help="debug mode") + # positional args + p_main.add_argument("package", + help="regex to match a package name") + p_main.add_argument("version", nargs="?", + help="regex to match a package version") + p_main.add_argument("release", nargs="?", + help="regex to match a package release") + return p_main.parse_args() + +def main(): + '''Program entry point''' + try: + # parse command line + args = parse_argv() + # set global debug mode + if args.debug: + getLogger().setLevel(DEBUG) + # init archive interface + archive = Archive(args.url, args.timeout, args.update) + # select target pacakges + packages = archive.search(args.package, args.version, args.release, args.arch) + if len(packages) == 0: + print("No match found.") + exit(0) + if args.mode == "list": + list_packages(packages, long=args.verbose) + elif args.mode == "install": + install_packages(packages, args.all) + else: + get_packages(packages, args.all) + except KeyboardInterrupt: + exit(Error.ERR_ABORT) + except Error as exp: + error(exp) + exit(Error.ERR_FATAL) + except Exception as exp: + error("Unknown error. Please report it with --debug") + error("at https://github.com/seblu/agetpkg/issues.") + error(exp) + if getLogger().getEffectiveLevel() == DEBUG: + raise + exit(Error.ERR_UNKNOWN) + +if __name__ == '__main__': + main() + +# vim:set ts=4 sw=4 et ai: |