From b3dfac1364175a1fd016ecb01c0e7bb26e8405bb Mon Sep 17 00:00:00 2001 From: yuli_han Date: Mon, 26 Feb 2024 16:22:11 -0800 Subject: [PATCH] add comment --- api/py/ai/chronon/repo/run.py | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/api/py/ai/chronon/repo/run.py b/api/py/ai/chronon/repo/run.py index d70885a7f..57d111e21 100755 --- a/api/py/ai/chronon/repo/run.py +++ b/api/py/ai/chronon/repo/run.py @@ -184,7 +184,7 @@ def download_only_once(url, path): @retry_decorator(retries=3, backoff=50) def download_jar(version, jar_type="uber", release_tag=None, spark_version="2.4.0"): assert ( - spark_version in SUPPORTED_SPARK + spark_version in SUPPORTED_SPARK ), f"Received unsupported spark version {spark_version}. Supported spark versions are {SUPPORTED_SPARK}" scala_version = SCALA_VERSION_FOR_SPARK[spark_version] maven_url_prefix = os.environ.get("CHRONON_MAVEN_MIRROR_PREFIX", None) @@ -274,6 +274,8 @@ def set_runtime_env(args): raise e if not team: team = "default" + # context is the environment in which the job is running, which is provided from the args, + # default to be dev. context = args.env logging.info( f"Context: {context} -- conf_type: {conf_type} -- team: {team}" @@ -301,6 +303,8 @@ def set_runtime_env(args): environment["team_env"] = ( teams_json[team].get(context, {}).get(effective_mode, {}) ) + # If the job is running in dev environment but no dev environment is defined in teams.json, + # use production environment. environment["production_team_env"] = ( teams_json[team].get("production", {}).get(effective_mode, {}) ) @@ -320,9 +324,9 @@ def set_runtime_env(args): [ k for k in [ - "chronon", - conf_type, - args.mode.replace("-", "_") if args.mode else None, + "chronon", + conf_type, + args.mode.replace("-", "_") if args.mode else None, ] if k is not None ] @@ -373,7 +377,7 @@ def __init__(self, args, jar_path): raise e possible_modes = list(ROUTES[self.conf_type].keys()) + UNIVERSAL_ROUTES assert ( - args.mode in possible_modes + args.mode in possible_modes ), "Invalid mode:{} for conf:{} of type:{}, please choose from {}".format( args.mode, self.conf, self.conf_type, possible_modes ) @@ -449,7 +453,7 @@ def run(self): ) if self.mode == "streaming": assert ( - len(filtered_apps) == 1 + len(filtered_apps) == 1 ), "More than one found, please kill them all" print("All good. No need to start a new app.") return @@ -601,7 +605,7 @@ def set_defaults(parser): parser.add_argument( "--online-jar", help="Jar containing Online KvStore & Deserializer Impl. " - + "Used for streaming and metadata-upload mode.", + + "Used for streaming and metadata-upload mode.", ) parser.add_argument( "--online-class", @@ -618,7 +622,7 @@ def set_defaults(parser): parser.add_argument( "--online-jar-fetch", help="Path to script that can pull online jar. " - + "This will run only when a file doesn't exist at location specified by online_jar", + + "This will run only when a file doesn't exist at location specified by online_jar", ) parser.add_argument( "--sub-help", @@ -643,7 +647,7 @@ def set_defaults(parser): parser.add_argument( "--render-info", help="Path to script rendering additional information of the given config. " - + "Only applicable when mode is set to info", + + "Only applicable when mode is set to info", ) set_defaults(parser) pre_parse_args, _ = parser.parse_known_args()