Skip to content

Commit

Permalink
Merge branch 'main' into add-canales-s3-functions
Browse files Browse the repository at this point in the history
  • Loading branch information
sharinetmc authored Sep 6, 2023
2 parents cd1d6d9 + 3e1001b commit 6d5ada6
Show file tree
Hide file tree
Showing 15 changed files with 564 additions and 80 deletions.
22 changes: 22 additions & 0 deletions .github/release.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
changelog:
categories:
- title: New Features
labels:
- connector-update
- new-connector
- parsons-core
- title: Automated Testing
labels:
- testing
- title: Bug Fixes
labels:
- bug-fix
- title: Documentation
labels:
- documentation
# - title: New Contributors
# labels:
# -🎉-first-PR
- title: Other Changes
labels:
- "*"
2 changes: 2 additions & 0 deletions parsons/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
("parsons.controlshift.controlshift", "Controlshift"),
("parsons.copper.copper", "Copper"),
("parsons.crowdtangle.crowdtangle", "CrowdTangle"),
("parsons.databases.database_connector", "DatabaseConnector"),
("parsons.databases.discover_database", "discover_database"),
("parsons.databases.db_sync", "DBSync"),
("parsons.databases.mysql.mysql", "MySQL"),
("parsons.databases.postgres.postgres", "Postgres"),
Expand Down
190 changes: 190 additions & 0 deletions parsons/databases/database_connector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
from abc import ABC, abstractmethod
from typing import Optional
from parsons.etl.table import Table


class DatabaseConnector(ABC):
"""
An abstract base class that provides a uniform interface for all Parsons database connectors.
This class should be used in functions instead of the specific database connector classes
when the functions don't rely on database-specific functionality.
It ensures that any class that inherits from it implements the methods that are uniform
operations when working with databases.
Should you use `DatabaseConnector` instead of `Redshift`/`BigQuery`/etc?
Overall this class is mostly useful for code in the Parsons library, not code using it.
There could be some exceptions. In general though, if you are writing a script to do a task
like moving data out of an API service and into a data warehouse, you probably do not need
to use DatabaseConnector. You can probably just use the Parsons class that directly corresponds
with the database that you use.
Here are more examples of situations where you may or may not need to use DatabaseConnector:
1. You do not use type annotations, or you don't know what "type annotations" are - No
If you do not use type annotations for your code, then you do not need to think about
`DatabaseConnector` when writing your code. This is the most common case. If none
of the cases below apply to you, then you probably don't need it.
In this simple example, we are not using type annotations in our code. We don't need
to think about exactly what class is being passed in. Python will figure it out.
```python
def my_database_function(db):
some_data = get_some_data()
db.copy("some_table", some_data)
# These will all just work:
my_database_function(Redshift())
my_database_function(MySQL())
my_database_functon(BigQuery())
```
2. You only use one database in your work - No
This is where most people will fall. Usually code is not intended to run on
multiple databases without modification. For example, if you are working for
an organization that uses Amazon Redshift as your data warehouse, you do not
need to use `DatabaseConnector` to write ETL scripts to load data into your
Redshift. It is rare that organizations switch databases. In the cases where
that does occur, usually more work is required to migrate your environment and
your vendor-specific SQL than would be saved by using `DatabaseConnector`.
3. You are writing a sample script or a tutorial - Yes
If you are using Parsons to write a sample script or tutorial, you should use
`DatabaseConnector`! If you use `DatabaseConnector` type annotations and the
`discover_database` function, then your sample code will run on any system.
This makes it much easier for new programmers to get your code working on
their system.
4. Utility code inside Parsons or other libraries - Yes
If you are writing a utility script inside Parsons or another library meant
for broad distribution, you should probably use `DatabaseConnector` type
annotations. This will ensure that your library code will be usable by the
widest possible set of users, not just users on one specific database.
Developer Notes:
This class is an Abstract Base Class (ABC). It's designed to ensure that all classes
inheriting from it implement certain methods, enforcing a consistent interface across
database connectors.
If you need to add a new method to the database connectors, there are three options:
1. Add the method to this ABC and implement it for all databases.
2. Add the method to this ABC and implement it for some databases while adding stubs for
others.
3. Implement the method on a specific database connector without touching the ABC.
If you go the second route, you can add a stub method like this:
.. code-block:: python
def new_method(self, arg1, arg2):
raise NotImplementedError("Method not implemented for this database connector.")
```
This communicates clearly to users that the method does not exist for certain connectors.
If you go the third route, remember that you're responsible for making sure your new
method matches the existing methods in other database connectors. For example, if you're
adding a method that already exists in another connector, like Redshift, you need to ensure
your new method behaves the same way and has the same parameters with the same types in the
same order. See the note below for more detail.
Note:
The Python type system (as of 3.10.6) will not stop you from breaking the type contract
of method signatures when implementing a subclass. It is up to the author of a database
connector to ensure that it satisfies this interface. Be careful to, for example, not
change the types of the parameters or leave out optional parameters that are specified
in the interface.
Any such inconsistencies can cause unexpected runtime errors that will not be caught by
the type checker.
It is safe to add additional features to subclasses, such as new methods or extra *optional*
parameters to specified methods. In general adding new methods is safe, but adding optional
parameters to methods specified in the interface should be considered bad practice, because
it could result in unexpected behavior.
Example usage:
.. code-block:: python
def my_function(db: DatabaseConnector, data: Table):
# Your code here, using the db object
# Pass an instance of a class that inherits from DatabaseConnector, e.g. Redshift
my_function(some_db_instance, some_data)
"""

