diff --git a/README.md b/README.md
index d94692919..e99460db3 100644
--- a/README.md
+++ b/README.md
@@ -4,9 +4,9 @@ Playwright is a Python library to automate [Chromium](https://www.chromium.org/H
| | Linux | macOS | Windows |
| :--- | :---: | :---: | :---: |
-| Chromium 129.0.6668.29 | ✅ | ✅ | ✅ |
+| Chromium 130.0.6723.31 | ✅ | ✅ | ✅ |
| WebKit 18.0 | ✅ | ✅ | ✅ |
-| Firefox 130.0 | ✅ | ✅ | ✅ |
+| Firefox 131.0 | ✅ | ✅ | ✅ |
## Documentation
diff --git a/playwright/_impl/_browser_context.py b/playwright/_impl/_browser_context.py
index 7da85e9a4..4645e2415 100644
--- a/playwright/_impl/_browser_context.py
+++ b/playwright/_impl/_browser_context.py
@@ -62,6 +62,7 @@
TimeoutSettings,
URLMatch,
URLMatcher,
+ WebSocketRouteHandlerCallback,
async_readfile,
async_writefile,
locals_to_params,
@@ -69,7 +70,14 @@
prepare_record_har_options,
to_impl,
)
-from playwright._impl._network import Request, Response, Route, serialize_headers
+from playwright._impl._network import (
+ Request,
+ Response,
+ Route,
+ WebSocketRoute,
+ WebSocketRouteHandler,
+ serialize_headers,
+)
from playwright._impl._page import BindingCall, Page, Worker
from playwright._impl._str_utils import escape_regex_flags
from playwright._impl._tracing import Tracing
@@ -106,6 +114,7 @@ def __init__(
self._browser._contexts.append(self)
self._pages: List[Page] = []
self._routes: List[RouteHandler] = []
+ self._web_socket_routes: List[WebSocketRouteHandler] = []
self._bindings: Dict[str, Any] = {}
self._timeout_settings = TimeoutSettings(None)
self._owner_page: Optional[Page] = None
@@ -132,7 +141,14 @@ def __init__(
)
),
)
-
+ self._channel.on(
+ "webSocketRoute",
+ lambda params: self._loop.create_task(
+ self._on_web_socket_route(
+ from_channel(params["webSocketRoute"]),
+ )
+ ),
+ )
self._channel.on(
"backgroundPage",
lambda params: self._on_background_page(from_channel(params["page"])),
@@ -244,10 +260,24 @@ async def _on_route(self, route: Route) -> None:
try:
# If the page is closed or unrouteAll() was called without waiting and interception disabled,
# the method will throw an error - silence it.
- await route._internal_continue(is_internal=True)
+ await route._inner_continue(True)
except Exception:
pass
+ async def _on_web_socket_route(self, web_socket_route: WebSocketRoute) -> None:
+ route_handler = next(
+ (
+ route_handler
+ for route_handler in self._web_socket_routes
+ if route_handler.matches(web_socket_route.url)
+ ),
+ None,
+ )
+ if route_handler:
+ await route_handler.handle(web_socket_route)
+ else:
+ web_socket_route.connect_to_server()
+
def _on_binding(self, binding_call: BindingCall) -> None:
func = self._bindings.get(binding_call._initializer["name"])
if func is None:
@@ -418,6 +448,17 @@ async def _unroute_internal(
return
await asyncio.gather(*map(lambda router: router.stop(behavior), removed)) # type: ignore
+ async def route_web_socket(
+ self, url: URLMatch, handler: WebSocketRouteHandlerCallback
+ ) -> None:
+ self._web_socket_routes.insert(
+ 0,
+ WebSocketRouteHandler(
+ URLMatcher(self._options.get("baseURL"), url), handler
+ ),
+ )
+ await self._update_web_socket_interception_patterns()
+
def _dispose_har_routers(self) -> None:
for router in self._har_routers:
router.dispose()
@@ -488,6 +529,14 @@ async def _update_interception_patterns(self) -> None:
"setNetworkInterceptionPatterns", {"patterns": patterns}
)
+ async def _update_web_socket_interception_patterns(self) -> None:
+ patterns = WebSocketRouteHandler.prepare_interception_patterns(
+ self._web_socket_routes
+ )
+ await self._channel.send(
+ "setWebSocketInterceptionPatterns", {"patterns": patterns}
+ )
+
def expect_event(
self,
event: str,
diff --git a/playwright/_impl/_connection.py b/playwright/_impl/_connection.py
index 19b68fb13..95c87deb8 100644
--- a/playwright/_impl/_connection.py
+++ b/playwright/_impl/_connection.py
@@ -132,6 +132,7 @@ def __init__(
self._channel: Channel = Channel(self._connection, self)
self._initializer = initializer
self._was_collected = False
+ self._is_internal_type = False
self._connection._objects[guid] = self
if self._parent:
@@ -156,6 +157,9 @@ def _adopt(self, child: "ChannelOwner") -> None:
self._objects[child._guid] = child
child._parent = self
+ def mark_as_internal_type(self) -> None:
+ self._is_internal_type = True
+
def _set_event_to_subscription_mapping(self, mapping: Dict[str, str]) -> None:
self._event_to_subscription_mapping = mapping
@@ -355,7 +359,7 @@ def _send_message_to_server(
"params": self._replace_channels_with_guids(params),
"metadata": metadata,
}
- if self._tracing_count > 0 and frames and object._guid != "localUtils":
+ if self._tracing_count > 0 and frames and not object._is_internal_type:
self.local_utils.add_stack_to_tracing_no_reply(id, frames)
self._transport.send(message)
diff --git a/playwright/_impl/_fetch.py b/playwright/_impl/_fetch.py
index a4de751bd..93144ac55 100644
--- a/playwright/_impl/_fetch.py
+++ b/playwright/_impl/_fetch.py
@@ -18,7 +18,6 @@
import typing
from pathlib import Path
from typing import Any, Dict, List, Optional, Union, cast
-from urllib.parse import parse_qs
import playwright._impl._network as network
from playwright._impl._api_structures import (
@@ -405,7 +404,8 @@ async def _inner_fetch(
"fetch",
{
"url": url,
- "params": params_to_protocol(params),
+ "params": object_to_array(params) if isinstance(params, dict) else None,
+ "encodedParams": params if isinstance(params, str) else None,
"method": method,
"headers": serialized_headers,
"postData": post_data,
@@ -430,23 +430,6 @@ async def storage_state(
return result
-def params_to_protocol(params: Optional[ParamsType]) -> Optional[List[NameValue]]:
- if not params:
- return None
- if isinstance(params, dict):
- return object_to_array(params)
- if params.startswith("?"):
- params = params[1:]
- parsed = parse_qs(params)
- if not parsed:
- return None
- out = []
- for name, values in parsed.items():
- for value in values:
- out.append(NameValue(name=name, value=value))
- return out
-
-
def file_payload_to_json(payload: FilePayload) -> ServerFilePayload:
return ServerFilePayload(
name=payload["name"],
diff --git a/playwright/_impl/_helper.py b/playwright/_impl/_helper.py
index a27f4a789..027b3e1f5 100644
--- a/playwright/_impl/_helper.py
+++ b/playwright/_impl/_helper.py
@@ -50,7 +50,7 @@
if TYPE_CHECKING: # pragma: no cover
from playwright._impl._api_structures import HeadersArray
- from playwright._impl._network import Request, Response, Route
+ from playwright._impl._network import Request, Response, Route, WebSocketRoute
URLMatch = Union[str, Pattern[str], Callable[[str], bool]]
URLMatchRequest = Union[str, Pattern[str], Callable[["Request"], bool]]
@@ -58,6 +58,7 @@
RouteHandlerCallback = Union[
Callable[["Route"], Any], Callable[["Route", "Request"], Any]
]
+WebSocketRouteHandlerCallback = Callable[["WebSocketRoute"], Any]
ColorScheme = Literal["dark", "light", "no-preference", "null"]
ForcedColors = Literal["active", "none", "null"]
diff --git a/playwright/_impl/_local_utils.py b/playwright/_impl/_local_utils.py
index 7172ee58a..26a3417c4 100644
--- a/playwright/_impl/_local_utils.py
+++ b/playwright/_impl/_local_utils.py
@@ -25,6 +25,7 @@ def __init__(
self, parent: ChannelOwner, type: str, guid: str, initializer: Dict
) -> None:
super().__init__(parent, type, guid, initializer)
+ self.mark_as_internal_type()
self.devices = {
device["name"]: parse_device_descriptor(device["descriptor"])
for device in initializer["deviceDescriptors"]
diff --git a/playwright/_impl/_network.py b/playwright/_impl/_network.py
index 91c2a460c..376b2b8cb 100644
--- a/playwright/_impl/_network.py
+++ b/playwright/_impl/_network.py
@@ -18,6 +18,7 @@
import json
import json as json_utils
import mimetypes
+import re
from collections import defaultdict
from pathlib import Path
from types import SimpleNamespace
@@ -51,7 +52,13 @@
)
from playwright._impl._errors import Error
from playwright._impl._event_context_manager import EventContextManagerImpl
-from playwright._impl._helper import async_readfile, locals_to_params
+from playwright._impl._helper import (
+ URLMatcher,
+ WebSocketRouteHandlerCallback,
+ async_readfile,
+ locals_to_params,
+)
+from playwright._impl._str_utils import escape_regex_flags
from playwright._impl._waiter import Waiter
if TYPE_CHECKING: # pragma: no cover
@@ -310,6 +317,7 @@ def __init__(
self, parent: ChannelOwner, type: str, guid: str, initializer: Dict
) -> None:
super().__init__(parent, type, guid, initializer)
+ self.mark_as_internal_type()
self._handling_future: Optional[asyncio.Future["bool"]] = None
self._context: "BrowserContext" = cast("BrowserContext", None)
self._did_throw = False
@@ -342,7 +350,6 @@ async def abort(self, errorCode: str = None) -> None:
"abort",
{
"errorCode": errorCode,
- "requestUrl": self.request._initializer["url"],
},
)
)
@@ -425,7 +432,6 @@ async def _inner_fulfill(
if length and "content-length" not in headers:
headers["content-length"] = str(length)
params["headers"] = serialize_headers(headers)
- params["requestUrl"] = self.request._initializer["url"]
await self._race_with_page_close(self._channel.send("fulfill", params))
@@ -484,43 +490,30 @@ async def continue_(
async def _inner() -> None:
self.request._apply_fallback_overrides(overrides)
- await self._internal_continue()
+ await self._inner_continue(False)
return await self._handle_route(_inner)
- def _internal_continue(
- self, is_internal: bool = False
- ) -> Coroutine[Any, Any, None]:
- async def continue_route() -> None:
- try:
- params: Dict[str, Any] = {}
- params["url"] = self.request._fallback_overrides.url
- params["method"] = self.request._fallback_overrides.method
- params["headers"] = self.request._fallback_overrides.headers
- if self.request._fallback_overrides.post_data_buffer is not None:
- params["postData"] = base64.b64encode(
- self.request._fallback_overrides.post_data_buffer
- ).decode()
- params = locals_to_params(params)
-
- if "headers" in params:
- params["headers"] = serialize_headers(params["headers"])
- params["requestUrl"] = self.request._initializer["url"]
- params["isFallback"] = is_internal
- await self._connection.wrap_api_call(
- lambda: self._race_with_page_close(
- self._channel.send(
- "continue",
- params,
- )
+ async def _inner_continue(self, is_fallback: bool = False) -> None:
+ options = self.request._fallback_overrides
+ await self._race_with_page_close(
+ self._channel.send(
+ "continue",
+ {
+ "url": options.url,
+ "method": options.method,
+ "headers": (
+ serialize_headers(options.headers) if options.headers else None
),
- is_internal,
- )
- except Exception as e:
- if not is_internal:
- raise e
-
- return continue_route()
+ "postData": (
+ base64.b64encode(options.post_data_buffer).decode()
+ if options.post_data_buffer is not None
+ else None
+ ),
+ "isFallback": is_fallback,
+ },
+ )
+ )
async def _redirected_navigation_request(self, url: str) -> None:
await self._handle_route(
@@ -548,6 +541,205 @@ async def _race_with_page_close(self, future: Coroutine) -> None:
await asyncio.gather(fut, return_exceptions=True)
+def _create_task_and_ignore_exception(coro: Coroutine) -> None:
+ async def _ignore_exception() -> None:
+ try:
+ await coro
+ except Exception:
+ pass
+
+ asyncio.create_task(_ignore_exception())
+
+
+class ServerWebSocketRoute:
+ def __init__(self, ws: "WebSocketRoute"):
+ self._ws = ws
+
+ def on_message(self, handler: Callable[[Union[str, bytes]], Any]) -> None:
+ self._ws._on_server_message = handler
+
+ def on_close(self, handler: Callable[[Optional[int], Optional[str]], Any]) -> None:
+ self._ws._on_server_close = handler
+
+ def connect_to_server(self) -> None:
+ raise NotImplementedError(
+ "connectToServer must be called on the page-side WebSocketRoute"
+ )
+
+ @property
+ def url(self) -> str:
+ return self._ws._initializer["url"]
+
+ def close(self, code: int = None, reason: str = None) -> None:
+ _create_task_and_ignore_exception(
+ self._ws._channel.send(
+ "closeServer",
+ {
+ "code": code,
+ "reason": reason,
+ "wasClean": True,
+ },
+ )
+ )
+
+ def send(self, message: Union[str, bytes]) -> None:
+ if isinstance(message, str):
+ _create_task_and_ignore_exception(
+ self._ws._channel.send(
+ "sendToServer", {"message": message, "isBase64": False}
+ )
+ )
+ else:
+ _create_task_and_ignore_exception(
+ self._ws._channel.send(
+ "sendToServer",
+ {"message": base64.b64encode(message).decode(), "isBase64": True},
+ )
+ )
+
+
+class WebSocketRoute(ChannelOwner):
+ def __init__(
+ self, parent: ChannelOwner, type: str, guid: str, initializer: Dict
+ ) -> None:
+ super().__init__(parent, type, guid, initializer)
+ self.mark_as_internal_type()
+ self._on_page_message: Optional[Callable[[Union[str, bytes]], Any]] = None
+ self._on_page_close: Optional[Callable[[Optional[int], Optional[str]], Any]] = (
+ None
+ )
+ self._on_server_message: Optional[Callable[[Union[str, bytes]], Any]] = None
+ self._on_server_close: Optional[
+ Callable[[Optional[int], Optional[str]], Any]
+ ] = None
+ self._server = ServerWebSocketRoute(self)
+ self._connected = False
+
+ self._channel.on("messageFromPage", self._channel_message_from_page)
+ self._channel.on("messageFromServer", self._channel_message_from_server)
+ self._channel.on("closePage", self._channel_close_page)
+ self._channel.on("closeServer", self._channel_close_server)
+
+ def _channel_message_from_page(self, event: Dict) -> None:
+ if self._on_page_message:
+ self._on_page_message(
+ base64.b64decode(event["message"])
+ if event["isBase64"]
+ else event["message"]
+ )
+ elif self._connected:
+ _create_task_and_ignore_exception(self._channel.send("sendToServer", event))
+
+ def _channel_message_from_server(self, event: Dict) -> None:
+ if self._on_server_message:
+ self._on_server_message(
+ base64.b64decode(event["message"])
+ if event["isBase64"]
+ else event["message"]
+ )
+ else:
+ _create_task_and_ignore_exception(self._channel.send("sendToPage", event))
+
+ def _channel_close_page(self, event: Dict) -> None:
+ if self._on_page_close:
+ self._on_page_close(event["code"], event["reason"])
+ else:
+ _create_task_and_ignore_exception(self._channel.send("closeServer", event))
+
+ def _channel_close_server(self, event: Dict) -> None:
+ if self._on_server_close:
+ self._on_server_close(event["code"], event["reason"])
+ else:
+ _create_task_and_ignore_exception(self._channel.send("closePage", event))
+
+ @property
+ def url(self) -> str:
+ return self._initializer["url"]
+
+ async def close(self, code: int = None, reason: str = None) -> None:
+ try:
+ await self._channel.send(
+ "closePage", {"code": code, "reason": reason, "wasClean": True}
+ )
+ except Exception:
+ pass
+
+ def connect_to_server(self) -> "WebSocketRoute":
+ if self._connected:
+ raise Error("Already connected to the server")
+ self._connected = True
+ asyncio.create_task(self._channel.send("connect"))
+ return cast("WebSocketRoute", self._server)
+
+ def send(self, message: Union[str, bytes]) -> None:
+ if isinstance(message, str):
+ _create_task_and_ignore_exception(
+ self._channel.send(
+ "sendToPage", {"message": message, "isBase64": False}
+ )
+ )
+ else:
+ _create_task_and_ignore_exception(
+ self._channel.send(
+ "sendToPage",
+ {
+ "message": base64.b64encode(message).decode(),
+ "isBase64": True,
+ },
+ )
+ )
+
+ def on_message(self, handler: Callable[[Union[str, bytes]], Any]) -> None:
+ self._on_page_message = handler
+
+ def on_close(self, handler: Callable[[Optional[int], Optional[str]], Any]) -> None:
+ self._on_page_close = handler
+
+ async def _after_handle(self) -> None:
+ if self._connected:
+ return
+ # Ensure that websocket is "open" and can send messages without an actual server connection.
+ await self._channel.send("ensureOpened")
+
+
+class WebSocketRouteHandler:
+ def __init__(self, matcher: URLMatcher, handler: WebSocketRouteHandlerCallback):
+ self.matcher = matcher
+ self.handler = handler
+
+ @staticmethod
+ def prepare_interception_patterns(
+ handlers: List["WebSocketRouteHandler"],
+ ) -> List[dict]:
+ patterns = []
+ all_urls = False
+ for handler in handlers:
+ if isinstance(handler.matcher.match, str):
+ patterns.append({"glob": handler.matcher.match})
+ elif isinstance(handler.matcher._regex_obj, re.Pattern):
+ patterns.append(
+ {
+ "regexSource": handler.matcher._regex_obj.pattern,
+ "regexFlags": escape_regex_flags(handler.matcher._regex_obj),
+ }
+ )
+ else:
+ all_urls = True
+
+ if all_urls:
+ return [{"glob": "**/*"}]
+ return patterns
+
+ def matches(self, ws_url: str) -> bool:
+ return self.matcher.matches(ws_url)
+
+ async def handle(self, websocket_route: "WebSocketRoute") -> None:
+ coro_or_future = self.handler(websocket_route)
+ if asyncio.iscoroutine(coro_or_future):
+ await coro_or_future
+ await websocket_route._after_handle()
+
+
class Response(ChannelOwner):
def __init__(
self, parent: ChannelOwner, type: str, guid: str, initializer: Dict
diff --git a/playwright/_impl/_object_factory.py b/playwright/_impl/_object_factory.py
index 2652e41fe..5f38b781b 100644
--- a/playwright/_impl/_object_factory.py
+++ b/playwright/_impl/_object_factory.py
@@ -26,7 +26,13 @@
from playwright._impl._frame import Frame
from playwright._impl._js_handle import JSHandle
from playwright._impl._local_utils import LocalUtils
-from playwright._impl._network import Request, Response, Route, WebSocket
+from playwright._impl._network import (
+ Request,
+ Response,
+ Route,
+ WebSocket,
+ WebSocketRoute,
+)
from playwright._impl._page import BindingCall, Page, Worker
from playwright._impl._playwright import Playwright
from playwright._impl._selectors import SelectorsOwner
@@ -88,6 +94,8 @@ def create_remote_object(
return Tracing(parent, type, guid, initializer)
if type == "WebSocket":
return WebSocket(parent, type, guid, initializer)
+ if type == "WebSocketRoute":
+ return WebSocketRoute(parent, type, guid, initializer)
if type == "Worker":
return Worker(parent, type, guid, initializer)
if type == "WritableStream":
diff --git a/playwright/_impl/_page.py b/playwright/_impl/_page.py
index 88c6da720..15195b28b 100644
--- a/playwright/_impl/_page.py
+++ b/playwright/_impl/_page.py
@@ -74,6 +74,7 @@
URLMatcher,
URLMatchRequest,
URLMatchResponse,
+ WebSocketRouteHandlerCallback,
async_readfile,
async_writefile,
locals_to_params,
@@ -88,7 +89,14 @@
parse_result,
serialize_argument,
)
-from playwright._impl._network import Request, Response, Route, serialize_headers
+from playwright._impl._network import (
+ Request,
+ Response,
+ Route,
+ WebSocketRoute,
+ WebSocketRouteHandler,
+ serialize_headers,
+)
from playwright._impl._video import Video
from playwright._impl._waiter import Waiter
@@ -163,6 +171,7 @@ def __init__(
self._workers: List["Worker"] = []
self._bindings: Dict[str, Any] = {}
self._routes: List[RouteHandler] = []
+ self._web_socket_routes: List[WebSocketRouteHandler] = []
self._owned_context: Optional["BrowserContext"] = None
self._timeout_settings: TimeoutSettings = TimeoutSettings(
self._browser_context._timeout_settings
@@ -210,6 +219,12 @@ def __init__(
self._on_route(from_channel(params["route"]))
),
)
+ self._channel.on(
+ "webSocketRoute",
+ lambda params: self._loop.create_task(
+ self._on_web_socket_route(from_channel(params["webSocketRoute"]))
+ ),
+ )
self._channel.on("video", lambda params: self._on_video(params))
self._channel.on(
"webSocket",
@@ -298,6 +313,20 @@ async def _update_interceptor_patterns_ignore_exceptions() -> None:
return
await self._browser_context._on_route(route)
+ async def _on_web_socket_route(self, web_socket_route: WebSocketRoute) -> None:
+ route_handler = next(
+ (
+ route_handler
+ for route_handler in self._web_socket_routes
+ if route_handler.matches(web_socket_route.url)
+ ),
+ None,
+ )
+ if route_handler:
+ await route_handler.handle(web_socket_route)
+ else:
+ await self._browser_context._on_web_socket_route(web_socket_route)
+
def _on_binding(self, binding_call: "BindingCall") -> None:
func = self._bindings.get(binding_call._initializer["name"])
if func:
@@ -572,6 +601,9 @@ async def go_forward(
await self._channel.send("goForward", locals_to_params(locals()))
)
+ async def request_gc(self) -> None:
+ await self._channel.send("requestGC")
+
async def emulate_media(
self,
media: Literal["null", "print", "screen"] = None,
@@ -661,6 +693,17 @@ async def _unroute_internal(
)
)
+ async def route_web_socket(
+ self, url: URLMatch, handler: WebSocketRouteHandlerCallback
+ ) -> None:
+ self._web_socket_routes.insert(
+ 0,
+ WebSocketRouteHandler(
+ URLMatcher(self._browser_context._options.get("baseURL"), url), handler
+ ),
+ )
+ await self._update_web_socket_interception_patterns()
+
def _dispose_har_routers(self) -> None:
for router in self._har_routers:
router.dispose()
@@ -705,6 +748,14 @@ async def _update_interception_patterns(self) -> None:
"setNetworkInterceptionPatterns", {"patterns": patterns}
)
+ async def _update_web_socket_interception_patterns(self) -> None:
+ patterns = WebSocketRouteHandler.prepare_interception_patterns(
+ self._web_socket_routes
+ )
+ await self._channel.send(
+ "setWebSocketInterceptionPatterns", {"patterns": patterns}
+ )
+
async def screenshot(
self,
timeout: float = None,
diff --git a/playwright/_impl/_tracing.py b/playwright/_impl/_tracing.py
index b2d4b5df9..5c59b749f 100644
--- a/playwright/_impl/_tracing.py
+++ b/playwright/_impl/_tracing.py
@@ -25,6 +25,7 @@ def __init__(
self, parent: ChannelOwner, type: str, guid: str, initializer: Dict
) -> None:
super().__init__(parent, type, guid, initializer)
+ self.mark_as_internal_type()
self._include_sources: bool = False
self._stacks_id: Optional[str] = None
self._is_tracing: bool = False
@@ -41,13 +42,10 @@ async def start(
params = locals_to_params(locals())
self._include_sources = bool(sources)
- async def _inner_start() -> str:
- await self._channel.send("tracingStart", params)
- return await self._channel.send(
- "tracingStartChunk", {"title": title, "name": name}
- )
-
- trace_name = await self._connection.wrap_api_call(_inner_start, True)
+ await self._channel.send("tracingStart", params)
+ trace_name = await self._channel.send(
+ "tracingStartChunk", {"title": title, "name": name}
+ )
await self._start_collecting_stacks(trace_name)
async def start_chunk(self, title: str = None, name: str = None) -> None:
@@ -64,14 +62,11 @@ async def _start_collecting_stacks(self, trace_name: str) -> None:
)
async def stop_chunk(self, path: Union[pathlib.Path, str] = None) -> None:
- await self._connection.wrap_api_call(lambda: self._do_stop_chunk(path), True)
+ await self._do_stop_chunk(path)
async def stop(self, path: Union[pathlib.Path, str] = None) -> None:
- async def _inner() -> None:
- await self._do_stop_chunk(path)
- await self._channel.send("tracingStop")
-
- await self._connection.wrap_api_call(_inner, True)
+ await self._do_stop_chunk(path)
+ await self._channel.send("tracingStop")
async def _do_stop_chunk(self, file_path: Union[pathlib.Path, str] = None) -> None:
self._reset_stack_counter()
diff --git a/playwright/async_api/__init__.py b/playwright/async_api/__init__.py
index 12ea5febd..a64a066c2 100644
--- a/playwright/async_api/__init__.py
+++ b/playwright/async_api/__init__.py
@@ -61,6 +61,7 @@
Touchscreen,
Video,
WebSocket,
+ WebSocketRoute,
Worker,
)
@@ -190,5 +191,6 @@ def __call__(
"Video",
"ViewportSize",
"WebSocket",
+ "WebSocketRoute",
"Worker",
]
diff --git a/playwright/async_api/_generated.py b/playwright/async_api/_generated.py
index 1d4badbe7..3730d8127 100644
--- a/playwright/async_api/_generated.py
+++ b/playwright/async_api/_generated.py
@@ -75,6 +75,7 @@
from playwright._impl._network import Response as ResponseImpl
from playwright._impl._network import Route as RouteImpl
from playwright._impl._network import WebSocket as WebSocketImpl
+from playwright._impl._network import WebSocketRoute as WebSocketRouteImpl
from playwright._impl._page import Page as PageImpl
from playwright._impl._page import Worker as WorkerImpl
from playwright._impl._playwright import Playwright as PlaywrightImpl
@@ -1146,6 +1147,133 @@ def is_closed(self) -> bool:
mapping.register(WebSocketImpl, WebSocket)
+class WebSocketRoute(AsyncBase):
+
+ @property
+ def url(self) -> str:
+ """WebSocketRoute.url
+
+ URL of the WebSocket created in the page.
+
+ Returns
+ -------
+ str
+ """
+ return mapping.from_maybe_impl(self._impl_obj.url)
+
+ async def close(
+ self, *, code: typing.Optional[int] = None, reason: typing.Optional[str] = None
+ ) -> None:
+ """WebSocketRoute.close
+
+ Closes one side of the WebSocket connection.
+
+ Parameters
+ ----------
+ code : Union[int, None]
+ Optional [close code](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/close#code).
+ reason : Union[str, None]
+ Optional [close reason](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/close#reason).
+ """
+
+ return mapping.from_maybe_impl(
+ await self._impl_obj.close(code=code, reason=reason)
+ )
+
+ def connect_to_server(self) -> "WebSocketRoute":
+ """WebSocketRoute.connect_to_server
+
+ By default, routed WebSocket does not connect to the server, so you can mock entire WebSocket communication. This
+ method connects to the actual WebSocket server, and returns the server-side `WebSocketRoute` instance, giving the
+ ability to send and receive messages from the server.
+
+ Once connected to the server:
+ - Messages received from the server will be **automatically forwarded** to the WebSocket in the page, unless
+ `web_socket_route.on_message()` is called on the server-side `WebSocketRoute`.
+ - Messages sent by the [`WebSocket.send()`](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/send) call
+ in the page will be **automatically forwarded** to the server, unless `web_socket_route.on_message()` is
+ called on the original `WebSocketRoute`.
+
+ See examples at the top for more details.
+
+ Returns
+ -------
+ WebSocketRoute
+ """
+
+ return mapping.from_impl(self._impl_obj.connect_to_server())
+
+ def send(self, message: typing.Union[str, bytes]) -> None:
+ """WebSocketRoute.send
+
+ Sends a message to the WebSocket. When called on the original WebSocket, sends the message to the page. When called
+ on the result of `web_socket_route.connect_to_server()`, sends the message to the server. See examples at the
+ top for more details.
+
+ Parameters
+ ----------
+ message : Union[bytes, str]
+ Message to send.
+ """
+
+ return mapping.from_maybe_impl(self._impl_obj.send(message=message))
+
+ def on_message(
+ self, handler: typing.Callable[[typing.Union[str, bytes]], typing.Any]
+ ) -> None:
+ """WebSocketRoute.on_message
+
+ This method allows to handle messages that are sent by the WebSocket, either from the page or from the server.
+
+ When called on the original WebSocket route, this method handles messages sent from the page. You can handle this
+ messages by responding to them with `web_socket_route.send()`, forwarding them to the server-side connection
+ returned by `web_socket_route.connect_to_server()` or do something else.
+
+ Once this method is called, messages are not automatically forwarded to the server or to the page - you should do
+ that manually by calling `web_socket_route.send()`. See examples at the top for more details.
+
+ Calling this method again will override the handler with a new one.
+
+ Parameters
+ ----------
+ handler : Callable[[Union[bytes, str]], Any]
+ Function that will handle messages.
+ """
+
+ return mapping.from_maybe_impl(
+ self._impl_obj.on_message(handler=self._wrap_handler(handler))
+ )
+
+ def on_close(
+ self,
+ handler: typing.Callable[
+ [typing.Optional[int], typing.Optional[str]], typing.Any
+ ],
+ ) -> None:
+ """WebSocketRoute.on_close
+
+ Allows to handle [`WebSocket.close`](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/close).
+
+ By default, closing one side of the connection, either in the page or on the server, will close the other side.
+ However, when `web_socket_route.on_close()` handler is set up, the default forwarding of closure is disabled,
+ and handler should take care of it.
+
+ Parameters
+ ----------
+ handler : Callable[[Union[int, None], Union[str, None]], Any]
+ Function that will handle WebSocket closure. Received an optional
+ [close code](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/close#code) and an optional
+ [close reason](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/close#reason).
+ """
+
+ return mapping.from_maybe_impl(
+ self._impl_obj.on_close(handler=self._wrap_handler(handler))
+ )
+
+
+mapping.register(WebSocketRouteImpl, WebSocketRoute)
+
+
class Keyboard(AsyncBase):
async def down(self, key: str) -> None:
@@ -4212,7 +4340,9 @@ async def click(
element, the call throws an exception.
trial : Union[bool, None]
When set, this method only performs the [actionability](../actionability.md) checks and skips the action. Defaults
- to `false`. Useful to wait until the element is ready for the action without performing it.
+ to `false`. Useful to wait until the element is ready for the action without performing it. Note that keyboard
+ `modifiers` will be pressed regardless of `trial` to allow testing elements which are only visible when those keys
+ are pressed.
"""
return mapping.from_maybe_impl(
@@ -4291,7 +4421,9 @@ async def dblclick(
element, the call throws an exception.
trial : Union[bool, None]
When set, this method only performs the [actionability](../actionability.md) checks and skips the action. Defaults
- to `false`. Useful to wait until the element is ready for the action without performing it.
+ to `false`. Useful to wait until the element is ready for the action without performing it. Note that keyboard
+ `modifiers` will be pressed regardless of `trial` to allow testing elements which are only visible when those keys
+ are pressed.
"""
return mapping.from_maybe_impl(
@@ -4362,7 +4494,9 @@ async def tap(
element, the call throws an exception.
trial : Union[bool, None]
When set, this method only performs the [actionability](../actionability.md) checks and skips the action. Defaults
- to `false`. Useful to wait until the element is ready for the action without performing it.
+ to `false`. Useful to wait until the element is ready for the action without performing it. Note that keyboard
+ `modifiers` will be pressed regardless of `trial` to allow testing elements which are only visible when those keys
+ are pressed.
"""
return mapping.from_maybe_impl(
@@ -4761,6 +4895,7 @@ def get_by_role(
**NOTE** Unlike most other attributes, `disabled` is inherited through the DOM hierarchy. Learn more about
[`aria-disabled`](https://www.w3.org/TR/wai-aria-1.2/#aria-disabled).
+
expanded : Union[bool, None]
An attribute that is usually set by `aria-expanded`.
@@ -5202,7 +5337,9 @@ async def hover(
element, the call throws an exception.
trial : Union[bool, None]
When set, this method only performs the [actionability](../actionability.md) checks and skips the action. Defaults
- to `false`. Useful to wait until the element is ready for the action without performing it.
+ to `false`. Useful to wait until the element is ready for the action without performing it. Note that keyboard
+ `modifiers` will be pressed regardless of `trial` to allow testing elements which are only visible when those keys
+ are pressed.
"""
return mapping.from_maybe_impl(
@@ -5902,7 +6039,7 @@ def owner(self) -> "Locator":
**Usage**
```py
- frame_locator = page.frame_locator(\"iframe[name=\\\"embedded\\\"]\")
+ frame_locator = page.locator(\"iframe[name=\\\"embedded\\\"]\").content_frame
# ...
locator = frame_locator.owner
await expect(locator).to_be_visible()
@@ -6240,6 +6377,7 @@ def get_by_role(
**NOTE** Unlike most other attributes, `disabled` is inherited through the DOM hierarchy. Learn more about
[`aria-disabled`](https://www.w3.org/TR/wai-aria-1.2/#aria-disabled).
+
expanded : Union[bool, None]
An attribute that is usually set by `aria-expanded`.
@@ -9090,6 +9228,28 @@ async def go_forward(
await self._impl_obj.go_forward(timeout=timeout, waitUntil=wait_until)
)
+ async def request_gc(self) -> None:
+ """Page.request_gc
+
+ Request the page to perform garbage collection. Note that there is no guarantee that all unreachable objects will
+ be collected.
+
+ This is useful to help detect memory leaks. For example, if your page has a large object `'suspect'` that might be
+ leaked, you can check that it does not leak by using a
+ [`WeakRef`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/WeakRef).
+
+ ```py
+ # 1. In your page, save a WeakRef for the \"suspect\".
+ await page.evaluate(\"globalThis.suspectWeakRef = new WeakRef(suspect)\")
+ # 2. Request garbage collection.
+ await page.request_gc()
+ # 3. Check that weak ref does not deref to the original object.
+ assert await page.evaluate(\"!globalThis.suspectWeakRef.deref()\")
+ ```
+ """
+
+ return mapping.from_maybe_impl(await self._impl_obj.request_gc())
+
async def emulate_media(
self,
*,
@@ -9259,7 +9419,7 @@ async def route(
**NOTE** `page.route()` will not intercept requests intercepted by Service Worker. See
[this](https://github.com/microsoft/playwright/issues/1090) issue. We recommend disabling Service Workers when
- using request interception by setting `Browser.newContext.serviceWorkers` to `'block'`.
+ using request interception by setting `serviceWorkers` to `'block'`.
**NOTE** `page.route()` will not intercept the first request of a popup page. Use
`browser_context.route()` instead.
@@ -9352,6 +9512,49 @@ async def unroute(
)
)
+ async def route_web_socket(
+ self,
+ url: typing.Union[str, typing.Pattern[str], typing.Callable[[str], bool]],
+ handler: typing.Callable[["WebSocketRoute"], typing.Any],
+ ) -> None:
+ """Page.route_web_socket
+
+ This method allows to modify websocket connections that are made by the page.
+
+ Note that only `WebSocket`s created after this method was called will be routed. It is recommended to call this
+ method before navigating the page.
+
+ **Usage**
+
+ Below is an example of a simple mock that responds to a single message. See `WebSocketRoute` for more details and
+ examples.
+
+ ```py
+ def message_handler(ws: WebSocketRoute, message: Union[str, bytes]):
+ if message == \"request\":
+ ws.send(\"response\")
+
+ def handler(ws: WebSocketRoute):
+ ws.on_message(lambda message: message_handler(ws, message))
+
+ await page.route_web_socket(\"/ws\", handler)
+ ```
+
+ Parameters
+ ----------
+ url : Union[Callable[[str], bool], Pattern[str], str]
+ Only WebSockets with the url matching this pattern will be routed. A string pattern can be relative to the
+ `baseURL` context option.
+ handler : Callable[[WebSocketRoute], Any]
+ Handler function to route the WebSocket.
+ """
+
+ return mapping.from_maybe_impl(
+ await self._impl_obj.route_web_socket(
+ url=self._wrap_handler(url), handler=self._wrap_handler(handler)
+ )
+ )
+
async def unroute_all(
self,
*,
@@ -9393,7 +9596,7 @@ async def route_from_har(
Playwright will not serve requests intercepted by Service Worker from the HAR file. See
[this](https://github.com/microsoft/playwright/issues/1090) issue. We recommend disabling Service Workers when
- using request interception by setting `Browser.newContext.serviceWorkers` to `'block'`.
+ using request interception by setting `serviceWorkers` to `'block'`.
Parameters
----------
@@ -9636,7 +9839,9 @@ async def click(
Deprecated: This option will default to `true` in the future.
trial : Union[bool, None]
When set, this method only performs the [actionability](../actionability.md) checks and skips the action. Defaults
- to `false`. Useful to wait until the element is ready for the action without performing it.
+ to `false`. Useful to wait until the element is ready for the action without performing it. Note that keyboard
+ `modifiers` will be pressed regardless of `trial` to allow testing elements which are only visible when those keys
+ are pressed.
strict : Union[bool, None]
When true, the call requires selector to resolve to a single element. If given selector resolves to more than one
element, the call throws an exception.
@@ -9717,7 +9922,9 @@ async def dblclick(
element, the call throws an exception.
trial : Union[bool, None]
When set, this method only performs the [actionability](../actionability.md) checks and skips the action. Defaults
- to `false`. Useful to wait until the element is ready for the action without performing it.
+ to `false`. Useful to wait until the element is ready for the action without performing it. Note that keyboard
+ `modifiers` will be pressed regardless of `trial` to allow testing elements which are only visible when those keys
+ are pressed.
"""
return mapping.from_maybe_impl(
@@ -9788,7 +9995,9 @@ async def tap(
element, the call throws an exception.
trial : Union[bool, None]
When set, this method only performs the [actionability](../actionability.md) checks and skips the action. Defaults
- to `false`. Useful to wait until the element is ready for the action without performing it.
+ to `false`. Useful to wait until the element is ready for the action without performing it. Note that keyboard
+ `modifiers` will be pressed regardless of `trial` to allow testing elements which are only visible when those keys
+ are pressed.
"""
return mapping.from_maybe_impl(
@@ -10185,6 +10394,7 @@ def get_by_role(
**NOTE** Unlike most other attributes, `disabled` is inherited through the DOM hierarchy. Learn more about
[`aria-disabled`](https://www.w3.org/TR/wai-aria-1.2/#aria-disabled).
+
expanded : Union[bool, None]
An attribute that is usually set by `aria-expanded`.
@@ -10626,7 +10836,9 @@ async def hover(
element, the call throws an exception.
trial : Union[bool, None]
When set, this method only performs the [actionability](../actionability.md) checks and skips the action. Defaults
- to `false`. Useful to wait until the element is ready for the action without performing it.
+ to `false`. Useful to wait until the element is ready for the action without performing it. Note that keyboard
+ `modifiers` will be pressed regardless of `trial` to allow testing elements which are only visible when those keys
+ are pressed.
"""
return mapping.from_maybe_impl(
@@ -11254,8 +11466,7 @@ async def pause(self) -> None:
User can inspect selectors or perform manual steps while paused. Resume will continue running the original script
from the place it was paused.
- **NOTE** This method requires Playwright to be started in a headed mode, with a falsy `headless` value in the
- `browser_type.launch()`.
+ **NOTE** This method requires Playwright to be started in a headed mode, with a falsy `headless` option.
"""
return mapping.from_maybe_impl(await self._impl_obj.pause())
@@ -11916,13 +12127,16 @@ async def add_locator_handler(
**NOTE** Running the handler will alter your page state mid-test. For example it will change the currently focused
element and move the mouse. Make sure that actions that run after the handler are self-contained and do not rely on
- the focus and mouse state being unchanged.
For example, consider a test that calls
- `locator.focus()` followed by `keyboard.press()`. If your handler clicks a button between these two
- actions, the focused element most likely will be wrong, and key press will happen on the unexpected element. Use
- `locator.press()` instead to avoid this problem.
Another example is a series of mouse
- actions, where `mouse.move()` is followed by `mouse.down()`. Again, when the handler runs between
- these two actions, the mouse position will be wrong during the mouse down. Prefer self-contained actions like
- `locator.click()` that do not rely on the state being unchanged by a handler.
+ the focus and mouse state being unchanged.
+
+ For example, consider a test that calls `locator.focus()` followed by `keyboard.press()`. If your
+ handler clicks a button between these two actions, the focused element most likely will be wrong, and key press
+ will happen on the unexpected element. Use `locator.press()` instead to avoid this problem.
+
+ Another example is a series of mouse actions, where `mouse.move()` is followed by `mouse.down()`.
+ Again, when the handler runs between these two actions, the mouse position will be wrong during the mouse down.
+ Prefer self-contained actions like `locator.click()` that do not rely on the state being unchanged by a
+ handler.
**Usage**
@@ -12931,7 +13145,7 @@ async def route(
**NOTE** `browser_context.route()` will not intercept requests intercepted by Service Worker. See
[this](https://github.com/microsoft/playwright/issues/1090) issue. We recommend disabling Service Workers when
- using request interception by setting `Browser.newContext.serviceWorkers` to `'block'`.
+ using request interception by setting `serviceWorkers` to `'block'`.
**Usage**
@@ -13025,6 +13239,51 @@ async def unroute(
)
)
+ async def route_web_socket(
+ self,
+ url: typing.Union[str, typing.Pattern[str], typing.Callable[[str], bool]],
+ handler: typing.Callable[["WebSocketRoute"], typing.Any],
+ ) -> None:
+ """BrowserContext.route_web_socket
+
+ This method allows to modify websocket connections that are made by any page in the browser context.
+
+ Note that only `WebSocket`s created after this method was called will be routed. It is recommended to call this
+ method before creating any pages.
+
+ **Usage**
+
+ Below is an example of a simple handler that blocks some websocket messages. See `WebSocketRoute` for more details
+ and examples.
+
+ ```py
+ def message_handler(ws: WebSocketRoute, message: Union[str, bytes]):
+ if message == \"to-be-blocked\":
+ return
+ ws.send(message)
+
+ async def handler(ws: WebSocketRoute):
+ ws.route_send(lambda message: message_handler(ws, message))
+ await ws.connect()
+
+ await context.route_web_socket(\"/ws\", handler)
+ ```
+
+ Parameters
+ ----------
+ url : Union[Callable[[str], bool], Pattern[str], str]
+ Only WebSockets with the url matching this pattern will be routed. A string pattern can be relative to the
+ `baseURL` context option.
+ handler : Callable[[WebSocketRoute], Any]
+ Handler function to route the WebSocket.
+ """
+
+ return mapping.from_maybe_impl(
+ await self._impl_obj.route_web_socket(
+ url=self._wrap_handler(url), handler=self._wrap_handler(handler)
+ )
+ )
+
async def unroute_all(
self,
*,
@@ -13066,7 +13325,7 @@ async def route_from_har(
Playwright will not serve requests intercepted by Service Worker from the HAR file. See
[this](https://github.com/microsoft/playwright/issues/1090) issue. We recommend disabling Service Workers when
- using request interception by setting `Browser.newContext.serviceWorkers` to `'block'`.
+ using request interception by setting `serviceWorkers` to `'block'`.
Parameters
----------
@@ -13616,11 +13875,10 @@ async def new_context(
`passphrase` property should be provided if the certificate is encrypted. The `origin` property should be provided
with an exact match to the request origin that the certificate is valid for.
- **NOTE** Using Client Certificates in combination with Proxy Servers is not supported.
-
**NOTE** When using WebKit on macOS, accessing `localhost` will not pick up client certificates. You can make it
work by replacing `localhost` with `local.playwright`.
+
Returns
-------
BrowserContext
@@ -13842,11 +14100,10 @@ async def new_page(
`passphrase` property should be provided if the certificate is encrypted. The `origin` property should be provided
with an exact match to the request origin that the certificate is valid for.
- **NOTE** Using Client Certificates in combination with Proxy Servers is not supported.
-
**NOTE** When using WebKit on macOS, accessing `localhost` will not pick up client certificates. You can make it
work by replacing `localhost` with `local.playwright`.
+
Returns
-------
Page
@@ -14402,11 +14659,10 @@ async def launch_persistent_context(
`passphrase` property should be provided if the certificate is encrypted. The `origin` property should be provided
with an exact match to the request origin that the certificate is valid for.
- **NOTE** Using Client Certificates in combination with Proxy Servers is not supported.
-
**NOTE** When using WebKit on macOS, accessing `localhost` will not pick up client certificates. You can make it
work by replacing `localhost` with `local.playwright`.
+
Returns
-------
BrowserContext
@@ -14733,8 +14989,8 @@ async def start(
----------
name : Union[str, None]
If specified, intermediate trace files are going to be saved into the files with the given name prefix inside the
- `tracesDir` folder specified in `browser_type.launch()`. To specify the final trace zip file name, you need
- to pass `path` option to `tracing.stop()` instead.
+ `tracesDir` directory specified in `browser_type.launch()`. To specify the final trace zip file name, you
+ need to pass `path` option to `tracing.stop()` instead.
title : Union[str, None]
Trace name to be shown in the Trace Viewer.
snapshots : Union[bool, None]
@@ -14790,8 +15046,8 @@ async def start_chunk(
Trace name to be shown in the Trace Viewer.
name : Union[str, None]
If specified, intermediate trace files are going to be saved into the files with the given name prefix inside the
- `tracesDir` folder specified in `browser_type.launch()`. To specify the final trace zip file name, you need
- to pass `path` option to `tracing.stop_chunk()` instead.
+ `tracesDir` directory specified in `browser_type.launch()`. To specify the final trace zip file name, you
+ need to pass `path` option to `tracing.stop_chunk()` instead.
"""
return mapping.from_maybe_impl(
@@ -15082,7 +15338,9 @@ async def click(
Deprecated: This option will default to `true` in the future.
trial : Union[bool, None]
When set, this method only performs the [actionability](../actionability.md) checks and skips the action. Defaults
- to `false`. Useful to wait until the element is ready for the action without performing it.
+ to `false`. Useful to wait until the element is ready for the action without performing it. Note that keyboard
+ `modifiers` will be pressed regardless of `trial` to allow testing elements which are only visible when those keys
+ are pressed.
"""
return mapping.from_maybe_impl(
@@ -15154,7 +15412,9 @@ async def dblclick(
Deprecated: This option has no effect.
trial : Union[bool, None]
When set, this method only performs the [actionability](../actionability.md) checks and skips the action. Defaults
- to `false`. Useful to wait until the element is ready for the action without performing it.
+ to `false`. Useful to wait until the element is ready for the action without performing it. Note that keyboard
+ `modifiers` will be pressed regardless of `trial` to allow testing elements which are only visible when those keys
+ are pressed.
"""
return mapping.from_maybe_impl(
@@ -15793,6 +16053,7 @@ def get_by_role(
**NOTE** Unlike most other attributes, `disabled` is inherited through the DOM hierarchy. Learn more about
[`aria-disabled`](https://www.w3.org/TR/wai-aria-1.2/#aria-disabled).
+
expanded : Union[bool, None]
An attribute that is usually set by `aria-expanded`.
@@ -16131,7 +16392,10 @@ def filter(
def or_(self, locator: "Locator") -> "Locator":
"""Locator.or_
- Creates a locator that matches either of the two locators.
+ Creates a locator matching all elements that match one or both of the two locators.
+
+ Note that when both locators match something, the resulting locator will have multiple matches and violate
+ [locator strictness](https://playwright.dev/python/docs/locators#strictness) guidelines.
**Usage**
@@ -16219,9 +16483,13 @@ async def all(self) -> typing.List["Locator"]:
elements.
**NOTE** `locator.all()` does not wait for elements to match the locator, and instead immediately returns
- whatever is present in the page. When the list of elements changes dynamically, `locator.all()` will
- produce unpredictable and flaky results. When the list of elements is stable, but loaded dynamically, wait for the
- full list to finish loading before calling `locator.all()`.
+ whatever is present in the page.
+
+ When the list of elements changes dynamically, `locator.all()` will produce unpredictable and flaky
+ results.
+
+ When the list of elements is stable, but loaded dynamically, wait for the full list to finish loading before
+ calling `locator.all()`.
**Usage**
@@ -16408,7 +16676,9 @@ async def hover(
Whether to bypass the [actionability](../actionability.md) checks. Defaults to `false`.
trial : Union[bool, None]
When set, this method only performs the [actionability](../actionability.md) checks and skips the action. Defaults
- to `false`. Useful to wait until the element is ready for the action without performing it.
+ to `false`. Useful to wait until the element is ready for the action without performing it. Note that keyboard
+ `modifiers` will be pressed regardless of `trial` to allow testing elements which are only visible when those keys
+ are pressed.
"""
return mapping.from_maybe_impl(
@@ -17086,7 +17356,9 @@ async def tap(
Deprecated: This option has no effect.
trial : Union[bool, None]
When set, this method only performs the [actionability](../actionability.md) checks and skips the action. Defaults
- to `false`. Useful to wait until the element is ready for the action without performing it.
+ to `false`. Useful to wait until the element is ready for the action without performing it. Note that keyboard
+ `modifiers` will be pressed regardless of `trial` to allow testing elements which are only visible when those keys
+ are pressed.
"""
return mapping.from_maybe_impl(
@@ -18147,7 +18419,7 @@ async def fetch(
```
The common way to send file(s) in the body of a request is to upload them as form fields with `multipart/form-data`
- encoding. Use `FormData` to construct request body and pass it to the request as `multipart` parameter:
+ encoding, by specifiying the `multipart` parameter:
Parameters
----------
@@ -18295,11 +18567,10 @@ async def new_context(
`passphrase` property should be provided if the certificate is encrypted. The `origin` property should be provided
with an exact match to the request origin that the certificate is valid for.
- **NOTE** Using Client Certificates in combination with Proxy Servers is not supported.
-
**NOTE** When using WebKit on macOS, accessing `localhost` will not pick up client certificates. You can make it
work by replacing `localhost` with `local.playwright`.
+
Returns
-------
APIRequestContext
diff --git a/playwright/sync_api/__init__.py b/playwright/sync_api/__init__.py
index e326fd9f5..80eaf71db 100644
--- a/playwright/sync_api/__init__.py
+++ b/playwright/sync_api/__init__.py
@@ -61,6 +61,7 @@
Touchscreen,
Video,
WebSocket,
+ WebSocketRoute,
Worker,
)
@@ -190,5 +191,6 @@ def __call__(
"Video",
"ViewportSize",
"WebSocket",
+ "WebSocketRoute",
"Worker",
]
diff --git a/playwright/sync_api/_generated.py b/playwright/sync_api/_generated.py
index 1553c2598..773c763dd 100644
--- a/playwright/sync_api/_generated.py
+++ b/playwright/sync_api/_generated.py
@@ -69,6 +69,7 @@
from playwright._impl._network import Response as ResponseImpl
from playwright._impl._network import Route as RouteImpl
from playwright._impl._network import WebSocket as WebSocketImpl
+from playwright._impl._network import WebSocketRoute as WebSocketRouteImpl
from playwright._impl._page import Page as PageImpl
from playwright._impl._page import Worker as WorkerImpl
from playwright._impl._playwright import Playwright as PlaywrightImpl
@@ -1142,6 +1143,133 @@ def is_closed(self) -> bool:
mapping.register(WebSocketImpl, WebSocket)
+class WebSocketRoute(SyncBase):
+
+ @property
+ def url(self) -> str:
+ """WebSocketRoute.url
+
+ URL of the WebSocket created in the page.
+
+ Returns
+ -------
+ str
+ """
+ return mapping.from_maybe_impl(self._impl_obj.url)
+
+ def close(
+ self, *, code: typing.Optional[int] = None, reason: typing.Optional[str] = None
+ ) -> None:
+ """WebSocketRoute.close
+
+ Closes one side of the WebSocket connection.
+
+ Parameters
+ ----------
+ code : Union[int, None]
+ Optional [close code](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/close#code).
+ reason : Union[str, None]
+ Optional [close reason](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/close#reason).
+ """
+
+ return mapping.from_maybe_impl(
+ self._sync(self._impl_obj.close(code=code, reason=reason))
+ )
+
+ def connect_to_server(self) -> "WebSocketRoute":
+ """WebSocketRoute.connect_to_server
+
+ By default, routed WebSocket does not connect to the server, so you can mock entire WebSocket communication. This
+ method connects to the actual WebSocket server, and returns the server-side `WebSocketRoute` instance, giving the
+ ability to send and receive messages from the server.
+
+ Once connected to the server:
+ - Messages received from the server will be **automatically forwarded** to the WebSocket in the page, unless
+ `web_socket_route.on_message()` is called on the server-side `WebSocketRoute`.
+ - Messages sent by the [`WebSocket.send()`](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/send) call
+ in the page will be **automatically forwarded** to the server, unless `web_socket_route.on_message()` is
+ called on the original `WebSocketRoute`.
+
+ See examples at the top for more details.
+
+ Returns
+ -------
+ WebSocketRoute
+ """
+
+ return mapping.from_impl(self._impl_obj.connect_to_server())
+
+ def send(self, message: typing.Union[str, bytes]) -> None:
+ """WebSocketRoute.send
+
+ Sends a message to the WebSocket. When called on the original WebSocket, sends the message to the page. When called
+ on the result of `web_socket_route.connect_to_server()`, sends the message to the server. See examples at the
+ top for more details.
+
+ Parameters
+ ----------
+ message : Union[bytes, str]
+ Message to send.
+ """
+
+ return mapping.from_maybe_impl(self._impl_obj.send(message=message))
+
+ def on_message(
+ self, handler: typing.Callable[[typing.Union[str, bytes]], typing.Any]
+ ) -> None:
+ """WebSocketRoute.on_message
+
+ This method allows to handle messages that are sent by the WebSocket, either from the page or from the server.
+
+ When called on the original WebSocket route, this method handles messages sent from the page. You can handle this
+ messages by responding to them with `web_socket_route.send()`, forwarding them to the server-side connection
+ returned by `web_socket_route.connect_to_server()` or do something else.
+
+ Once this method is called, messages are not automatically forwarded to the server or to the page - you should do
+ that manually by calling `web_socket_route.send()`. See examples at the top for more details.
+
+ Calling this method again will override the handler with a new one.
+
+ Parameters
+ ----------
+ handler : Callable[[Union[bytes, str]], Any]
+ Function that will handle messages.
+ """
+
+ return mapping.from_maybe_impl(
+ self._impl_obj.on_message(handler=self._wrap_handler(handler))
+ )
+
+ def on_close(
+ self,
+ handler: typing.Callable[
+ [typing.Optional[int], typing.Optional[str]], typing.Any
+ ],
+ ) -> None:
+ """WebSocketRoute.on_close
+
+ Allows to handle [`WebSocket.close`](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/close).
+
+ By default, closing one side of the connection, either in the page or on the server, will close the other side.
+ However, when `web_socket_route.on_close()` handler is set up, the default forwarding of closure is disabled,
+ and handler should take care of it.
+
+ Parameters
+ ----------
+ handler : Callable[[Union[int, None], Union[str, None]], Any]
+ Function that will handle WebSocket closure. Received an optional
+ [close code](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/close#code) and an optional
+ [close reason](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/close#reason).
+ """
+
+ return mapping.from_maybe_impl(
+ self._impl_obj.on_close(handler=self._wrap_handler(handler))
+ )
+
+
+mapping.register(WebSocketRouteImpl, WebSocketRoute)
+
+
class Keyboard(SyncBase):
def down(self, key: str) -> None:
@@ -4291,7 +4419,9 @@ def click(
element, the call throws an exception.
trial : Union[bool, None]
When set, this method only performs the [actionability](../actionability.md) checks and skips the action. Defaults
- to `false`. Useful to wait until the element is ready for the action without performing it.
+ to `false`. Useful to wait until the element is ready for the action without performing it. Note that keyboard
+ `modifiers` will be pressed regardless of `trial` to allow testing elements which are only visible when those keys
+ are pressed.
"""
return mapping.from_maybe_impl(
@@ -4372,7 +4502,9 @@ def dblclick(
element, the call throws an exception.
trial : Union[bool, None]
When set, this method only performs the [actionability](../actionability.md) checks and skips the action. Defaults
- to `false`. Useful to wait until the element is ready for the action without performing it.
+ to `false`. Useful to wait until the element is ready for the action without performing it. Note that keyboard
+ `modifiers` will be pressed regardless of `trial` to allow testing elements which are only visible when those keys
+ are pressed.
"""
return mapping.from_maybe_impl(
@@ -4445,7 +4577,9 @@ def tap(
element, the call throws an exception.
trial : Union[bool, None]
When set, this method only performs the [actionability](../actionability.md) checks and skips the action. Defaults
- to `false`. Useful to wait until the element is ready for the action without performing it.
+ to `false`. Useful to wait until the element is ready for the action without performing it. Note that keyboard
+ `modifiers` will be pressed regardless of `trial` to allow testing elements which are only visible when those keys
+ are pressed.
"""
return mapping.from_maybe_impl(
@@ -4848,6 +4982,7 @@ def get_by_role(
**NOTE** Unlike most other attributes, `disabled` is inherited through the DOM hierarchy. Learn more about
[`aria-disabled`](https://www.w3.org/TR/wai-aria-1.2/#aria-disabled).
+
expanded : Union[bool, None]
An attribute that is usually set by `aria-expanded`.
@@ -5297,7 +5432,9 @@ def hover(
element, the call throws an exception.
trial : Union[bool, None]
When set, this method only performs the [actionability](../actionability.md) checks and skips the action. Defaults
- to `false`. Useful to wait until the element is ready for the action without performing it.
+ to `false`. Useful to wait until the element is ready for the action without performing it. Note that keyboard
+ `modifiers` will be pressed regardless of `trial` to allow testing elements which are only visible when those keys
+ are pressed.
"""
return mapping.from_maybe_impl(
@@ -6016,7 +6153,7 @@ def owner(self) -> "Locator":
**Usage**
```py
- frame_locator = page.frame_locator(\"iframe[name=\\\"embedded\\\"]\")
+ frame_locator = page.locator(\"iframe[name=\\\"embedded\\\"]\").content_frame
# ...
locator = frame_locator.owner
expect(locator).to_be_visible()
@@ -6354,6 +6491,7 @@ def get_by_role(
**NOTE** Unlike most other attributes, `disabled` is inherited through the DOM hierarchy. Learn more about
[`aria-disabled`](https://www.w3.org/TR/wai-aria-1.2/#aria-disabled).
+
expanded : Union[bool, None]
An attribute that is usually set by `aria-expanded`.
@@ -9131,6 +9269,28 @@ def go_forward(
self._sync(self._impl_obj.go_forward(timeout=timeout, waitUntil=wait_until))
)
+ def request_gc(self) -> None:
+ """Page.request_gc
+
+ Request the page to perform garbage collection. Note that there is no guarantee that all unreachable objects will
+ be collected.
+
+ This is useful to help detect memory leaks. For example, if your page has a large object `'suspect'` that might be
+ leaked, you can check that it does not leak by using a
+ [`WeakRef`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/WeakRef).
+
+ ```py
+ # 1. In your page, save a WeakRef for the \"suspect\".
+ page.evaluate(\"globalThis.suspectWeakRef = new WeakRef(suspect)\")
+ # 2. Request garbage collection.
+ page.request_gc()
+ # 3. Check that weak ref does not deref to the original object.
+ assert page.evaluate(\"!globalThis.suspectWeakRef.deref()\")
+ ```
+ """
+
+ return mapping.from_maybe_impl(self._sync(self._impl_obj.request_gc()))
+
def emulate_media(
self,
*,
@@ -9301,7 +9461,7 @@ def route(
**NOTE** `page.route()` will not intercept requests intercepted by Service Worker. See
[this](https://github.com/microsoft/playwright/issues/1090) issue. We recommend disabling Service Workers when
- using request interception by setting `Browser.newContext.serviceWorkers` to `'block'`.
+ using request interception by setting `serviceWorkers` to `'block'`.
**NOTE** `page.route()` will not intercept the first request of a popup page. Use
`browser_context.route()` instead.
@@ -9398,6 +9558,51 @@ def unroute(
)
)
+ def route_web_socket(
+ self,
+ url: typing.Union[str, typing.Pattern[str], typing.Callable[[str], bool]],
+ handler: typing.Callable[["WebSocketRoute"], typing.Any],
+ ) -> None:
+ """Page.route_web_socket
+
+ This method allows to modify websocket connections that are made by the page.
+
+ Note that only `WebSocket`s created after this method was called will be routed. It is recommended to call this
+ method before navigating the page.
+
+ **Usage**
+
+ Below is an example of a simple mock that responds to a single message. See `WebSocketRoute` for more details and
+ examples.
+
+ ```py
+ def message_handler(ws: WebSocketRoute, message: Union[str, bytes]):
+ if message == \"request\":
+ ws.send(\"response\")
+
+ def handler(ws: WebSocketRoute):
+ ws.on_message(lambda message: message_handler(ws, message))
+
+ page.route_web_socket(\"/ws\", handler)
+ ```
+
+ Parameters
+ ----------
+ url : Union[Callable[[str], bool], Pattern[str], str]
+ Only WebSockets with the url matching this pattern will be routed. A string pattern can be relative to the
+ `baseURL` context option.
+ handler : Callable[[WebSocketRoute], Any]
+ Handler function to route the WebSocket.
+ """
+
+ return mapping.from_maybe_impl(
+ self._sync(
+ self._impl_obj.route_web_socket(
+ url=self._wrap_handler(url), handler=self._wrap_handler(handler)
+ )
+ )
+ )
+
def unroute_all(
self,
*,
@@ -9439,7 +9644,7 @@ def route_from_har(
Playwright will not serve requests intercepted by Service Worker from the HAR file. See
[this](https://github.com/microsoft/playwright/issues/1090) issue. We recommend disabling Service Workers when
- using request interception by setting `Browser.newContext.serviceWorkers` to `'block'`.
+ using request interception by setting `serviceWorkers` to `'block'`.
Parameters
----------
@@ -9688,7 +9893,9 @@ def click(
Deprecated: This option will default to `true` in the future.
trial : Union[bool, None]
When set, this method only performs the [actionability](../actionability.md) checks and skips the action. Defaults
- to `false`. Useful to wait until the element is ready for the action without performing it.
+ to `false`. Useful to wait until the element is ready for the action without performing it. Note that keyboard
+ `modifiers` will be pressed regardless of `trial` to allow testing elements which are only visible when those keys
+ are pressed.
strict : Union[bool, None]
When true, the call requires selector to resolve to a single element. If given selector resolves to more than one
element, the call throws an exception.
@@ -9771,7 +9978,9 @@ def dblclick(
element, the call throws an exception.
trial : Union[bool, None]
When set, this method only performs the [actionability](../actionability.md) checks and skips the action. Defaults
- to `false`. Useful to wait until the element is ready for the action without performing it.
+ to `false`. Useful to wait until the element is ready for the action without performing it. Note that keyboard
+ `modifiers` will be pressed regardless of `trial` to allow testing elements which are only visible when those keys
+ are pressed.
"""
return mapping.from_maybe_impl(
@@ -9844,7 +10053,9 @@ def tap(
element, the call throws an exception.
trial : Union[bool, None]
When set, this method only performs the [actionability](../actionability.md) checks and skips the action. Defaults
- to `false`. Useful to wait until the element is ready for the action without performing it.
+ to `false`. Useful to wait until the element is ready for the action without performing it. Note that keyboard
+ `modifiers` will be pressed regardless of `trial` to allow testing elements which are only visible when those keys
+ are pressed.
"""
return mapping.from_maybe_impl(
@@ -10245,6 +10456,7 @@ def get_by_role(
**NOTE** Unlike most other attributes, `disabled` is inherited through the DOM hierarchy. Learn more about
[`aria-disabled`](https://www.w3.org/TR/wai-aria-1.2/#aria-disabled).
+
expanded : Union[bool, None]
An attribute that is usually set by `aria-expanded`.
@@ -10694,7 +10906,9 @@ def hover(
element, the call throws an exception.
trial : Union[bool, None]
When set, this method only performs the [actionability](../actionability.md) checks and skips the action. Defaults
- to `false`. Useful to wait until the element is ready for the action without performing it.
+ to `false`. Useful to wait until the element is ready for the action without performing it. Note that keyboard
+ `modifiers` will be pressed regardless of `trial` to allow testing elements which are only visible when those keys
+ are pressed.
"""
return mapping.from_maybe_impl(
@@ -11339,8 +11553,7 @@ def pause(self) -> None:
User can inspect selectors or perform manual steps while paused. Resume will continue running the original script
from the place it was paused.
- **NOTE** This method requires Playwright to be started in a headed mode, with a falsy `headless` value in the
- `browser_type.launch()`.
+ **NOTE** This method requires Playwright to be started in a headed mode, with a falsy `headless` option.
"""
return mapping.from_maybe_impl(self._sync(self._impl_obj.pause()))
@@ -12005,13 +12218,16 @@ def add_locator_handler(
**NOTE** Running the handler will alter your page state mid-test. For example it will change the currently focused
element and move the mouse. Make sure that actions that run after the handler are self-contained and do not rely on
- the focus and mouse state being unchanged.
For example, consider a test that calls
- `locator.focus()` followed by `keyboard.press()`. If your handler clicks a button between these two
- actions, the focused element most likely will be wrong, and key press will happen on the unexpected element. Use
- `locator.press()` instead to avoid this problem.
Another example is a series of mouse
- actions, where `mouse.move()` is followed by `mouse.down()`. Again, when the handler runs between
- these two actions, the mouse position will be wrong during the mouse down. Prefer self-contained actions like
- `locator.click()` that do not rely on the state being unchanged by a handler.
+ the focus and mouse state being unchanged.
+
+ For example, consider a test that calls `locator.focus()` followed by `keyboard.press()`. If your
+ handler clicks a button between these two actions, the focused element most likely will be wrong, and key press
+ will happen on the unexpected element. Use `locator.press()` instead to avoid this problem.
+
+ Another example is a series of mouse actions, where `mouse.move()` is followed by `mouse.down()`.
+ Again, when the handler runs between these two actions, the mouse position will be wrong during the mouse down.
+ Prefer self-contained actions like `locator.click()` that do not rely on the state being unchanged by a
+ handler.
**Usage**
@@ -12956,7 +13172,7 @@ def route(
**NOTE** `browser_context.route()` will not intercept requests intercepted by Service Worker. See
[this](https://github.com/microsoft/playwright/issues/1090) issue. We recommend disabling Service Workers when
- using request interception by setting `Browser.newContext.serviceWorkers` to `'block'`.
+ using request interception by setting `serviceWorkers` to `'block'`.
**Usage**
@@ -13055,6 +13271,53 @@ def unroute(
)
)
+ def route_web_socket(
+ self,
+ url: typing.Union[str, typing.Pattern[str], typing.Callable[[str], bool]],
+ handler: typing.Callable[["WebSocketRoute"], typing.Any],
+ ) -> None:
+ """BrowserContext.route_web_socket
+
+ This method allows to modify websocket connections that are made by any page in the browser context.
+
+ Note that only `WebSocket`s created after this method was called will be routed. It is recommended to call this
+ method before creating any pages.
+
+ **Usage**
+
+ Below is an example of a simple handler that blocks some websocket messages. See `WebSocketRoute` for more details
+ and examples.
+
+ ```py
+ def message_handler(ws: WebSocketRoute, message: Union[str, bytes]):
+ if message == \"to-be-blocked\":
+ return
+ ws.send(message)
+
+ def handler(ws: WebSocketRoute):
+ ws.route_send(lambda message: message_handler(ws, message))
+ ws.connect()
+
+ context.route_web_socket(\"/ws\", handler)
+ ```
+
+ Parameters
+ ----------
+ url : Union[Callable[[str], bool], Pattern[str], str]
+ Only WebSockets with the url matching this pattern will be routed. A string pattern can be relative to the
+ `baseURL` context option.
+ handler : Callable[[WebSocketRoute], Any]
+ Handler function to route the WebSocket.
+ """
+
+ return mapping.from_maybe_impl(
+ self._sync(
+ self._impl_obj.route_web_socket(
+ url=self._wrap_handler(url), handler=self._wrap_handler(handler)
+ )
+ )
+ )
+
def unroute_all(
self,
*,
@@ -13096,7 +13359,7 @@ def route_from_har(
Playwright will not serve requests intercepted by Service Worker from the HAR file. See
[this](https://github.com/microsoft/playwright/issues/1090) issue. We recommend disabling Service Workers when
- using request interception by setting `Browser.newContext.serviceWorkers` to `'block'`.
+ using request interception by setting `serviceWorkers` to `'block'`.
Parameters
----------
@@ -13648,11 +13911,10 @@ def new_context(
`passphrase` property should be provided if the certificate is encrypted. The `origin` property should be provided
with an exact match to the request origin that the certificate is valid for.
- **NOTE** Using Client Certificates in combination with Proxy Servers is not supported.
-
**NOTE** When using WebKit on macOS, accessing `localhost` will not pick up client certificates. You can make it
work by replacing `localhost` with `local.playwright`.
+
Returns
-------
BrowserContext
@@ -13876,11 +14138,10 @@ def new_page(
`passphrase` property should be provided if the certificate is encrypted. The `origin` property should be provided
with an exact match to the request origin that the certificate is valid for.
- **NOTE** Using Client Certificates in combination with Proxy Servers is not supported.
-
**NOTE** When using WebKit on macOS, accessing `localhost` will not pick up client certificates. You can make it
work by replacing `localhost` with `local.playwright`.
+
Returns
-------
Page
@@ -14442,11 +14703,10 @@ def launch_persistent_context(
`passphrase` property should be provided if the certificate is encrypted. The `origin` property should be provided
with an exact match to the request origin that the certificate is valid for.
- **NOTE** Using Client Certificates in combination with Proxy Servers is not supported.
-
**NOTE** When using WebKit on macOS, accessing `localhost` will not pick up client certificates. You can make it
work by replacing `localhost` with `local.playwright`.
+
Returns
-------
BrowserContext
@@ -14776,8 +15036,8 @@ def start(
----------
name : Union[str, None]
If specified, intermediate trace files are going to be saved into the files with the given name prefix inside the
- `tracesDir` folder specified in `browser_type.launch()`. To specify the final trace zip file name, you need
- to pass `path` option to `tracing.stop()` instead.
+ `tracesDir` directory specified in `browser_type.launch()`. To specify the final trace zip file name, you
+ need to pass `path` option to `tracing.stop()` instead.
title : Union[str, None]
Trace name to be shown in the Trace Viewer.
snapshots : Union[bool, None]
@@ -14835,8 +15095,8 @@ def start_chunk(
Trace name to be shown in the Trace Viewer.
name : Union[str, None]
If specified, intermediate trace files are going to be saved into the files with the given name prefix inside the
- `tracesDir` folder specified in `browser_type.launch()`. To specify the final trace zip file name, you need
- to pass `path` option to `tracing.stop_chunk()` instead.
+ `tracesDir` directory specified in `browser_type.launch()`. To specify the final trace zip file name, you
+ need to pass `path` option to `tracing.stop_chunk()` instead.
"""
return mapping.from_maybe_impl(
@@ -15129,7 +15389,9 @@ def click(
Deprecated: This option will default to `true` in the future.
trial : Union[bool, None]
When set, this method only performs the [actionability](../actionability.md) checks and skips the action. Defaults
- to `false`. Useful to wait until the element is ready for the action without performing it.
+ to `false`. Useful to wait until the element is ready for the action without performing it. Note that keyboard
+ `modifiers` will be pressed regardless of `trial` to allow testing elements which are only visible when those keys
+ are pressed.
"""
return mapping.from_maybe_impl(
@@ -15203,7 +15465,9 @@ def dblclick(
Deprecated: This option has no effect.
trial : Union[bool, None]
When set, this method only performs the [actionability](../actionability.md) checks and skips the action. Defaults
- to `false`. Useful to wait until the element is ready for the action without performing it.
+ to `false`. Useful to wait until the element is ready for the action without performing it. Note that keyboard
+ `modifiers` will be pressed regardless of `trial` to allow testing elements which are only visible when those keys
+ are pressed.
"""
return mapping.from_maybe_impl(
@@ -15856,6 +16120,7 @@ def get_by_role(
**NOTE** Unlike most other attributes, `disabled` is inherited through the DOM hierarchy. Learn more about
[`aria-disabled`](https://www.w3.org/TR/wai-aria-1.2/#aria-disabled).
+
expanded : Union[bool, None]
An attribute that is usually set by `aria-expanded`.
@@ -16195,7 +16460,10 @@ def filter(
def or_(self, locator: "Locator") -> "Locator":
"""Locator.or_
- Creates a locator that matches either of the two locators.
+ Creates a locator matching all elements that match one or both of the two locators.
+
+ Note that when both locators match something, the resulting locator will have multiple matches and violate
+ [locator strictness](https://playwright.dev/python/docs/locators#strictness) guidelines.
**Usage**
@@ -16285,9 +16553,13 @@ def all(self) -> typing.List["Locator"]:
elements.
**NOTE** `locator.all()` does not wait for elements to match the locator, and instead immediately returns
- whatever is present in the page. When the list of elements changes dynamically, `locator.all()` will
- produce unpredictable and flaky results. When the list of elements is stable, but loaded dynamically, wait for the
- full list to finish loading before calling `locator.all()`.
+ whatever is present in the page.
+
+ When the list of elements changes dynamically, `locator.all()` will produce unpredictable and flaky
+ results.
+
+ When the list of elements is stable, but loaded dynamically, wait for the full list to finish loading before
+ calling `locator.all()`.
**Usage**
@@ -16476,7 +16748,9 @@ def hover(
Whether to bypass the [actionability](../actionability.md) checks. Defaults to `false`.
trial : Union[bool, None]
When set, this method only performs the [actionability](../actionability.md) checks and skips the action. Defaults
- to `false`. Useful to wait until the element is ready for the action without performing it.
+ to `false`. Useful to wait until the element is ready for the action without performing it. Note that keyboard
+ `modifiers` will be pressed regardless of `trial` to allow testing elements which are only visible when those keys
+ are pressed.
"""
return mapping.from_maybe_impl(
@@ -17178,7 +17452,9 @@ def tap(
Deprecated: This option has no effect.
trial : Union[bool, None]
When set, this method only performs the [actionability](../actionability.md) checks and skips the action. Defaults
- to `false`. Useful to wait until the element is ready for the action without performing it.
+ to `false`. Useful to wait until the element is ready for the action without performing it. Note that keyboard
+ `modifiers` will be pressed regardless of `trial` to allow testing elements which are only visible when those keys
+ are pressed.
"""
return mapping.from_maybe_impl(
@@ -18255,7 +18531,7 @@ def fetch(
JSON objects can be passed directly to the request:
The common way to send file(s) in the body of a request is to upload them as form fields with `multipart/form-data`
- encoding. Use `FormData` to construct request body and pass it to the request as `multipart` parameter:
+ encoding, by specifiying the `multipart` parameter:
```python
api_request_context.fetch(
@@ -18417,11 +18693,10 @@ def new_context(
`passphrase` property should be provided if the certificate is encrypted. The `origin` property should be provided
with an exact match to the request origin that the certificate is valid for.
- **NOTE** Using Client Certificates in combination with Proxy Servers is not supported.
-
**NOTE** When using WebKit on macOS, accessing `localhost` will not pick up client certificates. You can make it
work by replacing `localhost` with `local.playwright`.
+
Returns
-------
APIRequestContext
diff --git a/scripts/documentation_provider.py b/scripts/documentation_provider.py
index 9acbe6c7d..608c4319d 100644
--- a/scripts/documentation_provider.py
+++ b/scripts/documentation_provider.py
@@ -132,7 +132,11 @@ def print_entry(
doc_is_property = (
not method.get("async") and not len(method["args"]) and "type" in method
)
- if method["name"].startswith("is_") or method["name"].startswith("as_"):
+ if (
+ method["name"].startswith("is_")
+ or method["name"].startswith("as_")
+ or method["name"] == "connect_to_server"
+ ):
doc_is_property = False
if doc_is_property != is_property:
self.errors.add(f"Method vs property mismatch: {fqname}")
diff --git a/scripts/expected_api_mismatch.txt b/scripts/expected_api_mismatch.txt
index c101bba16..c6b3c7a95 100644
--- a/scripts/expected_api_mismatch.txt
+++ b/scripts/expected_api_mismatch.txt
@@ -15,3 +15,8 @@ Parameter type mismatch in Page.unroute(handler=): documented as Union[Callable[
# One vs two arguments in the callback, Python explicitly unions.
Parameter type mismatch in Page.add_locator_handler(handler=): documented as Callable[[Locator], Any], code has Union[Callable[[Locator], Any], Callable[[], Any]]
+
+Parameter type mismatch in BrowserContext.route_web_socket(handler=): documented as Callable[[WebSocketRoute], Union[Any, Any]], code has Callable[[WebSocketRoute], Any]
+Parameter type mismatch in Page.route_web_socket(handler=): documented as Callable[[WebSocketRoute], Union[Any, Any]], code has Callable[[WebSocketRoute], Any]
+Parameter type mismatch in WebSocketRoute.on_close(handler=): documented as Callable[[Union[int, undefined]], Union[Any, Any]], code has Callable[[Union[int, None], Union[str, None]], Any]
+Parameter type mismatch in WebSocketRoute.on_message(handler=): documented as Callable[[str], Union[Any, Any]], code has Callable[[Union[bytes, str]], Any]
diff --git a/scripts/generate_api.py b/scripts/generate_api.py
index 7966dbc25..e609dae73 100644
--- a/scripts/generate_api.py
+++ b/scripts/generate_api.py
@@ -40,7 +40,13 @@
from playwright._impl._input import Keyboard, Mouse, Touchscreen
from playwright._impl._js_handle import JSHandle, Serializable
from playwright._impl._locator import FrameLocator, Locator
-from playwright._impl._network import Request, Response, Route, WebSocket
+from playwright._impl._network import (
+ Request,
+ Response,
+ Route,
+ WebSocket,
+ WebSocketRoute,
+)
from playwright._impl._page import Page, Worker
from playwright._impl._playwright import Playwright
from playwright._impl._selectors import Selectors
@@ -233,7 +239,7 @@ def return_value(value: Any) -> List[str]:
from playwright._impl._frame import Frame as FrameImpl
from playwright._impl._input import Keyboard as KeyboardImpl, Mouse as MouseImpl, Touchscreen as TouchscreenImpl
from playwright._impl._js_handle import JSHandle as JSHandleImpl
-from playwright._impl._network import Request as RequestImpl, Response as ResponseImpl, Route as RouteImpl, WebSocket as WebSocketImpl
+from playwright._impl._network import Request as RequestImpl, Response as ResponseImpl, Route as RouteImpl, WebSocket as WebSocketImpl, WebSocketRoute as WebSocketRouteImpl
from playwright._impl._page import Page as PageImpl, Worker as WorkerImpl
from playwright._impl._web_error import WebError as WebErrorImpl
from playwright._impl._playwright import Playwright as PlaywrightImpl
@@ -252,6 +258,7 @@ def return_value(value: Any) -> List[str]:
Response,
Route,
WebSocket,
+ WebSocketRoute,
Keyboard,
Mouse,
Touchscreen,
diff --git a/setup.py b/setup.py
index 97fc4c5d2..8a67ab2c8 100644
--- a/setup.py
+++ b/setup.py
@@ -30,7 +30,7 @@
InWheel = None
from wheel.bdist_wheel import bdist_wheel as BDistWheelCommand
-driver_version = "1.47.0-beta-1726138322000"
+driver_version = "1.48.1"
def extractall(zip: zipfile.ZipFile, path: str) -> None:
diff --git a/tests/async/test_navigation.py b/tests/async/test_navigation.py
index de4a2f5e9..fb34fb75b 100644
--- a/tests/async/test_navigation.py
+++ b/tests/async/test_navigation.py
@@ -264,7 +264,7 @@ async def test_goto_should_fail_when_main_resources_failed_to_load(
if is_chromium:
assert "net::ERR_CONNECTION_REFUSED" in exc_info.value.message
elif is_webkit and is_win:
- assert "Couldn't connect to server" in exc_info.value.message
+ assert "Could not connect to server" in exc_info.value.message
elif is_webkit:
assert "Could not connect" in exc_info.value.message
else:
diff --git a/tests/async/test_page_request_gc.py b/tests/async/test_page_request_gc.py
new file mode 100644
index 000000000..7d0cce9ef
--- /dev/null
+++ b/tests/async/test_page_request_gc.py
@@ -0,0 +1,34 @@
+# Copyright (c) Microsoft Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from playwright.async_api import Page
+from tests.server import Server
+
+
+async def test_should_work(page: Page, server: Server) -> None:
+ await page.evaluate(
+ """() => {
+ globalThis.objectToDestroy = { hello: 'world' };
+ globalThis.weakRef = new WeakRef(globalThis.objectToDestroy);
+ }"""
+ )
+ await page.request_gc()
+ assert await page.evaluate("() => globalThis.weakRef.deref()") == {"hello": "world"}
+
+ await page.request_gc()
+ assert await page.evaluate("() => globalThis.weakRef.deref()") == {"hello": "world"}
+
+ await page.evaluate("() => globalThis.objectToDestroy = null")
+ await page.request_gc()
+ assert await page.evaluate("() => globalThis.weakRef.deref()") is None
diff --git a/tests/async/test_route_web_socket.py b/tests/async/test_route_web_socket.py
new file mode 100644
index 000000000..4996aff60
--- /dev/null
+++ b/tests/async/test_route_web_socket.py
@@ -0,0 +1,321 @@
+# Copyright (c) Microsoft Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import asyncio
+import re
+from typing import Any, Awaitable, Callable, Literal, Tuple, Union
+
+from playwright.async_api import Frame, Page, WebSocketRoute
+from tests.server import Server, WebSocketProtocol
+
+
+async def assert_equal(
+ actual_cb: Callable[[], Union[Any, Awaitable[Any]]], expected: Any
+) -> None:
+ __tracebackhide__ = True
+ start_time = asyncio.get_event_loop().time()
+ attempts = 0
+ while True:
+ actual = actual_cb()
+ if asyncio.iscoroutine(actual):
+ actual = await actual
+ if actual == expected:
+ return
+ attempts += 1
+ if asyncio.get_event_loop().time() - start_time > 5:
+ raise TimeoutError(f"Timed out after 10 seconds. Last actual was: {actual}")
+ await asyncio.sleep(0.2)
+
+
+async def setup_ws(
+ target: Union[Page, Frame],
+ port: int,
+ protocol: Union[Literal["blob"], Literal["arraybuffer"]],
+) -> None:
+ await target.goto("about:blank")
+ await target.evaluate(
+ """({ port, binaryType }) => {
+ window.log = [];
+ window.ws = new WebSocket('ws://localhost:' + port + '/ws');
+ window.ws.binaryType = binaryType;
+ window.ws.addEventListener('open', () => window.log.push('open'));
+ window.ws.addEventListener('close', event => window.log.push(`close code=${event.code} reason=${event.reason} wasClean=${event.wasClean}`));
+ window.ws.addEventListener('error', event => window.log.push(`error`));
+ window.ws.addEventListener('message', async event => {
+ let data;
+ if (typeof event.data === 'string')
+ data = event.data;
+ else if (event.data instanceof Blob)
+ data = 'blob:' + await event.data.text();
+ else
+ data = 'arraybuffer:' + await (new Blob([event.data])).text();
+ window.log.push(`message: data=${data} origin=${event.origin} lastEventId=${event.lastEventId}`);
+ });
+ window.wsOpened = new Promise(f => window.ws.addEventListener('open', () => f()));
+ }""",
+ {"port": port, "binaryType": protocol},
+ )
+
+
+async def test_should_work_with_ws_close(page: Page, server: Server) -> None:
+ future: asyncio.Future[WebSocketRoute] = asyncio.Future()
+
+ def _handle_ws(ws: WebSocketRoute) -> None:
+ ws.connect_to_server()
+ future.set_result(ws)
+
+ await page.route_web_socket(re.compile(".*"), _handle_ws)
+
+ ws_task = server.wait_for_web_socket()
+ await setup_ws(page, server.PORT, "blob")
+ ws = await ws_task
+
+ route = await future
+ route.send("hello")
+ await assert_equal(
+ lambda: page.evaluate("window.log"),
+ [
+ "open",
+ f"message: data=hello origin=ws://localhost:{server.PORT} lastEventId=",
+ ],
+ )
+
+ closed_promise: asyncio.Future[Tuple[int, str]] = asyncio.Future()
+ ws.events.once(
+ "close", lambda code, reason: closed_promise.set_result((code, reason))
+ )
+ await route.close(code=3009, reason="oops")
+ await assert_equal(
+ lambda: page.evaluate("window.log"),
+ [
+ "open",
+ f"message: data=hello origin=ws://localhost:{server.PORT} lastEventId=",
+ "close code=3009 reason=oops wasClean=true",
+ ],
+ )
+ assert await closed_promise == (3009, "oops")
+
+
+async def test_should_pattern_match(page: Page, server: Server) -> None:
+ await page.route_web_socket(
+ re.compile(r".*/ws$"), lambda ws: ws.connect_to_server()
+ )
+ await page.route_web_socket(
+ "**/mock-ws", lambda ws: ws.on_message(lambda message: ws.send("mock-response"))
+ )
+
+ ws_task = server.wait_for_web_socket()
+ await page.goto("about:blank")
+ await page.evaluate(
+ """async ({ port }) => {
+ window.log = [];
+ window.ws1 = new WebSocket('ws://localhost:' + port + '/ws');
+ window.ws1.addEventListener('message', event => window.log.push(`ws1:${event.data}`));
+ window.ws2 = new WebSocket('ws://localhost:' + port + '/something/something/mock-ws');
+ window.ws2.addEventListener('message', event => window.log.push(`ws2:${event.data}`));
+ await Promise.all([
+ new Promise(f => window.ws1.addEventListener('open', f)),
+ new Promise(f => window.ws2.addEventListener('open', f)),
+ ]);
+ }""",
+ {"port": server.PORT},
+ )
+
+ ws = await ws_task
+ ws.events.on("message", lambda payload, isBinary: ws.sendMessage(b"response"))
+
+ await page.evaluate("window.ws1.send('request')")
+ await assert_equal(lambda: page.evaluate("window.log"), ["ws1:response"])
+
+ await page.evaluate("window.ws2.send('request')")
+ await assert_equal(
+ lambda: page.evaluate("window.log"), ["ws1:response", "ws2:mock-response"]
+ )
+
+
+async def test_should_work_with_server(page: Page, server: Server) -> None:
+ future: asyncio.Future[WebSocketRoute] = asyncio.Future()
+
+ async def _handle_ws(ws: WebSocketRoute) -> None:
+ server = ws.connect_to_server()
+
+ def _ws_on_message(message: Union[str, bytes]) -> None:
+ if message == "to-respond":
+ ws.send("response")
+ return
+ if message == "to-block":
+ return
+ if message == "to-modify":
+ server.send("modified")
+ return
+ server.send(message)
+
+ ws.on_message(_ws_on_message)
+
+ def _server_on_message(message: Union[str, bytes]) -> None:
+ if message == "to-block":
+ return
+ if message == "to-modify":
+ ws.send("modified")
+ return
+ ws.send(message)
+
+ server.on_message(_server_on_message)
+ server.send("fake")
+ future.set_result(ws)
+
+ await page.route_web_socket(re.compile(".*"), _handle_ws)
+ ws_task = server.wait_for_web_socket()
+ log = []
+
+ def _once_web_socket_connection(ws: WebSocketProtocol) -> None:
+ ws.events.on(
+ "message", lambda data, is_binary: log.append(f"message: {data.decode()}")
+ )
+ ws.events.on(
+ "close",
+ lambda code, reason: log.append(f"close: code={code} reason={reason}"),
+ )
+
+ server.once_web_socket_connection(_once_web_socket_connection)
+
+ await setup_ws(page, server.PORT, "blob")
+ ws = await ws_task
+ await assert_equal(lambda: log, ["message: fake"])
+
+ ws.sendMessage(b"to-modify")
+ ws.sendMessage(b"to-block")
+ ws.sendMessage(b"pass-server")
+ await assert_equal(
+ lambda: page.evaluate("window.log"),
+ [
+ "open",
+ f"message: data=modified origin=ws://localhost:{server.PORT} lastEventId=",
+ f"message: data=pass-server origin=ws://localhost:{server.PORT} lastEventId=",
+ ],
+ )
+
+ await page.evaluate(
+ """() => {
+ window.ws.send('to-respond');
+ window.ws.send('to-modify');
+ window.ws.send('to-block');
+ window.ws.send('pass-client');
+ }"""
+ )
+ await assert_equal(
+ lambda: log, ["message: fake", "message: modified", "message: pass-client"]
+ )
+ await assert_equal(
+ lambda: page.evaluate("window.log"),
+ [
+ "open",
+ f"message: data=modified origin=ws://localhost:{server.PORT} lastEventId=",
+ f"message: data=pass-server origin=ws://localhost:{server.PORT} lastEventId=",
+ f"message: data=response origin=ws://localhost:{server.PORT} lastEventId=",
+ ],
+ )
+
+ route = await future
+ route.send("another")
+ await assert_equal(
+ lambda: page.evaluate("window.log"),
+ [
+ "open",
+ f"message: data=modified origin=ws://localhost:{server.PORT} lastEventId=",
+ f"message: data=pass-server origin=ws://localhost:{server.PORT} lastEventId=",
+ f"message: data=response origin=ws://localhost:{server.PORT} lastEventId=",
+ f"message: data=another origin=ws://localhost:{server.PORT} lastEventId=",
+ ],
+ )
+
+ await page.evaluate(
+ """() => {
+ window.ws.send('pass-client-2');
+ }"""
+ )
+ await assert_equal(
+ lambda: log,
+ [
+ "message: fake",
+ "message: modified",
+ "message: pass-client",
+ "message: pass-client-2",
+ ],
+ )
+
+ await page.evaluate(
+ """() => {
+ window.ws.close(3009, 'problem');
+ }"""
+ )
+ await assert_equal(
+ lambda: log,
+ [
+ "message: fake",
+ "message: modified",
+ "message: pass-client",
+ "message: pass-client-2",
+ "close: code=3009 reason=problem",
+ ],
+ )
+
+
+async def test_should_work_without_server(page: Page, server: Server) -> None:
+ future: asyncio.Future[WebSocketRoute] = asyncio.Future()
+
+ async def _handle_ws(ws: WebSocketRoute) -> None:
+ def _ws_on_message(message: Union[str, bytes]) -> None:
+ if message == "to-respond":
+ ws.send("response")
+
+ ws.on_message(_ws_on_message)
+ future.set_result(ws)
+
+ await page.route_web_socket(re.compile(".*"), _handle_ws)
+ await setup_ws(page, server.PORT, "blob")
+
+ await page.evaluate(
+ """async () => {
+ await window.wsOpened;
+ window.ws.send('to-respond');
+ window.ws.send('to-block');
+ window.ws.send('to-respond');
+ }"""
+ )
+
+ await assert_equal(
+ lambda: page.evaluate("window.log"),
+ [
+ "open",
+ f"message: data=response origin=ws://localhost:{server.PORT} lastEventId=",
+ f"message: data=response origin=ws://localhost:{server.PORT} lastEventId=",
+ ],
+ )
+
+ route = await future
+ route.send("another")
+ # wait for the message to be processed
+ await page.wait_for_timeout(100)
+ await route.close(code=3008, reason="oops")
+ await assert_equal(
+ lambda: page.evaluate("window.log"),
+ [
+ "open",
+ f"message: data=response origin=ws://localhost:{server.PORT} lastEventId=",
+ f"message: data=response origin=ws://localhost:{server.PORT} lastEventId=",
+ f"message: data=another origin=ws://localhost:{server.PORT} lastEventId=",
+ "close code=3008 reason=oops wasClean=true",
+ ],
+ )
diff --git a/tests/server.py b/tests/server.py
index f9072d448..89048b0ba 100644
--- a/tests/server.py
+++ b/tests/server.py
@@ -32,6 +32,7 @@
Set,
Tuple,
TypeVar,
+ Union,
cast,
)
from urllib.parse import urlparse
@@ -39,6 +40,7 @@
from autobahn.twisted.resource import WebSocketResource
from autobahn.twisted.websocket import WebSocketServerFactory, WebSocketServerProtocol
from OpenSSL import crypto
+from pyee import EventEmitter
from twisted.internet import reactor as _twisted_reactor
from twisted.internet import ssl
from twisted.internet.selectreactor import SelectReactor
@@ -197,6 +199,11 @@ async def wait_for_request(self, path: str) -> TestServerRequest:
self.request_subscribers[path] = future
return await future
+ def wait_for_web_socket(self) -> 'asyncio.Future["WebSocketProtocol"]':
+ future: asyncio.Future[WebSocketProtocol] = asyncio.Future()
+ self.once_web_socket_connection(future.set_result)
+ return future
+
@contextlib.contextmanager
def expect_request(
self, path: str
@@ -211,6 +218,20 @@ def done_cb(task: asyncio.Task) -> None:
future.add_done_callback(done_cb)
yield cb_wrapper
+ @contextlib.contextmanager
+ def expect_websocket(
+ self,
+ ) -> Generator[ExpectResponse["WebSocketProtocol"], None, None]:
+ future = self.wait_for_web_socket()
+
+ cb_wrapper: ExpectResponse["WebSocketProtocol"] = ExpectResponse()
+
+ def done_cb(_: asyncio.Future) -> None:
+ cb_wrapper._value = future.result()
+
+ future.add_done_callback(done_cb)
+ yield cb_wrapper
+
def set_auth(self, path: str, username: str, password: str) -> None:
self.auth[path] = (username, password)
@@ -280,6 +301,21 @@ def listen(self, factory: http.HTTPFactory) -> None:
class WebSocketProtocol(WebSocketServerProtocol):
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
+ super().__init__(*args, **kwargs)
+ self.events = EventEmitter()
+
+ def onClose(self, wasClean: bool, code: int, reason: str) -> None:
+ super().onClose(wasClean, code, reason)
+ self.events.emit(
+ "close",
+ code,
+ reason,
+ )
+
+ def onMessage(self, payload: Union[str, bytes], isBinary: bool) -> None:
+ self.events.emit("message", payload, isBinary)
+
def onOpen(self) -> None:
for handler in getattr(self.factory, "server_instance")._ws_handlers.copy():
getattr(self.factory, "server_instance")._ws_handlers.remove(handler)
diff --git a/tests/sync/test_page_request_gc.py b/tests/sync/test_page_request_gc.py
new file mode 100644
index 000000000..bfddc2320
--- /dev/null
+++ b/tests/sync/test_page_request_gc.py
@@ -0,0 +1,34 @@
+# Copyright (c) Microsoft Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from playwright.sync_api import Page
+from tests.server import Server
+
+
+def test_should_work(page: Page, server: Server) -> None:
+ page.evaluate(
+ """() => {
+ globalThis.objectToDestroy = { hello: 'world' };
+ globalThis.weakRef = new WeakRef(globalThis.objectToDestroy);
+ }"""
+ )
+ page.request_gc()
+ assert page.evaluate("() => globalThis.weakRef.deref()") == {"hello": "world"}
+
+ page.request_gc()
+ assert page.evaluate("() => globalThis.weakRef.deref()") == {"hello": "world"}
+
+ page.evaluate("() => globalThis.objectToDestroy = null")
+ page.request_gc()
+ assert page.evaluate("() => globalThis.weakRef.deref()") is None
diff --git a/tests/sync/test_route_web_socket.py b/tests/sync/test_route_web_socket.py
new file mode 100644
index 000000000..11e509cee
--- /dev/null
+++ b/tests/sync/test_route_web_socket.py
@@ -0,0 +1,316 @@
+# Copyright (c) Microsoft Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import re
+import time
+from typing import Any, Awaitable, Callable, Literal, Optional, Union
+
+from playwright.sync_api import Frame, Page, WebSocketRoute
+from tests.server import Server, WebSocketProtocol
+
+
+def assert_equal(
+ actual_cb: Callable[[], Union[Any, Awaitable[Any]]], expected: Any
+) -> None:
+ __tracebackhide__ = True
+ start_time = time.time()
+ attempts = 0
+ while True:
+ actual = actual_cb()
+ if actual == expected:
+ return
+ attempts += 1
+ if time.time() - start_time > 10:
+ raise TimeoutError(f"Timed out after 10 seconds. Last actual was: {actual}")
+ time.sleep(0.1)
+
+
+def setup_ws(
+ target: Union[Page, Frame],
+ port: int,
+ protocol: Union[Literal["blob"], Literal["arraybuffer"]],
+) -> None:
+ target.goto("about:blank")
+ target.evaluate(
+ """({ port, binaryType }) => {
+ window.log = [];
+ window.ws = new WebSocket('ws://localhost:' + port + '/ws');
+ window.ws.binaryType = binaryType;
+ window.ws.addEventListener('open', () => window.log.push('open'));
+ window.ws.addEventListener('close', event => window.log.push(`close code=${event.code} reason=${event.reason} wasClean=${event.wasClean}`));
+ window.ws.addEventListener('error', event => window.log.push(`error`));
+ window.ws.addEventListener('message', async event => {
+ let data;
+ if (typeof event.data === 'string')
+ data = event.data;
+ else if (event.data instanceof Blob)
+ data = 'blob:' + event.data.text();
+ else
+ data = 'arraybuffer:' + (new Blob([event.data])).text();
+ window.log.push(`message: data=${data} origin=${event.origin} lastEventId=${event.lastEventId}`);
+ });
+ window.wsOpened = new Promise(f => window.ws.addEventListener('open', () => f()));
+ }""",
+ {"port": port, "binaryType": protocol},
+ )
+
+
+def test_should_work_with_ws_close(page: Page, server: Server) -> None:
+ route: Optional["WebSocketRoute"] = None
+
+ def _handle_ws(ws: WebSocketRoute) -> None:
+ ws.connect_to_server()
+ nonlocal route
+ route = ws
+
+ page.route_web_socket(re.compile(".*"), _handle_ws)
+
+ with server.expect_websocket() as ws_task:
+ setup_ws(page, server.PORT, "blob")
+ page.evaluate("window.wsOpened")
+ ws = ws_task.value
+ assert route
+ route.send("hello")
+ assert_equal(
+ lambda: page.evaluate("window.log"),
+ [
+ "open",
+ f"message: data=hello origin=ws://localhost:{server.PORT} lastEventId=",
+ ],
+ )
+
+ closed_event = []
+ ws.events.once("close", lambda code, reason: closed_event.append((code, reason)))
+ route.close(code=3009, reason="oops")
+ assert_equal(
+ lambda: page.evaluate("window.log"),
+ [
+ "open",
+ f"message: data=hello origin=ws://localhost:{server.PORT} lastEventId=",
+ "close code=3009 reason=oops wasClean=true",
+ ],
+ )
+ assert_equal(lambda: closed_event, [(3009, "oops")])
+
+
+def test_should_pattern_match(page: Page, server: Server) -> None:
+ page.route_web_socket(re.compile(r".*/ws$"), lambda ws: ws.connect_to_server())
+ page.route_web_socket(
+ "**/mock-ws", lambda ws: ws.on_message(lambda message: ws.send("mock-response"))
+ )
+
+ page.goto("about:blank")
+ with server.expect_websocket() as ws_info:
+ page.evaluate(
+ """async ({ port }) => {
+ window.log = [];
+ window.ws1 = new WebSocket('ws://localhost:' + port + '/ws');
+ window.ws1.addEventListener('message', event => window.log.push(`ws1:${event.data}`));
+ window.ws2 = new WebSocket('ws://localhost:' + port + '/something/something/mock-ws');
+ window.ws2.addEventListener('message', event => window.log.push(`ws2:${event.data}`));
+ await Promise.all([
+ new Promise(f => window.ws1.addEventListener('open', f)),
+ new Promise(f => window.ws2.addEventListener('open', f)),
+ ]);
+ }""",
+ {"port": server.PORT},
+ )
+ ws = ws_info.value
+ ws.events.on("message", lambda payload, isBinary: ws.sendMessage(b"response"))
+
+ page.evaluate("window.ws1.send('request')")
+ assert_equal(lambda: page.evaluate("window.log"), ["ws1:response"])
+
+ page.evaluate("window.ws2.send('request')")
+ assert_equal(
+ lambda: page.evaluate("window.log"), ["ws1:response", "ws2:mock-response"]
+ )
+
+
+def test_should_work_with_server(page: Page, server: Server) -> None:
+ route = None
+
+ def _handle_ws(ws: WebSocketRoute) -> None:
+ server = ws.connect_to_server()
+
+ def _ws_on_message(message: Union[str, bytes]) -> None:
+ if message == "to-respond":
+ ws.send("response")
+ return
+ if message == "to-block":
+ return
+ if message == "to-modify":
+ server.send("modified")
+ return
+ server.send(message)
+
+ ws.on_message(_ws_on_message)
+
+ def _server_on_message(message: Union[str, bytes]) -> None:
+ if message == "to-block":
+ return
+ if message == "to-modify":
+ ws.send("modified")
+ return
+ ws.send(message)
+
+ server.on_message(_server_on_message)
+ server.send("fake")
+ nonlocal route
+ route = ws
+
+ page.route_web_socket(re.compile(".*"), _handle_ws)
+ log = []
+
+ def _once_web_socket_connection(ws: WebSocketProtocol) -> None:
+ ws.events.on(
+ "message", lambda data, is_binary: log.append(f"message: {data.decode()}")
+ )
+ ws.events.on(
+ "close",
+ lambda code, reason: log.append(f"close: code={code} reason={reason}"),
+ )
+
+ server.once_web_socket_connection(_once_web_socket_connection)
+
+ with server.expect_websocket() as ws_info:
+ setup_ws(page, server.PORT, "blob")
+ page.evaluate("window.wsOpened")
+ ws = ws_info.value
+ assert_equal(lambda: log, ["message: fake"])
+
+ ws.sendMessage(b"to-modify")
+ ws.sendMessage(b"to-block")
+ ws.sendMessage(b"pass-server")
+ assert_equal(
+ lambda: page.evaluate("window.log"),
+ [
+ "open",
+ f"message: data=modified origin=ws://localhost:{server.PORT} lastEventId=",
+ f"message: data=pass-server origin=ws://localhost:{server.PORT} lastEventId=",
+ ],
+ )
+
+ page.evaluate(
+ """() => {
+ window.ws.send('to-respond');
+ window.ws.send('to-modify');
+ window.ws.send('to-block');
+ window.ws.send('pass-client');
+ }"""
+ )
+ assert_equal(
+ lambda: log, ["message: fake", "message: modified", "message: pass-client"]
+ )
+ assert_equal(
+ lambda: page.evaluate("window.log"),
+ [
+ "open",
+ f"message: data=modified origin=ws://localhost:{server.PORT} lastEventId=",
+ f"message: data=pass-server origin=ws://localhost:{server.PORT} lastEventId=",
+ f"message: data=response origin=ws://localhost:{server.PORT} lastEventId=",
+ ],
+ )
+ assert route
+ route.send("another")
+ assert_equal(
+ lambda: page.evaluate("window.log"),
+ [
+ "open",
+ f"message: data=modified origin=ws://localhost:{server.PORT} lastEventId=",
+ f"message: data=pass-server origin=ws://localhost:{server.PORT} lastEventId=",
+ f"message: data=response origin=ws://localhost:{server.PORT} lastEventId=",
+ f"message: data=another origin=ws://localhost:{server.PORT} lastEventId=",
+ ],
+ )
+
+ page.evaluate(
+ """() => {
+ window.ws.send('pass-client-2');
+ }"""
+ )
+ assert_equal(
+ lambda: log,
+ [
+ "message: fake",
+ "message: modified",
+ "message: pass-client",
+ "message: pass-client-2",
+ ],
+ )
+
+ page.evaluate(
+ """() => {
+ window.ws.close(3009, 'problem');
+ }"""
+ )
+ assert_equal(
+ lambda: log,
+ [
+ "message: fake",
+ "message: modified",
+ "message: pass-client",
+ "message: pass-client-2",
+ "close: code=3009 reason=problem",
+ ],
+ )
+
+
+def test_should_work_without_server(page: Page, server: Server) -> None:
+ route = None
+
+ def _handle_ws(ws: WebSocketRoute) -> None:
+ def _ws_on_message(message: Union[str, bytes]) -> None:
+ if message == "to-respond":
+ ws.send("response")
+
+ ws.on_message(_ws_on_message)
+ nonlocal route
+ route = ws
+
+ page.route_web_socket(re.compile(".*"), _handle_ws)
+ setup_ws(page, server.PORT, "blob")
+
+ page.evaluate(
+ """async () => {
+ await window.wsOpened;
+ window.ws.send('to-respond');
+ window.ws.send('to-block');
+ window.ws.send('to-respond');
+ }"""
+ )
+
+ assert_equal(
+ lambda: page.evaluate("window.log"),
+ [
+ "open",
+ f"message: data=response origin=ws://localhost:{server.PORT} lastEventId=",
+ f"message: data=response origin=ws://localhost:{server.PORT} lastEventId=",
+ ],
+ )
+ assert route
+ route.send("another")
+ # wait for the message to be processed
+ page.wait_for_timeout(100)
+ route.close(code=3008, reason="oops")
+ assert_equal(
+ lambda: page.evaluate("window.log"),
+ [
+ "open",
+ f"message: data=response origin=ws://localhost:{server.PORT} lastEventId=",
+ f"message: data=response origin=ws://localhost:{server.PORT} lastEventId=",
+ f"message: data=another origin=ws://localhost:{server.PORT} lastEventId=",
+ "close code=3008 reason=oops wasClean=true",
+ ],
+ )