Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
ezvz committed Feb 5, 2024
1 parent 208829a commit ce74263
Show file tree
Hide file tree
Showing 4 changed files with 222 additions and 141 deletions.
23 changes: 22 additions & 1 deletion api/py/ai/chronon/repo/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

ONLINE_ARGS = "--online-jar={online_jar} --online-class={online_class} "
OFFLINE_ARGS = "--conf-path={conf_path} --end-date={ds} "
SAMPLING_ARGS = "--output-dir={output_dir} "
ONLINE_WRITE_ARGS = "--conf-path={conf_path} " + ONLINE_ARGS
ONLINE_OFFLINE_WRITE_ARGS = OFFLINE_ARGS + ONLINE_ARGS
ONLINE_MODES = [
Expand All @@ -51,6 +52,7 @@
"log-flattener",
"metadata-export",
"label-join",
"sample"
]
MODES_USING_EMBEDDED = ["metadata-upload", "fetch", "local-streaming"]

Expand All @@ -74,6 +76,7 @@
"metadata-export": OFFLINE_ARGS,
"label-join": OFFLINE_ARGS,
"streaming-client": ONLINE_WRITE_ARGS,
"sample": OFFLINE_ARGS + SAMPLING_ARGS,
"info": "",
}

Expand All @@ -87,6 +90,7 @@
"analyze": "analyze",
"metadata-export": "metadata-export",
"streaming-client": "group-by-streaming",
"sample": "sample"
},
"joins": {
"backfill": "join",
Expand All @@ -100,6 +104,7 @@
"log-flattener": "log-flattener",
"metadata-export": "metadata-export",
"label-join": "label-join",
"sample": "sample"
},
"staging_queries": {
"backfill": "staging-query-backfill",
Expand Down Expand Up @@ -344,6 +349,7 @@ def __init__(self, args, jar_path):
self.sub_help = args.sub_help
self.mode = args.mode
self.online_jar = args.online_jar
self.output_dir = args.output_dir
valid_jar = args.online_jar and os.path.exists(args.online_jar)
# fetch online jar if necessary
if (self.mode in ONLINE_MODES) and (not args.sub_help) and not valid_jar:
Expand Down Expand Up @@ -395,6 +401,7 @@ def run(self):
ds=self.ds,
online_jar=self.online_jar,
online_class=self.online_class,
output_dir=self.output_dir
)
final_args = base_args + " " + str(self.args)
if self.mode == "info":
Expand Down Expand Up @@ -466,6 +473,15 @@ def run(self):
)
check_call(command)

if self.mode == "sample":
# After executing the local data sampling, sample mode runs also run a local execution of the job itself
print("Sampling complete. Running {} in local mode".format(self.conf))
self.mode = "backfill"
# Make sure you set `--master "${SPARK_JOB_MODE:-yarn}"` in your spark_submit script for this to work as intended
os.environ["SPARK_JOB_MODE"] = "local[*]"
self.run()



def set_defaults(parser):
"""Set default values based on environment"""
Expand All @@ -490,9 +506,9 @@ def set_defaults(parser):
chronon_jar=os.environ.get("CHRONON_DRIVER_JAR"),
list_apps="python3 " + os.path.join(chronon_repo_path, "scripts/yarn_list.py"),
render_info=os.path.join(chronon_repo_path, RENDER_INFO_DEFAULT_SCRIPT),
output_dir=os.environ.get("CHRONON_LOCAL_DATA_DIR")
)


if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Submit various kinds of chronon jobs")
parser.add_argument(
Expand Down Expand Up @@ -564,6 +580,11 @@ def set_defaults(parser):
help="Path to script rendering additional information of the given config. "
+ "Only applicable when mode is set to info",
)
parser.add_argument(
"--output-dir",
help="Path to local directory to store sampled data for in-memory runs. "
+ "Only applicable when mode is set to sample",
)
set_defaults(parser)
pre_parse_args, _ = parser.parse_known_args()
# We do a pre-parse to extract conf, mode, etc and set environment variables and re parse default values.
Expand Down
Loading

0 comments on commit ce74263

Please sign in to comment.