Skip to content

Commit

Permalink
NET-378: start work on audit flux out.
Browse files Browse the repository at this point in the history
  • Loading branch information
msqr committed May 29, 2024
1 parent 31c0b30 commit a1ccafd
Showing 1 changed file with 105 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@

package net.solarnetwork.flux.vernemq.webhook.service.impl;

import static net.solarnetwork.util.ObjectUtils.requireNonNullArgument;

import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.time.Clock;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Iterator;
import java.util.Map;
Expand Down Expand Up @@ -87,14 +90,26 @@ public class JdbcAuditService implements AuditService {
/**
* The default value for the {@link mqttServiceName} property.
*/
public static final String DEFAULT_AUDIT_MQTT_SERVICE_NAME = "solarflux";
public static final String DEFAULT_AUDIT_MQTT_SERVICE_NAME = "solarflux-in";

/**
* The default value for the {@link mqttServiceName} property.
*
* @since 1.2
*/
public static final String DEFAULT_AUDIT_DELIVER_MQTT_SERVICE_NAME = "solarflux-out";

/**
* A regular expression that matches if a JDBC statement is a {@link CallableStatement}.
*/
public static final Pattern CALLABLE_STATEMENT_REGEX = Pattern.compile("^\\{call\\s.*\\}",
Pattern.CASE_INSENSITIVE);

/**
* The default value for the {@code deliverTopicRegex} property.
*/
public static final String DEFAULT_DELIVER_TOPIC_REGEX = "(?:user/(\\d+)/)?(?:node/(\\d+)/datum/[^/]+(/.+))?";

// CHECKSTYLE ON: LineLength

private final Logger log = LoggerFactory.getLogger(getClass());
Expand All @@ -108,6 +123,9 @@ public class JdbcAuditService implements AuditService {
private String nodeSourceIncrementSql;
private int statLogUpdateCount;

private Pattern deliverTopicRegex = Pattern.compile(DEFAULT_DELIVER_TOPIC_REGEX);
private String deliverMqttServiceName;

private WriterThread writerThread;
private long updateDelay;
private long flushDelay;
Expand All @@ -122,7 +140,8 @@ public class JdbcAuditService implements AuditService {
* if any argument is {@literal null}
*/
public JdbcAuditService(DataSource dataSource) {
this(dataSource, new ConcurrentHashMap<>(1000, 0.8f, 4), Clock.systemUTC());
this(dataSource, new ConcurrentHashMap<>(1000, 0.8f, 4),
Clock.tick(Clock.systemUTC(), Duration.ofHours(1)));
}

/**
Expand All @@ -133,27 +152,20 @@ public JdbcAuditService(DataSource dataSource) {
* @param nodeSourceCounters
* the node source counters map
* @param clock
* the clock to use
* the clock to use; the clock should tick only at the rate that counts should be
* aggregated to, e.g. {@code Clock.tick(Clock.systemUTC(), Duration.ofHours(1))}
* @throws IllegalArgumentException
* if any argument is {@literal null}
*/
public JdbcAuditService(DataSource dataSource,
ConcurrentMap<DatumId, AtomicInteger> nodeSourceCounters, Clock clock) {
super();
if (dataSource == null) {
throw new IllegalArgumentException("The dataSource argument must not be null.");
}
this.dataSource = dataSource;
if (nodeSourceCounters == null) {
throw new IllegalArgumentException("The nodeSourceCounters argument must not be null.");
}
this.nodeSourceCounters = nodeSourceCounters;
if (clock == null) {
throw new IllegalArgumentException("The clock argument must not be null.");
}
this.clock = clock;
this.dataSource = requireNonNullArgument(dataSource, "dataSource");
this.nodeSourceCounters = requireNonNullArgument(nodeSourceCounters, "nodeSourceCounters");
this.clock = requireNonNullArgument(clock, "clock");
this.updateCount = new AtomicLong();
setMqttServiceName(DEFAULT_AUDIT_MQTT_SERVICE_NAME);
setDeliverMqttServiceName(DEFAULT_AUDIT_DELIVER_MQTT_SERVICE_NAME);
setConnectionRecoveryDelay(DEFAULT_CONNECTION_RECOVERY_DELAY);
setFlushDelay(DEFAULT_FLUSH_DELAY);
setUpdateDelay(DEFAULT_UPDATE_DELAY);
Expand All @@ -174,8 +186,25 @@ public void auditPublishMessage(Actor actor, Long nodeId, String sourceId, Messa
@Override
public void auditDeliverMessage(Message message) {
final int byteCount = (message.getPayload() != null ? message.getPayload().length : 0);
if (byteCount > 0) {
// TODO
if (byteCount > 0 && message.getTopic() != null) {
Matcher m = deliverTopicRegex.matcher(message.getTopic());
if (m.matches()) {
final String userId = m.group(1);
final String nodeId = m.group(2);
final String sourceId = m.group(3);
final DatumId key;
if (nodeId != null && !nodeId.isEmpty() && sourceId != null && !sourceId.isEmpty()) {
key = DatumId.nodeId(Long.valueOf(nodeId), sourceId, clock.instant());
} else if (userId != null && !userId.isBlank()) {
key = DatumId.nodeId(Long.valueOf(userId), null, clock.instant());
} else {
// unknown topic format, ignore
return;
}
log.debug("Message on topic [{}] delivers {} bytes to key {}", message.getTopic(),
byteCount, key);
// TODO: addNodeSourceCount(key, byteCount);
}
}
}

Expand Down Expand Up @@ -269,13 +298,18 @@ private void flushNodeSourceData(PreparedStatement stmt)
}
try {
if (log.isTraceEnabled()) {
log.trace("Incrementing node {} source {} @ {} byte count by {}", key.getObjectId(),
key.getSourceId(), key.getTimestamp(), count);
if (key.getSourceId() != null) {
log.trace("Incrementing node {} source {} @ {} {} byte count by {}", key.getObjectId(),
key.getSourceId(), key.getTimestamp(), mqttServiceName, count);
} else {
log.trace("Incrementing user {} @ {} {} byte count by {}", key.getObjectId(),
key.getSourceId(), key.getTimestamp(), deliverMqttServiceName, count);
}
}
stmt.setString(1, mqttServiceName);
stmt.setString(1, key.getSourceId() != null ? mqttServiceName : deliverMqttServiceName);
stmt.setObject(2, key.getObjectId());
stmt.setString(3, key.getSourceId());
stmt.setTimestamp(4, new java.sql.Timestamp(key.getTimestamp().toEpochMilli()));
stmt.setTimestamp(4, java.sql.Timestamp.from(key.getTimestamp()));
stmt.setInt(5, count);
stmt.execute();
long currUpdateCount = updateCount.incrementAndGet();
Expand Down Expand Up @@ -345,16 +379,26 @@ public synchronized void disableWriting() {
}

/**
* Set the MQTT audit service name to use.
* Set the MQTT audit service name to use for publish events.
*
* @param mqttServiceName
* the service to set; defaults to {@link #DEFAULT_AUDIT_MQTT_SERVICE_NAME}
*/
public void setMqttServiceName(String mqttServiceName) {
if (mqttServiceName == null) {
throw new IllegalArgumentException("The mqttServiceName argument must not be null.");
}
this.mqttServiceName = mqttServiceName;
this.mqttServiceName = requireNonNullArgument(mqttServiceName, "mqttServiceName");
reconnectWriter();
}

/**
* Set the MQTT audit service name to use for deliver events.
*
* @param deliverMqttServiceName
* the service to use; defaults to {@link #DEFAULT_AUDIT_DELIVER_MQTT_SERVICE_NAME}
* @since 1.2
*/
public void setDeliverMqttServiceName(String deliverMqttServiceName) {
this.deliverMqttServiceName = requireNonNullArgument(deliverMqttServiceName,
"deliverMqttServiceName");
reconnectWriter();
}

Expand Down Expand Up @@ -421,9 +465,7 @@ public void setUpdateDelay(long updateDelay) {
* the SQL statement to use; defaults to {@link #DEFAULT_NODE_SOURCE_INCREMENT_SQL}
*/
public void setNodeSourceIncrementSql(String sql) {
if (sql == null) {
throw new IllegalArgumentException("nodeSourceIncrementSql must not be null");
}
requireNonNullArgument(sql, "sql");
if (sql.equals(nodeSourceIncrementSql)) {
return;
}
Expand All @@ -448,4 +490,38 @@ public void setStatLogUpdateCount(int statLogUpdateCount) {
this.statLogUpdateCount = statLogUpdateCount;
}

/**
* Get the deliver topic regular expression.
*
* @return the regular expression; defaults to {@link #DEFAULT_DELIVER_TOPIC_REGEX}
* @since 1.2
*/
public Pattern getDeliverTopicRegex() {
return deliverTopicRegex;
}

/**
* Set the deliver topic regular expression.
*
* <p>
* This expression is matched against the deliver request topics, and must provide the following
* matching groups:
* </p>
*
* <ol>
* <li>user ID</li>
* <li>node ID</li>
* <li>source ID</li>
* </ol>
*
* @param deliverTopicRegex
* the regular expression to use
* @throws IllegalArgumentException
* if {@code deliverTopicRegex} is {@literal null}
* @since 1.2
*/
public void setDeliverTopicRegex(Pattern deliverTopicRegex) {
this.deliverTopicRegex = requireNonNullArgument(deliverTopicRegex, "deliverTopicRegex");
}

}

0 comments on commit a1ccafd

Please sign in to comment.