diff --git a/tests/async/test_browsertype_connect.py b/tests/async/test_browsertype_connect.py index 1bbadeae1..cafdfaab3 100644 --- a/tests/async/test_browsertype_connect.py +++ b/tests/async/test_browsertype_connect.py @@ -14,6 +14,7 @@ import asyncio import re +from pathlib import Path from typing import Callable import pytest @@ -22,6 +23,7 @@ from playwright.async_api import BrowserType, Error, Playwright, Route from tests.conftest import RemoteServer from tests.server import Server +from tests.utils import parse_trace async def test_browser_type_connect_should_be_able_to_reconnect_to_a_browser( @@ -299,3 +301,61 @@ async def test_should_upload_large_file( ) assert match.group("name") == b"file1" assert match.group("filename") == b"200MB.zip" + + +async def test_should_record_trace_with_source( + launch_server: Callable[[], RemoteServer], + server: Server, + tmp_path: Path, + browser_type: BrowserType, +): + remote = launch_server() + browser = await browser_type.connect(remote.ws_endpoint) + context = await browser.new_context() + page = await context.new_page() + + await context.tracing.start(sources=True) + await page.goto(server.EMPTY_PAGE) + await page.set_content("") + await page.click("'Click'") + path = tmp_path / "trace1.zip" + await context.tracing.stop(path=path) + + await context.close() + await browser.close() + + (resources, events) = parse_trace(path) + current_file_content = Path(__file__).read_bytes() + found_current_file = False + for name, resource in resources.items(): + if resource == current_file_content: + found_current_file = True + break + assert found_current_file + + +async def test_should_record_trace_with_relative_trace_path( + launch_server: Callable[[], RemoteServer], + server: Server, + tmp_path: Path, + browser_type: BrowserType, +): + remote = launch_server() + browser = await browser_type.connect(remote.ws_endpoint) + context = await browser.new_context() + page = await context.new_page() + + await context.tracing.start(sources=True) + await page.goto(server.EMPTY_PAGE) + await page.set_content("") + await page.click("'Click'") + try: + await context.tracing.stop(path="trace1.zip") + + await context.close() + await browser.close() + + # make sure trace1.zip exists + assert Path("trace1.zip").exists() + finally: + Path("trace1.zip").unlink() diff --git a/tests/async/test_tracing.py b/tests/async/test_tracing.py index 1473ab648..702f1fd45 100644 --- a/tests/async/test_tracing.py +++ b/tests/async/test_tracing.py @@ -12,14 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -import json import re -import zipfile from pathlib import Path -from typing import Any, Dict, List, Tuple +from typing import Dict, List from playwright.async_api import Browser, BrowserContext, BrowserType, Page from tests.server import Server +from tests.utils import get_trace_actions, parse_trace async def test_browser_context_output_trace( @@ -110,7 +109,7 @@ async def test_should_collect_trace_with_resources_but_no_js( (_, events) = parse_trace(trace_file_path) assert events[0]["type"] == "context-options" - assert get_actions(events) == [ + assert get_trace_actions(events) == [ "Page.goto", "Page.set_content", "Page.click", @@ -165,7 +164,7 @@ async def test_should_collect_two_traces( (_, events) = parse_trace(tracing1_path) assert events[0]["type"] == "context-options" - assert get_actions(events) == [ + assert get_trace_actions(events) == [ "Page.goto", "Page.set_content", "Page.click", @@ -173,7 +172,7 @@ async def test_should_collect_two_traces( (_, events) = parse_trace(tracing2_path) assert events[0]["type"] == "context-options" - assert get_actions(events) == ["Page.dblclick", "Page.close"] + assert get_trace_actions(events) == ["Page.dblclick", "Page.close"] async def test_should_not_throw_when_stopping_without_start_but_not_exporting( @@ -200,7 +199,7 @@ async def test_should_work_with_playwright_context_managers( (_, events) = parse_trace(trace_file_path) assert events[0]["type"] == "context-options" - assert get_actions(events) == [ + assert get_trace_actions(events) == [ "Page.goto", "Page.set_content", "Page.expect_console_message", @@ -224,7 +223,7 @@ async def test_should_display_wait_for_load_state_even_if_did_not_wait_for_it( await context.tracing.stop(path=trace_file_path) (_, events) = parse_trace(trace_file_path) - assert get_actions(events) == [ + assert get_trace_actions(events) == [ "Page.goto", "Page.wait_for_load_state", "Page.wait_for_load_state", @@ -265,7 +264,7 @@ def resource_names(resources: Dict[str, bytes]) -> List[str]: ) (resources, events) = parse_trace(tmpdir / "trace1.zip") - assert get_actions(events) == ["Page.goto"] + assert get_trace_actions(events) == ["Page.goto"] assert resource_names(resources) == [ "resources/XXX.css", "resources/XXX.html", @@ -275,7 +274,7 @@ def resource_names(resources: Dict[str, bytes]) -> List[str]: ] (resources, events) = parse_trace(tmpdir / "trace2.zip") - assert get_actions(events) == ["Page.goto"] + assert get_trace_actions(events) == ["Page.goto"] assert resource_names(resources) == [ "resources/XXX.css", "resources/XXX.html", @@ -284,42 +283,3 @@ def resource_names(resources: Dict[str, bytes]) -> List[str]: "trace.stacks", "trace.trace", ] - - -def parse_trace(path: Path) -> Tuple[Dict[str, bytes], List[Any]]: - resources: Dict[str, bytes] = {} - with zipfile.ZipFile(path, "r") as zip: - for name in zip.namelist(): - resources[name] = zip.read(name) - action_map: Dict[str, Any] = {} - events: List[Any] = [] - for name in ["trace.trace", "trace.network"]: - for line in resources[name].decode().splitlines(): - if not line: - continue - event = json.loads(line) - if event["type"] == "before": - event["type"] = "action" - action_map[event["callId"]] = event - events.append(event) - elif event["type"] == "input": - pass - elif event["type"] == "after": - existing = action_map[event["callId"]] - existing["error"] = event.get("error", None) - else: - events.append(event) - return (resources, events) - - -def get_actions(events: List[Any]) -> List[str]: - action_events = sorted( - list( - filter( - lambda e: e["type"] == "action", - events, - ) - ), - key=lambda e: e["startTime"], - ) - return [e["apiName"] for e in action_events] diff --git a/tests/sync/test_tracing.py b/tests/sync/test_tracing.py index 61e67009d..eaef24e00 100644 --- a/tests/sync/test_tracing.py +++ b/tests/sync/test_tracing.py @@ -12,14 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -import json import re -import zipfile from pathlib import Path -from typing import Any, Dict, List, Tuple +from typing import Any, Dict, List from playwright.sync_api import Browser, BrowserContext, BrowserType, Page from tests.server import Server +from tests.utils import get_trace_actions, parse_trace def test_browser_context_output_trace( @@ -103,7 +102,7 @@ def test_should_collect_trace_with_resources_but_no_js( (_, events) = parse_trace(trace_file_path) assert events[0]["type"] == "context-options" - assert get_actions(events) == [ + assert get_trace_actions(events) == [ "Page.goto", "Page.set_content", "Page.click", @@ -158,7 +157,7 @@ def test_should_collect_two_traces( (_, events) = parse_trace(tracing1_path) assert events[0]["type"] == "context-options" - assert get_actions(events) == [ + assert get_trace_actions(events) == [ "Page.goto", "Page.set_content", "Page.click", @@ -166,7 +165,7 @@ def test_should_collect_two_traces( (_, events) = parse_trace(tracing2_path) assert events[0]["type"] == "context-options" - assert get_actions(events) == ["Page.dblclick", "Page.close"] + assert get_trace_actions(events) == ["Page.dblclick", "Page.close"] def test_should_not_throw_when_stopping_without_start_but_not_exporting( @@ -193,7 +192,7 @@ def test_should_work_with_playwright_context_managers( (_, events) = parse_trace(trace_file_path) assert events[0]["type"] == "context-options" - assert get_actions(events) == [ + assert get_trace_actions(events) == [ "Page.goto", "Page.set_content", "Page.expect_console_message", @@ -217,7 +216,7 @@ def test_should_display_wait_for_load_state_even_if_did_not_wait_for_it( context.tracing.stop(path=trace_file_path) (_, events) = parse_trace(trace_file_path) - assert get_actions(events) == [ + assert get_trace_actions(events) == [ "Page.goto", "Page.wait_for_load_state", "Page.wait_for_load_state", @@ -258,7 +257,7 @@ def resource_names(resources: Dict[str, bytes]) -> List[str]: ) (resources, events) = parse_trace(tmpdir / "trace1.zip") - assert get_actions(events) == ["Page.goto"] + assert get_trace_actions(events) == ["Page.goto"] assert resource_names(resources) == [ "resources/XXX.css", "resources/XXX.html", @@ -268,7 +267,7 @@ def resource_names(resources: Dict[str, bytes]) -> List[str]: ] (resources, events) = parse_trace(tmpdir / "trace2.zip") - assert get_actions(events) == ["Page.goto"] + assert get_trace_actions(events) == ["Page.goto"] assert resource_names(resources) == [ "resources/XXX.css", "resources/XXX.html", @@ -277,42 +276,3 @@ def resource_names(resources: Dict[str, bytes]) -> List[str]: "trace.stacks", "trace.trace", ] - - -def parse_trace(path: Path) -> Tuple[Dict[str, bytes], List[Any]]: - resources: Dict[str, bytes] = {} - with zipfile.ZipFile(path, "r") as zip: - for name in zip.namelist(): - resources[name] = zip.read(name) - action_map: Dict[str, Any] = {} - events: List[Any] = [] - for name in ["trace.trace", "trace.network"]: - for line in resources[name].decode().splitlines(): - if not line: - continue - event = json.loads(line) - if event["type"] == "before": - event["type"] = "action" - action_map[event["callId"]] = event - events.append(event) - elif event["type"] == "input": - pass - elif event["type"] == "after": - existing = action_map[event["callId"]] - existing["error"] = event.get("error", None) - else: - events.append(event) - return (resources, events) - - -def get_actions(events: List[Any]) -> List[str]: - action_events = sorted( - list( - filter( - lambda e: e["type"] == "action", - events, - ) - ), - key=lambda e: e["startTime"], - ) - return [e["apiName"] for e in action_events] diff --git a/tests/utils.py b/tests/utils.py new file mode 100644 index 000000000..287900faa --- /dev/null +++ b/tests/utils.py @@ -0,0 +1,57 @@ +# Copyright (c) Microsoft Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import zipfile +from pathlib import Path +from typing import Any, Dict, List, Tuple + + +def parse_trace(path: Path) -> Tuple[Dict[str, bytes], List[Any]]: + resources: Dict[str, bytes] = {} + with zipfile.ZipFile(path, "r") as zip: + for name in zip.namelist(): + resources[name] = zip.read(name) + action_map: Dict[str, Any] = {} + events: List[Any] = [] + for name in ["trace.trace", "trace.network"]: + for line in resources[name].decode().splitlines(): + if not line: + continue + event = json.loads(line) + if event["type"] == "before": + event["type"] = "action" + action_map[event["callId"]] = event + events.append(event) + elif event["type"] == "input": + pass + elif event["type"] == "after": + existing = action_map[event["callId"]] + existing["error"] = event.get("error", None) + else: + events.append(event) + return (resources, events) + + +def get_trace_actions(events: List[Any]) -> List[str]: + action_events = sorted( + list( + filter( + lambda e: e["type"] == "action", + events, + ) + ), + key=lambda e: e["startTime"], + ) + return [e["apiName"] for e in action_events]