From b8e6b058bb53dba398d6d91da4a65c5969e527bd Mon Sep 17 00:00:00 2001 From: winebarrel Date: Sun, 13 Oct 2024 20:24:56 +0900 Subject: [PATCH 1/6] Split schema.sql --- Makefile | 1 + schema.sql | 11 ----------- schema_test.sql | 10 ++++++++++ 3 files changed, 11 insertions(+), 11 deletions(-) create mode 100644 schema_test.sql diff --git a/Makefile b/Makefile index 5de3724..c97edc6 100644 --- a/Makefile +++ b/Makefile @@ -18,3 +18,4 @@ db: .PHONY: table table: psql -U qgtest -h localhost -d qgtest -f schema.sql + psql -U qgtest -h localhost -d qgtest -f schema_test.sql diff --git a/schema.sql b/schema.sql index f4b080c..f98cb83 100644 --- a/schema.sql +++ b/schema.sql @@ -15,14 +15,3 @@ CREATE TABLE que_jobs ); COMMENT ON TABLE que_jobs IS '3'; - -DROP TABLE IF EXISTS job_test; - -CREATE TABLE job_test -( - job_id bigserial NOT NULL, - name text NOT NULL, - queue text NOT NULL, - - CONSTRAINT job_test_pkey PRIMARY KEY (job_id) -); diff --git a/schema_test.sql b/schema_test.sql new file mode 100644 index 0000000..f83d1d4 --- /dev/null +++ b/schema_test.sql @@ -0,0 +1,10 @@ +DROP TABLE IF EXISTS job_test; + +CREATE TABLE job_test +( + job_id bigserial NOT NULL, + name text NOT NULL, + queue text NOT NULL, + + CONSTRAINT job_test_pkey PRIMARY KEY (job_id) +); From a118b70b648264e5a081e1264528ba441dd80de1 Mon Sep 17 00:00:00 2001 From: winebarrel Date: Sun, 13 Oct 2024 20:26:49 +0900 Subject: [PATCH 2/6] Rename job_test.queue to job_test.value --- schema_test.sql | 2 +- workerpool_test.go | 64 +++++++++++++++++++++++----------------------- 2 files changed, 33 insertions(+), 33 deletions(-) diff --git a/schema_test.sql b/schema_test.sql index f83d1d4..366a8e3 100644 --- a/schema_test.sql +++ b/schema_test.sql @@ -4,7 +4,7 @@ CREATE TABLE job_test ( job_id bigserial NOT NULL, name text NOT NULL, - queue text NOT NULL, + value text NOT NULL, CONSTRAINT job_test_pkey PRIMARY KEY (job_id) ); diff --git a/workerpool_test.go b/workerpool_test.go index cae4831..4ae1cd1 100644 --- a/workerpool_test.go +++ b/workerpool_test.go @@ -26,25 +26,25 @@ func TestWorkerPool(t *testing.T) { "job1": func(j *qg.Job) error { defer wg.Done() tx := j.Tx() - _, err := tx.Exec("INSERT INTO job_test (job_id, name, queue) VALUES ($1, $2, $3)", j.ID, "job1", queueName1) + _, err := tx.Exec("INSERT INTO job_test (job_id, name, value) VALUES ($1, $2, $3)", j.ID, "job1", queueName1) return err }, "job2": func(j *qg.Job) error { defer wg.Done() tx := j.Tx() - _, err := tx.Exec("INSERT INTO job_test (job_id, name, queue) VALUES ($1, $2, $3)", j.ID, "job2", queueName1) + _, err := tx.Exec("INSERT INTO job_test (job_id, name, value) VALUES ($1, $2, $3)", j.ID, "job2", queueName1) return err }, "job3": func(j *qg.Job) error { defer wg.Done() tx := j.Tx() - _, err := tx.Exec("INSERT INTO job_test (job_id, name, queue) VALUES ($1, $2, $3)", j.ID, "job3", queueName1) + _, err := tx.Exec("INSERT INTO job_test (job_id, name, value) VALUES ($1, $2, $3)", j.ID, "job3", queueName1) return err }, "job4": func(j *qg.Job) error { defer wg.Done() tx := j.Tx() - _, err := tx.Exec("INSERT INTO job_test (job_id, name, queue) VALUES ($1, $2, $3)", j.ID, "job4", queueName1) + _, err := tx.Exec("INSERT INTO job_test (job_id, name, value) VALUES ($1, $2, $3)", j.ID, "job4", queueName1) return err }, } @@ -72,7 +72,7 @@ func TestWorkerPool(t *testing.T) { wg.Wait() pool.Shutdown() - rs, err := checkDB.Query("SELECT name, queue FROM job_test") + rs, err := checkDB.Query("SELECT name, value FROM job_test") if err != nil { t.Fatal(err) } @@ -80,12 +80,12 @@ func TestWorkerPool(t *testing.T) { rows := []string{} for rs.Next() { - var name, queue string - err = rs.Scan(&name, &queue) + var name, value string + err = rs.Scan(&name, &value) if err != nil { t.Fatal(err) } - rows = append(rows, name+","+queue) + rows = append(rows, name+","+value) } sort.Strings(rows) @@ -128,25 +128,25 @@ func TestWorkerPoolMultiQueue(t *testing.T) { "job1": func(j *qg.Job) error { defer wg.Done() tx := j.Tx() - _, err := tx.Exec("INSERT INTO job_test (job_id, name, queue) VALUES ($1, $2, $3)", j.ID, "job1", queueName1) + _, err := tx.Exec("INSERT INTO job_test (job_id, name, value) VALUES ($1, $2, $3)", j.ID, "job1", queueName1) return err }, "job2": func(j *qg.Job) error { defer wg.Done() tx := j.Tx() - _, err := tx.Exec("INSERT INTO job_test (job_id, name, queue) VALUES ($1, $2, $3)", j.ID, "job2", queueName1) + _, err := tx.Exec("INSERT INTO job_test (job_id, name, value) VALUES ($1, $2, $3)", j.ID, "job2", queueName1) return err }, "job3": func(j *qg.Job) error { defer wg.Done() tx := j.Tx() - _, err := tx.Exec("INSERT INTO job_test (job_id, name, queue) VALUES ($1, $2, $3)", j.ID, "job3", queueName1) + _, err := tx.Exec("INSERT INTO job_test (job_id, name, value) VALUES ($1, $2, $3)", j.ID, "job3", queueName1) return err }, "job4": func(j *qg.Job) error { defer wg.Done() tx := j.Tx() - _, err := tx.Exec("INSERT INTO job_test (job_id, name, queue) VALUES ($1, $2, $3)", j.ID, "job4", queueName1) + _, err := tx.Exec("INSERT INTO job_test (job_id, name, value) VALUES ($1, $2, $3)", j.ID, "job4", queueName1) return err }, } @@ -164,25 +164,25 @@ func TestWorkerPoolMultiQueue(t *testing.T) { "job1": func(j *qg.Job) error { defer wg.Done() tx := j.Tx() - _, err := tx.Exec("INSERT INTO job_test (job_id, name, queue) VALUES ($1, $2, $3)", j.ID, "job1", queueName2) + _, err := tx.Exec("INSERT INTO job_test (job_id, name, value) VALUES ($1, $2, $3)", j.ID, "job1", queueName2) return err }, "job2": func(j *qg.Job) error { defer wg.Done() tx := j.Tx() - _, err := tx.Exec("INSERT INTO job_test (job_id, name, queue) VALUES ($1, $2, $3)", j.ID, "job2", queueName2) + _, err := tx.Exec("INSERT INTO job_test (job_id, name, value) VALUES ($1, $2, $3)", j.ID, "job2", queueName2) return err }, "job3": func(j *qg.Job) error { defer wg.Done() tx := j.Tx() - _, err := tx.Exec("INSERT INTO job_test (job_id, name, queue) VALUES ($1, $2, $3)", j.ID, "job3", queueName2) + _, err := tx.Exec("INSERT INTO job_test (job_id, name, value) VALUES ($1, $2, $3)", j.ID, "job3", queueName2) return err }, "job4": func(j *qg.Job) error { defer wg.Done() tx := j.Tx() - _, err := tx.Exec("INSERT INTO job_test (job_id, name, queue) VALUES ($1, $2, $3)", j.ID, "job4", queueName2) + _, err := tx.Exec("INSERT INTO job_test (job_id, name, value) VALUES ($1, $2, $3)", j.ID, "job4", queueName2) return err }, } @@ -217,7 +217,7 @@ func TestWorkerPoolMultiQueue(t *testing.T) { pool1.Shutdown() pool2.Shutdown() - rs, err := checkDB.Query("SELECT name, queue FROM job_test") + rs, err := checkDB.Query("SELECT name, value FROM job_test") if err != nil { t.Fatal(err) } @@ -225,12 +225,12 @@ func TestWorkerPoolMultiQueue(t *testing.T) { rows := []string{} for rs.Next() { - var name, queue string - err = rs.Scan(&name, &queue) + var name, value string + err = rs.Scan(&name, &value) if err != nil { t.Fatal(err) } - rows = append(rows, name+","+queue) + rows = append(rows, name+","+value) } sort.Strings(rows) @@ -277,25 +277,25 @@ func TestWorkerPoolMultiDB(t *testing.T) { "job1": func(j *qg.Job) error { defer wg.Done() tx := j.Tx() - _, err := tx.Exec("INSERT INTO job_test (job_id, name, queue) VALUES ($1, $2, $3)", j.ID, "job1", queueName1) + _, err := tx.Exec("INSERT INTO job_test (job_id, name, value) VALUES ($1, $2, $3)", j.ID, "job1", queueName1) return err }, "job2": func(j *qg.Job) error { defer wg.Done() tx := j.Tx() - _, err := tx.Exec("INSERT INTO job_test (job_id, name, queue) VALUES ($1, $2, $3)", j.ID, "job2", queueName1) + _, err := tx.Exec("INSERT INTO job_test (job_id, name, value) VALUES ($1, $2, $3)", j.ID, "job2", queueName1) return err }, "job3": func(j *qg.Job) error { defer wg.Done() tx := j.Tx() - _, err := tx.Exec("INSERT INTO job_test (job_id, name, queue) VALUES ($1, $2, $3)", j.ID, "job3", queueName1) + _, err := tx.Exec("INSERT INTO job_test (job_id, name, value) VALUES ($1, $2, $3)", j.ID, "job3", queueName1) return err }, "job4": func(j *qg.Job) error { defer wg.Done() tx := j.Tx() - _, err := tx.Exec("INSERT INTO job_test (job_id, name, queue) VALUES ($1, $2, $3)", j.ID, "job4", queueName1) + _, err := tx.Exec("INSERT INTO job_test (job_id, name, value) VALUES ($1, $2, $3)", j.ID, "job4", queueName1) return err }, } @@ -320,25 +320,25 @@ func TestWorkerPoolMultiDB(t *testing.T) { "job1": func(j *qg.Job) error { defer wg.Done() tx := j.Tx() - _, err := tx.Exec("INSERT INTO job_test (job_id, name, queue) VALUES ($1, $2, $3)", j.ID, "job1", queue2Name) + _, err := tx.Exec("INSERT INTO job_test (job_id, name, value) VALUES ($1, $2, $3)", j.ID, "job1", queue2Name) return err }, "job2": func(j *qg.Job) error { defer wg.Done() tx := j.Tx() - _, err := tx.Exec("INSERT INTO job_test (job_id, name, queue) VALUES ($1, $2, $3)", j.ID, "job2", queue2Name) + _, err := tx.Exec("INSERT INTO job_test (job_id, name, value) VALUES ($1, $2, $3)", j.ID, "job2", queue2Name) return err }, "job3": func(j *qg.Job) error { defer wg.Done() tx := j.Tx() - _, err := tx.Exec("INSERT INTO job_test (job_id, name, queue) VALUES ($1, $2, $3)", j.ID, "job3", queue2Name) + _, err := tx.Exec("INSERT INTO job_test (job_id, name, value) VALUES ($1, $2, $3)", j.ID, "job3", queue2Name) return err }, "job4": func(j *qg.Job) error { defer wg.Done() tx := j.Tx() - _, err := tx.Exec("INSERT INTO job_test (job_id, name, queue) VALUES ($1, $2, $3)", j.ID, "job4", queue2Name) + _, err := tx.Exec("INSERT INTO job_test (job_id, name, value) VALUES ($1, $2, $3)", j.ID, "job4", queue2Name) return err }, } @@ -380,7 +380,7 @@ func TestWorkerPoolMultiDB(t *testing.T) { pool1.Shutdown() pool2.Shutdown() - rs, err := checkDB.Query("SELECT name, queue FROM job_test") + rs, err := checkDB.Query("SELECT name, value FROM job_test") if err != nil { t.Fatal(err) } @@ -388,12 +388,12 @@ func TestWorkerPoolMultiDB(t *testing.T) { rows := []string{} for rs.Next() { - var name, queue string - err = rs.Scan(&name, &queue) + var name, value string + err = rs.Scan(&name, &value) if err != nil { t.Fatal(err) } - rows = append(rows, name+","+queue) + rows = append(rows, name+","+value) } sort.Strings(rows) From 042e1daa6afdde922bca1a13a18c4fd004184f30 Mon Sep 17 00:00:00 2001 From: winebarrel Date: Sun, 13 Oct 2024 20:45:03 +0900 Subject: [PATCH 3/6] Add TestWorkerPoolSingleQueueMultiDB --- workerpool_test.go | 130 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 130 insertions(+) diff --git a/workerpool_test.go b/workerpool_test.go index 4ae1cd1..200e964 100644 --- a/workerpool_test.go +++ b/workerpool_test.go @@ -2,10 +2,12 @@ package qg_test import ( "database/sql" + "fmt" "sort" "strings" "sync" "testing" + "time" "github.com/kanmu/qg/v5" ) @@ -419,3 +421,131 @@ func TestWorkerPoolMultiDB(t *testing.T) { t.Errorf("unexpected que_jobs count: %d", queJobsCount) } } + +func TestWorkerPoolSingleQueueMultiDB(t *testing.T) { + var pool1, pool2 *qg.WorkerPool + var wg sync.WaitGroup + theOneQueue := "the-one-queue" + + wm := qg.WorkMap{ + "job1": func(j *qg.Job) error { + defer wg.Done() + tx := j.Tx() + _, err := tx.Exec("INSERT INTO job_test (job_id, name, value) VALUES ($1, $2, pg_backend_pid())", j.ID, "job1") + time.Sleep(300 * time.Millisecond) + return err + }, + "job2": func(j *qg.Job) error { + defer wg.Done() + tx := j.Tx() + _, err := tx.Exec("INSERT INTO job_test (job_id, name, value) VALUES ($1, $2, pg_backend_pid())", j.ID, "job2") + time.Sleep(300 * time.Millisecond) + return err + }, + "job3": func(j *qg.Job) error { + defer wg.Done() + tx := j.Tx() + _, err := tx.Exec("INSERT INTO job_test (job_id, name, value) VALUES ($1, $2, pg_backend_pid())", j.ID, "job3") + time.Sleep(300 * time.Millisecond) + return err + }, + "job4": func(j *qg.Job) error { + defer wg.Done() + tx := j.Tx() + time.Sleep(300 * time.Millisecond) + _, err := tx.Exec("INSERT INTO job_test (job_id, name, value) VALUES ($1, $2, pg_backend_pid())", j.ID, "job4") + return err + }, + } + + // thread1 + { + connector, err := qg.GetConnector("localhost", 5432, "qgtest", "", "qgtest") + + if err != nil { + t.Fatal(err) + } + + db := sql.OpenDB(connector) + client := qg.MustNewClient(db) + pool1 = qg.NewWorkerPool(client, wm, 4) + pool1.Queue = theOneQueue + } + + // thread2 + { + connector, err := qg.GetConnector("localhost", 5432, "qgtest", "", "qgtest") + + if err != nil { + t.Fatal(err) + } + + db := sql.OpenDB(connector) + client := qg.MustNewClient(db) + pool2 = qg.NewWorkerPool(client, wm, 4) + pool2.Queue = theOneQueue + } + + checkDB, err := sql.Open("pgx", "postgres://qgtest@localhost:5432/qgtest") + if err != nil { + t.Fatal(err) + } + _, err = checkDB.Exec("TRUNCATE TABLE que_jobs, job_test") + if err != nil { + t.Fatal(err) + } + + connector, err := qg.GetConnector("localhost", 5432, "qgtest", "", "qgtest") + + if err != nil { + t.Fatal(err) + } + + db := sql.OpenDB(connector) + client := qg.MustNewClient(db) + + for range 2 { + for _, jobType := range []string{"job1", "job2", "job3", "job4"} { + client.Enqueue(&qg.Job{Type: jobType, Queue: theOneQueue}) + wg.Add(1) + } + } + + pool1.Start() + pool2.Start() + wg.Wait() + pool1.Shutdown() + pool2.Shutdown() + + rs, err := checkDB.Query("SELECT name, value, COUNT(*) as count FROM job_test GROUP BY name, value") + if err != nil { + t.Fatal(err) + } + + rows := []string{} + + for rs.Next() { + var name, _value string + var count int + err = rs.Scan(&name, &_value, &count) + if err != nil { + t.Fatal(err) + } + rows = append(rows, fmt.Sprintf("%s,%d", name, count)) + } + + sort.Strings(rows) + + if strings.Join(rows, " ") != "job1,1 job1,1 job2,1 job2,1 job3,1 job3,1 job4,1 job4,1" { + t.Errorf("unexpected result: %v", rows) + } + + var queJobsCount int + err = checkDB.QueryRow("SELECT COUNT(*) FROM que_jobs").Scan(&queJobsCount) + if err != nil { + t.Fatal(err) + } + if queJobsCount != 0 { + t.Errorf("unexpected que_jobs count: %d", queJobsCount) + } +} From c34d2a2ae6d397dea399ba073881d0e1f2f970cc Mon Sep 17 00:00:00 2001 From: winebarrel Date: Sun, 13 Oct 2024 21:12:53 +0900 Subject: [PATCH 4/6] Update golangci/golangci-lint-action --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 4c74779..b5088b1 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -34,7 +34,7 @@ jobs: cache: false - name: Lint - uses: golangci/golangci-lint-action@v4 + uses: golangci/golangci-lint-action@v6 - name: Prepare test database run: make db From c530fdc05a7d330d1a5b849a19907167e250c930 Mon Sep 17 00:00:00 2001 From: winebarrel Date: Sun, 13 Oct 2024 22:04:24 +0900 Subject: [PATCH 5/6] Add unique pid check --- workerpool_test.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/workerpool_test.go b/workerpool_test.go index 200e964..e64929d 100644 --- a/workerpool_test.go +++ b/workerpool_test.go @@ -3,6 +3,7 @@ package qg_test import ( "database/sql" "fmt" + "slices" "sort" "strings" "sync" @@ -523,23 +524,31 @@ func TestWorkerPoolSingleQueueMultiDB(t *testing.T) { } rows := []string{} + pids := []string{} for rs.Next() { - var name, _value string + var name, value string var count int - err = rs.Scan(&name, &_value, &count) + err = rs.Scan(&name, &value, &count) if err != nil { t.Fatal(err) } rows = append(rows, fmt.Sprintf("%s,%d", name, count)) + pids = append(pids, value) } sort.Strings(rows) + sort.Strings(pids) + pids = slices.Compact(pids) if strings.Join(rows, " ") != "job1,1 job1,1 job2,1 job2,1 job3,1 job3,1 job4,1 job4,1" { t.Errorf("unexpected result: %v", rows) } + if len(pids) != 8 { + t.Errorf("unexpected unique pid count: %v", len(pids)) + } + var queJobsCount int err = checkDB.QueryRow("SELECT COUNT(*) FROM que_jobs").Scan(&queJobsCount) if err != nil { From fc68afeb91ba552a5786351c631f0b810c0da9b8 Mon Sep 17 00:00:00 2001 From: Genki Sugawara Date: Mon, 14 Oct 2024 10:45:05 +0900 Subject: [PATCH 6/6] chore: Fix test table column type: bigserial -> bigint --- schema_test.sql | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/schema_test.sql b/schema_test.sql index 366a8e3..817c79a 100644 --- a/schema_test.sql +++ b/schema_test.sql @@ -2,9 +2,9 @@ DROP TABLE IF EXISTS job_test; CREATE TABLE job_test ( - job_id bigserial NOT NULL, - name text NOT NULL, - value text NOT NULL, + job_id bigint NOT NULL, + name text NOT NULL, + value text NOT NULL, CONSTRAINT job_test_pkey PRIMARY KEY (job_id) );