From c012d50770dc4b6e0f599509643406e0d35de34d Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Wed, 22 Jan 2025 17:12:09 -0800 Subject: [PATCH 1/2] WIP --- .../commands/start/ZillabaseStartCommand.java | 7 +- .../migrations/000001__create_events.sql | 77 ++----------------- .../migrations/000003__create_handlers.sql | 71 +++++++++++++++++ ... => 000004__create_unique_id_function.sql} | 0 ...ties.sql => 000005__create_activities.sql} | 0 ...ql => 000006__create_payment_requests.sql} | 0 ...00007__create_payment_risk_assessment.sql} | 0 7 files changed, 78 insertions(+), 77 deletions(-) create mode 100644 examples/streampay/zillabase/migrations/000003__create_handlers.sql rename examples/streampay/zillabase/migrations/{000003__create_unique_id_function.sql => 000004__create_unique_id_function.sql} (100%) rename examples/streampay/zillabase/migrations/{000004__create_activities.sql => 000005__create_activities.sql} (100%) rename examples/streampay/zillabase/migrations/{000005__create_payment_requests.sql => 000006__create_payment_requests.sql} (100%) rename examples/streampay/zillabase/migrations/{000006__create_payment_risk_assessment.sql => 000007__create_payment_risk_assessment.sql} (100%) diff --git a/cli/src/main/java/io/aklivity/zillabase/cli/internal/commands/start/ZillabaseStartCommand.java b/cli/src/main/java/io/aklivity/zillabase/cli/internal/commands/start/ZillabaseStartCommand.java index a15939de..e45d3f8f 100644 --- a/cli/src/main/java/io/aklivity/zillabase/cli/internal/commands/start/ZillabaseStartCommand.java +++ b/cli/src/main/java/io/aklivity/zillabase/cli/internal/commands/start/ZillabaseStartCommand.java @@ -362,9 +362,6 @@ private void processInitSql( CREATE TABLE zb_catalog.zfunctions( name VARCHAR PRIMARY KEY, sql VARCHAR); - CREATE TABLE zb_catalog.zstreams( - name VARCHAR PRIMARY KEY, - sql VARCHAR); """); } } @@ -941,7 +938,7 @@ private void extractedHeaders( { for (KafkaTopicSchemaRecord record : records) { - if (record.name.endsWith("_replies_sink")) + if (record.name.endsWith("_replies")) { ZillaBindingOptionsConfig.KafkaTopicConfig topicConfig = new ZillaBindingOptionsConfig.KafkaTopicConfig(); @@ -1462,7 +1459,7 @@ private String generateKafkaAsyncApiSpecs( operation.setMessages(Collections.singletonList(reference)); if (name.endsWith("_commands")) { - String replyTopic = name.replace("_commands", "_replies_sink"); + String replyTopic = name.replace("_commands", "_replies"); OperationReply reply = new OperationReply(); reference = new Reference("#/channels/%s".formatted(replyTopic)); reply.setChannel(reference); diff --git a/examples/streampay/zillabase/migrations/000001__create_events.sql b/examples/streampay/zillabase/migrations/000001__create_events.sql index 7df1e6aa..0295a704 100644 --- a/examples/streampay/zillabase/migrations/000001__create_events.sql +++ b/examples/streampay/zillabase/migrations/000001__create_events.sql @@ -1,79 +1,12 @@ -- create_events -CREATE ZFUNCTION send_payment_handler( - user_id VARCHAR, - request_id VARCHAR, - amount DOUBLE PRECISION, - notes VARCHAR) -RETURNS TABLE( - event VARCHAR, - user_id VARCHAR, - request_id VARCHAR, - amount DOUBLE PRECISION, - notes VARCHAR) -LANGUAGE SQL AS $$ - SELECT - 'PaymentSent' AS event, - args.user_id, - args.request_id, - args.amount, - args.notes; - $$; - -CREATE ZFUNCTION request_payment_handler( - user_id VARCHAR, - request_id VARCHAR, - amount DOUBLE PRECISION, - notes VARCHAR) -RETURNS TABLE( - event VARCHAR, - user_id VARCHAR, - request_id VARCHAR, - amount DOUBLE PRECISION, - notes VARCHAR) -LANGUAGE SQL AS $$ - SELECT - 'PaymentRequested' AS event, - args.user_id, - args.request_id, - args.amount, - args.notes; - $$; - -CREATE ZFUNCTION reject_payment_handler( - user_id VARCHAR, - request_id VARCHAR, - amount DOUBLE PRECISION, - notes VARCHAR) -RETURNS TABLE( - event VARCHAR, - user_id VARCHAR, - request_id VARCHAR, - amount DOUBLE PRECISION, - notes VARCHAR) -LANGUAGE SQL AS $$ - SELECT - 'PaymentRejected' AS event, - args.user_id, - args.request_id, - args.amount, - args.notes; - $$; - -CREATE ZSTREAM streampay_events( +CREATE TABLE streampay_events ( + correlation_id VARCHAR PRIMARY KEY, + owner_id VARCHAR, + created_at TIMESTAMPTZ, event VARCHAR, user_id VARCHAR, request_id VARCHAR, amount DOUBLE PRECISION, - notes VARCHAR, - owner_id VARCHAR GENERATED ALWAYS AS IDENTITY, - created_at TIMESTAMP GENERATED ALWAYS AS NOW -) -WITH ( - DISPATCH_ON = 'command', - HANDLERS = ( - 'SendPayment' TO 'send_payment_handler', - 'RequestPayment' TO 'request_payment_handler', - 'RejectPayment' TO 'reject_payment_handler' - ) + notes VARCHAR ); diff --git a/examples/streampay/zillabase/migrations/000003__create_handlers.sql b/examples/streampay/zillabase/migrations/000003__create_handlers.sql new file mode 100644 index 00000000..36d5373a --- /dev/null +++ b/examples/streampay/zillabase/migrations/000003__create_handlers.sql @@ -0,0 +1,71 @@ +-- create_handlers + +CREATE ZFUNCTION streampay_send_payment( + user_id VARCHAR, + request_id VARCHAR, + amount DOUBLE PRECISION, + notes VARCHAR) +RETURNS TABLE( + event VARCHAR, + user_id VARCHAR, + request_id VARCHAR, + amount DOUBLE PRECISION, + notes VARCHAR) +LANGUAGE SQL AS $$ + SELECT + 'PaymentSent' AS event, + args.user_id, + args.request_id, + args.amount, + args.notes; +$$ +WITH( + EVENTS = 'streampay_events' +); + +CREATE ZFUNCTION streampay_request_payment( + user_id VARCHAR, + request_id VARCHAR, + amount DOUBLE PRECISION, + notes VARCHAR) +RETURNS TABLE( + event VARCHAR, + user_id VARCHAR, + request_id VARCHAR, + amount DOUBLE PRECISION, + notes VARCHAR) +LANGUAGE SQL AS $$ + SELECT + 'PaymentRequested' AS event, + args.user_id, + args.request_id, + args.amount, + args.notes; +$$ +WITH( + EVENTS = 'streampay_events' +); + +CREATE ZFUNCTION streampay_reject_payment( + user_id VARCHAR, + request_id VARCHAR, + amount DOUBLE PRECISION, + notes VARCHAR) +RETURNS TABLE( + event VARCHAR, + user_id VARCHAR, + request_id VARCHAR, + amount DOUBLE PRECISION, + notes VARCHAR) +LANGUAGE SQL AS $$ + SELECT + 'PaymentRejected' AS event, + args.user_id, + args.request_id, + args.amount, + args.notes; +$$ +WITH( + EVENTS = 'streampay_events' +); + diff --git a/examples/streampay/zillabase/migrations/000003__create_unique_id_function.sql b/examples/streampay/zillabase/migrations/000004__create_unique_id_function.sql similarity index 100% rename from examples/streampay/zillabase/migrations/000003__create_unique_id_function.sql rename to examples/streampay/zillabase/migrations/000004__create_unique_id_function.sql diff --git a/examples/streampay/zillabase/migrations/000004__create_activities.sql b/examples/streampay/zillabase/migrations/000005__create_activities.sql similarity index 100% rename from examples/streampay/zillabase/migrations/000004__create_activities.sql rename to examples/streampay/zillabase/migrations/000005__create_activities.sql diff --git a/examples/streampay/zillabase/migrations/000005__create_payment_requests.sql b/examples/streampay/zillabase/migrations/000006__create_payment_requests.sql similarity index 100% rename from examples/streampay/zillabase/migrations/000005__create_payment_requests.sql rename to examples/streampay/zillabase/migrations/000006__create_payment_requests.sql diff --git a/examples/streampay/zillabase/migrations/000006__create_payment_risk_assessment.sql b/examples/streampay/zillabase/migrations/000007__create_payment_risk_assessment.sql similarity index 100% rename from examples/streampay/zillabase/migrations/000006__create_payment_risk_assessment.sql rename to examples/streampay/zillabase/migrations/000007__create_payment_risk_assessment.sql From ae71259e81e21961bd83a063b047232b30e7469c Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Tue, 28 Jan 2025 09:42:22 -0800 Subject: [PATCH 2/2] Fix endpoints --- .../aklivity/zillabase/cli/config/ZillabaseZillaConfig.java | 2 +- examples/streampay/src/components/PayOrRequestForm.vue | 6 ++---- examples/streampay/src/pages/RequestPage.vue | 6 ++---- .../zillabase/migrations/000005__create_activities.sql | 2 +- .../migrations/000006__create_payment_requests.sql | 2 +- 5 files changed, 7 insertions(+), 11 deletions(-) diff --git a/cli/src/main/java/io/aklivity/zillabase/cli/config/ZillabaseZillaConfig.java b/cli/src/main/java/io/aklivity/zillabase/cli/config/ZillabaseZillaConfig.java index 3b78e345..ac5eb4e7 100644 --- a/cli/src/main/java/io/aklivity/zillabase/cli/config/ZillabaseZillaConfig.java +++ b/cli/src/main/java/io/aklivity/zillabase/cli/config/ZillabaseZillaConfig.java @@ -18,7 +18,7 @@ public class ZillabaseZillaConfig { - public static final String DEFAULT_ZILLA_TAG = "0.9.121"; + public static final String DEFAULT_ZILLA_TAG = "0.9.122"; private static final List DEFAULT_PORT_LIST = List.of( new ZillabaseZillaPortConfig() diff --git a/examples/streampay/src/components/PayOrRequestForm.vue b/examples/streampay/src/components/PayOrRequestForm.vue index 53409045..4917403a 100644 --- a/examples/streampay/src/components/PayOrRequestForm.vue +++ b/examples/streampay/src/components/PayOrRequestForm.vue @@ -60,8 +60,7 @@ async function onPay() { if (balance.value - amount.value > 0) { const accessToken = keycloak.token; const authorization = { Authorization: `Bearer ${accessToken}` }; - api.post('/streampay_events_commands', { - command: 'SendPayment', + api.post('/streampay_send_payment_commands', { user_id: userOption.value?.value, request_id: '', amount: +amount.value, @@ -96,8 +95,7 @@ async function onPay() { async function onRequest() { const accessToken = keycloak.token; const authorization = { Authorization: `Bearer ${accessToken}` }; - api.post('/streampay_commands', { - command: 'RequestPayment', + api.post('/streampay_request_payment_commands', { user_id: userOption.value?.value, request_id: '', amount: +amount.value, diff --git a/examples/streampay/src/pages/RequestPage.vue b/examples/streampay/src/pages/RequestPage.vue index 747f4895..5ce23157 100644 --- a/examples/streampay/src/pages/RequestPage.vue +++ b/examples/streampay/src/pages/RequestPage.vue @@ -115,8 +115,7 @@ async function readRequests() { } function pay(request: any) { - api.post('/streampay_commands', { - command: 'SendPayment', + api.post('/streampay_send_payment_commands', { user_id: request.from_user_id, request_id: request?.id || '', amount: request.amount, @@ -141,8 +140,7 @@ function pay(request: any) { } function reject(request: any) { - api.post('/streampay_commands', { - command: 'RejectRequest', + api.post('/streampay_reject_payment_commands', { user_id: request.from_user_id, request_id: request?.id || '', amount: request.amount, diff --git a/examples/streampay/zillabase/migrations/000005__create_activities.sql b/examples/streampay/zillabase/migrations/000005__create_activities.sql index 6fad1e2a..2dfa5bee 100644 --- a/examples/streampay/zillabase/migrations/000005__create_activities.sql +++ b/examples/streampay/zillabase/migrations/000005__create_activities.sql @@ -20,7 +20,7 @@ CREATE ZVIEW streampay_activities AS SELECT generate_unique_id()::varchar AS id, 'PaymentReceived' AS eventName, - sc.user_id AS from_user_id, + sc.owner_id AS from_user_id, fu.username AS from_username, sc.user_id AS to_user_id, tu.username AS to_username, diff --git a/examples/streampay/zillabase/migrations/000006__create_payment_requests.sql b/examples/streampay/zillabase/migrations/000006__create_payment_requests.sql index 23b168e4..a9eadeec 100644 --- a/examples/streampay/zillabase/migrations/000006__create_payment_requests.sql +++ b/examples/streampay/zillabase/migrations/000006__create_payment_requests.sql @@ -1,6 +1,6 @@ -- create_payment_requests -CREATE VIEW request_payments AS +CREATE MATERIALIZED VIEW request_payments AS SELECT CASE WHEN sc.request_id IS NULL OR sc.request_id = '' THEN generate_unique_id()::varchar