diff --git a/sdk/python/feast/infra/registry/http.py b/sdk/python/feast/infra/registry/http.py index e50a0fabac2..abaf9c4afd5 100644 --- a/sdk/python/feast/infra/registry/http.py +++ b/sdk/python/feast/infra/registry/http.py @@ -123,9 +123,13 @@ def _handle_exception(self, exception: Exception): logger.exception("Request failed with exception: %s", str(exception)) raise httpx.HTTPError("Request failed with exception: " + str(exception)) - def _send_request(self, method: str, url: str, params=None, data=None, headers=None): + def _send_request( + self, method: str, url: str, params=None, data=None, headers=None + ): try: - request = httpx.Request(method=method, url=url, params=params, data=data, headers=headers) + request = httpx.Request( + method=method, url=url, params=params, data=data, headers=headers + ) response = self.http_client.send(request) response.raise_for_status() return response.json() @@ -139,7 +143,9 @@ def apply_project(self, project: str, commit: bool = True) -> ProjectMetadataMod url = f"{self.base_url}/projects" params = {"project": project, "commit": commit} headers = {"client_id": self.client_id} - response_data = self._send_request("PUT", url, params=params, headers=headers) + response_data = self._send_request( + "PUT", url, params=params, headers=headers + ) return ProjectMetadataModel.parse_obj(response_data) except Exception as exception: self._handle_exception(exception) @@ -150,7 +156,9 @@ def apply_entity(self, entity: Entity, project: str, commit: bool = True): data = EntityModel.from_entity(entity).json() params = {"commit": commit} headers = {"client_id": self.client_id} - response_data = self._send_request("PUT", url, params=params, data=data, headers=headers) + response_data = self._send_request( + "PUT", url, params=params, data=data, headers=headers + ) return EntityModel.parse_obj(response_data).to_entity() except Exception as exception: self._handle_exception(exception) @@ -185,7 +193,9 @@ def get_entity( # type: ignore[return] url = f"{self.base_url}/projects/{project}/entities/{name}" params = {"allow_cache": True} headers = {"client_id": self.client_id} - response_data = self._send_request("GET", url, params=params, headers=headers) + response_data = self._send_request( + "GET", url, params=params, headers=headers + ) return EntityModel.parse_obj(response_data).to_entity() except EntityNotFoundException as exception: logger.error( @@ -209,7 +219,9 @@ def list_entities( # type: ignore[return] url = f"{self.base_url}/projects/{project}/entities" params = {"allow_cache": True} headers = {"client_id": self.client_id} - response_data = self._send_request("GET", url, params=params, headers=headers) + response_data = self._send_request( + "GET", url, params=params, headers=headers + ) response_list = response_data if isinstance(response_data, list) else [] return [ EntityModel.parse_obj(entity).to_entity() for entity in response_list @@ -226,7 +238,9 @@ def apply_data_source( headers = {"client_id": self.client_id} if isinstance(data_source, SparkSource): data = SparkSourceModel.from_data_source(data_source).json() - response_data = self._send_request("PUT", url, params=params, data=data, headers=headers) + response_data = self._send_request( + "PUT", url, params=params, data=data, headers=headers + ) return SparkSourceModel.parse_obj(response_data).to_data_source() elif isinstance(data_source, RequestSource): data = RequestSourceModel.from_data_source(data_source).json() @@ -273,7 +287,9 @@ def get_data_source( # type: ignore[return] url = f"{self.base_url}/projects/{project}/data_sources/{name}" params = {"allow_cache": True} headers = {"client_id": self.client_id} - response_data = self._send_request("GET", url, params=params, headers=headers) + response_data = self._send_request( + "GET", url, params=params, headers=headers + ) if "model_type" in response_data: if response_data["model_type"] == "RequestSourceModel": return RequestSourceModel.parse_obj(response_data).to_data_source() @@ -304,7 +320,9 @@ def list_data_sources( # type: ignore[return] url = f"{self.base_url}/projects/{project}/data_sources" params = {"allow_cache": True} headers = {"client_id": self.client_id} - response_data = self._send_request("GET", url, params=params, headers=headers) + response_data = self._send_request( + "GET", url, params=params, headers=headers + ) response_list = response_data if isinstance(response_data, list) else [] data_source_list = [] for data_source in response_list: @@ -330,7 +348,9 @@ def apply_feature_service( data = FeatureServiceModel.from_feature_service(feature_service).json() params = {"commit": commit} headers = {"client_id": self.client_id} - response_data = self._send_request("PUT", url, params=params, data=data, headers=headers) + response_data = self._send_request( + "PUT", url, params=params, data=data, headers=headers + ) return FeatureServiceModel.parse_obj(response_data).to_feature_service() except Exception as exception: self._handle_exception(exception) @@ -366,7 +386,9 @@ def get_feature_service( # type: ignore[return] url = f"{self.base_url}/projects/{project}/feature_services/{name}" params = {"allow_cache": True} headers = {"client_id": self.client_id} - response_data = self._send_request("GET", url, params=params, headers=headers) + response_data = self._send_request( + "GET", url, params=params, headers=headers + ) return FeatureServiceModel.parse_obj(response_data).to_feature_service() except FeatureServiceNotFoundException as exception: logger.error( @@ -388,7 +410,9 @@ def list_feature_services( # type: ignore[return] url = f"{self.base_url}/projects/{project}/feature_services" params = {"allow_cache": True} headers = {"client_id": self.client_id} - response_data = self._send_request("GET", url, params=params, headers=headers) + response_data = self._send_request( + "GET", url, params=params, headers=headers + ) response_list = response_data if isinstance(response_data, list) else [] return [ FeatureServiceModel.parse_obj(feature_service).to_feature_service() @@ -406,12 +430,16 @@ def apply_feature_view( if isinstance(feature_view, FeatureView): url = f"{self.base_url}/projects/{project}/feature_views" data = FeatureViewModel.from_feature_view(feature_view).json() - response_data = self._send_request("PUT", url, params=params, data=data, headers=headers) + response_data = self._send_request( + "PUT", url, params=params, data=data, headers=headers + ) return FeatureViewModel.parse_obj(response_data).to_feature_view() elif isinstance(feature_view, OnDemandFeatureView): url = f"{self.base_url}/projects/{project}/on_demand_feature_views" data = OnDemandFeatureViewModel.from_feature_view(feature_view).json() - response_data = self._send_request("PUT", url, params=params, data=data, headers=headers) + response_data = self._send_request( + "PUT", url, params=params, data=data, headers=headers + ) return OnDemandFeatureViewModel.parse_obj( response_data ).to_feature_view() @@ -453,7 +481,9 @@ def get_feature_view( # type: ignore[return] url = f"{self.base_url}/projects/{project}/feature_views/{name}" params = {"allow_cache": True} headers = {"client_id": self.client_id} - response_data = self._send_request("GET", url, params=params, headers=headers) + response_data = self._send_request( + "GET", url, params=params, headers=headers + ) return FeatureViewModel.parse_obj(response_data).to_feature_view() except FeatureViewNotFoundException as exception: logger.error( @@ -475,7 +505,9 @@ def list_feature_views( # type: ignore[return] url = f"{self.base_url}/projects/{project}/feature_views" params = {"allow_cache": True} headers = {"client_id": self.client_id} - response_data = self._send_request("GET", url, params=params, headers=headers) + response_data = self._send_request( + "GET", url, params=params, headers=headers + ) response_list = response_data if isinstance(response_data, list) else [] return [ FeatureViewModel.parse_obj(feature_view).to_feature_view() @@ -496,7 +528,9 @@ def get_on_demand_feature_view( # type: ignore[return] url = f"{self.base_url}/projects/{project}/on_demand_feature_views/{name}" params = {"allow_cache": True} headers = {"client_id": self.client_id} - response_data = self._send_request("GET", url, params=params, headers=headers) + response_data = self._send_request( + "GET", url, params=params, headers=headers + ) return OnDemandFeatureViewModel.parse_obj(response_data).to_feature_view() except FeatureViewNotFoundException as exception: logger.error( @@ -518,7 +552,9 @@ def list_on_demand_feature_views( # type: ignore[return] url = f"{self.base_url}/projects/{project}/on_demand_feature_views" params = {"allow_cache": True} headers = {"client_id": self.client_id} - response_data = self._send_request("GET", url, params=params, headers=headers) + response_data = self._send_request( + "GET", url, params=params, headers=headers + ) response_list = response_data if isinstance(response_data, list) else [] return [ OnDemandFeatureViewModel.parse_obj(feature_view).to_feature_view() @@ -573,7 +609,9 @@ def apply_materialization( headers = {"client_id": self.client_id} url = f"{self.base_url}/projects/{project}/feature_views" data = FeatureViewModel.from_feature_view(feature_view).json() - response_data = self._send_request("PUT", url, params=params, data=data, headers=headers) + response_data = self._send_request( + "PUT", url, params=params, data=data, headers=headers + ) return FeatureViewModel.parse_obj(response_data).to_feature_view() else: raise TypeError( @@ -787,7 +825,9 @@ def list_project_metadata( # type: ignore[return] url = f"{self.base_url}/projects/{project}" params = {"allow_cache": True} headers = {"client_id": self.client_id} - response_data = self._send_request("GET", url, params=params, headers=headers) + response_data = self._send_request( + "GET", url, params=params, headers=headers + ) return [ProjectMetadataModel.parse_obj(response_data).to_project_metadata()] except ProjectMetadataNotFoundException as exception: logger.error(