Skip to content

Commit

Permalink
Add TestWorkerPoolSingleQueueMultiDB
Browse files Browse the repository at this point in the history
  • Loading branch information
winebarrel committed Oct 13, 2024
1 parent a118b70 commit 042e1da
Showing 1 changed file with 130 additions and 0 deletions.
130 changes: 130 additions & 0 deletions workerpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package qg_test

import (
"database/sql"
"fmt"
"sort"
"strings"
"sync"
"testing"
"time"

"github.com/kanmu/qg/v5"
)
Expand Down Expand Up @@ -419,3 +421,131 @@ func TestWorkerPoolMultiDB(t *testing.T) {
t.Errorf("unexpected que_jobs count: %d", queJobsCount)
}
}

func TestWorkerPoolSingleQueueMultiDB(t *testing.T) {
var pool1, pool2 *qg.WorkerPool
var wg sync.WaitGroup
theOneQueue := "the-one-queue"

wm := qg.WorkMap{
"job1": func(j *qg.Job) error {
defer wg.Done()
tx := j.Tx()
_, err := tx.Exec("INSERT INTO job_test (job_id, name, value) VALUES ($1, $2, pg_backend_pid())", j.ID, "job1")
time.Sleep(300 * time.Millisecond)
return err
},
"job2": func(j *qg.Job) error {
defer wg.Done()
tx := j.Tx()
_, err := tx.Exec("INSERT INTO job_test (job_id, name, value) VALUES ($1, $2, pg_backend_pid())", j.ID, "job2")
time.Sleep(300 * time.Millisecond)
return err
},
"job3": func(j *qg.Job) error {
defer wg.Done()
tx := j.Tx()
_, err := tx.Exec("INSERT INTO job_test (job_id, name, value) VALUES ($1, $2, pg_backend_pid())", j.ID, "job3")
time.Sleep(300 * time.Millisecond)
return err
},
"job4": func(j *qg.Job) error {
defer wg.Done()
tx := j.Tx()
time.Sleep(300 * time.Millisecond)
_, err := tx.Exec("INSERT INTO job_test (job_id, name, value) VALUES ($1, $2, pg_backend_pid())", j.ID, "job4")
return err
},
}

// thread1
{
connector, err := qg.GetConnector("localhost", 5432, "qgtest", "", "qgtest")

if err != nil {
t.Fatal(err)
}

db := sql.OpenDB(connector)
client := qg.MustNewClient(db)
pool1 = qg.NewWorkerPool(client, wm, 4)
pool1.Queue = theOneQueue
}

// thread2
{
connector, err := qg.GetConnector("localhost", 5432, "qgtest", "", "qgtest")

if err != nil {
t.Fatal(err)
}

db := sql.OpenDB(connector)
client := qg.MustNewClient(db)
pool2 = qg.NewWorkerPool(client, wm, 4)
pool2.Queue = theOneQueue
}

checkDB, err := sql.Open("pgx", "postgres://qgtest@localhost:5432/qgtest")
if err != nil {
t.Fatal(err)
}
_, err = checkDB.Exec("TRUNCATE TABLE que_jobs, job_test")
if err != nil {
t.Fatal(err)
}

connector, err := qg.GetConnector("localhost", 5432, "qgtest", "", "qgtest")

if err != nil {
t.Fatal(err)
}

db := sql.OpenDB(connector)
client := qg.MustNewClient(db)

for range 2 {
for _, jobType := range []string{"job1", "job2", "job3", "job4"} {
client.Enqueue(&qg.Job{Type: jobType, Queue: theOneQueue})
wg.Add(1)
}
}

pool1.Start()
pool2.Start()
wg.Wait()
pool1.Shutdown()
pool2.Shutdown()

rs, err := checkDB.Query("SELECT name, value, COUNT(*) as count FROM job_test GROUP BY name, value")
if err != nil {
t.Fatal(err)
}

rows := []string{}

for rs.Next() {
var name, _value string
var count int
err = rs.Scan(&name, &_value, &count)
if err != nil {
t.Fatal(err)
}
rows = append(rows, fmt.Sprintf("%s,%d", name, count))
}

sort.Strings(rows)

if strings.Join(rows, " ") != "job1,1 job1,1 job2,1 job2,1 job3,1 job3,1 job4,1 job4,1" {
t.Errorf("unexpected result: %v", rows)
}

var queJobsCount int
err = checkDB.QueryRow("SELECT COUNT(*) FROM que_jobs").Scan(&queJobsCount)
if err != nil {
t.Fatal(err)
}
if queJobsCount != 0 {
t.Errorf("unexpected que_jobs count: %d", queJobsCount)
}
}

0 comments on commit 042e1da

Please sign in to comment.