Skip to content

Commit

Permalink
✨ Source File: add fixed width file format support (#34678)
Browse files Browse the repository at this point in the history
Co-authored-by: mgreene <[email protected]>
Co-authored-by: Serhii Lazebnyi <[email protected]>
Co-authored-by: Serhii Lazebnyi <[email protected]>
  • Loading branch information
4 people authored Feb 13, 2024
1 parent 462970f commit f6b4436
Show file tree
Hide file tree
Showing 15 changed files with 183 additions and 88 deletions.
3 changes: 3 additions & 0 deletions airbyte-integrations/connectors/source-file/.coveragerc
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[run]
omit =
source_file/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ acceptance_tests:
extra_fields: no
exact_order: no
extra_records: yes
file_types:
skip_test: yes
bypass_reason: "Source is not based on file based CDK"
full_refresh:
tests:
- config_path: "integration_tests/config.json"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"jsonl",
"excel",
"excel_binary",
"fwf",
"feather",
"parquet",
"yaml"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def check_read(config, expected_columns=10, expected_rows=42):
("jsonl", "jsonl", 2, 6492, "jsonl"),
("excel", "xls", 8, 50, "demo"),
("excel", "xlsx", 8, 50, "demo"),
("fwf", "txt", 4, 2, "demo"),
("feather", "feather", 9, 3, "demo"),
("parquet", "parquet", 9, 3, "demo"),
("yaml", "yaml", 8, 3, "demo"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"streams": [
{
"stream": {
"name": "test",
"json_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"$schema": "http://json-schema.org/schema#",
"type": "object",
"properties": {
"text": { "type": "string" },
"num": { "type": "number" },
"float": { "type": "number" },
"bool": { "type": "string" }
}
}
}
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
text num float bool
short 1 0.2 true
long_text 33 0.0 false
Binary file not shown.
Binary file not shown.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-file/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: file
connectorType: source
definitionId: 778daa7c-feaf-4db6-96f3-70fd645acc77
dockerImageTag: 0.3.16
dockerImageTag: 0.3.17
dockerRepository: airbyte/source-file
documentationUrl: https://docs.airbyte.com/integrations/sources/file
githubIssueLabel: source-file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ def storage_scheme(self) -> str:
"""
storage_name = self._provider["storage"].upper()
parse_result = urlparse(self._url)

if storage_name == "GCS":
return "gs://"
elif storage_name == "S3":
Expand All @@ -191,7 +192,7 @@ def storage_scheme(self) -> str:
elif parse_result.scheme:
return parse_result.scheme

logger.error(f"Unknown Storage provider in: {self.full_url}")
logger.error(f"Unknown Storage provider in: {self._url}")
return ""

def _open_gcs_url(self) -> object:
Expand Down Expand Up @@ -328,6 +329,7 @@ def load_dataframes(self, fp, skip_data=False, read_sample_chunk: bool = False)
"html": pd.read_html,
"excel": pd.read_excel,
"excel_binary": pd.read_excel,
"fwf": pd.read_fwf,
"feather": pd.read_feather,
"parquet": pd.read_parquet,
"orc": pd.read_orc,
Expand All @@ -354,9 +356,9 @@ def load_dataframes(self, fp, skip_data=False, read_sample_chunk: bool = False)
yield record
if read_sample_chunk and bytes_read >= self.CSV_CHUNK_SIZE:
return
elif self._reader_options == "excel_binary":
elif self._reader_format == "excel_binary":
reader_options["engine"] = "pyxlsb"
yield from reader(fp, **reader_options)
yield reader(fp, **reader_options)
elif self._reader_format == "excel":
# Use openpyxl to read new-style Excel (xlsx) file; return to pandas for others
try:
Expand Down Expand Up @@ -483,7 +485,7 @@ def streams(self, empty_schema: bool = False) -> Iterable:

def openpyxl_chunk_reader(self, file, **kwargs):
"""Use openpyxl lazy loading feature to read excel files (xlsx only) in chunks of 500 lines at a time"""
work_book = load_workbook(filename=file, read_only=True)
work_book = load_workbook(filename=file)
user_provided_column_names = kwargs.get("names")
for sheetname in work_book.sheetnames:
work_sheet = work_book[sheetname]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
AirbyteConnectionStatus,
AirbyteMessage,
AirbyteRecordMessage,
AirbyteStreamStatus,
ConfiguredAirbyteCatalog,
ConnectorSpecification,
FailureType,
Expand All @@ -24,6 +25,7 @@
)
from airbyte_cdk.sources import Source
from airbyte_cdk.utils import AirbyteTracedException, is_cloud_environment
from airbyte_cdk.utils.stream_status_utils import as_airbyte_message as stream_status_as_airbyte_message

from .client import Client
from .utils import LOCAL_STORAGE_NAME, dropbox_force_download
Expand Down Expand Up @@ -61,6 +63,7 @@ class SourceFile(Source):
- read_json
- read_html
- read_excel
- read_fwf
- read_feather
- read_parquet
- read_orc
Expand Down Expand Up @@ -170,14 +173,33 @@ def read(
fields = self.selected_fields(catalog, config)
name = client.stream_name

logger.info(f"Reading {name} ({client.reader.full_url})...")
configured_stream = catalog.streams[0]

logger.info(f"Syncing stream: {name} ({client.reader.full_url})...")

yield stream_status_as_airbyte_message(configured_stream, AirbyteStreamStatus.STARTED)

record_counter = 0
try:
for row in client.read(fields=fields):
record = AirbyteRecordMessage(stream=name, data=row, emitted_at=int(datetime.now().timestamp()) * 1000)

record_counter += 1
if record_counter == 1:
logger.info(f"Marking stream {name} as RUNNING")
yield stream_status_as_airbyte_message(configured_stream, AirbyteStreamStatus.RUNNING)

yield AirbyteMessage(type=Type.RECORD, record=record)

logger.info(f"Marking stream {name} as STOPPED")
yield stream_status_as_airbyte_message(configured_stream, AirbyteStreamStatus.COMPLETE)

except Exception as err:
reason = f"Failed to read data of {name} at {client.reader.full_url}: {repr(err)}\n{traceback.format_exc()}"
logger.error(reason)
logger.exception(f"Encountered an exception while reading stream {name}")
logger.info(f"Marking stream {name} as STOPPED")
yield stream_status_as_airbyte_message(configured_stream, AirbyteStreamStatus.INCOMPLETE)
raise err

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"jsonl",
"excel",
"excel_binary",
"fwf",
"feather",
"parquet",
"yaml"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@

import pytest
from airbyte_cdk.utils import AirbyteTracedException
from pandas import read_csv, read_excel
from pandas import read_csv, read_excel, testing
from paramiko import SSHException
from source_file.client import Client, URLFile
from source_file.utils import backoff_handler
from urllib3.exceptions import ProtocolError


Expand All @@ -34,21 +35,22 @@ def csv_format_client():


@pytest.mark.parametrize(
"storage, expected_scheme",
"storage, expected_scheme, url",
[
("GCS", "gs://"),
("S3", "s3://"),
("AZBLOB", "azure://"),
("HTTPS", "https://"),
("SSH", "scp://"),
("SCP", "scp://"),
("SFTP", "sftp://"),
("WEBHDFS", "webhdfs://"),
("LOCAL", "file://"),
("GCS", "gs://", "http://localhost"),
("S3", "s3://", "http://localhost"),
("AZBLOB", "azure://", "http://localhost"),
("HTTPS", "https://", "http://localhost"),
("SSH", "scp://", "http://localhost"),
("SCP", "scp://", "http://localhost"),
("SFTP", "sftp://", "http://localhost"),
("WEBHDFS", "webhdfs://", "http://localhost"),
("LOCAL", "file://", "http://localhost"),
("WRONG", "", ""),
],
)
def test_storage_scheme(storage, expected_scheme):
urlfile = URLFile(provider={"storage": storage}, url="http://localhost")
def test_storage_scheme(storage, expected_scheme, url):
urlfile = URLFile(provider={"storage": storage}, url=url)
assert urlfile.storage_scheme == expected_scheme


Expand Down Expand Up @@ -80,8 +82,27 @@ def test_load_dataframes_xlsb(config, absolute_path, test_files):
assert read_file.equals(expected)


def test_load_nested_json(client, absolute_path, test_files):
f = f"{absolute_path}/{test_files}/formats/json/demo.json"
@pytest.mark.parametrize("file_name, should_raise_error", [("test.xlsx", False), ("test_one_line.xlsx", True)])
def test_load_dataframes_xlsx(config, absolute_path, test_files, file_name, should_raise_error):
config["format"] = "excel"
client = Client(**config)
f = f"{absolute_path}/{test_files}/{file_name}"
if should_raise_error:
with pytest.raises(AirbyteTracedException):
next(client.load_dataframes(fp=f))
else:
read_file = next(client.load_dataframes(fp=f))
expected = read_excel(f, engine="openpyxl")
assert read_file.equals(expected)


@pytest.mark.parametrize("file_format, file_path", [("json", "formats/json/demo.json"),
("jsonl", "formats/jsonl/jsonl_nested.jsonl")])
def test_load_nested_json(client, config, absolute_path, test_files, file_format, file_path):
if file_format == "jsonl":
config["format"] = file_format
client = Client(**config)
f = f"{absolute_path}/{test_files}/{file_path}"
with open(f, mode="rb") as file:
assert client.load_nested_json(fp=file)

Expand Down Expand Up @@ -189,3 +210,11 @@ def patched_open(self):
assert call_count == 7

assert sleep_mock.call_count == 5


def test_backoff_handler(caplog):
details = {"tries": 1, "wait": 1}
backoff_handler(details)
expected = [('airbyte', 20, 'Caught retryable error after 1 tries. Waiting 1 seconds then retrying...')]

assert caplog.record_tuples == expected
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
Type,
)
from airbyte_cdk.utils import AirbyteTracedException
from airbyte_protocol.models.airbyte_protocol import Type as MessageType
from source_file.source import SourceFile

logger = logging.getLogger("airbyte")
Expand Down Expand Up @@ -95,7 +96,8 @@ def test_nan_to_null(absolute_path, test_files):

source = SourceFile()
records = source.read(logger=logger, config=deepcopy(config), catalog=catalog)
records = [r.record.data for r in records]

records = [r.record.data for r in records if r.type == MessageType.RECORD]
assert records == [
{"col1": "key1", "col2": 1.11, "col3": None},
{"col1": "key2", "col2": None, "col3": 2.22},
Expand All @@ -105,13 +107,14 @@ def test_nan_to_null(absolute_path, test_files):

config.update({"format": "yaml", "url": f"{absolute_path}/{test_files}/formats/yaml/demo.yaml"})
records = source.read(logger=logger, config=deepcopy(config), catalog=catalog)
records = [r.record.data for r in records]
records = [r.record.data for r in records if r.type == MessageType.RECORD]
assert records == []

config.update({"provider": {"storage": "SSH", "user": "user", "host": "host"}})

with pytest.raises(Exception):
next(source.read(logger=logger, config=config, catalog=catalog))
for record in source.read(logger=logger, config=config, catalog=catalog):
pass


def test_spec(source):
Expand Down Expand Up @@ -176,7 +179,7 @@ def test_pandas_header_not_none(absolute_path, test_files):

source = SourceFile()
records = source.read(logger=logger, config=deepcopy(config), catalog=catalog)
records = [r.record.data for r in records]
records = [r.record.data for r in records if r.type == MessageType.RECORD]
assert records == [
{"text11": "text21", "text12": "text22"},
]
Expand All @@ -195,7 +198,7 @@ def test_pandas_header_none(absolute_path, test_files):

source = SourceFile()
records = source.read(logger=logger, config=deepcopy(config), catalog=catalog)
records = [r.record.data for r in records]
records = [r.record.data for r in records if r.type == MessageType.RECORD]
assert records == [
{"0": "text11", "1": "text12"},
{"0": "text21", "1": "text22"},
Expand Down Expand Up @@ -224,4 +227,4 @@ def test_incorrect_reader_options(absolute_path, test_files):
):
catalog = get_catalog({"0": {"type": ["string", "null"]}, "1": {"type": ["string", "null"]}})
records = source.read(logger=logger, config=deepcopy(config), catalog=catalog)
records = [r.record.data for r in records]
records = [r.record.data for r in records if r.type == MessageType.RECORD]
Loading

0 comments on commit f6b4436

Please sign in to comment.