#!/usr/bin/env python3
"""Run ANGLE end-to-end tests locally on macOS, on Mac Catalyst, on an iOS
simulator/device, or on a visionOS device.

Forwards any unrecognized flags (e.g. --gtest_filter, --gtest_list_tests)
straight to the test binary/app. With --lldb, runs under lldb (on local mac
and simulator this auto-attaches; on device prints instructions). With
--per-suite, runs each test suite separately to identify (and optionally
isolate) crashing tests. Defaults to the Debug build directory; pass
--release for Release.

Examples:
    run-angle-tests
    run-angle-tests --release
    run-angle-tests --gtest_filter='ClearTestES3.*'
    run-angle-tests --maccatalyst --gtest_filter='WebGLTransformFeedbackTest.*'
    run-angle-tests --simulator --gtest_list_tests
    run-angle-tests --ios-device CE42FB0C-... --no-install
    run-angle-tests --xros-device --gtest_filter='*Metal*'
    run-angle-tests --lldb --gtest_filter='ClearTestES3.ClearMaxAttachments/ES3_Metal'
    run-angle-tests --per-suite
    run-angle-tests --ios-device --per-suite --suites-only
"""

import argparse
import json
import os
import plistlib
import re
import shlex
import subprocess
import sys
import tempfile
from pathlib import Path

DEFAULT_TIMEOUT = 120

# gtest's end-of-run banner is printed once a run completes cleanly (whether
# tests pass or fail). Its absence in captured output means the test process
# aborted before reaching the end — used as a crash signal on simulator/device,
# where simctl --console returns 0 even on SIGABRT.
END_OF_RUN_RE = re.compile(r"^\[==========\].*ran\.", re.MULTILINE)
NO_BANNER_EXIT = -999


def default_paths(config):
    """Return mode → default path mapping for the given build config."""
    return {
        "mac":          f"WebKitBuild/{config}/ANGLEEnd2EndTests",
        "maccatalyst":  f"WebKitBuild/{config}-maccatalyst/ANGLEEnd2EndTestsApp.app",
        "simulator":    f"WebKitBuild/{config}-iphonesimulator/ANGLEEnd2EndTestsApp.app",
        "ios":          f"WebKitBuild/{config}-iphoneos/ANGLEEnd2EndTestsApp.app",
        "xros":         f"WebKitBuild/{config}-xros/ANGLEEnd2EndTestsApp.app",
    }


def read_bundle_id(app_path):
    with (app_path / "Info.plist").open("rb") as f:
        return plistlib.load(f)["CFBundleIdentifier"]


def is_crash(returncode):
    # gtest exits 0 on pass, 1 on test failure; anything else (signals show as
    # 128+sig or negative) means the process aborted.
    if returncode is None:
        return False
    return returncode > 1 or returncode < 0

def run(cmd, verbose, *, capture, timeout=None):
    """subprocess.run wrapper that, when verbose, prints the command before
    launch and (for capture=True) the captured stdout/stderr afterwards.

    capture=False inherits the parent's stdio so test output streams live;
    capture=True returns a CompletedProcess whose stdout/stderr are strings.
    Raises subprocess.TimeoutExpired on timeout (caller decides how to recover).
    """
    if verbose:
        print(f"+ {shlex.join(cmd)}", flush=True)
    if not capture:
        return subprocess.run(cmd)
    result = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
    if verbose:
        if result.stdout:
            sys.stdout.write(result.stdout)
            if not result.stdout.endswith("\n"):
                sys.stdout.write("\n")
            sys.stdout.flush()
        if result.stderr:
            sys.stderr.write(result.stderr)
            if not result.stderr.endswith("\n"):
                sys.stderr.write("\n")
            sys.stderr.flush()
    return result


class LocalRunner:
    """Runs the test binary directly on the local machine."""

    def __init__(self, executable):
        self.executable = Path(executable)
        self.verbose = False

    def install(self):
        pass  # no install step for local binaries

    def launch(self, gtest_args):
        cmd = [str(self.executable), *gtest_args]
        return run(cmd, self.verbose, capture=False).returncode

    def run_capturing(self, gtest_args, timeout):
        cmd = [str(self.executable), *gtest_args]
        try:
            result = run(cmd, self.verbose, capture=True, timeout=timeout)
            return result.returncode, result.stdout
        except subprocess.TimeoutExpired:
            return None, ""

    def format_command(self, filter_str):
        return (f"{shlex.quote(str(self.executable))} "
                f"--gtest_filter={shlex.quote(filter_str)}")

    def launch_for_debug(self, gtest_args):
        # Replace this process with lldb so the user gets the prompt directly.
        os.execvp("lldb", ["lldb", "--", str(self.executable), *gtest_args])