@abstractmethod
def table_exists(self, table_name: str) -> bool:
"""Check if a table or view exists in the database.
`Args:`
table_name: str
The table name and schema (e.g. ``myschema.mytable``).
`Returns:`
boolean
``True`` if the table exists and ``False`` if it does not.
"""
pass

@abstractmethod
def copy(self, tbl: Table, table_name: str, if_exists: str):
"""Copy a :ref:`parsons-table` to the database.
`Args`:
tbl (Table):
Table containing the data to save.
table_name (str):
The destination table name (ex. ``my_schema.my_table``).
if_exists (str):
If the table already exists, either ``fail``, ``append``, ``drop``
or ``truncate`` the table.
"""
pass

@abstractmethod
def query(self, sql: str, parameters: Optional[list] = None) -> Optional[Table]:
"""Execute a query against the database. Will return ``None`` if the query returns empty.
To include python variables in your query, it is recommended to pass them as parameters,
following the `psycopg style
<http://initd.org/psycopg/docs/usage.html#passing-parameters-to-sql-queries>`.
Using the ``parameters`` argument ensures that values are escaped properly, and avoids SQL
injection attacks.
**Parameter Examples**
.. code-block:: python
# Note that the name contains a quote, which could break your query if not escaped
# properly.
name = "Beatrice O'Brady"
sql = "SELECT * FROM my_table WHERE name = %s"
db.query(sql, parameters=[name])
.. code-block:: python
names = ["Allen Smith", "Beatrice O'Brady", "Cathy Thompson"]
placeholders = ', '.join('%s' for item in names)
sql = f"SELECT * FROM my_table WHERE name IN ({placeholders})"
db.query(sql, parameters=names)
`Args:`
sql: str
A valid SQL statement
parameters: Optional[list]
A list of python variables to be converted into SQL values in your query
`Returns:`
Parsons Table
See :ref:`parsons-table` for output options.
"""
pass
79 changes: 79 additions & 0 deletions parsons/databases/discover_database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import os
from typing import Optional, Union, Type, List

from parsons.databases.database_connector import DatabaseConnector
from parsons.databases.redshift import Redshift
from parsons.databases.mysql import MySQL
from parsons.databases.postgres import Postgres
from parsons.google.google_bigquery import GoogleBigQuery


