Skip to content

Commit

Permalink
Formalized Multi-cluster and Leafnode toplogies
Browse files Browse the repository at this point in the history
Signed-off-by: R.I.Pienaar <[email protected]>
  • Loading branch information
ripienaar committed Oct 19, 2023
1 parent 3d7e81b commit 72774e3
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 29 deletions.
193 changes: 164 additions & 29 deletions adr/ADR-8.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@

## Release History

| Revision | Date | Description |
|----------|------------|-------------------------------------------|
| 1 | 2023-10-16 | Document NATS Server 2.10 sourced buckets |
| 1 | 2023-10-16 | Document read replica mirrors buckets |
| 1 | 2023-10-16 | Document consistency guarantees |
| Revision | Date | Description |
|----------|------------|-----------------------------------------------------|
| 1 | 2021-12-15 | Initial stable release of version 1.0 specification |
| 2 | 2023-10-16 | Document NATS Server 2.10 sourced buckets |
| 2 | 2023-10-16 | Document read replica mirrors buckets |
| 2 | 2023-10-16 | Document consistency guarantees |
| 3 | 2023-10-19 | Formalize initial bucket topologies |


## Context
Expand Down Expand Up @@ -51,6 +53,7 @@ additional behaviors will come during the 1.x cycle.
* Merged buckets using NATS Server 2.10 subject transforms
* Read replicas facilitated by Stream Mirrors
* Replica auto discovery for mirror based replicas
* Formalized Multi-cluster and Leafnode Topologies

### 1.2

Expand Down Expand Up @@ -394,40 +397,179 @@ is clearly indicated as a language specific extension. Names should be language

The default behavior with no options set is to send all the `last_per_subject` values, including delete/purge operations.

#### Replicas for NATS Server 2.10 and newer
#### Multi-Cluster and Leafnode topologies

A bucket, being backed by a Stream, lives in one Cluster only. To make buckets available elsewhere we have to use
JetStream Sources and Mirrors.

In KV we call these `Toplogies` and adding *Topology Buckets* require using different APIs than the main Bucket API
allowing us to codify patterns and options that we support at a higher level than the underlying Stream options.

For example, we want to be able to expose a single boolean that says an Aggregate is read-only which would potentially
influence numerous options in the Stream Configuration.

![KV Topologies](images/0008-topologies.png)

To better communicate the intent than the word Source we will use `Aggregate` in KV terms:

**Mirror**: Copy of exactly 1 other bucket. Used primarily for scaling out the `Get()` operations.

* It is always Read-Only
* It can hold a filtered subset of keys
* Replicas are automatically picked using a RTT-nearest algorithm without any configuration
* Additional replicas can be added and removed at run-time without any re-configuration of already running KV clients
* Writes and Watchers are transparently sent to the origin bucket
* Can replicate buckets from other accounts and domains

**Aggregate**: A `Source` that combines one or many buckets into 1 new bucket. Used to provide a full local copy of
other buckets that support watchers and gets locally in edge scenarios.

* Requires being accessed specifically by its name used in a `KeyValue()` call
* Can be read-only or read-write
* It can hold a subset of keys from the origin buckets to limit data exposure or size
* Can host watchers
* Writes are not transparently sent to the origin Bucket as with Replicas, they either fail (default) or succeed and
modify the Aggregate (opt-in)
* Can combine buckets from multiple other accounts and domains into a single Aggregate
* Additional Sources can be added after initially creating the Aggregate

Experiments:

These items we will add in future iterations of the Topology concept:

* Existing Sources can be removed from an Aggregate. Optionally, but by default, purge the data out of the bucket
for the Source being removed
* Watchers could be supported against a Replica and would support auto-discovery of nearest replica but would
minimise the ability to add and remove Replicas at runtime

*Implementation Note*: While this says Domains are supported, we might decide not to implement support for them at
this point as we know we will revisit the concept of a domain. The existing domain based mirrors that are supported
in KeyValueConfig will be deprecated but supported for the foreseeable future for those requiring domain support.

#### Creation of Aggregates

Since NATS Server 2.10 we support transforming messages as a stream configuration item. This allows us to source one
bucket from another and rewrite the keys in the new bucket to have the correct name.

To copy the keys `NEW.>` from bucket `ORDERS` into `NEW_ORDERS` we create the new stream with the following
partial config:
We will model this using a few API functions and specific structures:

```go
// KVAggregateConfig configures an aggregate
//
// This one is quite complex because are buckets in their own right and so inevitably need
// to have all the options that are in buckets today (minus the deprecated ones).
type KVAggregateConfig struct {
Bucket string
Writable bool
Description string
Replicas int
MaxValueSize int32
History uint8
TTL time.Duration
MaxBytes int64
Storage KVStorageType // a new kv specific storage struct, for now identical to normal one
Placement *KVPlacement // a new kv specific placement struct, for now identical to normal one
RePublish *KVRePublish // a new kv specific replacement struct, for now identical to normal one
Origins []*KVAggregateOrigin
}

type KVAggregateOrigin struct {
Stream string // note this is Stream and not Bucket since the origin may be a mirror which may not be a bucket
Bucket string // in the case where we are aggregating from a mirror, we need to know the bucket name to construct mappings
Keys []string // optional filter defaults to >
External *ExternalStream
}

// CreateAggregate creates a new read-only Aggregate bucket with one or more sources
CreateAggregate(ctx context.Context, cfg KVAggregateOrigin) (KeyValue, error) {}

// AddAggregateOrigin updates bucket by adding new origin cfg, errors if bucket is not an Aggregate
AddAggregateOrigin(ctx context.Context, bucket KeyValue, cfg KVAggregateOrigin) error {}
```