class SimulatorRunner:
    def __init__(self, app_path, device, bundle_id):
        self.app_path = app_path
        self.device = device
        self.bundle_id = bundle_id
        self.verbose = False

    def install(self):
        print(f"Installing on simulator {self.device} ...")
        subprocess.run(
            ["xcrun", "simctl", "install", self.device, str(self.app_path)],
            check=True,
        )

    def launch(self, gtest_args):
        cmd = [
            "xcrun", "simctl", "launch",
            "--console", "--terminate-running-process",
            self.device, self.bundle_id, *gtest_args,
        ]
        return run(cmd, self.verbose, capture=False).returncode

    def run_capturing(self, gtest_args, timeout):
        cmd = [
            "xcrun", "simctl", "launch",
            "--console", "--terminate-running-process",
            self.device, self.bundle_id, *gtest_args,
        ]
        try:
            result = run(cmd, self.verbose, capture=True, timeout=timeout)
            return result.returncode, result.stdout
        except subprocess.TimeoutExpired:
            subprocess.run(
                ["xcrun", "simctl", "terminate", self.device, self.bundle_id],
                capture_output=True,
            )
            return None, ""

    def format_command(self, filter_str):
        return (
            f"xcrun simctl launch --console --terminate-running-process "
            f"{self.device} {self.bundle_id} "
            f"--gtest_filter={shlex.quote(filter_str)}"
        )

    def launch_for_debug(self, gtest_args):
        log_dir = Path(tempfile.mkdtemp(prefix="run-gtest-"))
        stdout_log = log_dir / "stdout.log"
        stderr_log = log_dir / "stderr.log"
        cmd = [
            "xcrun", "simctl", "launch",
            "--wait-for-debugger", "--terminate-running-process",
            f"--stdout={stdout_log}", f"--stderr={stderr_log}",
            self.device, self.bundle_id, *gtest_args,
        ]
        proc = subprocess.run(cmd, capture_output=True, text=True, check=True)
        # simctl prints "org.webkit.ANGLEEnd2EndTestApp: 12345"
        m = re.search(r"\b(\d+)\s*$", proc.stdout.strip())
        if not m:
            print(f"Could not parse PID from simctl output:\n{proc.stdout}",
                  file=sys.stderr)
            sys.exit(1)
        pid = int(m.group(1))
        print(f"App suspended at entry (waiting for debugger).")
        print(f"  PID:    {pid}")
        print(f"  stdout: {stdout_log}")
        print(f"  stderr: {stderr_log}")
        print(f"  (run `tail -f {stdout_log}` in another terminal to watch output)")
        print(f"Attaching lldb ...")
        # Replace this process with lldb so the user gets the prompt directly.
        os.execvp("lldb", ["lldb", "-p", str(pid), "-o", "continue"])


