Skip to content

Commit

Permalink
Make it run on more than one node (#39)
Browse files Browse the repository at this point in the history
* poc working

Signed-off-by: Matteo Collina <[email protected]>

* running on postgresql

Signed-off-by: Matteo Collina <[email protected]>

* election working

Signed-off-by: Matteo Collina <[email protected]>

* working election

Signed-off-by: Matteo Collina <[email protected]>

* add pg to gh actions

Signed-off-by: Matteo Collina <[email protected]>

* test neon

Signed-off-by: Matteo Collina <[email protected]>

* fixup

Signed-off-by: Matteo Collina <[email protected]>

* fixup

Signed-off-by: Matteo Collina <[email protected]>

* fixup

Signed-off-by: Matteo Collina <[email protected]>

* update to latest plt

Signed-off-by: Matteo Collina <[email protected]>

* fixup

Signed-off-by: Matteo Collina <[email protected]>

* fixup

Signed-off-by: Matteo Collina <[email protected]>

* fixup

Signed-off-by: Matteo Collina <[email protected]>

* fixup

Signed-off-by: Matteo Collina <[email protected]>

* fixup

Signed-off-by: Matteo Collina <[email protected]>

* fixup

Signed-off-by: Matteo Collina <[email protected]>

* fixup

Signed-off-by: Matteo Collina <[email protected]>

* fixup

Signed-off-by: Matteo Collina <[email protected]>

* fixup

Signed-off-by: Matteo Collina <[email protected]>

* fixup

Signed-off-by: Matteo Collina <[email protected]>

* fixup

Signed-off-by: Matteo Collina <[email protected]>

* fixup

Signed-off-by: Matteo Collina <[email protected]>

* fixup

Signed-off-by: Matteo Collina <[email protected]>

---------

Signed-off-by: Matteo Collina <[email protected]>
  • Loading branch information
mcollina authored Mar 16, 2023
1 parent bd5090d commit e259350
Show file tree
Hide file tree
Showing 24 changed files with 814 additions and 279 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ jobs:
with:
node-version: ${{ matrix.node-version }}

- uses: ikalnytskyi/action-setup-postgres@v4

- name: Install
run: |
npm ci
Expand Down
32 changes: 30 additions & 2 deletions .github/workflows/platformatic-deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ on:
- 'docs/**'
- '**.md'

# This allows a subsequently queued workflow run to interrupt previous runs
concurrency:
group: "${{ github.workflow }} @ ${{ github.event.pull_request.head.label || github.head_ref || github.ref }}"
cancel-in-progress: true

jobs:
build_and_deploy:
runs-on: ubuntu-latest
Expand All @@ -15,16 +20,39 @@ jobs:
uses: actions/checkout@v3
- name: npm install --omit=dev
run: npm install --omit=dev
- name: Get PR number
id: get_pull_number
run: |
pull_sha=$(jq --raw-output .pull_request.base.sha "$GITHUB_EVENT_PATH")
echo "pull_sha=${pull_sha}" >> $GITHUB_OUTPUT
echo $pull_sha
- uses: neondatabase/delete-branch-by-name-action@main
with:
project_id: ${{ secrets.NEON_PROJECT_ID }}
branch_name: ${{ steps.get_pull_number.outputs.pull_sha }}
api_key: ${{ secrets.NEON_API_KEY }}
- run: sleep 10
- uses: neondatabase/create-branch-action@v2
with:
project_id: ${{ secrets.NEON_PROJECT_ID }}
branch_name: ${{ steps.get_pull_number.outputs.pull_sha }}
api_key: ${{ secrets.NEON_API_KEY }}
username: ${{ secrets.DBUSER }}
Password: ${{ secrets.DBPASSWORD }}
id: create-branch
- name: Get DATABASE_URL
run: echo DATABASE_URL=${{ steps.create-branch.outputs.db_url}}/neondb
- name: Deploy project
uses: platformatic/onestep@latest
with:
github_token: ${{ secrets.GITHUB_TOKEN }}
platformatic_api_key: ${{ secrets.PLATFORMATIC_API_KEY }}
platformatic_config_path: ./platformatic.db.json
env:
DATABASE_URL: sqlite://./db.sqlite
PLT_SERVER_LOGGER_LEVEL: info
PORT: 3042
PLT_SERVER_HOSTNAME: 127.0.0.1
PLT_ADMIN_SECRET: ${{ secrets.PLT_ADMIN_SECRET }}

PLT_LOCK: ${{ vars.PLT_LOCK }}
PLT_LEADER_POLL: ${{ vars.PLT_LEADER_POLL }}
DATABASE_URL: ${{ steps.create-branch.outputs.db_url}}/neondb
1 change: 1 addition & 0 deletions .taprc.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
jobs: 1
25 changes: 25 additions & 0 deletions bully-poc/bully.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import createConnectionPool, { sql } from '@databases/pg'
import { setTimeout as sleep } from 'timers/promises'

// 673 - "outbox" charCodeAt summed up
let n = 0
for (const c of 'outbox') {
n += c.charCodeAt(0)
}

const db = createConnectionPool(
'postgres://postgres:[email protected]:5432/postgres'
)

console.log(process.pid)

while (true) {
const results = await db.query(sql`
SELECT pg_try_advisory_lock(${n}) as result;
`)

console.log(results)
await sleep(1000)
}

// await db.dispose()
17 changes: 17 additions & 0 deletions bully-poc/listen.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import createConnectionPool, { sql } from '@databases/pg'
import { on } from 'events'

const db = createConnectionPool(
'postgres://postgres:[email protected]:5432/postgres'
)

console.log(process.pid)

await db.task(async (t) => {
await t.query(sql`
LISTEN "messages";
`)
for await (const notification of on(t._driver.client, 'notification')) {
console.log(notification)
}
})
15 changes: 15 additions & 0 deletions bully-poc/notify.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import createConnectionPool, { sql } from '@databases/pg'
import { setTimeout as sleep } from 'timers/promises'

const db = createConnectionPool(
'postgres://postgres:[email protected]:5432/postgres'
)

console.log(process.pid)

const results = await db.query(sql`
NOTIFY "messages";
`)

console.log(results)
await sleep(6000)
8 changes: 8 additions & 0 deletions docker-compose-apple-silicon.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
version: "3.3"
services:
postgresql:
ports:
- "5432:5432"
image: "arm64v8/postgres:15-alpine"
environment:
- POSTGRES_PASSWORD=postgres
13 changes: 13 additions & 0 deletions global.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,19 @@ import { Queue } from './types/Queue'
import { Message } from './types/Message'
import { Cron } from './types/Cron'

declare module 'fastify' {
interface FastifyInstance {
getSchema(schemaId: 'Queue' | 'Message' | 'Cron'): {
'$id': string,
title: string,
description: string,
type: string,
properties: object,
required: string[]
};
}
}

declare module '@platformatic/sql-mapper' {
interface Entities {
queue: Entity<Queue>,
Expand Down
2 changes: 1 addition & 1 deletion lib/cron.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ module.exports = fp(async function (app) {
throw _err
}

const next = interval.next().getTime()
const next = interval.next()
const cron = await original({
...args,
tx
Expand Down
61 changes: 38 additions & 23 deletions lib/executor.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,45 +8,51 @@ class Executor {
constructor (app) {
this.app = app
this.timer = null
this.execute = this.execute.bind(this)
this.execute = () => {
/* c8 ignore next 3 */
this._execute().catch((err) => {
app.log.error({ err })
})
}
}

async execute () {
async _execute () {
const app = this.app
const now = Date.now()
const now = new Date()
const { db, sql } = app.platformatic

/* Write a join sql query between messages and queue on queue_id column */
const messages = await db.query(sql`
SELECT queues.callback_url AS callbackUrl,
queues.method as method,
messages.body AS body,
messages.headers AS headers,
messages.retries AS retries,
queues.headers AS queueHeaders,
queues.max_retries AS maxRetries,
queues.dead_letter_queue_id AS deadLetterQueueId,
messages.\`when\` AS \`when\`,
messages.cron_id AS cronId,
messages.id AS id
SELECT queues.callback_url AS "callbackUrl",
queues.method as "method",
messages.body AS "body",
messages.headers AS "headers",
messages.retries AS "retries",
queues.headers AS "queueHeaders",
queues.max_retries AS "maxRetries",
queues.dead_letter_queue_id AS "deadLetterQueueId",
messages."when" AS "when",
messages.cron_id AS "cronId",
messages.id AS "id"
FROM messages
INNER JOIN queues ON messages.queue_id = queues.id
WHERE messages.sent_at IS NULL
AND messages.\`when\` <= ${now}
AND messages.failed = false
AND messages."when" <= ${now}
LIMIT 10
`)

await Promise.allSettled(messages.map(async (message) => {
const res = await Promise.allSettled(messages.map(async (message) => {
const { callbackUrl, method, body, maxRetries, deadLetterQueueId } = message
// We must JSON.parse(message.headers) because SQLite store JSON
// as strings.
const headers = {
...(message.queueHeaders ? JSON.parse(message.queueHeaders) : {}),
...(message.headers ? JSON.parse(message.headers) : {})
...message.queueHeaders,
...message.headers
}
headers['content-type'] ||= 'application/json'

Expand All @@ -59,7 +65,7 @@ class Executor {
await app.platformatic.entities.message.save({
input: {
id: message.id,
sentAt: new Date().getTime()
sentAt: new Date()
},
tx
})
Expand All @@ -71,7 +77,7 @@ class Executor {
await app.platformatic.entities.message.save({
input: {
id: message.id,
sentAt: new Date().getTime(),
sentAt: new Date(),
failed: true
},
tx
Expand All @@ -91,7 +97,7 @@ class Executor {
const newItem = {
id: message.id,
retries: backoff.retries,
when: new Date(Date.now() + backoff.waitFor).getTime()
when: new Date(Date.now() + backoff.waitFor)
}
await app.platformatic.entities.message.save({ input: newItem, tx })
}
Expand All @@ -101,7 +107,7 @@ class Executor {
if (message.cronId) {
const [cron] = await app.platformatic.entities.cron.find({ where: { id: { eq: message.cronId } }, tx })
const interval = cronParser.parseExpression(cron.schedule)
const next = interval.next().getTime()
const next = interval.next()
await app.platformatic.entities.message.save({
input: {
queueId: cron.queueId,
Expand All @@ -116,6 +122,13 @@ class Executor {
})
}))

/* c8 ignore next 4 */
for (const r of res) {
if (r.status === 'rejected') {
app.log.error({ err: r.reason }, 'error while executing message')
}
}

const [next] = await app.platformatic.entities.message.find({
where: {
sentAt: {
Expand All @@ -129,7 +142,9 @@ class Executor {
})

if (next) {
const delay = next.when - now
const whenTime = new Date(next.when).getTime()
const nowTime = new Date(now).getTime()
const delay = whenTime - nowTime
clearTimeout(this.timer)
this.timer = setTimeout(this.execute, delay)
this.nextTime = now + delay
Expand Down
2 changes: 1 addition & 1 deletion migrations/001.do.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

/* create a queues table */
CREATE TABLE queues (
id INTEGER PRIMARY KEY,
id SERIAL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL
Expand Down
4 changes: 2 additions & 2 deletions migrations/002.do.sql
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
/* creates an items table */
CREATE TABLE items (
id INTEGER PRIMARY KEY,
id SERIAL PRIMARY KEY,
queue_id INTEGER NOT NULL REFERENCES queues(id),
`when` TIMESTAMP NOT NULL,
"when" TIMESTAMP NOT NULL,
/* add some validations, this should be an URL */
callback_url VARCHAR(2048) NOT NULL,
/* add some validations, this should be an enum */
Expand Down
2 changes: 1 addition & 1 deletion migrations/005.do.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* create crons table */
CREATE TABLE crons (
id INTEGER PRIMARY KEY,
id SERIAL PRIMARY KEY,
queue_id INTEGER NOT NULL REFERENCES queues(id),

schedule VARCHAR(255) NOT NULL,
Expand Down
Loading

0 comments on commit e259350

Please sign in to comment.