Skip to content

Commit

Permalink
MessageBatchGetRequest reform based on adr
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf committed Nov 5, 2024
1 parent e0df8c0 commit ed87573
Show file tree
Hide file tree
Showing 2 changed files with 402 additions and 520 deletions.
270 changes: 74 additions & 196 deletions src/main/java/io/nats/client/api/MessageBatchGetRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,9 @@
package io.nats.client.api;

import io.nats.client.support.JsonSerializable;
import io.nats.client.support.Validator;

import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;

import static io.nats.client.support.ApiConstants.*;
Expand All @@ -30,23 +28,82 @@
public class MessageBatchGetRequest implements JsonSerializable {

private final int batch;
private final String nextBySubject;
private final int maxBytes;
private final long minSequence;
private final ZonedDateTime startTime;
private final String nextBySubject;
private final List<String> multiLastBySubjects;
private final long upToSequence;
private final ZonedDateTime upToTime;

MessageBatchGetRequest(Builder b) {
this.batch = b.batch;
this.maxBytes = b.maxBytes;
this.minSequence = b.minSequence;
this.startTime = b.startTime;
this.nextBySubject = b.nextBySubject;
this.multiLastBySubjects = b.multiLastBySubjects;
this.upToSequence = b.upToSequence;
this.upToTime = b.upToTime;
// batch constructor
private MessageBatchGetRequest(String subject,
int batch,
int maxBytes,
long minSequence,
ZonedDateTime startTime)
{
Validator.required(subject, "Subject");
Validator.validateGtZero(batch, "Batch");

this.nextBySubject = subject;
this.batch = batch;
this.maxBytes = maxBytes;
this.startTime = startTime;
this.multiLastBySubjects = null;
this.upToSequence = -1;
this.upToTime = null;

this.minSequence = startTime == null && minSequence < 1 ? 1 : minSequence;
}

public static MessageBatchGetRequest batch(String subject, int batch) {
return new MessageBatchGetRequest(subject, batch, -1, -1, null);
}

public static MessageBatchGetRequest batch(String subject, int batch, long minSequence) {
return new MessageBatchGetRequest(subject, batch, -1, minSequence, null);
}

public static MessageBatchGetRequest batch(String subject, int batch, ZonedDateTime startTime) {
return new MessageBatchGetRequest(subject, batch, -1, -1, startTime);
}

public static MessageBatchGetRequest batchBytes(String subject, int batch, int maxBytes) {
return new MessageBatchGetRequest(subject, batch, maxBytes, -1, null);
}

public static MessageBatchGetRequest batchBytes(String subject, int batch, int maxBytes, long minSequence) {
return new MessageBatchGetRequest(subject, batch, maxBytes, minSequence, null);
}

public static MessageBatchGetRequest batchBytes(String subject, int batch, int maxBytes, ZonedDateTime startTime) {
return new MessageBatchGetRequest(subject, batch, maxBytes, -1, startTime);
}

// multi for constructor
private MessageBatchGetRequest(List<String> subjects, long upToSequence, ZonedDateTime upToTime) {
Validator.required(subjects, "Subjects");
batch = -1;
nextBySubject = null;
this.maxBytes = -1;
this.minSequence = -1;
this.startTime = null;
this.multiLastBySubjects = subjects;
this.upToSequence = upToSequence;
this.upToTime = upToTime;
}

public static MessageBatchGetRequest multiLastForSubjects(List<String> subjects) {
return new MessageBatchGetRequest(subjects, -1, null);
}

public static MessageBatchGetRequest multiLastForSubjects(List<String> subjects, long upToSequence) {
return new MessageBatchGetRequest(subjects, upToSequence, null);
}

public static MessageBatchGetRequest multiLastForSubjects(List<String> subjects, ZonedDateTime upToTime) {
return new MessageBatchGetRequest(subjects, -1, upToTime);
}

/**
Expand Down Expand Up @@ -123,195 +180,16 @@ public String toJson() {
addField(sb, BATCH, batch);
addField(sb, MAX_BYTES, maxBytes);
addField(sb, START_TIME, startTime);
addField(sb, SEQ, minSequence);
addField(sb, NEXT_BY_SUBJECT, nextBySubject);
addStrings(sb, MULTI_LAST, multiLastBySubjects);
addField(sb, UP_TO_SEQ, upToSequence);
addField(sb, UP_TO_TIME, upToTime);

// THIS IS A WORKAROUND https://github.com/nats-io/nats-server/issues/6026
if (minSequence < 1) {
if (maxBytes > 0) {
addField(sb, SEQ, 1);
}
}
else {
addField(sb, SEQ, minSequence);
}
return endJson(sb).toString();
}

/**
* Creates a builder for the request.
* @return Builder
*/
public static Builder builder() {
return new Builder();
}

/**
* Creates a builder for the request.
* @param req the {@link MessageBatchGetRequest}
* @return Builder
*/
public static Builder builder(MessageBatchGetRequest req) {
return req == null ? new Builder() : new Builder(req);
}

/**
* {@link MessageBatchGetRequest} is created using a Builder. The builder supports chaining and will
* create a default set of options if no methods are calls.
* <p>{@code MessageBatchGetRequest.builder().build()} will create a default {@link MessageBatchGetRequest}.
*/
public static class Builder {
private int batch = -1;
private int maxBytes = -1;
private long minSequence = -1;
private ZonedDateTime startTime = null;
private String nextBySubject = null;
private List<String> multiLastBySubjects = new ArrayList<>();
private long upToSequence = -1;
private ZonedDateTime upToTime = null;

/**
* Construct the builder
*/
public Builder() {
}

/**
* Construct the builder and initialize values with the existing {@link MessageBatchGetRequest}
* @param req the {@link MessageBatchGetRequest} to clone
*/
public Builder(MessageBatchGetRequest req) {
if (req != null) {
this.batch = req.batch;
this.maxBytes = req.maxBytes;
this.minSequence = req.minSequence;
this.startTime = req.startTime;
this.nextBySubject = req.nextBySubject;
this.multiLastBySubjects = req.multiLastBySubjects;
this.upToSequence = req.upToSequence;
this.upToTime = req.upToTime;
}
}

/**
* Set the maximum amount of messages to be returned for this request.
* @param batch the batch size
* @return Builder
*/
public Builder batch(int batch) {
this.batch = batch < 1 ? -1 : batch;
return this;
}

/**
* Maximum amount of returned bytes for this request.
* Limits the amount of returned messages to not exceed this.
* @param maxBytes the maximum bytes
* @return Builder
*/
public Builder maxBytes(int maxBytes) {
this.maxBytes = maxBytes < 1 ? -1 : maxBytes;
return this;
}

/**
* Minimum sequence for returned messages.
* All returned messages will have a sequence equal to or higher than this.
* @param sequence the minimum message sequence
* @return Builder
*/
public Builder minSequence(long sequence) {
this.minSequence = sequence < 1 ? -1 : sequence;
return this;
}

/**
* Minimum start time for returned messages.
* All returned messages will have a start time equal to or higher than this.
* @param startTime the minimum message start time
* @return Builder
*/
public Builder startTime(ZonedDateTime startTime) {
this.startTime = startTime;
return this;
}

/**
* Subject used to filter messages that should be returned.
* @param nextBySubject the subject to filter
* @return Builder
*/
public Builder nextBySubject(String nextBySubject) {
if (!multiLastBySubjects.isEmpty()) {
throw new IllegalArgumentException("nextBySubject cannot be used when multiLastBySubjects is used.");
}
this.nextBySubject = nextBySubject;
return this;
}

/**
* Subjects filter used, these can include wildcards.
* Will get the last messages matching the subjects.
* @param subjects the subjects to get the last messages for
* @return Builder
*/
public Builder multiLastBySubjects(String... subjects) {
if (nextBySubject != null) {
throw new IllegalArgumentException("multiLastBySubjects cannot be used when nextBySubject is used.");
}
this.multiLastBySubjects.clear();
if (subjects != null && subjects.length > 0) {
this.multiLastBySubjects.addAll(Arrays.asList(subjects));
}
return this;
}

/**
* Subjects filter used, these can include wildcards.
* Will get the last messages matching the subjects.
* @param subjects the subjects to get the last messages for
* @return Builder
*/
public Builder multiLastBySubjects(Collection<String> subjects) {
if (nextBySubject != null) {
throw new IllegalArgumentException("multiLastBySubjects cannot be used when nextBySubject is used.");
}
this.multiLastBySubjects.clear();
if (subjects != null && !subjects.isEmpty()) {
this.multiLastBySubjects.addAll(subjects);
}
return this;
}

/**
* Only return messages up to this sequence.
* If not set, will be last sequence for the stream.
* @param upToSequence the maximum message sequence to return results for
* @return Builder
*/
public Builder upToSequence(long upToSequence) {
this.upToSequence = upToSequence < 1 ? -1 : upToSequence;
return this;
}

/**
* Only return messages up to this time.
* @param upToTime the maximum message time to return results for
* @return Builder
*/
public Builder upToTime(ZonedDateTime upToTime) {
this.upToTime = upToTime;
return this;
}

/**
* Build the {@link MessageBatchGetRequest}.
* @return MessageBatchGetRequest
*/
public MessageBatchGetRequest build() {
return new MessageBatchGetRequest(this);
}
@Override
public String toString() {
return toJson();
}
}
Loading

0 comments on commit ed87573

Please sign in to comment.