class DeviceRunner:
    def __init__(self, app_path, device, bundle_id):
        self.app_path = app_path
        self.device = device  # may be None until resolved
        self.bundle_id = bundle_id
        self.verbose = False

    @staticmethod
    def _list_devices_json():
        proc = subprocess.run(
            ["xcrun", "devicectl", "list", "devices", "--json-output", "-"],
            capture_output=True, text=True,
        )
        if proc.returncode != 0:
            print(proc.stderr, file=sys.stderr)
            sys.exit(proc.returncode)
        return json.loads(proc.stdout)

    @staticmethod
    def _is_usable(d):
        cp = d.get("connectionProperties", {})
        return (cp.get("transportType") is not None
                and cp.get("tunnelState") != "unavailable")

    def resolve_device(self):
        if self.device:
            return
        data = self._list_devices_json()
        usable = [d for d in data.get("result", {}).get("devices", []) if self._is_usable(d)]
        if not usable:
            print("Error: no paired/reachable iOS device found.", file=sys.stderr)
            sys.exit(1)
        if len(usable) > 1:
            print("Error: multiple usable devices found; pass --device <udid>:",
                  file=sys.stderr)
            for d in usable:
                name = d.get("deviceProperties", {}).get("name", "?")
                print(f"   {d['identifier']}  {name}", file=sys.stderr)
            sys.exit(1)
        self.device = usable[0]["identifier"]

    def install(self):
        self.resolve_device()
        print(f"Installing on device {self.device} ...")
        proc = subprocess.run(
            ["xcrun", "devicectl", "device", "install", "app",
             "--device", self.device, str(self.app_path)],
            capture_output=True, text=True,
        )
        if proc.returncode != 0:
            sys.stderr.write(proc.stderr)
            blob = proc.stderr + proc.stdout
            if "DeviceLocked" in blob or "device is locked" in blob.lower():
                print("\nHint: unlock the device, then re-run.", file=sys.stderr)
            sys.exit(proc.returncode)

    def launch(self, gtest_args):
        self.resolve_device()
        cmd = [
            "xcrun", "devicectl", "device", "process", "launch",
            "--device", self.device,
            "--console", "--terminate-existing",
            self.bundle_id, *gtest_args,
        ]
        return run(cmd, self.verbose, capture=False).returncode

    def run_capturing(self, gtest_args, timeout):
        self.resolve_device()
        cmd = [
            "xcrun", "devicectl", "device", "process", "launch",
            "--device", self.device,
            "--console", "--terminate-existing",
            self.bundle_id, *gtest_args,
        ]
        try:
            result = run(cmd, self.verbose, capture=True, timeout=timeout)
            return result.returncode, result.stdout
        except subprocess.TimeoutExpired:
            # devicectl has no simple "terminate by bundle id"; the next launch's
            # --terminate-existing will clean up any leftover instance.
            return None, ""

    def format_command(self, filter_str):
        return (
            f"xcrun devicectl device process launch --device {self.device} "
            f"--console --terminate-existing {self.bundle_id} "
            f"--gtest_filter={shlex.quote(filter_str)}"
        )

    def launch_for_debug(self, gtest_args):
        self.resolve_device()
        # --start-stopped suspends the app immediately after launch so a
        # debugger can attach before any user code runs.
        cmd = [
            "xcrun", "devicectl", "device", "process", "launch",
            "--device", self.device,
            "--start-stopped", "--terminate-existing",
            self.bundle_id, *gtest_args,
        ]
        proc = subprocess.run(cmd, capture_output=True, text=True, check=True)
        # devicectl prints something like "Launched application ... with process identifier 1234"
        m = re.search(r"process identifier[:\s]+(\d+)", proc.stdout, re.IGNORECASE)
        if not m:
            print(f"Could not parse PID from devicectl output:\n{proc.stdout}",
                  file=sys.stderr)
            sys.exit(1)
        pid = int(m.group(1))
        print(f"App launched and suspended.")
        print(f"  Device: {self.device}")
        print(f"  PID:    {pid}")
        print()
        print("CLI lldb attach on a real device requires debugserver, which")
        print("Xcode normally runs for you. Easiest path:")
        print()
        print(f"  1. Open Xcode → Debug → Attach to Process by PID or Name… → "
              f"enter PID {pid}")
        print(f"  2. Or: open the .app's Xcode project (ANGLE.xcodeproj), "
              f"set ANGLEEnd2EndTestApp scheme, then Debug → Attach to Process")
        print()
        print("To resume the app without a debugger:")
        print(f"  xcrun devicectl device process resume --device {self.device} "
              f"--pid {pid}")
        return 0


def list_tests(runner, timeout, extra_args=()):
    code, stdout = runner.run_capturing(
        ["--gtest_list_tests", *extra_args], timeout=timeout
    )
    if code is None or code != 0:
        print(f"Error: --gtest_list_tests exited {code}", file=sys.stderr)
        sys.exit(1)
    suites = {}
    current_suite = None
    for line in stdout.splitlines():
        if not line.startswith(" ") and line.endswith("."):
            current_suite = line.rstrip(".")
            suites[current_suite] = []
        elif line.startswith("  ") and current_suite is not None:
            # Parameterized tests may have trailing "  # GetParam() = ..."
            test_name = line.strip().split("  #")[0].strip()
            if test_name:
                suites[current_suite].append(test_name)
    return suites


def run_filter(runner, filter_str, timeout):
    code, stdout = runner.run_capturing(
        [f"--gtest_filter={filter_str}", "--gtest_color=no"],
        timeout,
    )
    if code is None or is_crash(code):
        return code
    # gtest prints "[==========] N tests from M test suites ran. (X ms total)"
    # at the end of every clean run, pass or fail. simctl --console returns 0
    # even when the app aborts with SIGABRT, so the absence of this banner is
    # our crash signal of last resort.
    if not END_OF_RUN_RE.search(stdout):
        return NO_BANNER_EXIT
    return code


