You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I noticed that make_tf_dataset with shuffle_row_groups=True generates a non-deterministic order of rows starting in epoch 2, even if I fix the random seed. I am using workers_count=1 to remove randomness from parallel processing (which is addressed separately in #796). I was able to reproduce the problem when using make_batch_reader directly.
Example code to reproduce the problem
Create a Spark DF with 10 rows and use repartition to force multiple data files
Create a Petastorm converter
Set petastorm_converter.file_urls = list(sorted(petastorm_converter.file_urls)) to fix the order of input files. Note: This should probably be done somewhere in the Petastorm library, because without this I saw additional randomness from the order in which Spark lists the data files
Create a Reader with `workers_count=1, seed=1, shuffle_rows=False, shuffle_row_groups=True, num_epochs=2)
Read all batches from the reader into a new Pandas DF
Repeat 4. and 5.
Compare the resulting Dataframes
I get differently ordered results about 50% of the time on my system. Re-running steps 4-7 a few times should eventually lead to the problem
I was able to trace back the root cause of this to the _ventilate() method of the ConcurrentVentilator class, specifically this part. The problem is that the shuffle operation at the beginning of an epoch happens before the sleep and continue operation in case the queue has no room.
Example code to directly reproduce the problem with ConcurrentVentilator
This code snippet implements a sub-classed version of ConcurrentVentilator which overloads the _ventilate() method. The logic is an exact copy/paste of the original method, but there are additional print statements inside of the loop to show what is going on.
frompetastorm.workers_pool.ventilatorimportConcurrentVentilatorfrompetastorm.workers_pool.thread_poolimportThreadPoolfrompetastorm.workers_pool.worker_baseimportWorkerBasefromtimeimportsleepclassDebugConcurrentVentilator(ConcurrentVentilator):
def_ventilate(self):
whileTrue:
# Stop condition is when no iterations are remaining or there are no items to ventilateifself.completed():
break# If we are ventilating the first item, we check if we would like to randomize the item orderifself._current_item_to_ventilate==0andself._randomize_item_order:
print(f"_current_item_to_ventilate: {self._current_item_to_ventilate} - shuffling")
self._random_state.shuffle(self._items_to_ventilate)
# Block until queue has room, but use continue to allow for checking if stop has been calledifself._ventilated_items_count-self._processed_items_count>=self._max_ventilation_queue_size:
sleep(self._ventilation_interval)
print(f"_current_item_to_ventilate: {self._current_item_to_ventilate} - waiting for room in the queue")
continueitem_to_ventilate=self._items_to_ventilate[self._current_item_to_ventilate]
print(f"_current_item_to_ventilate: {self._current_item_to_ventilate} - ventilating")
self._ventilate_fn(**item_to_ventilate)
self._current_item_to_ventilate+=1self._ventilated_items_count+=1ifself._current_item_to_ventilate>=len(self._items_to_ventilate):
self._current_item_to_ventilate=0# If iterations was set to None, that means we will iterate until stop is calledifself._iterations_remainingisnotNone:
self._iterations_remaining-=1items= [{'index': i} foriinrange(3)]
pool=ThreadPool(1)
ventilator=DebugConcurrentVentilator(
ventilate_fn=pool.ventilate,
items_to_ventilate=items,
iterations=2,
randomize_item_order=True,
random_seed=0,
max_ventilation_queue_size=1
)
classTestWorker(WorkerBase):
def__init__(self, worker_id, publish_func, args):
super().__init__(worker_id, publish_func, args)
defprocess(self, index):
sleep(0.1)
ventilator.processed_item()
pool.start(TestWorker, ventilator=ventilator)
Example output
_current_item_to_ventilate: 0 - shuffling
_current_item_to_ventilate: 0 - ventilating
_current_item_to_ventilate: 1 - waiting for room in the queue
_current_item_to_ventilate: 1 - waiting for room in the queue
_current_item_to_ventilate: 1 - waiting for room in the queue
_current_item_to_ventilate: 1 - waiting for room in the queue
_current_item_to_ventilate: 1 - waiting for room in the queue
_current_item_to_ventilate: 1 - waiting for room in the queue
_current_item_to_ventilate: 1 - ventilating
_current_item_to_ventilate: 2 - waiting for room in the queue
_current_item_to_ventilate: 2 - waiting for room in the queue
_current_item_to_ventilate: 2 - waiting for room in the queue
_current_item_to_ventilate: 2 - waiting for room in the queue
_current_item_to_ventilate: 2 - waiting for room in the queue
_current_item_to_ventilate: 2 - ventilating
_current_item_to_ventilate: 0 - shuffling
_current_item_to_ventilate: 0 - waiting for room in the queue
_current_item_to_ventilate: 0 - shuffling
_current_item_to_ventilate: 0 - waiting for room in the queue
_current_item_to_ventilate: 0 - shuffling
_current_item_to_ventilate: 0 - waiting for room in the queue
_current_item_to_ventilate: 0 - shuffling
_current_item_to_ventilate: 0 - waiting for room in the queue
_current_item_to_ventilate: 0 - shuffling
_current_item_to_ventilate: 0 - waiting for room in the queue
_current_item_to_ventilate: 0 - shuffling
_current_item_to_ventilate: 0 - waiting for room in the queue
_current_item_to_ventilate: 0 - shuffling
_current_item_to_ventilate: 0 - ventilating
_current_item_to_ventilate: 1 - waiting for room in the queue
_current_item_to_ventilate: 1 - waiting for room in the queue
_current_item_to_ventilate: 1 - waiting for room in the queue
_current_item_to_ventilate: 1 - waiting for room in the queue
_current_item_to_ventilate: 1 - waiting for room in the queue
_current_item_to_ventilate: 1 - ventilating
_current_item_to_ventilate: 2 - waiting for room in the queue
_current_item_to_ventilate: 2 - waiting for room in the queue
_current_item_to_ventilate: 2 - waiting for room in the queue
_current_item_to_ventilate: 2 - waiting for room in the queue
_current_item_to_ventilate: 2 - waiting for room in the queue
_current_item_to_ventilate: 2 - ventilating
Note that at the beginning of the second iteration there are 7 shuffle operations before the first item is eventually ventilated.
Proposed fix
This can be fixed easily by first checking whether the queue has room, and only if it does then shuffle and ventilate:
classFixedConcurrentVentilator(ConcurrentVentilator):
def_ventilate(self):
whileTrue:
# Stop condition is when no iterations are remaining or there are no items to ventilateifself.completed():
break# Block until queue has room, but use continue to allow for checking if stop has been calledifself._ventilated_items_count-self._processed_items_count>=self._max_ventilation_queue_size:
sleep(self._ventilation_interval)
print(f"_current_item_to_ventilate: {self._current_item_to_ventilate} - waiting for room in the queue")
continue# If we are ventilating the first item, we check if we would like to randomize the item orderifself._current_item_to_ventilate==0andself._randomize_item_order:
print(f"_current_item_to_ventilate: {self._current_item_to_ventilate} - shuffling")
self._random_state.shuffle(self._items_to_ventilate)
item_to_ventilate=self._items_to_ventilate[self._current_item_to_ventilate]
print(f"_current_item_to_ventilate: {self._current_item_to_ventilate} - ventilating")
self._ventilate_fn(**item_to_ventilate)
self._current_item_to_ventilate+=1self._ventilated_items_count+=1ifself._current_item_to_ventilate>=len(self._items_to_ventilate):
self._current_item_to_ventilate=0# If iterations was set to None, that means we will iterate until stop is calledifself._iterations_remainingisnotNone:
self._iterations_remaining-=1
Example output of running the same example code as above but with FixedConcurrentVentilator instead of DebugConcurrentVentilator:
_current_item_to_ventilate: 0 - shuffling
_current_item_to_ventilate: 0 - ventilating
_current_item_to_ventilate: 1 - waiting for room in the queue
_current_item_to_ventilate: 1 - waiting for room in the queue
_current_item_to_ventilate: 1 - waiting for room in the queue
_current_item_to_ventilate: 1 - waiting for room in the queue
_current_item_to_ventilate: 1 - waiting for room in the queue
_current_item_to_ventilate: 1 - waiting for room in the queue
_current_item_to_ventilate: 1 - ventilating
_current_item_to_ventilate: 2 - waiting for room in the queue
_current_item_to_ventilate: 2 - waiting for room in the queue
_current_item_to_ventilate: 2 - waiting for room in the queue
_current_item_to_ventilate: 2 - waiting for room in the queue
_current_item_to_ventilate: 2 - ventilating
_current_item_to_ventilate: 0 - waiting for room in the queue
_current_item_to_ventilate: 0 - waiting for room in the queue
_current_item_to_ventilate: 0 - waiting for room in the queue
_current_item_to_ventilate: 0 - shuffling
_current_item_to_ventilate: 0 - ventilating
_current_item_to_ventilate: 1 - waiting for room in the queue
_current_item_to_ventilate: 1 - waiting for room in the queue
_current_item_to_ventilate: 1 - waiting for room in the queue
_current_item_to_ventilate: 1 - waiting for room in the queue
_current_item_to_ventilate: 1 - ventilating
_current_item_to_ventilate: 2 - waiting for room in the queue
_current_item_to_ventilate: 2 - waiting for room in the queue
_current_item_to_ventilate: 2 - waiting for room in the queue
_current_item_to_ventilate: 2 - waiting for room in the queue
_current_item_to_ventilate: 2 - ventilating
Note that there is exactly one shuffle operation at the beginning of each iteration.
The text was updated successfully, but these errors were encountered:
Hi Petastorm team,
I noticed that
make_tf_dataset
withshuffle_row_groups=True
generates a non-deterministic order of rows starting in epoch 2, even if I fix the random seed. I am usingworkers_count=1
to remove randomness from parallel processing (which is addressed separately in #796). I was able to reproduce the problem when usingmake_batch_reader
directly.Example code to reproduce the problem
petastorm_converter.file_urls = list(sorted(petastorm_converter.file_urls))
to fix the order of input files. Note: This should probably be done somewhere in the Petastorm library, because without this I saw additional randomness from the order in which Spark lists the data filesReader
with `workers_count=1, seed=1, shuffle_rows=False, shuffle_row_groups=True, num_epochs=2)I get differently ordered results about 50% of the time on my system. Re-running steps 4-7 a few times should eventually lead to the problem
Root cause
I was able to trace back the root cause of this to the
_ventilate()
method of theConcurrentVentilator
class, specifically this part. The problem is that the shuffle operation at the beginning of an epoch happens before thesleep
andcontinue
operation in case the queue has no room.Example code to directly reproduce the problem with
ConcurrentVentilator
This code snippet implements a sub-classed version of
ConcurrentVentilator
which overloads the_ventilate()
method. The logic is an exact copy/paste of the original method, but there are additionalprint
statements inside of the loop to show what is going on.Example output
Note that at the beginning of the second iteration there are 7 shuffle operations before the first item is eventually ventilated.
Proposed fix
This can be fixed easily by first checking whether the queue has room, and only if it does then shuffle and ventilate:
Example output of running the same example code as above but with
FixedConcurrentVentilator
instead ofDebugConcurrentVentilator
:Note that there is exactly one shuffle operation at the beginning of each iteration.
The text was updated successfully, but these errors were encountered: