Skip to content

Commit

Permalink
Remove Provider Deprecations in Elasticsearch (apache#44629)
Browse files Browse the repository at this point in the history
* Remove Provider Deprecations in Elasticsearch

* Update Changelog

* Fix Changelog typo and indent

* Fix provider.yaml and test
  • Loading branch information
jason810496 authored Dec 6, 2024
1 parent 37236b6 commit f326e47
Show file tree
Hide file tree
Showing 6 changed files with 19 additions and 74 deletions.
13 changes: 13 additions & 0 deletions providers/src/airflow/providers/elasticsearch/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,19 @@
Changelog
---------

main
.....

.. warning::
All deprecated classes, parameters and features have been removed from the ElasticSearch provider package.
The following breaking changes were introduced:

* Hooks
* Remove ``airflow.providers.elasticsearch.hooks.elasticsearch.ElasticsearchHook``. Use ``airflow.providers.elasticsearch.hooks.elasticsearch.ElasticsearchSQLHook`` instead.
* Log
* Removed ``log_id_template`` parameter from ``ElasticsearchTaskHandler``.
* Removed ``retry_timeout`` parameter from ``ElasticsearchTaskHandler``. Use ``retry_on_timeout`` instead

5.5.3
.....

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@
from typing import TYPE_CHECKING, Any
from urllib import parse

from deprecated import deprecated
from elasticsearch import Elasticsearch

from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.hooks.base import BaseHook
from airflow.providers.common.sql.hooks.sql import DbApiHook

Expand Down Expand Up @@ -142,21 +140,6 @@ def get_uri(self) -> str:
return uri


@deprecated(
reason="Please use `airflow.providers.elasticsearch.hooks.elasticsearch.ElasticsearchSQLHook`.",
category=AirflowProviderDeprecationWarning,
)
class ElasticsearchHook(ElasticsearchSQLHook):
"""
This class is deprecated and was renamed to ElasticsearchSQLHook.
Please use :class:`airflow.providers.elasticsearch.hooks.elasticsearch.ElasticsearchSQLHook`.
"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)


class ElasticsearchPythonHook(BaseHook):
"""
Interacts with Elasticsearch. This hook uses the official Elasticsearch Python Client.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import logging
import sys
import time
import warnings
from collections import defaultdict
from operator import attrgetter
from typing import TYPE_CHECKING, Any, Callable, Literal
Expand All @@ -36,7 +35,7 @@

from airflow import __version__ as airflow_version
from airflow.configuration import conf
from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning
from airflow.exceptions import AirflowException
from airflow.models.dagrun import DagRun
from airflow.providers.elasticsearch.log.es_json_formatter import ElasticsearchJSONFormatter
from airflow.providers.elasticsearch.log.es_response import ElasticSearchResponse, Hit
Expand Down Expand Up @@ -77,20 +76,6 @@ def get_es_kwargs_from_config() -> dict[str, Any]:
if elastic_search_config
else {}
)
# TODO: Remove in next major release (drop support for elasticsearch<8 parameters)
if (
elastic_search_config
and "retry_timeout" in elastic_search_config
and not kwargs_dict.get("retry_on_timeout")
):
warnings.warn(
"retry_timeout is not supported with elasticsearch>=8. Please use `retry_on_timeout`.",
AirflowProviderDeprecationWarning,
stacklevel=2,
)
retry_timeout = elastic_search_config.get("retry_timeout")
if retry_timeout is not None:
kwargs_dict["retry_on_timeout"] = retry_timeout
return kwargs_dict