def status_for(code):
    if code is None:
        return "TIMEOUT"
    if code == NO_BANNER_EXIT:
        return "CRASH (no end-of-run banner)"
    if is_crash(code):
        return f"CRASH (exit {code})"
    return f"ok (exit {code})"


def run_per_suite(runner, timeout, suites_only, list_args=()):
    print("Listing tests ...")
    suites = list_tests(runner, timeout=max(timeout, 60), extra_args=list_args)
    total_tests = sum(len(t) for t in suites.values())
    print(f"Found {len(suites)} suites, {total_tests} total tests\n")

    # Phase 1: run each suite, detect crashes. Sequential because only one app
    # instance can run on a given simulator/device.
    crashing_suites = {}
    for i, (suite_name, tests) in enumerate(suites.items(), 1):
        print(f"[{i}/{len(suites)}] {suite_name} ({len(tests)} tests) ... ", end='', flush=True)
        code = run_filter(runner, f"{suite_name}.*", timeout)
        if is_crash(code):
            crashing_suites[suite_name] = tests
        print(f"{status_for(code)}")

    if not crashing_suites:
        print("\nNo crashing suites found.")
        return 0

    print(f"\n{'='*60}")
    print(f"CRASHING SUITES ({len(crashing_suites)})")
    print(f"{'='*60}")
    for suite_name, tests in crashing_suites.items():
        print(f"  {suite_name}  ({len(tests)} tests)")

    if suites_only:
        return 1

    # Phase 2: isolate individual crashing tests in crashing suites.
    print(f"\n{'='*60}")
    print(f"Found {len(crashing_suites)} crashing suite(s) — isolating tests ...")
    print(f"{'='*60}\n")

    pairs = [(s, t) for s, ts in crashing_suites.items() for t in ts]
    crashing_tests = {}  # {suite_name: [test, ...]}
    for i, (suite_name, test) in enumerate(pairs, 1):
        print(f"  [{i}/{len(pairs)}] {suite_name}.{test} ... ", end='', flush=True)
        code = run_filter(runner, f"{suite_name}.{test}", timeout)
        if is_crash(code):
            crashing_tests.setdefault(suite_name, []).append(test)
        print(f"{status_for(code)}")

    print(f"\n{'='*60}")
    print("CRASHING TESTS")
    print(f"{'='*60}")
    if not crashing_tests:
        print("  (none isolated — crash may be ordering-dependent)")
    else:
        for suite_name, tests in crashing_tests.items():
            for t in tests:
                print(f"  {suite_name}.{t}")

    print(f"\n{'='*60}")
    print("RUN SUITE SKIPPING CRASHING TESTS")
    print(f"{'='*60}")
    for suite_name, tests in crashing_tests.items():
        skip = ":".join(f"-{suite_name}.{t}" for t in tests)
        filter_expr = f"{suite_name}.*:{skip}"
        print(f"\n# {suite_name}")
        print(f"  {runner.format_command(filter_expr)}")

    return 1


