Skip to content

Commit

Permalink
Merge branch 'main' into remove-pinned-conda
Browse files Browse the repository at this point in the history
  • Loading branch information
yury-s authored Nov 7, 2023
2 parents e4ac089 + 7278dad commit f498263
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 98 deletions.
60 changes: 60 additions & 0 deletions tests/async/test_browsertype_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import asyncio
import re
from pathlib import Path
from typing import Callable

import pytest
Expand All @@ -22,6 +23,7 @@
from playwright.async_api import BrowserType, Error, Playwright, Route
from tests.conftest import RemoteServer
from tests.server import Server
from tests.utils import parse_trace


async def test_browser_type_connect_should_be_able_to_reconnect_to_a_browser(
Expand Down Expand Up @@ -299,3 +301,61 @@ async def test_should_upload_large_file(
)
assert match.group("name") == b"file1"
assert match.group("filename") == b"200MB.zip"


async def test_should_record_trace_with_source(
launch_server: Callable[[], RemoteServer],
server: Server,
tmp_path: Path,
browser_type: BrowserType,
):
remote = launch_server()
browser = await browser_type.connect(remote.ws_endpoint)
context = await browser.new_context()
page = await context.new_page()

await context.tracing.start(sources=True)
await page.goto(server.EMPTY_PAGE)
await page.set_content("<button>Click</button>")
await page.click("'Click'")
path = tmp_path / "trace1.zip"
await context.tracing.stop(path=path)

await context.close()
await browser.close()

(resources, events) = parse_trace(path)
current_file_content = Path(__file__).read_bytes()
found_current_file = False
for name, resource in resources.items():
if resource == current_file_content:
found_current_file = True
break
assert found_current_file


async def test_should_record_trace_with_relative_trace_path(
launch_server: Callable[[], RemoteServer],
server: Server,
tmp_path: Path,
browser_type: BrowserType,
):
remote = launch_server()
browser = await browser_type.connect(remote.ws_endpoint)
context = await browser.new_context()
page = await context.new_page()

await context.tracing.start(sources=True)
await page.goto(server.EMPTY_PAGE)
await page.set_content("<button>Click</button>")
await page.click("'Click'")
try:
await context.tracing.stop(path="trace1.zip")

await context.close()
await browser.close()

# make sure trace1.zip exists
assert Path("trace1.zip").exists()
finally:
Path("trace1.zip").unlink()
58 changes: 9 additions & 49 deletions tests/async/test_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import re
import zipfile
from pathlib import Path
from typing import Any, Dict, List, Tuple
from typing import Dict, List

from playwright.async_api import Browser, BrowserContext, BrowserType, Page
from tests.server import Server
from tests.utils import get_trace_actions, parse_trace


async def test_browser_context_output_trace(
Expand Down Expand Up @@ -110,7 +109,7 @@ async def test_should_collect_trace_with_resources_but_no_js(

(_, events) = parse_trace(trace_file_path)
assert events[0]["type"] == "context-options"
assert get_actions(events) == [
assert get_trace_actions(events) == [
"Page.goto",
"Page.set_content",
"Page.click",
Expand Down Expand Up @@ -165,15 +164,15 @@ async def test_should_collect_two_traces(

(_, events) = parse_trace(tracing1_path)
assert events[0]["type"] == "context-options"
assert get_actions(events) == [
assert get_trace_actions(events) == [
"Page.goto",
"Page.set_content",
"Page.click",
]

(_, events) = parse_trace(tracing2_path)
assert events[0]["type"] == "context-options"
assert get_actions(events) == ["Page.dblclick", "Page.close"]
assert get_trace_actions(events) == ["Page.dblclick", "Page.close"]


async def test_should_not_throw_when_stopping_without_start_but_not_exporting(
Expand All @@ -200,7 +199,7 @@ async def test_should_work_with_playwright_context_managers(

(_, events) = parse_trace(trace_file_path)
assert events[0]["type"] == "context-options"
assert get_actions(events) == [
assert get_trace_actions(events) == [
"Page.goto",
"Page.set_content",
"Page.expect_console_message",
Expand All @@ -224,7 +223,7 @@ async def test_should_display_wait_for_load_state_even_if_did_not_wait_for_it(
await context.tracing.stop(path=trace_file_path)

(_, events) = parse_trace(trace_file_path)
assert get_actions(events) == [
assert get_trace_actions(events) == [
"Page.goto",
"Page.wait_for_load_state",
"Page.wait_for_load_state",
Expand Down Expand Up @@ -265,7 +264,7 @@ def resource_names(resources: Dict[str, bytes]) -> List[str]:
)

(resources, events) = parse_trace(tmpdir / "trace1.zip")
assert get_actions(events) == ["Page.goto"]
assert get_trace_actions(events) == ["Page.goto"]
assert resource_names(resources) == [
"resources/XXX.css",
"resources/XXX.html",
Expand All @@ -275,7 +274,7 @@ def resource_names(resources: Dict[str, bytes]) -> List[str]:
]

(resources, events) = parse_trace(tmpdir / "trace2.zip")
assert get_actions(events) == ["Page.goto"]
assert get_trace_actions(events) == ["Page.goto"]
assert resource_names(resources) == [
"resources/XXX.css",
"resources/XXX.html",
Expand All @@ -284,42 +283,3 @@ def resource_names(resources: Dict[str, bytes]) -> List[str]:
"trace.stacks",
"trace.trace",
]


def parse_trace(path: Path) -> Tuple[Dict[str, bytes], List[Any]]:
resources: Dict[str, bytes] = {}
with zipfile.ZipFile(path, "r") as zip:
for name in zip.namelist():
resources[name] = zip.read(name)
action_map: Dict[str, Any] = {}
events: List[Any] = []
for name in ["trace.trace", "trace.network"]:
for line in resources[name].decode().splitlines():
if not line:
continue
event = json.loads(line)
if event["type"] == "before":
event["type"] = "action"
action_map[event["callId"]] = event
events.append(event)
elif event["type"] == "input":
pass
elif event["type"] == "after":
existing = action_map[event["callId"]]
existing["error"] = event.get("error", None)
else:
events.append(event)
return (resources, events)


def get_actions(events: List[Any]) -> List[str]:
action_events = sorted(
list(
filter(
lambda e: e["type"] == "action",
events,
)
),
key=lambda e: e["startTime"],
)
return [e["apiName"] for e in action_events]
58 changes: 9 additions & 49 deletions tests/sync/test_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import re
import zipfile
from pathlib import Path
from typing import Any, Dict, List, Tuple
from typing import Any, Dict, List

from playwright.sync_api import Browser, BrowserContext, BrowserType, Page
from tests.server import Server
from tests.utils import get_trace_actions, parse_trace


def test_browser_context_output_trace(
Expand Down Expand Up @@ -103,7 +102,7 @@ def test_should_collect_trace_with_resources_but_no_js(

(_, events) = parse_trace(trace_file_path)
assert events[0]["type"] == "context-options"
assert get_actions(events) == [
assert get_trace_actions(events) == [
"Page.goto",
"Page.set_content",
"Page.click",
Expand Down Expand Up @@ -158,15 +157,15 @@ def test_should_collect_two_traces(

(_, events) = parse_trace(tracing1_path)
assert events[0]["type"] == "context-options"
assert get_actions(events) == [
assert get_trace_actions(events) == [
"Page.goto",
"Page.set_content",
"Page.click",
]

(_, events) = parse_trace(tracing2_path)
assert events[0]["type"] == "context-options"
assert get_actions(events) == ["Page.dblclick", "Page.close"]
assert get_trace_actions(events) == ["Page.dblclick", "Page.close"]


def test_should_not_throw_when_stopping_without_start_but_not_exporting(
Expand All @@ -193,7 +192,7 @@ def test_should_work_with_playwright_context_managers(

(_, events) = parse_trace(trace_file_path)
assert events[0]["type"] == "context-options"
assert get_actions(events) == [
assert get_trace_actions(events) == [
"Page.goto",
"Page.set_content",
"Page.expect_console_message",
Expand All @@ -217,7 +216,7 @@ def test_should_display_wait_for_load_state_even_if_did_not_wait_for_it(
context.tracing.stop(path=trace_file_path)

(_, events) = parse_trace(trace_file_path)
assert get_actions(events) == [
assert get_trace_actions(events) == [
"Page.goto",
"Page.wait_for_load_state",
"Page.wait_for_load_state",
Expand Down Expand Up @@ -258,7 +257,7 @@ def resource_names(resources: Dict[str, bytes]) -> List[str]:
)

(resources, events) = parse_trace(tmpdir / "trace1.zip")
assert get_actions(events) == ["Page.goto"]
assert get_trace_actions(events) == ["Page.goto"]
assert resource_names(resources) == [
"resources/XXX.css",
"resources/XXX.html",
Expand All @@ -268,7 +267,7 @@ def resource_names(resources: Dict[str, bytes]) -> List[str]:
]

(resources, events) = parse_trace(tmpdir / "trace2.zip")
assert get_actions(events) == ["Page.goto"]
assert get_trace_actions(events) == ["Page.goto"]
assert resource_names(resources) == [
"resources/XXX.css",
"resources/XXX.html",
Expand All @@ -277,42 +276,3 @@ def resource_names(resources: Dict[str, bytes]) -> List[str]:
"trace.stacks",
"trace.trace",
]


def parse_trace(path: Path) -> Tuple[Dict[str, bytes], List[Any]]:
resources: Dict[str, bytes] = {}
with zipfile.ZipFile(path, "r") as zip:
for name in zip.namelist():
resources[name] = zip.read(name)
action_map: Dict[str, Any] = {}
events: List[Any] = []
for name in ["trace.trace", "trace.network"]:
for line in resources[name].decode().splitlines():
if not line:
continue
event = json.loads(line)
if event["type"] == "before":
event["type"] = "action"
action_map[event["callId"]] = event
events.append(event)
elif event["type"] == "input":
pass
elif event["type"] == "after":
existing = action_map[event["callId"]]
existing["error"] = event.get("error", None)
else:
events.append(event)
return (resources, events)


def get_actions(events: List[Any]) -> List[str]:
action_events = sorted(
list(
filter(
lambda e: e["type"] == "action",
events,
)
),
key=lambda e: e["startTime"],
)
return [e["apiName"] for e in action_events]
57 changes: 57 additions & 0 deletions tests/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Copyright (c) Microsoft Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import zipfile
from pathlib import Path
from typing import Any, Dict, List, Tuple


def parse_trace(path: Path) -> Tuple[Dict[str, bytes], List[Any]]:
resources: Dict[str, bytes] = {}
with zipfile.ZipFile(path, "r") as zip:
for name in zip.namelist():
resources[name] = zip.read(name)
action_map: Dict[str, Any] = {}
events: List[Any] = []
for name in ["trace.trace", "trace.network"]:
for line in resources[name].decode().splitlines():
if not line:
continue
event = json.loads(line)
if event["type"] == "before":
event["type"] = "action"
action_map[event["callId"]] = event
events.append(event)
elif event["type"] == "input":
pass
elif event["type"] == "after":
existing = action_map[event["callId"]]
existing["error"] = event.get("error", None)
else:
events.append(event)
return (resources, events)


def get_trace_actions(events: List[Any]) -> List[str]:
action_events = sorted(
list(
filter(
lambda e: e["type"] == "action",
events,
)
),
key=lambda e: e["startTime"],
)
return [e["apiName"] for e in action_events]

0 comments on commit f498263

Please sign in to comment.