From fe875ab8c6b395e3ffee6708fa6c7f551be01017 Mon Sep 17 00:00:00 2001 From: Jonathan <42983653+jonnyz32@users.noreply.github.com> Date: Mon, 21 Oct 2024 10:03:54 -0400 Subject: [PATCH] Batch delete (#169) Co-authored-by: Sanjula Ganepola <32170854+SanjulaGanepola@users.noreply.github.com> --- .../manzan/routes/event/WatchMsgEvent.java | 49 +++++++++++++++++-- 1 file changed, 45 insertions(+), 4 deletions(-) diff --git a/camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEvent.java b/camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEvent.java index cacb34c..0470c9f 100644 --- a/camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEvent.java +++ b/camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEvent.java @@ -1,7 +1,9 @@ package com.github.theprez.manzan.routes.event; import java.io.IOException; +import java.util.ArrayList; import java.util.List; +import java.util.stream.Collectors; import com.github.theprez.jcmdutils.StringUtils; import com.github.theprez.manzan.ManzanEventType; @@ -32,15 +34,27 @@ public WatchMsgEvent(final String _name, final String _session_id, final String public void configure() { from("timer://foo?synchronous=true&period=" + m_interval) .routeId("manzan_msg:" + m_name) + .process(exchange -> { + // Reset the list of ordinal positions at the start of each execution + exchange.setProperty("ordinalPositions", new ArrayList()); + }) .setHeader(EVENT_TYPE, constant(ManzanEventType.WATCH_MSG)) .setBody(constant("SeLeCt * fRoM " + m_schema + ".mAnZaNmSg wHeRe SESSION_ID = '" + m_sessionId + "' limit " + m_numToProcess)) - // .to("stream:out") .to("jdbc:jt400?outputType=StreamList") .split(body()).streaming().parallelProcessing() .setHeader("id", simple("${body[ORDINAL_POSITION]}")) .setHeader("session_id", simple("${body[SESSION_ID]}")) .setHeader("data_map", simple("${body}")) + .process(exchange -> { + Integer ordinalPosition = exchange.getIn().getHeader("id", Integer.class); + @SuppressWarnings("unchecked") + List ordinalPositions = exchange.getProperty("ordinalPositions", List.class); + + synchronized (ordinalPositions) { + ordinalPositions.add(ordinalPosition); // Accumulate ORDINAL_POSITION + } + }) .marshal().json(true) // TODO: skip this if we are applying a format .setBody(simple("${body}\n")) .process(exchange -> { @@ -48,8 +62,35 @@ public void configure() { exchange.getIn().setBody(m_formatter.format(getDataMap(exchange))); } }) - .recipientList(constant(getRecipientList())).parallelProcessing().stopOnException().end() - .setBody(simple("delete fRoM " + m_schema + ".mAnZaNmSg where ORDINAL_POSITION = ${header.id} WITH NC")) - .to("jdbc:jt400").to("stream:err"); + .recipientList(constant(getRecipientList())) + .parallelProcessing() + .stopOnException() + .end() + .end() + .process(exchange -> { + // Constructing the WHERE clause for ORDINAL_POSITIONs + StringBuilder deleteQuery = new StringBuilder("DELETE FROM " + m_schema + + ".MANZANMSG WHERE SESSION_ID = '" + m_sessionId + "' AND ORDINAL_POSITION IN ("); + @SuppressWarnings("unchecked") + List ordinalPositions = exchange.getProperty("ordinalPositions", List.class); + if (ordinalPositions != null && !ordinalPositions.isEmpty()) { + synchronized (ordinalPositions) { + String positions = ordinalPositions.stream() + .map(String::valueOf) // Convert to String + .collect(Collectors.joining(",")); + deleteQuery.append(positions).append(")"); + } + + // Set the DELETE query as the body + exchange.getIn().setBody(deleteQuery.toString()); + } else { + // If no positions, set body to null, skip the DELETE + exchange.getIn().setBody(null); + } + }) + .choice() + .when(body().isNotNull()) + .to("jdbc:jt400") + .to("stream:err"); } }