Skip to content

Commit

Permalink
fix: database download out of memory
Browse files Browse the repository at this point in the history
  • Loading branch information
Nolife999 committed Oct 31, 2024
1 parent d46a964 commit 915a0c9
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@ public void exportToParquet(List<TableToRetrieve> tables, String outputDirectory
// export tables one by one to parquet
for (TableToRetrieve table : tables) {
// export table to parquet
LoggerHelper.custom(LOGGER, "Parquet started on : " + table.getTableName());
LoggerHelper.custom(LOGGER, "Parquet export start : " + table.getTableName());
exportTableToParquet(connection, table, outputDirectory);
LoggerHelper.custom(LOGGER, "Parquet ended on : " + table.getTableName());
LoggerHelper.custom(LOGGER, "Parquet export end");
}

} catch (SQLException | IOException e) {
Expand Down
42 changes: 30 additions & 12 deletions arc-utils/src/main/java/fr/insee/arc/utils/dao/UtilitaireDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -530,23 +530,43 @@ public boolean hasResults(Connection connexion, GenericPreparedStatementBuilder
*/
public void outStreamRequeteSelect(Connection connexion, GenericPreparedStatementBuilder requete, OutputStream out)
throws ArcException {

writeSelectQueryMetadataToOutputStreamAsCSV(connexion, requete, out);

exporting(connexion, "(" + requete + ")", out, true);

}

/**
* write query metadata to outputstream in csv format
* @param connection
* @param requete
* @param out
* @throws ArcException
*/
public void writeSelectQueryMetadataToOutputStreamAsCSV(Connection connexion, GenericPreparedStatementBuilder requete,
OutputStream out) throws ArcException {

try (ConnectionWrapper connexionWrapper = initConnection(connexion)) {
try (PreparedStatement stmt = connexionWrapper.getConnexion()
.prepareStatement(requete.getQuery().toString())) {

// limit query output as only metadata is required
GenericPreparedStatementBuilder limit = FormatSQL.limitQuery(requete , 0);

try (PreparedStatement stmt = connexionWrapper.getConnexion()
.prepareStatement(limit.getQuery().toString())) {

// bind parameters
for (int i = 0; i < requete.getParameters().size(); i++) {
registerBindVariable(stmt, requete, i);
for (int i = 0; i < limit.getParameters().size(); i++) {
registerBindVariable(stmt, limit, i);
}

StringBuilder str = new StringBuilder();
String lineSeparator = System.lineSeparator();

// write metadata in output
try (ResultSet res = stmt.executeQuery()) {
ResultSetMetaData rsmd = res.getMetaData();

// Noms des colonnes
for (int i = 1; i <= rsmd.getColumnCount(); i++) {
str.append(rsmd.getColumnLabel(i));
Expand All @@ -563,18 +583,16 @@ public void outStreamRequeteSelect(Connection connexion, GenericPreparedStatemen
}
}
str.append(lineSeparator);

out.write(str.toString().getBytes());

} catch (SQLException | IOException e) {
throw new ArcException(ArcExceptionMessage.GUI_EXPORT_TABLE_FAILED);
}
} catch (SQLException e1) {
throw new ArcException(ArcExceptionMessage.GUI_EXPORT_TABLE_FAILED);
}

exporting(connexion, "(" + requete + ")", out, true);


}
}

Expand Down
12 changes: 12 additions & 0 deletions arc-utils/src/main/java/fr/insee/arc/utils/utils/FormatSQL.java
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,18 @@ public static String resetTimeOutMaintenance() {
public static String tryQuery(String query) {
return "do $$ begin " + query + " exception when others then end; $$; ";
}

/**
* build a query to limit the number of row given of a given query
* @param query
* @return
*/
public static GenericPreparedStatementBuilder limitQuery(GenericPreparedStatementBuilder query, int limit)
{
GenericPreparedStatementBuilder limitQuery = new GenericPreparedStatementBuilder();
limitQuery.build(SQL.SELECT, "*", SQL.FROM, "(", query, ") ww ", SQL.LIMIT, limit );
return limitQuery;
}

/**
* build a query that try a generic query and report the result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,9 @@

import fr.insee.arc.core.dataobjects.ArcPreparedStatementBuilder;
import fr.insee.arc.core.dataobjects.ColumnEnum;
import fr.insee.arc.core.dataobjects.ViewEnum;
import fr.insee.arc.core.service.global.dao.FileSystemManagement;
import fr.insee.arc.core.util.LoggerDispatcher;
import fr.insee.arc.utils.dao.ModeRequeteImpl;
import fr.insee.arc.utils.dao.SQL;
import fr.insee.arc.utils.dao.UtilitaireDao;
import fr.insee.arc.utils.database.ArcDatabase;
import fr.insee.arc.utils.dataobjects.TypeEnum;
Expand Down Expand Up @@ -1112,11 +1110,15 @@ public File download(VObject currentData, String dirOut, List<String> fileNames,
int numberOfExecutorNods = ArcDatabase.numberOfExecutorNods();
for (int executorConnectionId = ArcDatabase.COORDINATOR.getIndex(); executorConnectionId < ArcDatabase.EXECUTOR
.getIndex() + numberOfExecutorNods; executorConnectionId++) {

ArcPreparedStatementBuilder limit = new ArcPreparedStatementBuilder();
limit.build(SQL.SELECT, "*", SQL.FROM, "(", requetes.get(i), ")", ViewEnum.ALIAS_A, SQL.LIMIT, 0 );

if (!UtilitaireDao.get(executorConnectionId).getBoolean(connection, FormatSQL.tryQueryAndReport(limit)))

// check if query is valid on database (data may not be foud on every executors)
if (!UtilitaireDao.get(executorConnectionId).getBoolean(connection, FormatSQL.tryQueryAndReport(FormatSQL.limitQuery(requetes.get(i), 0))))
{
continue;
}

// keep in zip only query that returns rows
if (!UtilitaireDao.get(executorConnectionId).hasResults(connection, FormatSQL.limitQuery(requetes.get(i), 1)))
{
continue;
}
Expand All @@ -1125,18 +1127,22 @@ public File download(VObject currentData, String dirOut, List<String> fileNames,
(executorConnectionId> ArcDatabase.COORDINATOR.getIndex() ? "_"+executorConnectionId : "") + ".csv");
zos.putNextEntry(entry);

LoggerHelper.custom(LOGGER, "Downloading : " + entry.getName());

LoggerHelper.custom(LOGGER, "Build entry start : " + entry.getName());
// Ecriture dans le fichier
UtilitaireDao.get(executorConnectionId).outStreamRequeteSelect(this.connection, requetes.get(i),
zos);
LoggerHelper.custom(LOGGER, "Build entry end");

zos.closeEntry();
}
}
}
} catch (IOException | ArcException ex) {
LoggerHelper.errorGenTextAsComment(getClass(), "download()", LOGGER, ex);
}

LoggerHelper.custom(LOGGER, "Download end");

return fOut;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,50 +280,40 @@ public File downloadBdBAS(VObject viewFichierBAS, String dirOut,

ArcPreparedStatementBuilder requete;

requete = new ArcPreparedStatementBuilder();
requete.append("SELECT id_source FROM " + dataObjectService.getView(ViewEnum.PILOTAGE_FICHIER));
requete.append("\n WHERE phase_traitement=" + requete.quoteText(phase.toString()) + " ");
requete.append("\n AND etat_traitement=" + requete.quoteText(etat.getSqlArrayExpression()) + "::text[] ");
requete.append("\n AND date_entree=" + requete.quoteText(date) + " ");

// Si des fichiers ont été selectionnés, on ajoute a la requete la liste des
// fichiers
if (!viewFichierBAS.mapContentSelected().isEmpty()) {
requete.append("AND id_source IN (");
requete.append(requete.sqlListeOfValues(viewFichierBAS.mapContentSelected().get("id_source")));
requete.append(")");
}

List<String> idSources = new GenericBean(UtilitaireDao.get(0).executeRequest(null, requete))
.mapContent().get("id_source");


for (String t : tableDownload) {
if (phase.equals(TraitementPhase.MAPPING))
{
requete = new ArcPreparedStatementBuilder();
requete.append("WITH prep as ( ");
requete.append("SELECT id_source FROM " + dataObjectService.getView(ViewEnum.PILOTAGE_FICHIER));
requete.append("\n WHERE phase_traitement=" + requete.quoteText(phase.toString()) + " ");
requete.append("\n AND etat_traitement=" + requete.quoteText(etat.getSqlArrayExpression()) + "::text[] ");
requete.append("\n AND date_entree=" + requete.quoteText(date) + " ");

// Si des fichiers ont été selectionnés, on ajoute a la requete la liste des
// fichiers
if (!viewFichierBAS.mapContentSelected().isEmpty()) {
requete.append("\n AND id_source IN (");
requete.append(
requete.sqlListeOfValues(viewFichierBAS.mapContentSelected().get("id_source")));
requete.append(")");
}
requete.append(" ) ");
requete.append("\n SELECT * from " + t
+ " a where exists (select 1 from prep b where a.id_source=b.id_source) ");
requete.append("\n SELECT * FROM " + t + " a ");
requete.append("\n WHERE id_source IN ( ");
requete.append(requete.sqlListeOfValues(idSources));
requete.append(")");

tableauRequete.add(requete);
fileNames.add(t);

}
else
{
requete = new ArcPreparedStatementBuilder();
requete.append("SELECT id_source FROM " + dataObjectService.getView(ViewEnum.PILOTAGE_FICHIER));
requete.append("\n WHERE phase_traitement=" + requete.quoteText(phase.toString()) + " ");
requete.append("\n AND etat_traitement=" + requete.quoteText(etat.getSqlArrayExpression()) + "::text[] ");
requete.append("\n AND date_entree=" + requete.quoteText(date) + " ");

// Si des fichiers ont été selectionnés, on ajoute a la requete la liste des
// fichiers
if (!viewFichierBAS.mapContentSelected().isEmpty()) {
requete.append("AND id_source IN (");
requete.append(requete.sqlListeOfValues(viewFichierBAS.mapContentSelected().get("id_source")));
requete.append(")");
}

List<String> idSources = new GenericBean(UtilitaireDao.get(0).executeRequest(null, requete))
.mapContent().get("id_source");

// for each files, generate the download query
for (String idSource : idSources) {
Expand Down

0 comments on commit 915a0c9

Please sign in to comment.