def main():
    parser = argparse.ArgumentParser(
        description=__doc__,
        formatter_class=argparse.RawDescriptionHelpFormatter,
    )
    mode = parser.add_mutually_exclusive_group()
    mode.add_argument(
        "--mac", action="store_true",
        help="Run the test binary locally on macOS (default mode)."
    )
    mode.add_argument(
        "--maccatalyst", dest="maccatalyst", action="store_true",
        help="Run the .app's binary directly as a Mac Catalyst (iosmac) build."
    )
    mode.add_argument(
        "--simulator", nargs="?", const="booted", default=None, metavar="SIM_ID",
        help="Run on an iOS Simulator. Optional simulator id "
             "(default when bare: 'booted')."
    )
    mode.add_argument(
        "--ios-device", nargs="?", const="auto", default=None, metavar="UDID",
        help="Run on a connected iOS device via xcrun devicectl. Optional "
             "UDID (default when bare: auto-detect a single connected device)."
    )
    mode.add_argument(
        "--xros-device", nargs="?", const="auto", default=None, metavar="UDID",
        help="Run on a connected visionOS device via xcrun devicectl. "
             "Optional UDID (default when bare: auto-detect a single "
             "connected device)."
    )
    config_group = parser.add_mutually_exclusive_group()
    config_group.add_argument(
        "--debug", dest="config", action="store_const", const="Debug",
        help="Use Debug build directory (default)."
    )
    config_group.add_argument(
        "--release", dest="config", action="store_const", const="Release",
        help="Use Release build directory."
    )
    parser.set_defaults(config="Debug")
    parser.add_argument("--app", default=None,
        help="Path to test binary or .app bundle. Defaults to "
             "WebKitBuild/<Debug|Release>[-maccatalyst|-iphonesimulator|"
             "-iphoneos|-xros]/ANGLEEnd2EndTests[App.app] depending on mode.")
    parser.add_argument("--bundle-id", default=None,
        help="Override bundle id (default: read from Info.plist)")
    parser.add_argument("--no-install", action="store_true",
        help="Skip the install step (re-launch already-installed app)")
    parser.add_argument("--lldb", action="store_true",
        help="Launch suspended and (on local mac / simulator) auto-attach lldb")
    parser.add_argument("--per-suite", action="store_true",
        help="Run each test suite separately to identify crashing suites, "
             "then isolate individual crashing tests")
    parser.add_argument("--suites-only", action="store_true",
        help="With --per-suite: stop after the suite phase; skip per-test isolation")
    parser.add_argument("--timeout", type=int, default=DEFAULT_TIMEOUT,
        help=f"Per-suite/per-test timeout in seconds (default: {DEFAULT_TIMEOUT})")
    parser.add_argument("--verbose", "-v", action="store_true",
        help="Print each subprocess command and its captured output. With "
             "--per-suite this surfaces the per-suite/per-test command lines "
             "and the test binary's stdout/stderr that would otherwise be "
             "captured silently.")
    args, gtest_args = parser.parse_known_args()

    # Default to local mac mode if no mode flag was given.
    if (not args.mac and not args.maccatalyst and args.simulator is None
            and args.ios_device is None and args.xros_device is None):
        args.mac = True

    paths = default_paths(args.config)

    if args.mac:
        target_path = Path(args.app) if args.app else Path(paths["mac"])
        if not target_path.is_file() or not os.access(target_path, os.X_OK):
            print(f"Error: not an executable: {target_path}", file=sys.stderr)
            sys.exit(1)
        runner = LocalRunner(target_path)
    elif args.maccatalyst:
        # Mac Catalyst .app's binary lives in Contents/MacOS/<bundle name>.
        # Run it directly — it works without `open` and gives us the test
        # binary's real exit code.
        app_path = Path(args.app) if args.app else Path(paths["maccatalyst"])
        if not app_path.is_dir():
            print(f"Error: not a bundle: {app_path}", file=sys.stderr)
            sys.exit(1)
        target_path = app_path / "Contents" / "MacOS" / app_path.stem
        if not target_path.is_file() or not os.access(target_path, os.X_OK):
            print(f"Error: not an executable: {target_path}", file=sys.stderr)
            sys.exit(1)
        runner = LocalRunner(target_path)
    else:
        if args.xros_device is not None:
            device_id = None if args.xros_device == "auto" else args.xros_device
            default_app = paths["xros"]
        elif args.ios_device is not None:
            device_id = None if args.ios_device == "auto" else args.ios_device
            default_app = paths["ios"]
        else:
            device_id = args.simulator  # always a real id ("booted" or user-supplied)
            default_app = paths["simulator"]

        app_path = Path(args.app) if args.app else Path(default_app)
        if not app_path.is_dir():
            print(f"Error: not a bundle: {app_path}", file=sys.stderr)
            sys.exit(1)
        bundle_id = args.bundle_id or read_bundle_id(app_path)

        if args.ios_device is not None or args.xros_device is not None:
            runner = DeviceRunner(app_path, device_id, bundle_id)
        else:
            runner = SimulatorRunner(app_path, device_id, bundle_id)

    if args.suites_only and not args.per_suite:
        parser.error("--suites-only requires --per-suite")
    if args.per_suite and args.lldb:
        parser.error("--per-suite is incompatible with --lldb")

    runner.verbose = args.verbose

    if not args.no_install:
        runner.install()

    if args.per_suite:
        sys.exit(run_per_suite(runner, args.timeout, args.suites_only,
                               list_args=gtest_args))

    if args.lldb:
        # Replaces this process (mac / simulator path execs lldb).
        runner.launch_for_debug(gtest_args)
        return
    sys.exit(runner.launch(gtest_args))


if __name__ == "__main__":
    main()
