Skip to content

Commit

Permalink
enable lr
Browse files Browse the repository at this point in the history
  • Loading branch information
pshreay committed Jan 24, 2024
1 parent 3c3a267 commit 746a4bf
Show file tree
Hide file tree
Showing 19 changed files with 274 additions and 91 deletions.
Binary file added cloud/corfu/.swp
Binary file not shown.
28 changes: 28 additions & 0 deletions cloud/corfu/cluster_deploy.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/bin/zsh

k3d cluster delete corfu
rm -rf /tmp/k3dvol

k3d cluster create corfu \
--volume /tmp/k3dvol:/tmp/k3dvol \
-p "8082:30080@agent:0" \
--agents 2

k3d image import corfudb/corfu-server:cloud -c corfu
k3d image import corfudb2/corfu-server:cloud -c corfu

k3d image import corfudb/corfu-server:0.4.0-SNAPSHOT -c corfu
k3d image import corfudb2/corfu-server:0.4.0-SNAPSHOT -c corfu

k3d image import corfudb/corfu-client-example:latest -c corfu

helm repo add jetstack https://charts.jetstack.io
helm repo update

helm install cert-manager jetstack/cert-manager --namespace cert-manager --create-namespace --version v1.8.0 --set installCRDs=true
helm install corfu corfu --set tls.enabled=false --set tls.certificate.enabled=false --set global.replicas=3 --set image.repository=corfudb/corfu-server --set image.tag=0.4.0-SNAPSHOT
helm install corfu2 corfu --set tls.enabled=false --set tls.certificate.enabled=false --set global.replicas=3 --set image.repository=corfudb2/corfu-server --set image.tag=0.4.0-SNAPSHOT --set lr.name="log-replication2" --set nameOverride="corfu2" --set serviceAccount.name="corfu2" --set nameOverride="corfu2" --set fullnameOverride="corfu2" --set cluster.type="sink"

# helm install corfu-client corfu-client-example-helm --set tls.enabled=false

sleep 10
2 changes: 1 addition & 1 deletion cloud/corfu/corfu-client-example-helm/templates/job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ spec:
containers:
- name: corfu-client
image: corfudb/corfu-client-example:latest
imagePullPolicy: Always
imagePullPolicy: Never
command:
- "sh"
- "-c"
Expand Down
2 changes: 1 addition & 1 deletion cloud/corfu/corfu-client-example-helm/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ image:
registry: "docker.io"
repository: "corfudb/corfu-client-example"
tag: "latest"
pullPolicy: Always
pullPolicy: Never
corfuEndpoint: "corfu-0.corfu-headless.default.svc.cluster.local"
tls:
enabled: true
Expand Down
22 changes: 12 additions & 10 deletions cloud/corfu/corfu-client-example/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,18 @@ repositories {
mavenLocal()
mavenCentral()

maven {
name = "GitHubPackages"
url = uri("https://maven.pkg.github.com/corfudb/corfudb")
// For accessing GitHub Secrets in CorfuDB repo
credentials {
username = System.getenv("PKG_USERNAME")
password = System.getenv("PUBLISH_TOKEN")
}
}
// maven {
// name = "GitHubPackages"
// url = uri("https://maven.pkg.github.com/corfudb/corfudb")
// // For accessing GitHub Secrets in CorfuDB repo
// credentials {
// username = System.getenv("PKG_USERNAME")
// password = System.getenv("PUBLISH_TOKEN")
// }
// }
}

val corfuVersion = "0.3.2-SNAPSHOT"
val corfuVersion = "0.4.0-SNAPSHOT"
val logbackVersion = "1.2.11"
val junitVersion = "5.8.2"

