diff --git a/premise/metals.py b/premise/metals.py index 69e70b3b..764595ae 100644 --- a/premise/metals.py +++ b/premise/metals.py @@ -57,6 +57,7 @@ def load_mining_shares_mapping(): df = pd.read_excel(filepath, sheet_name="Shares_mapping") return df + def load_activities_mapping(): """ Load mapping for the ecoinvent exchanges to be updated by the new metal intensities @@ -69,7 +70,7 @@ def load_activities_mapping(): # Define a function to replicate rows based on the generated activity sets def extend_dataframe(df, mapping): - """" + """ " Extend a DataFrame by duplicating rows based on a mapping dictionary. Parameters: @@ -288,39 +289,50 @@ def update_metal_use( matching_rows["Element"] == metal, "final_technology" ].iloc[0] - ### GET METAL MARKET LOCATION - locations = ['World', 'GLO', 'RoW'] - dataset_metal = None # Initialize dataset_metal outside the loop + locations = ["World", "GLO", "RoW"] + dataset_metal = ( + None # Initialize dataset_metal outside the loop + ) # Check if metal_activity is a string and not nan - if isinstance(metal_activity, str) and not pd.isna(metal_activity): + if isinstance(metal_activity, str) and not pd.isna( + metal_activity + ): for location in locations: try: dataset_metal = ws.get_one( self.database, - ws.equals('name', metal_activity), - ws.equals('location', location) + ws.equals("name", metal_activity), + ws.equals("location", location), ) - if dataset_metal: # If we found the dataset, break out of the loop + if ( + dataset_metal + ): # If we found the dataset, break out of the loop break except Exception as e: - print(f"Failed to get dataset for location '{location}': {e}") + print( + f"Failed to get dataset for location '{location}': {e}" + ) # If dataset_metal is still None, try without specifying the location if dataset_metal is None: try: dataset_metal = ws.get_one( self.database, - ws.contains('name', metal_activity) + ws.contains("name", metal_activity), ) except Exception as e: - print(f"Failed to get dataset without specifying location: {e}") + print( + f"Failed to get dataset without specifying location: {e}" + ) else: print(f"Invalid metal activity: '{metal_activity}'") if dataset_metal is None: - print(f"Dataset for metal activity '{metal_activity}' not found in any location.") + print( + f"Dataset for metal activity '{metal_activity}' not found in any location." + ) dataset_demand = ws.get_many( self.database, @@ -333,7 +345,6 @@ def update_metal_use( ) if dataset_metal is not None: - exchange_to_delete = { "product": metal_reference_product, "name": metal_activity, @@ -343,28 +354,40 @@ def update_metal_use( to_remove = [] condition_met = False - for exc in act['exchanges']: - if all(key in exc for key in ['product', 'name', 'type']): - if (exc["product"] == exchange_to_delete["product"] and - exc["name"] == exchange_to_delete["name"] and - exc["type"] == exchange_to_delete["type"]): + for exc in act["exchanges"]: + if all( + key in exc + for key in ["product", "name", "type"] + ): + if ( + exc["product"] + == exchange_to_delete["product"] + and exc["name"] + == exchange_to_delete["name"] + and exc["type"] + == exchange_to_delete["type"] + ): print( - f"Exchange for {metal_reference_product} was already present. It's been updated") + f"Exchange for {metal_reference_product} was already present. It's been updated" + ) condition_met = True to_remove.append(exc) old_amount = exc["amount"] else: - if exc["type"] == 'biosphere': + if exc["type"] == "biosphere": pass else: - print(f"Missing one or more keys in exchange: {exc}") - + print( + f"Missing one or more keys in exchange: {exc}" + ) for item in to_remove: try: - act['exchanges'].remove(item) + act["exchanges"].remove(item) except ValueError: - print(f"Item {item} was already removed or not found in the list") + print( + f"Item {item} was already removed or not found in the list" + ) exchange = { "uncertainty type": 0, @@ -382,17 +405,24 @@ def update_metal_use( if metal not in dataset["log parameters"]: if condition_met: - dataset["log parameters"][f"{metal} old amount, for {act['name']}"] = old_amount - dataset["log parameters"][f"{metal} new amount for {act['name']}"] = result + dataset["log parameters"][ + f"{metal} old amount, for {act['name']}" + ] = old_amount + dataset["log parameters"][ + f"{metal} new amount for {act['name']}" + ] = result else: dataset["log parameters"][ - f"{metal} old amount, for {act['name']}"] = 0 + f"{metal} old amount, for {act['name']}" + ] = 0 dataset["log parameters"][ - f"{metal} new amount for {act['name']}"] = result - + f"{metal} new amount for {act['name']}" + ] = result else: - print(f"Could not find dataset_metal for {ds_name} with technology {technology}") + print( + f"Could not find dataset_metal for {ds_name} with technology {technology}" + ) else: print(