Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Composite API Demo #670

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,4 @@ coverage.xml
.cci
.sfdx
/src.orig
/src
myvenv
6 changes: 6 additions & 0 deletions snowfakery/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
"jpg": "snowfakery.output_streams.ImageOutputStream",
"ps": "snowfakery.output_streams.ImageOutputStream",
"dot": "snowfakery.output_streams.GraphvizOutputStream",
"datapack": "snowfakery.experimental.DataPack",
"apex": "snowfakery.experimental.DataPack.ApexDataPack",
"json": "snowfakery.output_streams.JSONOutputStream",
"txt": "snowfakery.output_streams.DebugOutputStream",
"csv": "snowfakery.output_streams.CSVOutputStream",
Expand Down Expand Up @@ -251,6 +253,10 @@ def _get_output_streams(dburls, output_files, output_format, output_folder):

if output_stream_cls.uses_folder:
output_streams.append(output_stream_cls(output_folder))
elif output_folder and str(output_folder) != "." and not output_files:
raise exc.DataGenError(
"--output-folder can only be used with --output-file=<something> or --output-format=csv"
)

if output_files:
for f in output_files:
Expand Down
8 changes: 0 additions & 8 deletions snowfakery/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,14 +276,6 @@ def validate_options(
"Sorry, you need to pick --dburl or --output-file "
"because they are mutually exclusive."
)
if (
output_folder
and str(output_folder) != "."
and not (output_files or output_format == "csv")
):
raise click.ClickException(
"--output-folder can only be used with --output-file=<something> or --output-format=csv"
)

