summarylogtreecommitdiffstats
path: root/parallel_build.py
blob: 3d41bb77d031873a3c5b524d9bb213fb17421b54 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
#!/usr/bin/env python

# Drop-in replacement for calibre's parallel_build.py
# Avoids multiprocessing.pool.ThreadPool which triggers resource_tracker
# BrokenPipeError in chroot environments with Python 3.14+.
# Uses concurrent.futures.ThreadPoolExecutor instead, which is equally
# parallel for I/O-bound builds without any semaphore/shared-memory needs.
#
# Original: src/calibre/setup/parallel_build.py

__license__ = 'GPL v3'
__copyright__ = '2014, Kovid Goyal <kovid at kovidgoyal.net>'

import itertools
import json
import os
import shlex
import subprocess
import sys
from collections import namedtuple
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from threading import Thread

from polyglot.builtins import as_bytes

Job = namedtuple('Job', 'cmd human_text cwd')

cpu_count = min(16, max(1, os.cpu_count()))


def run_worker(job, decorate=True):
    cmd, human_text = job.cmd, job.human_text
    human_text = human_text or shlex.join(cmd)
    cwd = job.cwd
    if cmd[0].lower().endswith('cl.exe'):
        cwd = os.environ.get('COMPILER_CWD')
    try:
        p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=cwd)
    except Exception as err:
        return False, human_text, str(err)
    stdout, stderr = p.communicate()
    if stdout:
        stdout = stdout.decode('utf-8')
    if stderr:
        stderr = stderr.decode('utf-8')
    if decorate:
        stdout = human_text + '\n' + (stdout or '')
    ok = p.returncode == 0
    return ok, stdout, (stderr or '')


def create_job(cmd, human_text=None, cwd=None):
    return Job(cmd, human_text, cwd)


def parallel_build(jobs, log, verbose=True):
    with ThreadPoolExecutor(max_workers=cpu_count) as pool:
        for ok, stdout, stderr in pool.map(run_worker, jobs):
            if verbose or not ok:
                log(stdout)
                if stderr:
                    log(stderr)
            if not ok:
                return False
        return True


def parallel_build_silent(jobs):
    results = []
    failed = False
    with ThreadPoolExecutor(max_workers=cpu_count) as pool:
        for (ok, stdout, stderr), job in zip(pool.map(partial(run_worker, decorate=False), jobs), jobs):
            results.append((ok, job.cmd, job.human_text, stdout, stderr))
            if not ok:
                failed = True
    return failed, results


def parallel_check_output(jobs, log):
    with ThreadPoolExecutor(max_workers=cpu_count) as pool:
        for ok, stdout, stderr in pool.map(
                partial(run_worker, decorate=False), ((j, '') for j in jobs)):
            if not ok:
                log(stdout)
                if stderr:
                    log(stderr)
                raise SystemExit(1)
            yield stdout


def get_tasks(it):
    it = tuple(it)
    size, extra = divmod(len(it), cpu_count)
    if extra:
        size += 1
    it = iter(it)
    while 1:
        x = tuple(itertools.islice(it, size))
        if not x:
            return
        yield x


def batched_parallel_jobs(cmd, jobs, cwd=None):
    workers = []

    def get_output(p):
        p.output = p.communicate(as_bytes(json.dumps(p.jobs_batch)))

    for batch in get_tasks(jobs):
        p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=cwd)
        p.jobs_batch = batch
        p.output_thread = t = Thread(target=get_output, args=(p,))
        t.daemon = True
        t.start()
        workers.append(p)

    failed = False
    ans = []
    for p in workers:
        p.output_thread.join()
        if p.wait() != 0:
            sys.stderr.buffer.write(p.output[1])
            sys.stderr.buffer.flush()
            failed = True
        else:
            ans.extend(json.loads(p.output[0]))
    if failed:
        raise SystemExit('Worker process failed')
    return ans


def threaded_func_jobs(func, jobs_args):

    def f(args):
        return func(*args)

    with ThreadPoolExecutor(max_workers=cpu_count) as pool:
        return list(pool.map(f, jobs_args))