Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support new zfunction format #171

Merged
merged 2 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

public class ZillabaseZillaConfig
{
public static final String DEFAULT_ZILLA_TAG = "0.9.121";
public static final String DEFAULT_ZILLA_TAG = "0.9.122";

private static final List<ZillabaseZillaPortConfig> DEFAULT_PORT_LIST = List.of(
new ZillabaseZillaPortConfig()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,9 +362,6 @@ private void processInitSql(
CREATE TABLE zb_catalog.zfunctions(
name VARCHAR PRIMARY KEY,
sql VARCHAR);
CREATE TABLE zb_catalog.zstreams(
name VARCHAR PRIMARY KEY,
sql VARCHAR);
""");
}
}
Expand Down Expand Up @@ -941,7 +938,7 @@ private void extractedHeaders(
{
for (KafkaTopicSchemaRecord record : records)
{
if (record.name.endsWith("_replies_sink"))
if (record.name.endsWith("_replies"))
{
ZillaBindingOptionsConfig.KafkaTopicConfig topicConfig =
new ZillaBindingOptionsConfig.KafkaTopicConfig();
Expand Down Expand Up @@ -1462,7 +1459,7 @@ private String generateKafkaAsyncApiSpecs(
operation.setMessages(Collections.singletonList(reference));
if (name.endsWith("_commands"))
{
String replyTopic = name.replace("_commands", "_replies_sink");
String replyTopic = name.replace("_commands", "_replies");
OperationReply reply = new OperationReply();
reference = new Reference("#/channels/%s".formatted(replyTopic));
reply.setChannel(reference);
Expand Down
6 changes: 2 additions & 4 deletions examples/streampay/src/components/PayOrRequestForm.vue
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ async function onPay() {
if (balance.value - amount.value > 0) {
const accessToken = keycloak.token;
const authorization = { Authorization: `Bearer ${accessToken}` };
api.post('/streampay_events_commands', {
command: 'SendPayment',
api.post('/streampay_send_payment_commands', {
user_id: userOption.value?.value,
request_id: '',
amount: +amount.value,
Expand Down Expand Up @@ -96,8 +95,7 @@ async function onPay() {
async function onRequest() {
const accessToken = keycloak.token;
const authorization = { Authorization: `Bearer ${accessToken}` };
api.post('/streampay_commands', {
command: 'RequestPayment',
api.post('/streampay_request_payment_commands', {
user_id: userOption.value?.value,
request_id: '',
amount: +amount.value,
Expand Down
6 changes: 2 additions & 4 deletions examples/streampay/src/pages/RequestPage.vue
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,7 @@ async function readRequests() {
}

function pay(request: any) {
api.post('/streampay_commands', {
command: 'SendPayment',
api.post('/streampay_send_payment_commands', {
user_id: request.from_user_id,
request_id: request?.id || '',
amount: request.amount,
Expand All @@ -141,8 +140,7 @@ function pay(request: any) {
}

function reject(request: any) {
api.post('/streampay_commands', {
command: 'RejectRequest',
api.post('/streampay_reject_payment_commands', {
user_id: request.from_user_id,
request_id: request?.id || '',
amount: request.amount,
Expand Down
77 changes: 5 additions & 72 deletions examples/streampay/zillabase/migrations/000001__create_events.sql
Original file line number Diff line number Diff line change
@@ -1,79 +1,12 @@
-- create_events

CREATE ZFUNCTION send_payment_handler(
user_id VARCHAR,
request_id VARCHAR,
amount DOUBLE PRECISION,
notes VARCHAR)
RETURNS TABLE(
event VARCHAR,
user_id VARCHAR,
request_id VARCHAR,
amount DOUBLE PRECISION,
notes VARCHAR)
LANGUAGE SQL AS $$
SELECT
'PaymentSent' AS event,
args.user_id,
args.request_id,
args.amount,
args.notes;
$$;

CREATE ZFUNCTION request_payment_handler(
user_id VARCHAR,
request_id VARCHAR,
amount DOUBLE PRECISION,
notes VARCHAR)
RETURNS TABLE(
event VARCHAR,
user_id VARCHAR,
request_id VARCHAR,
amount DOUBLE PRECISION,
notes VARCHAR)
LANGUAGE SQL AS $$
SELECT
'PaymentRequested' AS event,
args.user_id,
args.request_id,
args.amount,
args.notes;
$$;

CREATE ZFUNCTION reject_payment_handler(
user_id VARCHAR,
request_id VARCHAR,
amount DOUBLE PRECISION,
notes VARCHAR)
RETURNS TABLE(
event VARCHAR,
user_id VARCHAR,
request_id VARCHAR,
amount DOUBLE PRECISION,
notes VARCHAR)
LANGUAGE SQL AS $$
SELECT
'PaymentRejected' AS event,
args.user_id,
args.request_id,
args.amount,
args.notes;
$$;

CREATE ZSTREAM streampay_events(
CREATE TABLE streampay_events (
correlation_id VARCHAR PRIMARY KEY,
owner_id VARCHAR,
created_at TIMESTAMPTZ,
event VARCHAR,
user_id VARCHAR,
request_id VARCHAR,
amount DOUBLE PRECISION,
notes VARCHAR,
owner_id VARCHAR GENERATED ALWAYS AS IDENTITY,
created_at TIMESTAMP GENERATED ALWAYS AS NOW
)
WITH (
DISPATCH_ON = 'command',
HANDLERS = (
'SendPayment' TO 'send_payment_handler',
'RequestPayment' TO 'request_payment_handler',
'RejectPayment' TO 'reject_payment_handler'
)
notes VARCHAR
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
-- create_handlers

CREATE ZFUNCTION streampay_send_payment(
user_id VARCHAR,
request_id VARCHAR,
amount DOUBLE PRECISION,
notes VARCHAR)
RETURNS TABLE(
event VARCHAR,
user_id VARCHAR,
request_id VARCHAR,
amount DOUBLE PRECISION,
notes VARCHAR)
LANGUAGE SQL AS $$
SELECT
'PaymentSent' AS event,
args.user_id,
args.request_id,
args.amount,
args.notes;
$$
WITH(
EVENTS = 'streampay_events'
);

CREATE ZFUNCTION streampay_request_payment(
user_id VARCHAR,
request_id VARCHAR,
amount DOUBLE PRECISION,
notes VARCHAR)
RETURNS TABLE(
event VARCHAR,
user_id VARCHAR,
request_id VARCHAR,
amount DOUBLE PRECISION,
notes VARCHAR)
LANGUAGE SQL AS $$
SELECT
'PaymentRequested' AS event,
args.user_id,
args.request_id,
args.amount,
args.notes;
$$
WITH(
EVENTS = 'streampay_events'
);

CREATE ZFUNCTION streampay_reject_payment(
user_id VARCHAR,
request_id VARCHAR,
amount DOUBLE PRECISION,
notes VARCHAR)
RETURNS TABLE(
event VARCHAR,
user_id VARCHAR,
request_id VARCHAR,
amount DOUBLE PRECISION,
notes VARCHAR)
LANGUAGE SQL AS $$
SELECT
'PaymentRejected' AS event,
args.user_id,
args.request_id,
args.amount,
args.notes;
$$
WITH(
EVENTS = 'streampay_events'
);

Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ CREATE ZVIEW streampay_activities AS
SELECT
generate_unique_id()::varchar AS id,
'PaymentReceived' AS eventName,
sc.user_id AS from_user_id,
sc.owner_id AS from_user_id,
fu.username AS from_username,
sc.user_id AS to_user_id,
tu.username AS to_username,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
-- create_payment_requests

CREATE VIEW request_payments AS
CREATE MATERIALIZED VIEW request_payments AS
SELECT
CASE
WHEN sc.request_id IS NULL OR sc.request_id = '' THEN generate_unique_id()::varchar
Expand Down