def discover_database(
default_connector: Optional[
Union[Type[DatabaseConnector], List[Type[DatabaseConnector]]]
] = None
) -> DatabaseConnector:
"""Create an appropriate ``DatabaseConnector`` based on environmental variables.
Will search the environmental variables for the proper credentials for the
Redshift, MySQL, Postgres, and BigQuery connectors. See the documentation
for the connectors to variables required to initialize them.
If no suitable configuration is found, will raise an error.
If multiple suitable configurations are found, will raise an error unless
a default connector class or list of classes is provided.
Note that the variables to be searched for are hard-coded in this function,
since they are unlikely to change. If that is done, for some reason, or a
new database connector is added, ``discover_database`` should be updated
Args:
default_connector: Optional, single Class or list of Classes inheriting from
DatabaseConnector to be used as default in case multiple database configurations
are detected.
Returns:
DatabaseConnector: The database connector configured in the environment.
"""
connectors = {
"Redshift": Redshift,
"MySQL": MySQL,
"Postgres": Postgres,
"GoogleBigQuery": GoogleBigQuery,
}

password_vars = {
"Redshift": "REDSHIFT_PASSWORD",
"MySQL": "MYSQL_PASSWORD",
"Postgres": "PGPASSWORD",
"GoogleBigQuery": "GOOGLE_APPLICATION_CREDENTIALS",
}

detected = [name for name in connectors.keys() if os.getenv(password_vars[name])]

if len(detected) > 1:
if default_connector is None:
raise EnvironmentError(
f"Multiple database configurations detected: {detected}."
" Please specify a default connector."
)

if isinstance(default_connector, list):
for connector in default_connector:
if connector.__name__ in detected:
return connector()
raise EnvironmentError(
f"None of the default connectors {default_connector} were detected."
)
elif default_connector.__name__ in detected:
return default_connector()
else:
raise EnvironmentError(
f"Default connector {default_connector} not detected. Detected: {detected}."
)

elif detected:
return connectors[detected[0]]()
else:
raise EnvironmentError("Could not find any database configuration.")
15 changes: 9 additions & 6 deletions parsons/databases/mysql/mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import pickle
import logging
import os
from parsons.databases.database_connector import DatabaseConnector
from parsons.databases.table import BaseTable
from parsons.databases.mysql.create_table import MySQLCreateTable
from parsons.databases.alchemy import Alchemy
Expand All @@ -19,7 +20,7 @@
logger = logging.getLogger(__name__)


class MySQL(MySQLCreateTable, Alchemy):
class MySQL(DatabaseConnector, MySQLCreateTable, Alchemy):
"""
Connect to a MySQL database.
Expand Down Expand Up @@ -151,7 +152,6 @@ def query_with_connection(self, sql, connection, parameters=None, commit=True):
See :ref:`parsons-table` for output options.
"""
with self.cursor(connection) as cursor:

# The python connector can only execute a single sql statement, so we will
# break up each statement and execute them separately.
for s in sql.strip().split(";"):
Expand Down Expand Up @@ -193,7 +193,12 @@ def query_with_connection(self, sql, connection, parameters=None, commit=True):
return final_tbl

def copy(
self, tbl, table_name, if_exists="fail", chunk_size=1000, strict_length=True
self,
tbl: Table,
table_name: str,
if_exists: str = "fail",
chunk_size: int = 1000,
strict_length: bool = True,
):
"""
Copy a :ref:`parsons-table` to the database.
Expand Down Expand Up @@ -225,7 +230,6 @@ def copy(
return None

with self.connection() as connection:

# Create table if not exists
if self._create_table_precheck(connection, table_name, if_exists):
sql = self.create_statement(
Expand Down Expand Up @@ -282,7 +286,6 @@ def _create_table_precheck(self, connection, table_name, if_exists):

# If the table exists, evaluate the if_exists argument for next steps.
if self.table_exists(table_name):

if if_exists == "fail":
raise ValueError("Table already exists.")

Expand All @@ -301,7 +304,7 @@ def _create_table_precheck(self, connection, table_name, if_exists):
else:
return True

def table_exists(self, table_name):
def table_exists(self, table_name: str) -> bool:
"""
Check if a table or view exists in the database.
Expand Down
Loading

0 comments on commit 6d5ada6

Please sign in to comment.