Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature #600]: Use multiprocessing to speed up the parsing #601

Open
wants to merge 23 commits into
base: develop
Choose a base branch
from

Conversation

AlexandraImbrisca
Copy link
Contributor

Follow up to the previous PR:

  • Using a ProcessorPoolExecutor with 3 processors speeds up the execution time significantly
  • Depending on the operating system and technical specifications, we obtain a time decrease between 49.68% and 70.03% relatively to the previously optimized algorithm. In combination with the other improvements, this adds up to a 76,56% decrease from the initial, non-optimized implementation
  • Leveraging the standard multiprocessing functionality and carefully ordering the files leads to a safe optimisation across all tested environments

If it's not done inside of the "if __name__ == "__main__"", it will be recalled inside every new process on Mac/Windows
Since the processing is now async, this print might confuse the users
@nesnoj
Copy link
Collaborator

nesnoj commented Jan 27, 2025

Thank you @AlexandraImbrisca for the implementation and sending the detailed report which reads coherently!
Is this PR ready for review?

What I stumbled across so far:

  • The CPU count is hard coded but should be configurable. Ideally via CLI but we do not have one, so maybe an environment variable could do the job. And I'm not sure whether >1 is an appropriate default, mp could also be promoted optionally. To be kept in mind: the base process uses 100 MB and each process about 1 GB. A standard office PC is equipped with ~8 GB so 3 processes might be ok. Alternatively, we could set a default of 1 and add a message like "Your system supports multiple CPU cores, you can increase the processing speed by setting env var ..."
    What do you think @AlexandraImbrisca @FlorianK13 ?
  • I tested with different CPU counts (Ryzen 7, Linux, SQlite DB, only "solar"). Concerning the processing speed increase my results are somewhat in line to yours:
Cores Time in s (SQLite)
3 394.8
4 316.6
5 292.4
6 280.8
8 279.5
10 280.6
12 276.4

The speed is stalling somewhere from 5 cores onward. I can imagine this drop in the speed increase is caused by a) the writing concurrency, b) other running processes on my laptop, c) number of parallel processes decrease once most of the tasks are done?

  • Could you please explain why you chose parameters in create_efficient_engine() like this? And are they optimal for any number of CPUs?
  • With 12 cores I occasionally(!) get the following error message:
sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) duplicate column name: InAnspruchGenommeneAckerflaeche
[SQL: ALTER TABLE solar_extended ADD "InAnspruchGenommeneAckerflaeche" VARCHAR NULL;]

