pluggy/_callers.py

"""
Call loop machinery
"""
from __future__ import annotations

import warnings
from typing import cast
from typing import Generator
from typing import Mapping
from typing import NoReturn
from typing import Sequence
from typing import Tuple
from typing import Union

from ._hooks import HookImpl
from ._result import HookCallError
from ._result import Result
from ._warnings import PluggyTeardownRaisedWarning


# Need to distinguish between old- and new-style hook wrappers.
# Wrapping with a tuple is the fastest type-safe way I found to do it.
Teardown = Union[
    Tuple[Generator[None, Result[object], None], HookImpl],
    Generator[None, object, object],
]


def _raise_wrapfail(
    wrap_controller: (
        Generator[None, Result[object], None] | Generator[None, object, object]
    ),
    msg: str,
) -> NoReturn:
    co = wrap_controller.gi_code
    raise RuntimeError(
        "wrap_controller at %r %s:%d %s"
        % (co.co_name, co.co_filename, co.co_firstlineno, msg)
    )


def _warn_teardown_exception(
    hook_name: str, hook_impl: HookImpl, e: BaseException
) -> None:
    msg = "A plugin raised an exception during an old-style hookwrapper teardown.\n"
    msg += f"Plugin: {hook_impl.plugin_name}, Hook: {hook_name}\n"
    msg += f"{type(e).__name__}: {e}\n"
    msg += "For more information see https://pluggy.readthedocs.io/en/stable/api_reference.html#pluggy.PluggyTeardownRaisedWarning"  # noqa: E501
    warnings.warn(PluggyTeardownRaisedWarning(msg), stacklevel=5)


def _multicall(
    hook_name: str,
    hook_impls: Sequence[HookImpl],
    caller_kwargs: Mapping[str, object],
    firstresult: bool,
) -> object | list[object]:
    """Execute a call into multiple python functions/methods and return the
    result(s).

    ``caller_kwargs`` comes from HookCaller.__call__().
    """
    __tracebackhide__ = True
    results: list[object] = []
    exception = None
    only_new_style_wrappers = True
    try:  # run impl and wrapper setup functions in a loop
        teardowns: list[Teardown] = []
        try:
            for hook_impl in reversed(hook_impls):
                try:
                    args = [caller_kwargs[argname] for argname in hook_impl.argnames]
                except KeyError:
                    for argname in hook_impl.argnames:
                        if argname not in caller_kwargs:
                            raise HookCallError(
                                f"hook call must provide argument {argname!r}"
                            )

                if hook_impl.hookwrapper:
                    only_new_style_wrappers = False
                    try:
                        # If this cast is not valid, a type error is raised below,
                        # which is the desired response.
                        res = hook_impl.function(*args)
                        wrapper_gen = cast(Generator[None, Result[object], None], res)
                        next(wrapper_gen)  # first yield
                        teardowns.append((wrapper_gen, hook_impl))
                    except StopIteration:
                        _raise_wrapfail(wrapper_gen, "did not yield")
                elif hook_impl.wrapper:
                    try:
                        # If this cast is not valid, a type error is raised below,
                        # which is the desired response.
                        res = hook_impl.function(*args)
                        function_gen = cast(Generator[None, object, object], res)
                        next(function_gen)  # first yield
                        teardowns.append(function_gen)
                    except StopIteration:
                        _raise_wrapfail(function_gen, "did not yield")
                else:
                    res = hook_impl.function(*args)
                    if res is not None:
                        results.append(res)
                        if firstresult:  # halt further impl calls
                            break
        except BaseException as exc:
            exception = exc
    finally:
        # Fast path - only new-style wrappers, no Result.
        if only_new_style_wrappers:
            if firstresult:  # first result hooks return a single value
                result = results[0] if results else None
            else:
                result = results

            # run all wrapper post-yield blocks
            for teardown in reversed(teardowns):
                try:
                    if exception is not None:
                        teardown.throw(exception)  # type: ignore[union-attr]
                    else:
                        teardown.send(result)  # type: ignore[union-attr]
                    # Following is unreachable for a well behaved hook wrapper.
                    # Try to force finalizers otherwise postponed till GC action.
                    # Note: close() may raise if generator handles GeneratorExit.
                    teardown.close()  # type: ignore[union-attr]
                except StopIteration as si:
                    result = si.value
                    exception = None
                    continue
                except BaseException as e:
                    exception = e
                    continue
                _raise_wrapfail(teardown, "has second yield")  # type: ignore[arg-type]

            if exception is not None:
                raise exception.with_traceback(exception.__traceback__)
            else:
                return result

        # Slow path - need to support old-style wrappers.
        else:
            if firstresult:  # first result hooks return a single value
                outcome: Result[object | list[object]] = Result(
                    results[0] if results else None, exception
                )
            else:
                outcome = Result(results, exception)

            # run all wrapper post-yield blocks
            for teardown in reversed(teardowns):
                if isinstance(teardown, tuple):
                    try:
                        teardown[0].send(outcome)
                    except StopIteration:
                        pass
                    except BaseException as e:
                        _warn_teardown_exception(hook_name, teardown[1], e)
                        raise
                    else:
                        _raise_wrapfail(teardown[0], "has second yield")
                else:
                    try:
                        if outcome._exception is not None:
                            teardown.throw(outcome._exception)
                        else:
                            teardown.send(outcome._result)
                        # Following is unreachable for a well behaved hook wrapper.
                        # Try to force finalizers otherwise postponed till GC action.
                        # Note: close() may raise if generator handles GeneratorExit.
                        teardown.close()
                    except StopIteration as si:
                        outcome.force_result(si.value)
                        continue
                    except BaseException as e:
                        outcome.force_exception(e)
                        continue
                    _raise_wrapfail(teardown, "has second yield")

            return outcome.get_result()