Expand Down Expand Up @@ -162,8 +147,6 @@ def __init__(
index_patterns: str = conf.get("elasticsearch", "index_patterns"),
index_patterns_callable: str = conf.get("elasticsearch", "index_patterns_callable", fallback=""),
es_kwargs: dict | None | Literal["default_es_kwargs"] = "default_es_kwargs",
*,
log_id_template: str | None = None,
**kwargs,
):
es_kwargs = es_kwargs or {}
Expand All @@ -175,14 +158,7 @@ def __init__(

self.client = elasticsearch.Elasticsearch(host, **es_kwargs)
# in airflow.cfg, host of elasticsearch has to be http://dockerhostXxxx:9200
if USE_PER_RUN_LOG_ID and log_id_template is not None:
warnings.warn(
"Passing log_id_template to ElasticsearchTaskHandler is deprecated and has no effect",
AirflowProviderDeprecationWarning,
stacklevel=2,
)

self.log_id_template = log_id_template # Only used on Airflow < 2.3.2.
self.frontend = frontend
self.mark_end_on_close = True
self.end_of_log_mark = end_of_log_mark.strip()
Expand Down Expand Up @@ -244,8 +220,6 @@ def _render_log_id(self, ti: TaskInstance | TaskInstanceKey, try_number: int) ->
dag_run = ti.get_dagrun(session=session)
if USE_PER_RUN_LOG_ID:
log_id_template = dag_run.get_log_template(session=session).elasticsearch_id
else:
log_id_template = self.log_id_template

if TYPE_CHECKING:
assert ti.task
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ hooks:
- airflow.providers.elasticsearch.hooks.elasticsearch

connection-types:
- hook-class-name: airflow.providers.elasticsearch.hooks.elasticsearch.ElasticsearchHook
- hook-class-name: airflow.providers.elasticsearch.hooks.elasticsearch.ElasticsearchSQLHook
connection-type: elasticsearch

logging:
Expand Down
30 changes: 4 additions & 26 deletions providers/tests/elasticsearch/hooks/test_elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,46 +20,24 @@
from unittest import mock
from unittest.mock import MagicMock

import pytest
from elasticsearch import Elasticsearch

from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.models import Connection
from airflow.providers.elasticsearch.hooks.elasticsearch import (
ElasticsearchHook,
ElasticsearchPythonHook,
ElasticsearchSQLHook,
ESConnection,
)


class TestElasticsearchHook:
def test_throws_warning(self):
self.cur = mock.MagicMock(rowcount=0)
self.conn = mock.MagicMock()
self.conn.cursor.return_value = self.cur
conn = self.conn
self.connection = Connection(host="localhost", port=9200, schema="http")

with pytest.warns(AirflowProviderDeprecationWarning):

class UnitTestElasticsearchHook(ElasticsearchHook):
conn_name_attr = "test_conn_id"

def get_conn(self):
return conn

self.db_hook = UnitTestElasticsearchHook()


class TestElasticsearchSQLHookConn:
def setup_method(self):
self.connection = Connection(host="localhost", port=9200, schema="http")

class UnitTestElasticsearchHook(ElasticsearchSQLHook):
class UnitTestElasticsearchSQLHook(ElasticsearchSQLHook):
conn_name_attr = "elasticsearch_conn_id"

self.db_hook = UnitTestElasticsearchHook()
self.db_hook = UnitTestElasticsearchSQLHook()
self.db_hook.get_connection = mock.Mock()
self.db_hook.get_connection.return_value = self.connection

Expand All @@ -77,13 +55,13 @@ def setup_method(self):
self.conn.cursor.return_value = self.cur
conn = self.conn

class UnitTestElasticsearchHook(ElasticsearchSQLHook):
class UnitTestElasticsearchSQLHook(ElasticsearchSQLHook):
conn_name_attr = "test_conn_id"

def get_conn(self):
return conn

self.db_hook = UnitTestElasticsearchHook()
self.db_hook = UnitTestElasticsearchSQLHook()

def test_get_first_record(self):
statement = "SQL"
Expand Down
3 changes: 0 additions & 3 deletions tests/always/test_project_structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,9 +526,6 @@ class TestElasticsearchProviderProjectStructure(ExampleCoverageTest):
PROVIDER = "elasticsearch"
CLASS_DIRS = {"hooks"}
CLASS_SUFFIXES = ["Hook"]
DEPRECATED_CLASSES = {
"airflow.providers.elasticsearch.hooks.elasticsearch.ElasticsearchHook",
}


class TestCncfProviderProjectStructure(ExampleCoverageTest):
Expand Down

0 comments on commit f326e47

Please sign in to comment.