Get a DynamicPartitionsDefinition updated within a schedule definition before the RunRequest are fired #20508
-
Hello everyone, Issue: I'm struggling to get a DynamicPartitionsDefinition updated within the body of a schedule function before the resulting RunRequests are fired (and subsequently fail right after their spawn due to the lack of said partitions in the DynamicPartitionsDefinition object). What I'm trying to achieve: (within a schedule function body)
Why is it needed: we have a bunch of files accumulating (on purpose) daily on a remote shared folder, and we need to trigger at given times (multiple times a day) an ETL workflow assessing what is on the shared folder, and running a bunch of independent transformations on each. Other dagster resources possibly related to that kind of situation:
Toy example: ("bare": without SensorResult or .build_add_request() call) from dagster import (
asset,
Config,
DynamicPartitionsDefinition,
Definitions,
define_asset_job,
schedule,
RunRequest,
)
part_def = DynamicPartitionsDefinition(name="files")
class FileToLoad(Config):
file: str
@asset(partitions_def=part_def)
def get_file(config: FileToLoad):
return config.file
asset_job = define_asset_job(
name="new_file_job",
selection=[get_file]
)
@schedule(job=asset_job, cron_schedule="0 14 * * *")
def schedule_new_files():
files = ["test1.pos", "test2.txt", "test3.pos"]
run_requests = [
RunRequest(
run_key=f"{file}",
partition_key=file[0:5],
run_config={
"ops": {
"get_file": {
"config": {"file": f"{file}"}
}
}
}
)
for file in files
]
return run_requests
defs = Definitions(
assets=[get_file],
jobs=[asset_job],
schedules=[schedule_new_files],
) |
Beta Was this translation helpful? Give feedback.
Replies: 4 comments 1 reply
-
Hi - I think you would need a sensor to add and process the dynamic partition requests. Returning a I'd recommend adding a A schedule can evaluate dynamic partitions, but the partition keys should be added before evaluation. Currently, we don't have something similar to SensorResult for schedules - when a @sensor(job=asset_job)
def file_sensor(context: SensorEvaluationContext):
filenames = ["test1.pos", "test2.pos", "test3.pos"]
new_partitions = [
filename
for filename in filenames
if not part_def.has_partition_key(
filename, dynamic_partitions_store=context.instance
)
]
return SensorResult(
dynamic_partitions_requests=[
part_def.build_add_request(new_partitions)
],
)
@schedule(job=asset_job, cron_schedule="0 14 * * *")
def schedule_new_files(context):
partition_keys = part_def.get_partition_keys(dynamic_partitions_store=context.instance)
run_requests = [
RunRequest(
run_key=f"{partition_key}",
partition_key=partition_key,
run_config={
"ops": {
"get_file": {
"config": {"file": f"{partition_key}"}
}
}
}
)
for partition_key in partition_keys
]
return run_requests |
Beta Was this translation helpful? Give feedback.
-
I would also be interested in something like this. We are using a schedule just because it's more natural and we often want to have a specific time (and not slam our counterparty server). But we are using dynamic partitions because what data is available varies. |
Beta Was this translation helpful? Give feedback.
-
Could this not be done with |
Beta Was this translation helpful? Give feedback.
-
@mmutso-boku yup, that's the solution I ended up finding. The AI pointed me at this issue haha which is maybe a little old |
Beta Was this translation helpful? Give feedback.
Hi - I think you would need a sensor to add and process the dynamic partition requests. Returning a
SensorResult
with only the parameterdynamic_partitions_requests
will add the partitions without launching a run. You could also add aSkipReason
within theSensorResult
to state the reason why the sensor is not returning aRunRequest
, but it's not mandatory. See code below.I'd recommend adding a
minimum_interval_seconds
to your sensor if you do so, to make sure it's not continuously running and evaluating your files. See documentation here.A schedule can evaluate dynamic partitions, but the partition keys should be added before evaluation. Currently, we don't have something similar to Se…