From 915a0c934eae0d472cbff6ca47ad47a06a69090d Mon Sep 17 00:00:00 2001 From: Manuel Soulier Date: Thu, 31 Oct 2024 04:30:19 +0100 Subject: [PATCH] fix: database download out of memory --- .../service/p6export/parquet/ParquetDao.java | 4 +- .../fr/insee/arc/utils/dao/UtilitaireDao.java | 42 ++++++++++----- .../fr/insee/arc/utils/utils/FormatSQL.java | 12 +++++ .../arc/web/gui/all/util/VObjectService.java | 24 +++++---- .../arc/web/gui/pilotage/dao/PilotageDao.java | 54 ++++++++----------- 5 files changed, 81 insertions(+), 55 deletions(-) diff --git a/arc-core/src/main/java/fr/insee/arc/core/service/p6export/parquet/ParquetDao.java b/arc-core/src/main/java/fr/insee/arc/core/service/p6export/parquet/ParquetDao.java index ab2efaab4..4e3f1e309 100644 --- a/arc-core/src/main/java/fr/insee/arc/core/service/p6export/parquet/ParquetDao.java +++ b/arc-core/src/main/java/fr/insee/arc/core/service/p6export/parquet/ParquetDao.java @@ -98,9 +98,9 @@ public void exportToParquet(List tables, String outputDirectory // export tables one by one to parquet for (TableToRetrieve table : tables) { // export table to parquet - LoggerHelper.custom(LOGGER, "Parquet started on : " + table.getTableName()); + LoggerHelper.custom(LOGGER, "Parquet export start : " + table.getTableName()); exportTableToParquet(connection, table, outputDirectory); - LoggerHelper.custom(LOGGER, "Parquet ended on : " + table.getTableName()); + LoggerHelper.custom(LOGGER, "Parquet export end"); } } catch (SQLException | IOException e) { diff --git a/arc-utils/src/main/java/fr/insee/arc/utils/dao/UtilitaireDao.java b/arc-utils/src/main/java/fr/insee/arc/utils/dao/UtilitaireDao.java index fb34320e8..4e598bb40 100644 --- a/arc-utils/src/main/java/fr/insee/arc/utils/dao/UtilitaireDao.java +++ b/arc-utils/src/main/java/fr/insee/arc/utils/dao/UtilitaireDao.java @@ -530,23 +530,43 @@ public boolean hasResults(Connection connexion, GenericPreparedStatementBuilder */ public void outStreamRequeteSelect(Connection connexion, GenericPreparedStatementBuilder requete, OutputStream out) throws ArcException { + + writeSelectQueryMetadataToOutputStreamAsCSV(connexion, requete, out); + + exporting(connexion, "(" + requete + ")", out, true); + + } + + /** + * write query metadata to outputstream in csv format + * @param connection + * @param requete + * @param out + * @throws ArcException + */ + public void writeSelectQueryMetadataToOutputStreamAsCSV(Connection connexion, GenericPreparedStatementBuilder requete, + OutputStream out) throws ArcException { try (ConnectionWrapper connexionWrapper = initConnection(connexion)) { - try (PreparedStatement stmt = connexionWrapper.getConnexion() - .prepareStatement(requete.getQuery().toString())) { + // limit query output as only metadata is required + GenericPreparedStatementBuilder limit = FormatSQL.limitQuery(requete , 0); + + try (PreparedStatement stmt = connexionWrapper.getConnexion() + .prepareStatement(limit.getQuery().toString())) { + // bind parameters - for (int i = 0; i < requete.getParameters().size(); i++) { - registerBindVariable(stmt, requete, i); + for (int i = 0; i < limit.getParameters().size(); i++) { + registerBindVariable(stmt, limit, i); } - + StringBuilder str = new StringBuilder(); String lineSeparator = System.lineSeparator(); - + // write metadata in output try (ResultSet res = stmt.executeQuery()) { ResultSetMetaData rsmd = res.getMetaData(); - + // Noms des colonnes for (int i = 1; i <= rsmd.getColumnCount(); i++) { str.append(rsmd.getColumnLabel(i)); @@ -563,18 +583,16 @@ public void outStreamRequeteSelect(Connection connexion, GenericPreparedStatemen } } str.append(lineSeparator); - + out.write(str.toString().getBytes()); - + } catch (SQLException | IOException e) { throw new ArcException(ArcExceptionMessage.GUI_EXPORT_TABLE_FAILED); } } catch (SQLException e1) { throw new ArcException(ArcExceptionMessage.GUI_EXPORT_TABLE_FAILED); } - - exporting(connexion, "(" + requete + ")", out, true); - + } } diff --git a/arc-utils/src/main/java/fr/insee/arc/utils/utils/FormatSQL.java b/arc-utils/src/main/java/fr/insee/arc/utils/utils/FormatSQL.java index 28feaf54b..eb3235f79 100644 --- a/arc-utils/src/main/java/fr/insee/arc/utils/utils/FormatSQL.java +++ b/arc-utils/src/main/java/fr/insee/arc/utils/utils/FormatSQL.java @@ -152,6 +152,18 @@ public static String resetTimeOutMaintenance() { public static String tryQuery(String query) { return "do $$ begin " + query + " exception when others then end; $$; "; } + + /** + * build a query to limit the number of row given of a given query + * @param query + * @return + */ + public static GenericPreparedStatementBuilder limitQuery(GenericPreparedStatementBuilder query, int limit) + { + GenericPreparedStatementBuilder limitQuery = new GenericPreparedStatementBuilder(); + limitQuery.build(SQL.SELECT, "*", SQL.FROM, "(", query, ") ww ", SQL.LIMIT, limit ); + return limitQuery; + } /** * build a query that try a generic query and report the result diff --git a/arc-web/src/main/java/fr/insee/arc/web/gui/all/util/VObjectService.java b/arc-web/src/main/java/fr/insee/arc/web/gui/all/util/VObjectService.java index 998dcb027..c68e0beb0 100644 --- a/arc-web/src/main/java/fr/insee/arc/web/gui/all/util/VObjectService.java +++ b/arc-web/src/main/java/fr/insee/arc/web/gui/all/util/VObjectService.java @@ -35,11 +35,9 @@ import fr.insee.arc.core.dataobjects.ArcPreparedStatementBuilder; import fr.insee.arc.core.dataobjects.ColumnEnum; -import fr.insee.arc.core.dataobjects.ViewEnum; import fr.insee.arc.core.service.global.dao.FileSystemManagement; import fr.insee.arc.core.util.LoggerDispatcher; import fr.insee.arc.utils.dao.ModeRequeteImpl; -import fr.insee.arc.utils.dao.SQL; import fr.insee.arc.utils.dao.UtilitaireDao; import fr.insee.arc.utils.database.ArcDatabase; import fr.insee.arc.utils.dataobjects.TypeEnum; @@ -1112,11 +1110,15 @@ public File download(VObject currentData, String dirOut, List fileNames, int numberOfExecutorNods = ArcDatabase.numberOfExecutorNods(); for (int executorConnectionId = ArcDatabase.COORDINATOR.getIndex(); executorConnectionId < ArcDatabase.EXECUTOR .getIndex() + numberOfExecutorNods; executorConnectionId++) { - - ArcPreparedStatementBuilder limit = new ArcPreparedStatementBuilder(); - limit.build(SQL.SELECT, "*", SQL.FROM, "(", requetes.get(i), ")", ViewEnum.ALIAS_A, SQL.LIMIT, 0 ); - - if (!UtilitaireDao.get(executorConnectionId).getBoolean(connection, FormatSQL.tryQueryAndReport(limit))) + + // check if query is valid on database (data may not be foud on every executors) + if (!UtilitaireDao.get(executorConnectionId).getBoolean(connection, FormatSQL.tryQueryAndReport(FormatSQL.limitQuery(requetes.get(i), 0)))) + { + continue; + } + + // keep in zip only query that returns rows + if (!UtilitaireDao.get(executorConnectionId).hasResults(connection, FormatSQL.limitQuery(requetes.get(i), 1))) { continue; } @@ -1125,11 +1127,12 @@ public File download(VObject currentData, String dirOut, List fileNames, (executorConnectionId> ArcDatabase.COORDINATOR.getIndex() ? "_"+executorConnectionId : "") + ".csv"); zos.putNextEntry(entry); - LoggerHelper.custom(LOGGER, "Downloading : " + entry.getName()); - + LoggerHelper.custom(LOGGER, "Build entry start : " + entry.getName()); // Ecriture dans le fichier UtilitaireDao.get(executorConnectionId).outStreamRequeteSelect(this.connection, requetes.get(i), zos); + LoggerHelper.custom(LOGGER, "Build entry end"); + zos.closeEntry(); } } @@ -1137,6 +1140,9 @@ public File download(VObject currentData, String dirOut, List fileNames, } catch (IOException | ArcException ex) { LoggerHelper.errorGenTextAsComment(getClass(), "download()", LOGGER, ex); } + + LoggerHelper.custom(LOGGER, "Download end"); + return fOut; } diff --git a/arc-web/src/main/java/fr/insee/arc/web/gui/pilotage/dao/PilotageDao.java b/arc-web/src/main/java/fr/insee/arc/web/gui/pilotage/dao/PilotageDao.java index 673fd26c6..64aabd235 100644 --- a/arc-web/src/main/java/fr/insee/arc/web/gui/pilotage/dao/PilotageDao.java +++ b/arc-web/src/main/java/fr/insee/arc/web/gui/pilotage/dao/PilotageDao.java @@ -280,27 +280,32 @@ public File downloadBdBAS(VObject viewFichierBAS, String dirOut, ArcPreparedStatementBuilder requete; + requete = new ArcPreparedStatementBuilder(); + requete.append("SELECT id_source FROM " + dataObjectService.getView(ViewEnum.PILOTAGE_FICHIER)); + requete.append("\n WHERE phase_traitement=" + requete.quoteText(phase.toString()) + " "); + requete.append("\n AND etat_traitement=" + requete.quoteText(etat.getSqlArrayExpression()) + "::text[] "); + requete.append("\n AND date_entree=" + requete.quoteText(date) + " "); + + // Si des fichiers ont été selectionnés, on ajoute a la requete la liste des + // fichiers + if (!viewFichierBAS.mapContentSelected().isEmpty()) { + requete.append("AND id_source IN ("); + requete.append(requete.sqlListeOfValues(viewFichierBAS.mapContentSelected().get("id_source"))); + requete.append(")"); + } + + List idSources = new GenericBean(UtilitaireDao.get(0).executeRequest(null, requete)) + .mapContent().get("id_source"); + + for (String t : tableDownload) { if (phase.equals(TraitementPhase.MAPPING)) { requete = new ArcPreparedStatementBuilder(); - requete.append("WITH prep as ( "); - requete.append("SELECT id_source FROM " + dataObjectService.getView(ViewEnum.PILOTAGE_FICHIER)); - requete.append("\n WHERE phase_traitement=" + requete.quoteText(phase.toString()) + " "); - requete.append("\n AND etat_traitement=" + requete.quoteText(etat.getSqlArrayExpression()) + "::text[] "); - requete.append("\n AND date_entree=" + requete.quoteText(date) + " "); - - // Si des fichiers ont été selectionnés, on ajoute a la requete la liste des - // fichiers - if (!viewFichierBAS.mapContentSelected().isEmpty()) { - requete.append("\n AND id_source IN ("); - requete.append( - requete.sqlListeOfValues(viewFichierBAS.mapContentSelected().get("id_source"))); - requete.append(")"); - } - requete.append(" ) "); - requete.append("\n SELECT * from " + t - + " a where exists (select 1 from prep b where a.id_source=b.id_source) "); + requete.append("\n SELECT * FROM " + t + " a "); + requete.append("\n WHERE id_source IN ( "); + requete.append(requete.sqlListeOfValues(idSources)); + requete.append(")"); tableauRequete.add(requete); fileNames.add(t); @@ -308,22 +313,7 @@ public File downloadBdBAS(VObject viewFichierBAS, String dirOut, } else { - requete = new ArcPreparedStatementBuilder(); - requete.append("SELECT id_source FROM " + dataObjectService.getView(ViewEnum.PILOTAGE_FICHIER)); - requete.append("\n WHERE phase_traitement=" + requete.quoteText(phase.toString()) + " "); - requete.append("\n AND etat_traitement=" + requete.quoteText(etat.getSqlArrayExpression()) + "::text[] "); - requete.append("\n AND date_entree=" + requete.quoteText(date) + " "); - - // Si des fichiers ont été selectionnés, on ajoute a la requete la liste des - // fichiers - if (!viewFichierBAS.mapContentSelected().isEmpty()) { - requete.append("AND id_source IN ("); - requete.append(requete.sqlListeOfValues(viewFichierBAS.mapContentSelected().get("id_source"))); - requete.append(")"); - } - List idSources = new GenericBean(UtilitaireDao.get(0).executeRequest(null, requete)) - .mapContent().get("id_source"); // for each files, generate the download query for (String idSource : idSources) {