Skip to content

Commit

Permalink
Batch delete (#169)
Browse files Browse the repository at this point in the history
Co-authored-by: Sanjula Ganepola <[email protected]>
  • Loading branch information
jonnyz32 and SanjulaGanepola authored Oct 21, 2024
1 parent 5033c1d commit fe875ab
Showing 1 changed file with 45 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.github.theprez.manzan.routes.event;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

import com.github.theprez.jcmdutils.StringUtils;
import com.github.theprez.manzan.ManzanEventType;
Expand Down Expand Up @@ -32,24 +34,63 @@ public WatchMsgEvent(final String _name, final String _session_id, final String
public void configure() {
from("timer://foo?synchronous=true&period=" + m_interval)
.routeId("manzan_msg:" + m_name)
.process(exchange -> {
// Reset the list of ordinal positions at the start of each execution
exchange.setProperty("ordinalPositions", new ArrayList<Integer>());
})
.setHeader(EVENT_TYPE, constant(ManzanEventType.WATCH_MSG))
.setBody(constant("SeLeCt * fRoM " + m_schema + ".mAnZaNmSg wHeRe SESSION_ID = '" + m_sessionId
+ "' limit " + m_numToProcess))
// .to("stream:out")
.to("jdbc:jt400?outputType=StreamList")
.split(body()).streaming().parallelProcessing()
.setHeader("id", simple("${body[ORDINAL_POSITION]}"))
.setHeader("session_id", simple("${body[SESSION_ID]}"))
.setHeader("data_map", simple("${body}"))
.process(exchange -> {
Integer ordinalPosition = exchange.getIn().getHeader("id", Integer.class);
@SuppressWarnings("unchecked")
List<Integer> ordinalPositions = exchange.getProperty("ordinalPositions", List.class);

synchronized (ordinalPositions) {
ordinalPositions.add(ordinalPosition); // Accumulate ORDINAL_POSITION
}
})
.marshal().json(true) // TODO: skip this if we are applying a format
.setBody(simple("${body}\n"))
.process(exchange -> {
if (null != m_formatter) {
exchange.getIn().setBody(m_formatter.format(getDataMap(exchange)));
}
})
.recipientList(constant(getRecipientList())).parallelProcessing().stopOnException().end()
.setBody(simple("delete fRoM " + m_schema + ".mAnZaNmSg where ORDINAL_POSITION = ${header.id} WITH NC"))
.to("jdbc:jt400").to("stream:err");
.recipientList(constant(getRecipientList()))
.parallelProcessing()
.stopOnException()
.end()
.end()
.process(exchange -> {
// Constructing the WHERE clause for ORDINAL_POSITIONs
StringBuilder deleteQuery = new StringBuilder("DELETE FROM " + m_schema
+ ".MANZANMSG WHERE SESSION_ID = '" + m_sessionId + "' AND ORDINAL_POSITION IN (");
@SuppressWarnings("unchecked")
List<Integer> ordinalPositions = exchange.getProperty("ordinalPositions", List.class);
if (ordinalPositions != null && !ordinalPositions.isEmpty()) {
synchronized (ordinalPositions) {
String positions = ordinalPositions.stream()
.map(String::valueOf) // Convert to String
.collect(Collectors.joining(","));
deleteQuery.append(positions).append(")");
}

// Set the DELETE query as the body
exchange.getIn().setBody(deleteQuery.toString());
} else {
// If no positions, set body to null, skip the DELETE
exchange.getIn().setBody(null);
}
})
.choice()
.when(body().isNotNull())
.to("jdbc:jt400")
.to("stream:err");
}
}

0 comments on commit fe875ab

Please sign in to comment.