To copy the keys `NEW.>` from bucket `ORDERS` into `NEW_ORDERS`:

```go
bucket, _ := CreateAggregate(ctx, KVAggregateConfig{
Name: "NEW_ORDERS",
Writable: false,
Origins: []KVAggregateOrigin{
{
Stream: "KV_ORDERS",
Keys: []string{"NEW.>"}
}
}
})
```

We create the new stream with the following partial config, rest as per any other KV, if the `orders` handle :

```json
"subjects": []string{},
"deny_delete": true,
"deny_purge": true,
"sources": [
{
"name": "KV_ORDERS",
"filter_subject": "$KV.ORDERS.NEW.>",
"subject_transforms": [
{
"src": "$KV.ORDERS.>",
"src": "$KV.ORDERS.NEW.>",
"dest": "$KV.NEW_ORDERS.>"
}
]
}
],
```

When writable, configure as normal just add the sources.

This results in all messages from `ORDERS` keys `NEW.>` to be copied into `NEW_ORDERS` and the subjects rewritten on
write to the new bucket so that a unmodified KV client on `NEW_ORDERS` would just work.

As this is a `Source` and not a `Mirror` this new bucket can accept writes. Sourced streams can be created with no
listening subjects which would render the mirror a read-only replica. Given that it's a Source it can be made with
multiple sources to create an aggregate bucket by using multiple subject transforms.
#### Creation of Mirrors

#### Read replica mirrors
Replicas can be built using the standard mirror feature by setting `mirror_direct` to true as long as the origin bucket
also has `allow_direct`. When adding a mirror it should be confirmed that the origin bucket has `allow_direct` set.

Regional read replicas can be built using the standard mirror feature by setting `mirror_direct` to true as long as the
origin bucket also has `allow_direct`.
We will model this using a few API functions and specific structures:

```go
type KVMirrorConfig struct {
Name string
Description string
Replicas int
History uint8
TTL time.Duration
MaxBytes int64
Storage StorageType
Placement *Placement
Keys []string // if empty no filters on the mirror
}

type KVMirrorOrigin struct {
Stream string // note this is Stream and not Bucket since the origin may be a mirror which may not be a bucket
Keys []string // optional filter defaults to >
External *ExternalStream
}

// CreateMirror creates a new read-only Mirror bucket from an origin bucket
CreateMirror(ctx context.Context, origin KVMirrorOrigin, cfg KVMirrorConfig) error {}
```

These mirrors are not called `Bucket` and may not have the `KV_` string name prefix as they are not buckets and cannot
be used as buckets without significant changes in how a KV client constructs its key names etc, we have done this in
the leafnode mode and decided it's not a good pattern.

When creating a replica of `ORDERS` to `MIRROR_ORDERS_NYC` we do:

```go
origin := KVMirrorOrigin{Name: "ORDERS"}

err := CreateMirror(ctx, origin, KVMirrorConfig{
Name: "MIRROR_ORDERS_NYC",
// ...
})
```

When a direct read is done the response will be from the rtt-nearest mirror. With a mirror added the `nats` command
can be used to verify that a alternative location is set:
Expand All @@ -437,38 +579,31 @@ $ nats s info KV_ORDERS
...
State:
Alternates: KV_ORDERS_NYC: Cluster: nyc Domain: hub
KV_ORDERS: Cluster: lon Domain: hub
Alternates: MIRROR_ORDERS_NYC: Cluster: nyc Domain: hub
KV_ORDERS: Cluster: lon Domain: hub
```

Here we see a RTT-sorted list of alternatives, the `KV_ORDERS_NYC` is nearest to me in the RTT sorted list.
Here we see a RTT-sorted list of alternatives, the `MIRROR_ORDERS_NYC` is nearest to me in the RTT sorted list.

When doing a direct get the headers will confirm the mirror served the request:

```
$ nats req '$JS.API.DIRECT.GET.KV_ORDERS.$KV.ORDERS.NEW.123' ''
13:26:06 Sending request on "JS.API.DIRECT.GET.KV_ORDERS.$KV.ORDERS.NEW.123"
13:26:06 Received with rtt 1.319085ms
13:26:06 Nats-Stream: KV_ORDERS_NYC
13:26:06 Nats-Stream: MIRROR_ORDERS_NYC
13:26:06 Nats-Subject: $KV.ORDERS.NEW.123
13:26:06 Nats-Sequence: 12
13:26:06 Nats-Time-Stamp: 2023-10-16T12:54:19.409051084Z
{......}
```

As mirrors support subject filters these regional replicas can hold region specific keys.
As mirrors support subject filters these replicas can hold region specific keys.

As this is a `Mirror` this stream does not listen on a subject and so the only way to get data into it is via the origin
bucket.

Watchers will always run on the original stream.

While it appears that there is significant overlap between this config and the source based one, only this strategy
creates RTT aware automatic nearest replica selection. Replicas can be added and removed without clients requiring any
knowledge of these replicas and clients will automatically use the nearest replica. During the life of a client the
replica used may even change as network conditions change.
bucket. We should also set the options to deny deletes and purges.

#### API Design notes

Expand Down
Binary file added adr/images/0008-topologies.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit 72774e3

Please sign in to comment.