forked from python/cpython
-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy path_llvm.py
More file actions
110 lines (87 loc) · 3.25 KB
/
_llvm.py
File metadata and controls
110 lines (87 loc) · 3.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
"""Utilities for invoking LLVM tools."""
import asyncio
import functools
import os
import re
import shlex
import subprocess
import typing
import weakref
_LLVM_VERSION = 19
_LLVM_VERSION_PATTERN = re.compile(rf"version\s+{_LLVM_VERSION}\.\d+\.\d+\S*\s+")
_P = typing.ParamSpec("_P")
_R = typing.TypeVar("_R")
_C = typing.Callable[_P, typing.Awaitable[_R]]
def _async_cache(f: _C[_P, _R]) -> _C[_P, _R]:
cache = {}
lock = asyncio.Lock()
@functools.wraps(f)
async def wrapper(
*args: _P.args, **kwargs: _P.kwargs # pylint: disable = no-member
) -> _R:
async with lock:
if args not in cache:
cache[args] = await f(*args, **kwargs)
return cache[args]
return wrapper
_CORES_BY_LOOP: weakref.WeakKeyDictionary[
asyncio.AbstractEventLoop, asyncio.BoundedSemaphore
] = weakref.WeakKeyDictionary()
def _get_cores() -> asyncio.BoundedSemaphore:
loop = asyncio.get_running_loop()
if loop not in _CORES_BY_LOOP:
_CORES_BY_LOOP[loop] = asyncio.BoundedSemaphore(os.cpu_count())
return _CORES_BY_LOOP[loop]
async def _run(tool: str, args: typing.Iterable[str], echo: bool = False) -> str | None:
command = [tool, *args]
async with _get_cores():
if echo:
print(shlex.join(command))
try:
process = await asyncio.create_subprocess_exec(
*command, stdout=subprocess.PIPE
)
except FileNotFoundError:
return None
out, _ = await process.communicate()
if process.returncode:
raise RuntimeError(f"{tool} exited with return code {process.returncode}")
return out.decode()
@_async_cache
async def _check_tool_version(name: str, *, echo: bool = False) -> bool:
output = await _run(name, ["--version"], echo=echo)
return bool(output and _LLVM_VERSION_PATTERN.search(output))
@_async_cache
async def _get_brew_llvm_prefix(*, echo: bool = False) -> str | None:
output = await _run("brew", ["--prefix", f"llvm@{_LLVM_VERSION}"], echo=echo)
return output and output.removesuffix("\n")
@_async_cache
async def _find_tool(tool: str, *, echo: bool = False) -> str | None:
# Unversioned executables:
path = tool
if await _check_tool_version(path, echo=echo):
return path
# Versioned executables:
path = f"{tool}-{_LLVM_VERSION}"
if await _check_tool_version(path, echo=echo):
return path
# Homebrew-installed executables:
prefix = await _get_brew_llvm_prefix(echo=echo)
if prefix is not None:
path = os.path.join(prefix, "bin", tool)
if await _check_tool_version(path, echo=echo):
return path
# Nothing found:
return None
async def maybe_run(
tool: str, args: typing.Iterable[str], echo: bool = False
) -> str | None:
"""Run an LLVM tool if it can be found. Otherwise, return None."""
path = await _find_tool(tool, echo=echo)
return path and await _run(path, args, echo=echo)
async def run(tool: str, args: typing.Iterable[str], echo: bool = False) -> str:
"""Run an LLVM tool if it can be found. Otherwise, raise RuntimeError."""
output = await maybe_run(tool, args, echo=echo)
if output is None:
raise RuntimeError(f"Can't find {tool}-{_LLVM_VERSION}!")
return output