if target_number and reps:
raise click.ClickException(
Expand Down
2 changes: 2 additions & 0 deletions snowfakery/data_generator_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,8 @@ def loop_over_templates_until_finished(self, continuing):
self.iteration_count += 1
continuing = True
self.globals.reset_slots()
# let the output stream know that the recipe was finished
self.output_stream.complete_recipe()
self.row_history.reset_locals()

def loop_over_templates_once(self, statement_list, continuing: bool):
Expand Down
210 changes: 210 additions & 0 deletions snowfakery/experimental/DataPack.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
# Use this experimental OutputStream like this:

# snowfakery --output-format snowfakery.experimental.DataPack recipe.yml > composite.json
#
# Once you have the file you can make it accessible to Salesforce by uploading it
# to some form of server. E.g. Github gist, Heroku, etc.
#
# Then you can use Anon Apex like that in `LoadCompositeAPIData.apex` to load it into
# any org. e.g.:

# sfdx force:apex:execute -f ./examples/salesforce/LoadCompositeAPIData.apex -u Snowfakery__qa
# or
# cci task run execute_anon --path examples/salesforce/LoadCompositeAPIData.apex --org qa
#
# Note that Salesforce will complain if the dataset has more than 500 rows.

# TODO: Add tests

import json
from logging import warning
from io import StringIO
import typing as T
import datetime
from pathlib import Path
from tempfile import TemporaryDirectory

from snowfakery.output_streams import FileOutputStream, OutputStream

MAX_BATCH_SIZE = 500 # https://developer.salesforce.com/docs/atlas.en-us.api_rest.meta/api_rest/resources_composite_graph_limits.htm


class SalesforceCompositeAPIOutput(FileOutputStream):
"""Output stream that generates records for Salesforce's Composite API"""

encoders: T.Mapping[type, T.Callable] = {
**FileOutputStream.encoders,
datetime.date: str,
datetime.datetime: str,
bool: bool,
}
is_text = True

def __init__(self, file, **kwargs):
assert file
super().__init__(file, **kwargs)
self.rows = []

def write_single_row(self, tablename: str, row: T.Dict) -> None:
row_without_id = row.copy()
del row_without_id["id"]
_sf_update_key = row_without_id.pop("_sf_update_key", None)
if _sf_update_key:
method = "PATCH"
url = f"/services/data/v50.0/sobjects/{tablename}/{_sf_update_key}/{row_without_id[_sf_update_key]}"
else:
method = "POST"
url = f"/services/data/v50.0/sobjects/{tablename}/"

values = {
"method": method,
"referenceId": f"{tablename}_{row['id']}",
"url": url,
"body": row_without_id,
}
self.rows.append(values)

def flatten(
self,
sourcetable: str,
fieldname: str,
source_row_dict,
target_object_row,
) -> T.Union[str, int]:
target_reference = f"{target_object_row._tablename}_{target_object_row.id}"
return "@{%s.id}" % target_reference

def close(self, **kwargs) -> T.Optional[T.Sequence[str]]:
# NOTE: Could improve loading performance by breaking graphs up
# to allow server-side parallelization, but I'd risk locking issues
assert self.rows
data = {"graphs": [{"graphId": "graph", "compositeRequest": self.rows}]}
self.write(json.dumps(data, indent=2))
return super().close()


class Folder(OutputStream):
uses_folder = True

def __init__(self, output_folder, **kwargs):
super().__init__(None, **kwargs)
self.target_path = Path(output_folder)
if not Path.exists(self.target_path):
Path.mkdir(self.target_path, exist_ok=True) # pragma: no cover
self.recipe_sets = [[]]
self.current_batch = []
self.filenum = 0
self.filenames = []

def write_row(
self, tablename: str, row_with_references: T.Dict, *args, **kwargs
) -> None:
self.recipe_sets[-1].append((tablename, row_with_references))

def write_single_row(self, tablename: str, row: T.Dict, *args, **kwargs) -> None:
raise NotImplementedError(
"Shouldn't be called. write_row should be called instead"
)

def close(self, **kwargs) -> T.Optional[T.Sequence[str]]:
self.flush_sets()
self.flush_batch()
table_metadata = [{"url": str(filename)} for filename in self.filenames]
metadata = {
"@context": "http://www.w3.org/ns/csvw",
"tables": table_metadata,
}
metadata_filename = self.target_path / "csvw_metadata.json"
with open(metadata_filename, "w") as f:
json.dump(metadata, f, indent=2)
return [f"Created {self.target_path}"]

def complete_recipe(self, *args):
self.flush_sets()
self.recipe_sets.append([])

def flush_sets(self):
while self.recipe_sets:
next_set = self.recipe_sets.pop(0)
assert len(next_set) <= MAX_BATCH_SIZE
if len(self.current_batch) + len(next_set) > MAX_BATCH_SIZE:
self.flush_batch()
self.current_batch.extend(next_set)

def flush_batch(self):
self.filenum += 1
filename = Path(self.target_path) / f"{self.filenum}.composite.json"

with open(filename, "w") as open_file, SalesforceCompositeAPIOutput(
open_file
) as out:
self.filenames.append(filename)
assert self.current_batch
for tablename, row in self.current_batch:
out.write_row(tablename, row)

self.current_batch = []


class DataPack(FileOutputStream):
def __init__(self, file, **kwargs):
super().__init__(file, **kwargs)
warning("DataPack is an experimental data format")
self.tempdir = TemporaryDirectory()
self.folder_os = Folder(self.tempdir.name)

def write_row(
self, tablename: str, row_with_references: T.Dict, *args, **kwargs
) -> None:
self.folder_os.write_row(tablename, row_with_references)

def write_single_row(self, tablename: str, row: T.Dict, *args, **kwargs) -> None:
raise NotImplementedError(
"Shouldn't be called. write_row should be called instead"
)

def complete_recipe(self, *args):
self.folder_os.complete_recipe()

def close(self):
self.folder_os.close()
data = self.organize_bundle()
self.write(json.dumps(data, indent=2))
self.tempdir.cleanup()
return super().close()

def organize_bundle(self):
files = Path(self.tempdir.name).glob("*.composite.json")
data = [file.read_text() for file in files]
assert data
return {"datapack_format": 1, "data": data}


class ApexDataPack(FileOutputStream):
"""Wrap in Anon Apex but note that the amount of data you can load
this way is very limited due to limitations of the REST API (used by CCI)
and SOAP API (used by sfdx)"""

def __init__(self, file, **kwargs):
super().__init__(file, **kwargs)
self.data = StringIO()
self.datapack = DataPack(self.data)

def write_row(
self, tablename: str, row_with_references: T.Dict, *args, **kwargs
) -> None:
self.datapack.write_row(tablename, row_with_references)

def write_single_row(self, tablename: str, row: T.Dict, *args, **kwargs) -> None:
raise NotImplementedError(
"Shouldn't be called. write_row should be called instead"
)

def complete_recipe(self, *args):
self.datapack.complete_recipe()

def close(self):
self.datapack.close()
quoted_data = repr(self.data.getvalue())
self.write(f"String json_data = {quoted_data};\n")
self.write("LoadCompositeAPIData.loadBundledJsonSet(json_data);\n")
5 changes: 5 additions & 0 deletions snowfakery/output_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@ def __enter__(self, *args):
def __exit__(self, *args):
self.close()

def complete_recipe(self, *args):
"""Let the output stream know that a complete recipe
set was generated."""
pass


class SmartStream:
"""Common code for managing stream/file opening/closing
Expand Down
69 changes: 69 additions & 0 deletions tests/test_DataPack.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from io import StringIO
from unittest.mock import patch

from snowfakery.data_generator import generate
from snowfakery.data_generator_runtime import StoppingCriteria
from snowfakery.experimental.DataPack import (
DataPack,
ApexDataPack,
SalesforceCompositeAPIOutput,
)
import json

## Fill this out when it isn't experimental anymore


class TestSalesforceCompositeAPIOutput:
@patch("snowfakery.experimental.DataPack.MAX_BATCH_SIZE", 10)
def test_composite(self):
out = StringIO()
output_stream = DataPack(out)
with open("examples/basic-salesforce.recipe.yml") as f:
generate(
f, {}, output_stream, stopping_criteria=StoppingCriteria("Account", 15)
)
output_stream.close()
data = json.loads(out.getvalue())
assert data["datapack_format"] == 1
assert len(data["data"]) == 8
single_payload = json.loads(data["data"][0])
print(single_payload)
assert single_payload["graphs"][0]["compositeRequest"][0]["method"] == "POST"

def test_reference(self):
out = StringIO()
output_stream = SalesforceCompositeAPIOutput(out)
with open("examples/basic-salesforce.recipe.yml") as f:
generate(f, {}, output_stream)
output_stream.close()
print(out.getvalue())
data = json.loads(out.getvalue())
assert (
data["graphs"][0]["compositeRequest"][-1]["body"]["AccountId"]
== "@{Account_2.id}"
)

@patch("snowfakery.experimental.DataPack.MAX_BATCH_SIZE", 50)
def test_composite_upsert(self):
out = StringIO()
output_stream = DataPack(out)
with open("tests/upsert-2.yml") as f:
generate(
f, {}, output_stream, stopping_criteria=StoppingCriteria("Account", 50)
)
output_stream.close()
data = json.loads(out.getvalue())
assert data["datapack_format"] == 1
single_payload = json.loads(data["data"][1])
assert single_payload["graphs"][0]["compositeRequest"][-1]["method"] == "PATCH"

def test_apex(self):
out = StringIO()
output_stream = ApexDataPack(out)
with open("examples/basic-salesforce.recipe.yml") as f:
generate(
f, {}, output_stream, stopping_criteria=StoppingCriteria("Account", 50)
)
output_stream.close()
out = out.getvalue()
assert out.startswith("String json_data")
Loading