Apache Pulsar extension for Mongoose
- Introduction
- Features
- Deployment
  3.1. Basic
  3.2. Docker
    3.2.1. Standalone
    3.2.2. Distributed
      3.2.2.1. Additional Node
      3.2.2.2. Entry Node - Configuration
  4.1. Specific Options
  4.2. Tuning - Usage
  5.1. Message Operations
    5.1.1. Create
    5.1.2. Read
      5.1.2.1. Basic
      5.1.2.2. Tail
      5.1.2.3. End-to-end Latency
  5.2. Topic Operations
    5.2.1. Create
    5.2.2. Read
    5.2.3. Update
    5.2.4. Delete - Open Issues
- Development
  7.1. Build
  7.2. Test
    7.2.1. Automated
      7.2.1.1. Unit
      7.2.1.2. Integration
      7.2.1.3. Functional
    7.2.2. Manual
Pulsar | Mongoose |
---|---|
Message | Data Item |
Topic | Item Path or Data Item |
- Authentication: https://mongoose-issues.atlassian.net/browse/PULSAR-4
- SSL/TLS: https://mongoose-issues.atlassian.net/browse/PULSAR-1
- Item Types:
data
: corresponds to a messagepath
: corresponds to a topictoken
: not supported
- Supported load operations:
create
(messages)read
(messages)updae
(topics appending, TODO)delete
(topics, TODO)
- Storage-specific:
Java 11+ is required to build/run.
-
Get the latest
mongoose-base
jar from the maven repo and put it to your working directory. Note the particular version, which is referred as BASE_VERSION below. -
Get the latest
mongoose-storage-driver-coop
jar from the maven repo and put it to the~/.mongoose/<BASE_VERSION>/ext
directory. -
Get the latest
mongoose-storage-driver-pulsar
jar from the maven repo and put it to the~/.mongoose/<BASE_VERSION>/ext
directory.
java -jar mongoose-base-<BASE_VERSION>.jar \
--storage-driver-type=pulsar \
--storage-net-node-addrs=<NODE_IP_ADDRS> \
--storage-net-node-port=6650 \
--load-batch-size=1000 \
--storage-driver-limit-concurrency=1000 \
...
docker run \
--network host \
emcmongoose/mongoose-storage-driver-pulsar \
--storage-net-node-addrs=<NODE_IP_ADDRS> \
--load-batch-size=1000 \
--storage-driver-limit-concurrency=1000 \
...
docker run \
--network host \
--expose 1099 \
emcmongoose/mongoose-storage-driver-pulsar \
--run-node
docker run \
--network host \
emcmongoose/mongoose-storage-driver-pulsar \
--load-step-node-addrs=<ADDR1,ADDR2,...> \
--storage-net-node-addrs=<NODE_IP_ADDRS> \
--load-batch-size=1000 \
--storage-driver-limit-concurrency=1000 \
...
Reference
Name | Type | Default Value | Description |
---|---|---|---|
storage-driver-create-batch-enabled | boolean | true |
Producer batching |
storage-driver-create-batch-delayMicros | long integer | 1000 | Maximum publish latency (microseconds) |
storage-driver-create-compression | enum | none |
Should compress data upon messages create or not (default). The available compression types are defined by the Pulsar client |
storage-driver-create-timestamp | boolean | false |
If enabled, will record the message creation timestamp into the message's metadata. Required for end-to-end latency measurement |
storage-driver-read-tail | boolean | false |
Should read the latest message of the topic or read from the topic beginning (default). Should be true for end-to-end latency measurement |
storage-net-tcpNoDelay | boolean | true |
The option specifies whether the server disables the delay of sending successive small packets on the network. |
storage-net-timeoutMilliSec | integer | 0 |
Connection timeout. 0 means no timeout |
storage-net-node-addrs | list of strings | 127.0.0.1 |
The list of the storage node IPs or hostnames to use for HTTP load. May include port numbers. |
storage-net-node-port | integer | 6650 |
The common port number to access the storage nodes, may be overriden adding the port number to the storage-driver-addrs, for example: "127.0.0.1:6650,127.0.0.1:6651,..." |
-
load-batch-size
Determines how many operations the driver will try to submit at once. Large values may increase both the throughput and the memory consumption. -
storage-driver-concurrency-limit
Determines how many operations may be in flight at every moment of the time. Specifying0
will cause the "burst mode", when the driver submits as many operations as possible regardless their completion. The unlimited concurrency is useful for the sustained rate measurement but may cause either Pulsar or Mongoose inconsistent state. -
storage-driver-threads
Determines the Pulsar client IO worker threads count.
Example, write 1KB messages to the topic "topic1" in the Pulsar instance w/ address 12.34.56.78:
docker run \
--network host \
emcmongoose/mongoose-storage-driver-pulsar \
--storage-net-node-addrs=12.34.56.78 \
--load-batch-size=1000 \
--storage-driver-limit-concurrency=1000 \
--item-data-size=1KB \
--item-output-path=topic1
Notes:
- Generally,
load-op-recycle
option should be set totrue
to make the messages reading working. - Mongoose couldn't determine the end of the topic(s), so it's mandatory to specify the count/time limit.
Example, read 1M messages from the beginning of the topic "topic1":
docker run \
--network host \
emcmongoose/mongoose-storage-driver-pulsar \
--storage-net-node-addrs=12.34.56.78 \
--load-batch-size=100 \
--storage-driver-limit-concurrency=100 \
--read \
--item-input-path=topic1 \
--load-op-recycle \
--load-op-limit-count=1000000
Example, read all new messages from the topic "topic1" during the 1 minute:
docker run \
--network host \
emcmongoose/mongoose-storage-driver-pulsar \
--storage-net-node-addrs=12.34.56.78 \
--load-batch-size=100 \
--storage-driver-limit-concurrency=100 \
--read \
--item-input-path=topic1 \
--storage-driver-read-tail \
--load-op-recycle \
--load-step-limit-time=1m
- Start writing the messages to some topic with enabled timestamps recording. Example command:
docker run \
--network host \
emcmongoose/mongoose-storage-driver-pulsar \
--item-data-size=1KB \
--item-output-path=topic1 \
--load-batch-size=1000 \
--storage-driver-create-timestamp
- Start the tail read from the same topic:
docker run \
--network host \
--volume $PWD/log:/root/.mongoose/4.2.16/log
emcmongoose/mongoose-storage-driver-pulsar \
--load-batch-size=1000 \
--read \
--item-input-path=topic1 \
--storage-driver-read-tail \
--load-op-recycle \
--load-step-id=e2e_test
- Check the end-to-end time data in the
log/e2e_test/op.trace.csv
log file. The data is in the CSV format with 3 columns:
- internal message id
- message payload size
- end-to-end time in milliseconds
Note: the end-to-end time data will not be aggregated in the distributed mode.
TODO: https://mongoose-issues.atlassian.net/browse/PULSAR-2
TODO
TODO
TODO
TODO
Please refer to the issue tracker
./gradlew clean jar
./gradlew clean test
docker run -it \
-p 6650:6650 \
-p 8080:8080 \
apachepulsar/pulsar:<PULSAR_VERSION> \
bin/pulsar standalone
./gradlew integrationTest
./gradlew jar
export SUITE=api.storage
TEST=<TODO_TEST_NAME> ./gradlew robotest
- Build the storage driver
- Copy the storage driver's jar file into the mongoose's
ext
directory:
cp -f build/libs/mongoose-storage-driver-pulsar-*.jar ~/.mongoose/<MONGOOSE_BASE_VERSION>/ext/
Note that the Pulsar storage driver depends on the
Coop Storage Driver
extension so it should be also put into the ext
directory
3. Install and run the Apache Pulsar
docker run -it \
-p 6650:6650 \
-p 8080:8080 \
apachepulsar/pulsar:<PULSAR_VERSION> \
bin/pulsar standalone
- Run Mongoose's default scenario with some specific command-line arguments:
java -jar mongoose-<MONGOOSE_BASE_VERSION>.jar \
--storage-driver-type=pulsar \
--storage-net-node-port=6650 \
--storage-driver-limit-concurrency=10 \
--item-output-path=topic-0