Expand All @@ -29,6 +29,8 @@ dependencies {
implementation("org.corfudb:runtime:${corfuVersion}") {
exclude(group = "io.netty", module = "netty-tcnative")
}
implementation("org.corfudb:infrastructure:${corfuVersion}")
implementation("com.github.luben:zstd-jni:1.4.8-1")

testImplementation("org.junit.jupiter:junit-jupiter-engine:${junitVersion}")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
package org.corfudb.cloud.runtime.example;

import com.google.common.reflect.TypeToken;
import org.corfudb.runtime.CorfuRuntime;
import org.corfudb.runtime.collections.PersistentCorfuTable;
import java.util.Map;
import org.corfudb.infrastructure.logreplication.proto.Sample;
import org.corfudb.runtime.CorfuOptions;
import org.corfudb.runtime.CorfuRuntime;
import org.corfudb.runtime.collections.CorfuStore;
import org.corfudb.runtime.collections.Table;
import org.corfudb.runtime.collections.TableOptions;
import org.corfudb.runtime.collections.TxnContext;
import org.corfudb.runtime.view.ObjectsView;
import org.corfudb.util.NodeLocator;
import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;

import java.util.concurrent.TimeUnit;

/**
* This tutorial demonstrates a simple Corfu application.
Expand All @@ -32,7 +40,7 @@ private static CorfuRuntime getRuntimeAndConnect(String host, boolean tlsEnabled

CorfuRuntime.CorfuRuntimeParameters.CorfuRuntimeParametersBuilder builder = CorfuRuntime.CorfuRuntimeParameters
.builder()
.connectionTimeout(Duration.ofSeconds(2))
.connectionTimeout(Duration.ofSeconds(20))
.layoutServers(Collections.singletonList(loc));
if (tlsEnabled) {
builder.tlsEnabled(tlsEnabled)
Expand All @@ -47,7 +55,7 @@ private static CorfuRuntime getRuntimeAndConnect(String host, boolean tlsEnabled
}

// Sample code
public static void main(String[] args) {
public static void main(String[] args) throws Exception {
System.out.println("Start application. Got args: " + Arrays.toString(args));
// Parse the options given, using docopt.
/*
Expand Down Expand Up @@ -88,47 +96,74 @@ public static void main(String[] args) {
tlsEnabled = true;
}

CorfuRuntime runtimeSource = getRuntimeAndConnect(ip, tlsEnabled, keyStore, keyStorePassword, trustStore, trustStorePassword);
CorfuRuntime runtimeSink = getRuntimeAndConnect("corfu2-0.corfu2-headless.default.svc.cluster.local",
tlsEnabled, keyStore, keyStorePassword, trustStore, trustStorePassword);

CorfuStore corfuStoreSource = new CorfuStore(runtimeSource);
CorfuStore corfuStoreSink = new CorfuStore(runtimeSink);

CorfuRuntime runtime = getRuntimeAndConnect(ip, tlsEnabled, keyStore, keyStorePassword, trustStore, trustStorePassword);
String NAMESPACE = "test";
String streamA = "Table001";

/**
* Obviously, this application is not doing much yet,
* but you can already invoke getRuntimeAndConnect to test if you can connect to a deployed Corfu service.
*
* Above, you will need to point it to a host and port which is running the service.
* See {@link https://github.com/CorfuDB/CorfuDB} for instructions on how to deploy Corfu.
*/
Table<Sample.StringKey, Sample.IntValue, Sample.Metadata> mapA = corfuStoreSource.openTable(
NAMESPACE,
streamA,
Sample.StringKey.class,
Sample.IntValue.class,
Sample.Metadata.class,
TableOptions.builder().schemaOptions(
CorfuOptions.SchemaOptions.newBuilder()
.setIsFederated(true)
.addStreamTag(ObjectsView.LOG_REPLICATOR_STREAM_INFO.getTagName())
.build())
.build()
);

/**
* Next, we will illustrate how to declare a Java object backed by a Corfu Stream.
* A Corfu Stream is a log dedicated specifically to the history of updates of one object.
* We will instantiate a stream by giving it a name "A",
* and then instantiate an object by specifying its class
*/
Map<String, Integer> map = runtime.getObjectsView()
.build()
.setStreamName("A") // stream name
.setTypeToken(new TypeToken<PersistentCorfuTable<String, Integer>>() {})
.open(); // instantiate the object!
Table<Sample.StringKey, Sample.IntValue, Sample.Metadata> mapASink = corfuStoreSink.openTable(
NAMESPACE,
streamA,
Sample.StringKey.class,
Sample.IntValue.class,
Sample.Metadata.class,
TableOptions.builder().schemaOptions(
CorfuOptions.SchemaOptions.newBuilder()
.setIsFederated(true)
.addStreamTag(ObjectsView.LOG_REPLICATOR_STREAM_INFO.getTagName())
.build())
.build()
);

/**
* The magic has already happened! map is an in-memory view of a shared map, backed by the Corfu log.
* The application can perform put and get on this map from different application instances,
* crash and restart applications, and so on.
* The map will persist and be consistent across all applications.
*
* For example, try the following code repeatedly in a sequence, in between run/exit,
* from multiple instances, and see the different interleaving of values that result.
*/
Integer previous = map.get("a");
if (previous == null) {
System.out.println("This is the first time we were run!");
map.put("a", 1);
int totalEntries = 200;
int startIndex = 0;

int maxIndex = totalEntries + startIndex;
for (int i = startIndex; i < maxIndex; i++) {
try (TxnContext txn = corfuStoreSource.txn(NAMESPACE)) {
txn.putRecord(mapA, Sample.StringKey.newBuilder().setKey(String.valueOf(i)).build(),
Sample.IntValue.newBuilder().setValue(i).build(), null);
txn.commit();
}
}
else {
map.put("a", ++previous);
System.out.println("This is the " + previous + " time we were run!");

while (true) {
try (TxnContext txn = corfuStoreSource.txn(NAMESPACE)) {
int tableSize = txn.getTable(streamA).count();
System.out.println("Size of source table is: " + tableSize);
txn.commit();
}

try (TxnContext txn = corfuStoreSink.txn(NAMESPACE)) {
int tableSize = txn.getTable(streamA).count();

System.out.println("Size of sink table is: " + tableSize);
txn.commit();

if (tableSize == 200) {
break;
}
}
TimeUnit.SECONDS.sleep(5);
}
}
}
18 changes: 15 additions & 3 deletions cloud/corfu/corfu/config/corfu_plugin_config.properties
Original file line number Diff line number Diff line change
@@ -1,15 +1,27 @@
# Transport Plugin Configuration
transport_adapter_JAR_path=/app/corfu.jar
GRPC_transport_adapter_server_class_name=org.corfudb.infrastructure.logreplication.transport.sample.GRPCLogReplicationServerChannelAdapter
GRPC_transport_adapter_client_class_name=org.corfudb.infrastructure.logreplication.transport.sample.GRPCLogReplicationClientChannelAdapter
NETTY_transport_adapter_server_class_name=org.corfudb.infrastructure.logreplication.transport.sample.NettyLogReplicationServerChannelAdapter
NETTY_transport_adapter_client_class_name=org.corfudb.infrastructure.logreplication.transport.sample.NettyLogReplicationClientChannelAdapter

# Transport plugin selector
transport_plugin_selector_JAR_path=/app/corfu.jar
transport_plugin_selector_class_name=org.corfudb.infrastructure.logreplication.infrastructure.plugins.DefaultTransportPluginSelector

# Transport Plugin Configuration
transport_adapter_server_class_name=org.corfudb.infrastructure.logreplication.transport.sample.GRPCLogReplicationServerChannelAdapter
transport_adapter_client_class_name=org.corfudb.infrastructure.logreplication.transport.sample.GRPCLogReplicationClientChannelAdapter

# Stream Fetcher Plugin Configuration
stream_fetcher_plugin_JAR_path=/app/corfu.jar
stream_fetcher_plugin_class_name=org.corfudb.infrastructure.logreplication.infrastructure.plugins.DefaultLogReplicationConfigAdapter

# Topology Manager Plugin Configuration
topology_manager_adapter_JAR_path=/app/corfu.jar
topology_manager_adapter_class_name=org.corfudb.infrastructure.logreplication.infrastructure.plugins.DefaultClusterManager

# Snapshot Sync Configuration (Plugin)
snapshot_sync_plugin_JAR_path=/app/corfu.jar
snapshot_sync_plugin_class_name=org.corfudb.infrastructure.logreplication.infrastructure.plugins.DefaultSnapshotSyncPlugin
saas_endpoint=corfu:9000
local_node_id_path=/usr/share/corfu/conf/serial_number

saas_endpoint=corfu-0.corfu-headless.default.svc.cluster.local:9000
3 changes: 2 additions & 1 deletion cloud/corfu/corfu/files/init_layout.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def generate_layout(args):
layout_template["layoutServers"] = fqdn_list
layout_template["sequencers"] = fqdn_list
layout_template["segments"][0]["stripes"][0]["logServers"] = fqdn_list
layout_template["clusterId"] = "123e4567-e89b-12d3-a456-556642440000"
layout_template["clusterId"] = "456e4567-e89b-12d3-a456-556642440001" if args.type == "source" else "456e4567-e89b-12d3-a456-556642440002"

# print layout
print("Generated layout:")
Expand All @@ -39,6 +39,7 @@ def main():
parser.add_argument('--replica', '-r', type=int, required=True, help='The replica of Corfu cluster.')
parser.add_argument('--statefulset', type=str, default='corfu', help='Corfu statefulset name.')
parser.add_argument('--headless', type=str, default='corfu-headless', required=True, help='Corfu headless service name.')
parser.add_argument('--type', type=str, default='source', required=True, help='Source or sink.')
args = parser.parse_args()

generate_layout(args)
Expand Down
1 change: 1 addition & 0 deletions cloud/corfu/corfu/serial_number/serial_number
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
serial=B1310142-311D-B7AE-D6A7-F66DEE607871
64 changes: 64 additions & 0 deletions cloud/corfu/corfu/templates/Deployment.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: {{ include "corfu.service.lr" . }}
labels:
{{- include "corfu.labels" . | indent 4 }}
spec:
replicas: {{ include "corfu.replicas" . }}
serviceName: {{ include "corfu.service.lr" . }}
selector:
matchLabels:
{{- include "corfu.selectors.lr" . | nindent 6 }}
template:
metadata:
labels:
{{- include "corfu.selectors.lr" . | nindent 8 }}
spec:
containers:
- name: {{ include "corfu.service.lr" . }}
image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
ports:
- name: http
containerPort: {{ .Values.lr.port }}
protocol: TCP
env:
- name: CONFIG_FILE_PATH
value: "/usr/share/corfu/conf/corfu_replication_config.properties"
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: POD_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: POD_UID
valueFrom:
fieldRef:
fieldPath: metadata.uid
VolumeMounts:
- name: log-dir
mountPath: /var/log/corfu-log-replication
- name: config-dir
mountPath: /config/corfu-log-replication
- name: lr
mountPath: /common/configs/
command:
- "sh"
- "-c"
- |
"java
-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/var/log/corfu/corfu_oom.hprof
-XX:+HeapDumpOnOutOfMemoryError
-Djdk.nio.maxCachedBufferSize=1048576
-Dio.netty.recycler.maxCapacityPerThread=0
-XX:+PrintGCApplicationStoppedTime
-XX:+PrintGCApplicationConcurrentTime
-Djava.io.tmpdir=/image/corfu-server/temp
-cp /app/corfu.jar:/opt/vmware/log-replication/log-replication_deploy.jar
-Djava.io.tmpdir=/tmp org.corfudb.infrastructure.CorfuServer
--plugin=/usr/share/corfu/conf/corfu_plugin_config.properties
-d DEBUG 9010 -m"
10 changes: 9 additions & 1 deletion cloud/corfu/corfu/templates/_helpers.tpl
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{{- define "corfu.name" -}}
corfu
{{ .Values.nameOverride }}
{{- end }}

{{- define "corfu.fullname" -}}
Expand Down Expand Up @@ -30,6 +30,14 @@ type: {{ .Values.type | default "config" | quote }}
app.kubernetes.io/name: {{ include "corfu.fullname" . }}
{{- end }}

{{- define "corfu.service.lr" -}}
{{- .Values.lr.name }}
{{- end }}

{{- define "corfu.selectors.lr" -}}
app.kubernetes.io/name: {{ include "corfu.service.lr" . }}
{{- end }}

{{/*
If replicas tag is defined in its own helm chart values.yaml
it will always override the global value. If not, we will use the global value.
Expand Down
3 changes: 2 additions & 1 deletion cloud/corfu/corfu/templates/certificate.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{{- if .Values.tls.certificate.enabled }}
{{- $dns := printf "*.corfu-headless.%s.svc.cluster.local" .Release.Namespace }}
{{- $dns := printf "*.%s-headless.%s.svc.cluster.local" .Values.nameOverride .Release.Namespace }}
apiVersion: cert-manager.io/v1
kind: Certificate
metadata:
Expand All @@ -25,6 +25,7 @@ spec:
- client auth
dnsNames:
- corfu
- corfu2
- "{{ $dns }}"
issuerRef:
name: {{ .Values.tls.certificate.issuer.name }}
Expand Down
Loading

0 comments on commit 746a4bf

Please sign in to comment.