(The column InAnspruchGenommeneAckerflaeche does not exist in our data model which isn't a problem - it is automatically added but there seems to be an issue with that in the mp)

  • PostgreSQL is crashing here: sqlalchemy.exc.ProgrammingError: (psycopg2.ProgrammingError) invalid dsn: invalid connection option "timeout". The arg timeout in connect_args seems not to be understood by Postgres.
  • The docs need to be updated
  • Changelog entry is missing

@AlexandraImbrisca
Copy link
Contributor Author

AlexandraImbrisca commented Jan 27, 2025

Thanks a lot for the detailed review and suggestions @nesnoj!

  • Default number of processes: I like the suggestion of keeping only 1 and adding a note! Since we are just introducing this feature, it might be helpful to make people aware of it and ask them to report any potential issues. How about we add that explanatory message and a link to the issues page to report any possible bugs / negative experiences?
  • Thanks for testing! From what I have read the general suggestion is to use CPU_count - 1 so I totally agree that increasing the number of cores relatively to the system makes sense. I think the performance stalls because of sqlite (sqlite is not designed for write concurrency) :(
  • The choice of parameters: sure, I'll leave some comments!
  • Error when using 12 cores: oh, interestingly! OOC, is the exception caught or does the program terminate?
  • Postgresql: I unfortunately tested mostly on sqlite:( I'll find a solution for this bug and test a bit more on postgresql
  • Docs & changelog updates: sure thing! I'll create another commit for these updates

@nesnoj
Copy link
Collaborator

nesnoj commented Jan 28, 2025

Hey @AlexandraImbrisca !

  • Default number of processes: I like the suggestion of keeping only 1 and adding a note! Since we are just introducing this feature, it might be helpful to make people aware of it and ask them to report any potential issues. How about we add that explanatory message and a link to the issues page to report any possible bugs / negative experiences?

Sounds good to me.
What do you think @FlorianK13 ?

I think the performance stalls because of sqlite (sqlite is not designed for write concurrency) :(

An alternative way could be to create separate SQLite DBs and finally merge them. Dunno if this is a viable option..

  • Error when using 12 cores: oh, interestingly! OOC, is the exception caught or does the program terminate?

It terminates :(

@FlorianK13
Copy link
Member

  • Default number of processes: I like the suggestion of keeping only 1 and adding a note! Since we are just introducing this feature, it might be helpful to make people aware of it and ask them to report any potential issues. How about we add that explanatory message and a link to the issues page to report any possible bugs / negative experiences?

Sounds good to me as well!

@AlexandraImbrisca AlexandraImbrisca changed the title Use multiprocessing to speed up the parsing [Feature #600]: Use multiprocessing to speed up the parsing Jan 28, 2025
@AlexandraImbrisca
Copy link
Contributor Author

Awesome, thanks a lot both! A few updates from my side:

  • I introduced 2 new environment variables: one for using the recommended number of processes, one to set up a custom number of processes. I think that 2 variables are necessary since people might not be aware of what number of processes would perform the best, but it would be nice to allow them to customize it
  • @nesnoj I think the "duplicate column name" exception occurs because of a race condition (i.e., 2 processes trying to add the same column at the same time). Please correct me if I'm wrong, but I think we can safely ignore this error since once we have introduced the missing columns, we reached our purpose 🤔 I added some more error handling. Could you please let me know if you are still able to reproduce this issue?
  • I fixed the PostgreSQL issue and generally tested more for PostgreSQL
  • I updated the documentation and added a message to promote this feature

About merging the DBs: that might work, but it might get quite messy with many processes (i.e., we could end up with 10+ temporary DBs) and we have to make sure that we clean everything up eventually 🤔 Using temporary tables performed better than I expected (source)

@nesnoj
Copy link
Collaborator

nesnoj commented Jan 29, 2025

Thx for the quick update!

  • I introduced 2 new environment variables: one for using the recommended number of processes, one to set up a custom number of processes. I think that 2 variables are necessary since people might not be aware of what number of processes would perform the best, but it would be nice to allow them to customize it

I'll get back to this later

  • @nesnoj I think the "duplicate column name" exception occurs because of a race condition (i.e., 2 processes trying to add the same column at the same time). Please correct me if I'm wrong, but I think we can safely ignore this error since once we have introduced the missing columns, we reached our purpose 🤔 I added some more error handling. Could you please let me know if you are still able to reproduce this issue?
  • I fixed the PostgreSQL issue and generally tested more for PostgreSQL

The column issue seems to be solved but now I keep getting an error in PostgreSQL with the privileges, see below for full log. The user has all privileges for the DB (superuser) and the tables are created but no data is written. I think it is not related to the actual privileges but the implementation but I wasn't able to track it further down right now.
Does it work properly at your end?

  • I updated the documentation and added a message to promote this feature

About merging the DBs: that might work, but it might get quite messy with many processes (i.e., we could end up with 10+ temporary DBs) and we have to make sure that we clean everything up eventually 🤔 Using temporary tables performed better than I expected (source)

Great that you already did some testing in the past! The write-temp-and-merge strategy was just a quick thought, it probably comes with other consequences I cannot estimate and also requires more testing. I'm also fine with the current implementation but open for discussion ;).

Click here for full postgres traceback
Processing file 'AnlagenEegSolar_48.xml'...
Processing file 'EinheitenSolar_48.xml'...

concurrent.futures.process._RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/home/nesnoj/miniconda3/envs/py310_open_mastr_546/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 146, in __init__
    self._dbapi_connection = engine.raw_connection()
  File "/home/nesnoj/miniconda3/envs/py310_open_mastr_546/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 3298, in raw_connection
    return self.pool.connect()
  File "/home/nesnoj/miniconda3/envs/py310_open_mastr_546/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 449, in connect
    return _ConnectionFairy._checkout(self)
  File "/home/nesnoj/miniconda3/envs/py310_open_mastr_546/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 1263, in _checkout
    fairy = _ConnectionRecord.checkout(pool)
  File "/home/nesnoj/miniconda3/envs/py310_open_mastr_546/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 712, in checkout
    rec = pool._do_get()
  File "/home/nesnoj/miniconda3/envs/py310_open_mastr_546/lib/python3.10/site-packages/sqlalchemy/pool/impl.py", line 179, in _do_get
    with util.safe_reraise():
  File "/home/nesnoj/miniconda3/envs/py310_open_mastr_546/lib/python3.10/site-packages/sqlalchemy/util/langhelpers.py", line 146, in __exit__
    raise exc_value.with_traceback(exc_tb)
  File "/home/nesnoj/miniconda3/envs/py310_open_mastr_546/lib/python3.10/site-packages/sqlalchemy/pool/impl.py", line 177, in _do_get
    return self._create_connection()
  File "/home/nesnoj/miniconda3/envs/py310_open_mastr_546/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 390, in _create_connection
    return _ConnectionRecord(self)
  File "/home/nesnoj/miniconda3/envs/py310_open_mastr_546/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 674, in __init__
    self.__connect()
  File "/home/nesnoj/miniconda3/envs/py310_open_mastr_546/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 900, in __connect
    with util.safe_reraise():
  File "/home/nesnoj/miniconda3/envs/py310_open_mastr_546/lib/python3.10/site-packages/sqlalchemy/util/langhelpers.py", line 146, in __exit__
    raise exc_value.with_traceback(exc_tb)
  File "/home/nesnoj/miniconda3/envs/py310_open_mastr_546/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 896, in __connect
    self.dbapi_connection = connection = pool._invoke_creator(self)
  File "/home/nesnoj/miniconda3/envs/py310_open_mastr_546/lib/python3.10/site-packages/sqlalchemy/engine/create.py", line 646, in connect
    return dialect.connect(*cargs, **cparams)
  File "/home/nesnoj/miniconda3/envs/py310_open_mastr_546/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 622, in connect
    return self.loaded_dbapi.connect(*cargs, **cparams)
  File "/home/nesnoj/miniconda3/envs/py310_open_mastr_546/lib/python3.10/site-packages/psycopg2/__init__.py", line 122, in connect
    conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
psycopg2.OperationalError: connection to server at "localhost" (127.0.0.1), port 5432 failed: FATAL:  password authentication failed for user "mastr"
connection to server at "localhost" (127.0.0.1), port 5432 failed: FATAL:  password authentication failed for user "mastr"


The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/nesnoj/miniconda3/envs/py310_open_mastr_546/lib/python3.10/concurrent/futures/process.py", line 246, in _process_worker
    r = call_item.fn(*call_item.args, **call_item.kwargs)
  File "/home/nesnoj/git-repos/OpenEnergyPlatform/open-MaStR/open-MaStR_546_parsing_speed/open_mastr/xml_download/utils_write_to_database.py", line 103, in process_xml_file
    create_database_table(engine, xml_table_name)
  File "/home/nesnoj/git-repos/OpenEnergyPlatform/open-MaStR/open-MaStR_546_parsing_speed/open_mastr/xml_download/utils_write_to_database.py", line 215, in create_database_table
    orm_class.__table__.drop(engine, checkfirst=True)
  File "/home/nesnoj/miniconda3/envs/py310_open_mastr_546/lib/python3.10/site-packages/sqlalchemy/sql/schema.py", line 1299, in drop
    bind._run_ddl_visitor(ddl.SchemaDropper, self, checkfirst=checkfirst)
  File "/home/nesnoj/miniconda3/envs/py310_open_mastr_546/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 3248, in _run_ddl_visitor
    with self.begin() as conn:
  File "/home/nesnoj/miniconda3/envs/py310_open_mastr_546/lib/python3.10/contextlib.py", line 135, in __enter__
    return next(self.gen)
  File "/home/nesnoj/miniconda3/envs/py310_open_mastr_546/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 3238, in begin
    with self.connect() as conn:
  File "/home/nesnoj/miniconda3/envs/py310_open_mastr_546/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 3274, in connect
    return self._connection_cls(self)
  File "/home/nesnoj/miniconda3/envs/py310_open_mastr_546/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 148, in __init__
    Connection._handle_dbapi_exception_noconnection(
  File "/home/nesnoj/miniconda3/envs/py310_open_mastr_546/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2439, in _handle_dbapi_exception_noconnection
    raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
  File "/home/nesnoj/miniconda3/envs/py310_open_mastr_546/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 146, in __init__
    self._dbapi_connection = engine.raw_connection()
  File "/home/nesnoj/miniconda3/envs/py310_open_mastr_546/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 3298, in raw_connection
    return self.pool.connect()
  File "/home/nesnoj/miniconda3/envs/py310_open_mastr_546/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 449, in connect
    return _ConnectionFairy._checkout(self)
  File "/home/nesnoj/miniconda3/envs/py310_open_mastr_546/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 1263, in _checkout
    fairy = _ConnectionRecord.checkout(pool)
  File "/home/nesnoj/miniconda3/envs/py310_open_mastr_546/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 712, in checkout
    rec = pool._do_get()
  File "/home/nesnoj/miniconda3/envs/py310_open_mastr_546/lib/python3.10/site-packages/sqlalchemy/pool/impl.py", line 179, in _do_get
    with util.safe_reraise():
  File "/home/nesnoj/miniconda3/envs/py310_open_mastr_546/lib/python3.10/site-packages/sqlalchemy/util/langhelpers.py", line 146, in __exit__
    raise exc_value.with_traceback(exc_tb)
  File "/home/nesnoj/miniconda3/envs/py310_open_mastr_546/lib/python3.10/site-packages/sqlalchemy/pool/impl.py", line 177, in _do_get
    return self._create_connection()
  File "/home/nesnoj/miniconda3/envs/py310_open_mastr_546/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 390, in _create_connection
    return _ConnectionRecord(self)
  File "/home/nesnoj/miniconda3/envs/py310_open_mastr_546/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 674, in __init__
    self.__connect()
  File "/home/nesnoj/miniconda3/envs/py310_open_mastr_546/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 900, in __connect
    with util.safe_reraise():
  File "/home/nesnoj/miniconda3/envs/py310_open_mastr_546/lib/python3.10/site-packages/sqlalchemy/util/langhelpers.py", line 146, in __exit__
    raise exc_value.with_traceback(exc_tb)
  File "/home/nesnoj/miniconda3/envs/py310_open_mastr_546/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 896, in __connect
    self.dbapi_connection = connection = pool._invoke_creator(self)
  File "/home/nesnoj/miniconda3/envs/py310_open_mastr_546/lib/python3.10/site-packages/sqlalchemy/engine/create.py", line 646, in connect
    return dialect.connect(*cargs, **cparams)
  File "/home/nesnoj/miniconda3/envs/py310_open_mastr_546/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 622, in connect
    return self.loaded_dbapi.connect(*cargs, **cparams)
  File "/home/nesnoj/miniconda3/envs/py310_open_mastr_546/lib/python3.10/site-packages/psycopg2/__init__.py", line 122, in connect
    conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) connection to server at "localhost" (127.0.0.1), port 5432 failed: FATAL:  password authentication failed for user "mastr"
connection to server at "localhost" (127.0.0.1), port 5432 failed: FATAL:  password authentication failed for user "mastr"

(Background on this error at: https://sqlalche.me/e/20/e3q8)
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/nesnoj/git-repos/OpenEnergyPlatform/open-MaStR/open-MaStR_546_parsing_speed/testing.py", line 17, in <module>
    db.download(data="solar")# solar
  File "/home/nesnoj/git-repos/OpenEnergyPlatform/open-MaStR/open-MaStR_546_parsing_speed/open_mastr/mastr.py", line 244, in download
    write_mastr_xml_to_database(
  File "/home/nesnoj/git-repos/OpenEnergyPlatform/open-MaStR/open-MaStR_546_parsing_speed/open_mastr/xml_download/utils_write_to_database.py", line 65, in write_mastr_xml_to_database
    future.result()
  File "/home/nesnoj/miniconda3/envs/py310_open_mastr_546/lib/python3.10/concurrent/futures/_base.py", line 458, in result
    return self.__get_result()
  File "/home/nesnoj/miniconda3/envs/py310_open_mastr_546/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) connection to server at "localhost" (127.0.0.1), port 5432 failed: FATAL:  password authentication failed for user "mastr"
connection to server at "localhost" (127.0.0.1), port 5432 failed: FATAL:  password authentication failed for user "mastr"

(Background on this error at: https://sqlalche.me/e/20/e3q8)

Process finished with exit code 1

@AlexandraImbrisca
Copy link
Contributor Author

Thanks a bunch for finding this bug! I was using an unauthenticated database and I didn't realise that this could be an issue. The connection_url obfuscates the password so I updated the code to properly set the password. Could you please try again and let me know if you see the same issue?

Ensure correct type of NUMBER_OF_PROCESSES and add error handling for non-numeric types
Copy link
Collaborator

@nesnoj nesnoj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two small things needed a fix, I patched..
Now it works fine with psql, thank you!

variable."""
if "NUMBER_OF_PROCESSES" in os.environ:
number_of_processes = os.environ.get("NUMBER_OF_PROCESSES")
if number_of_processes >= cpu_count():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more thing I forget to mention in my previous posts:
The env var NUMBER_OF_PROCESSES is a string causing the comparison to fail.

"""Process a single xml file and write it to the database."""
# If set, the connection url obfuscates the password. We must replace the masked password with the actual password.
if password:
connection_url = re.sub(r"://[^:]+:\*+@", f"://{password}@", connection_url)
Copy link
Collaborator

@nesnoj nesnoj Jan 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this regex the pw is not supplied at all.

sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) connection to server at "localhost" (127.0.0.1), port 5432 failed: fe_sendauth: no password supplied

It deletes the username and uses the pw as username.
Also, this solution does not allow colons in the username.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants