Skip to content

Commit

Permalink
Add support for IAM database authentication for CloudSQL connection
Browse files Browse the repository at this point in the history
  • Loading branch information
MaksYermak committed Oct 28, 2024
1 parent 9772dbe commit 65c2641
Show file tree
Hide file tree
Showing 3 changed files with 519 additions and 2 deletions.
38 changes: 38 additions & 0 deletions docs/apache-airflow-providers-google/connections/gcp_sql.rst
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,41 @@ Extra (optional)
.. code-block:: bash
export AIRFLOW_CONN_GOOGLE_CLOUD_SQL_DEFAULT='gcpcloudsql://user:[email protected]:3306/mydb?database_type=mysql&project_id=example-project&location=europe-west1&instance=testinstance&use_proxy=True&sql_proxy_use_tcp=False'
Configuring and using IAM authentication
----------------------------------------

.. warning::
This functionality requires ``gcloud`` command (Google Cloud SDK) must be `installed
<https://cloud.google.com/sdk/docs/install>`_ on the Airflow worker.

.. warning::
IAM authentication working only for Google Service Accounts.

Configure Service Accounts on Google Cloud IAM side
"""""""""""""""""""""""""""""""""""""""""""""""""""

For connecting via IAM you need to use Service Account. It can be the same service account which you use for
the ``gcloud`` authentication or an another account. If you decide to use a different account then this
account should be impersonated from the account which used for ``gcloud`` authentication and granted
a ``Service Account Token Creator`` role. More information how to grant a role `here
<https://cloud.google.com/iam/docs/manage-access-service-accounts?hl=en&_gl=1*3bsv5i*_ga*NDY4NDIyNTcxLjE3MjkxNzQ4MTM.*_ga_WH2QY8WWF5*MTcyOTE5MzU1OS4yLjEuMTcyOTE5NTM0My4wLjAuMA..#single-role>`_.

Also the Service Account should be configured for working with IAM.
Here are links describing what should be done before the start:
`PostgreSQL<https://cloud.google.com/sql/docs/postgres/iam-logins#before_you_begin>`_ and
`MySQL<https://cloud.google.com/sql/docs/mysql/iam-logins#before_you_begin>`_.

Configure ``gcpcloudsql`` connection with IAM enabling
""""""""""""""""""""""""""""""""""""""""""""""""""""""

For using IAM you need to enable ``"use_iam": "True"`` in the ``extra`` field. And specify IAM account in this format
``USERNAME@PROJECT_ID.iam.gserviceaccount.com`` in ``login`` field and empty string in the ``password`` field.

For example:

.. exampleinclude:: /../../providers/tests/system/google/cloud/cloud_sql/example_cloud_sql_query_iam.py
:language: python
:dedent: 4
:start-after: [START howto_operator_cloudsql_iam_connections]
:end-before: [END howto_operator_cloudsql_iam_connections]
41 changes: 39 additions & 2 deletions providers/src/airflow/providers/google/cloud/hooks/cloud_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import platform
import random
import re
import shlex
import shutil
import socket
import string
Expand Down Expand Up @@ -777,6 +778,8 @@ class CloudSQLDatabaseHook(BaseHook):
SQL DB.
* **use_ssl** - (default False) Whether SSL should be used to connect to Cloud SQL DB.
You cannot use proxy and SSL together.
* **use_iam** - (default False) Whether IAM should be used to connect to Cloud SQL DB.
With using IAM password field should be empty string.
* **sql_proxy_use_tcp** - (default False) If set to true, TCP is used to connect via
proxy, otherwise UNIX sockets are used.
* **sql_proxy_version** - Specific version of the proxy to download (for example
Expand Down Expand Up @@ -839,11 +842,16 @@ def __init__(
self.database_type = self.extras.get("database_type")
self.use_proxy = self._get_bool(self.extras.get("use_proxy", "False"))
self.use_ssl = self._get_bool(self.extras.get("use_ssl", "False"))
self.use_iam = self._get_bool(self.extras.get("use_iam", "False"))
self.sql_proxy_use_tcp = self._get_bool(self.extras.get("sql_proxy_use_tcp", "False"))
self.sql_proxy_version = self.extras.get("sql_proxy_version")
self.sql_proxy_binary_path = sql_proxy_binary_path
self.user = self.cloudsql_connection.login
self.password = self.cloudsql_connection.password
if self.use_iam:
self.user = self._get_iam_db_login()
self.password = self._generate_login_token(service_account=self.cloudsql_connection.login)
else:
self.user = self.cloudsql_connection.login
self.password = self.cloudsql_connection.password
self.public_ip = self.cloudsql_connection.host
self.public_port = self.cloudsql_connection.port
self.ssl_cert = ssl_cert
Expand Down Expand Up @@ -1187,3 +1195,32 @@ def free_reserved_port(self) -> None:
if self.reserved_tcp_socket:
self.reserved_tcp_socket.close()
self.reserved_tcp_socket = None

def _get_iam_db_login(self) -> str:
"""Get an IAM login for Cloud SQL database."""
if not self.cloudsql_connection.login:
raise AirflowException("The login parameter needs to be set in connection")

if self.database_type == "postgres":
return self.cloudsql_connection.login.split(".gserviceaccount.com")[0]
else:
return self.cloudsql_connection.login.split("@")[0]

def _generate_login_token(self, service_account) -> str:
"""Generate an IAM login token for Cloud SQL and return the token."""
cmd = ["gcloud", "sql", "generate-login-token", f"--impersonate-service-account={service_account}"]
self.log.info("Executing command: %s", " ".join(shlex.quote(c) for c in cmd))
cloud_sql_hook = CloudSQLHook(api_version="v1", gcp_conn_id=self.gcp_conn_id)

with cloud_sql_hook.provide_authorized_gcloud():
proc = subprocess.run(cmd, capture_output=True)

if proc.returncode != 0:
stderr_last_20_lines = "\n".join(proc.stderr.decode().strip().splitlines()[-20:])
raise AirflowException(
f"Process exited with non-zero exit code. Exit code: {proc.returncode}. Error Details: "
f"{stderr_last_20_lines}"
)

auth_token = proc.stdout.decode().strip()
return auth_token
Loading

0 comments on commit 65c2641

Please sign in to comment.