diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 4c74779..b5088b1 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -34,7 +34,7 @@ jobs: cache: false - name: Lint - uses: golangci/golangci-lint-action@v4 + uses: golangci/golangci-lint-action@v6 - name: Prepare test database run: make db diff --git a/Makefile b/Makefile index 5de3724..c97edc6 100644 --- a/Makefile +++ b/Makefile @@ -18,3 +18,4 @@ db: .PHONY: table table: psql -U qgtest -h localhost -d qgtest -f schema.sql + psql -U qgtest -h localhost -d qgtest -f schema_test.sql diff --git a/schema.sql b/schema.sql index f4b080c..f98cb83 100644 --- a/schema.sql +++ b/schema.sql @@ -15,14 +15,3 @@ CREATE TABLE que_jobs ); COMMENT ON TABLE que_jobs IS '3'; - -DROP TABLE IF EXISTS job_test; - -CREATE TABLE job_test -( - job_id bigserial NOT NULL, - name text NOT NULL, - queue text NOT NULL, - - CONSTRAINT job_test_pkey PRIMARY KEY (job_id) -); diff --git a/schema_test.sql b/schema_test.sql new file mode 100644 index 0000000..817c79a --- /dev/null +++ b/schema_test.sql @@ -0,0 +1,10 @@ +DROP TABLE IF EXISTS job_test; + +CREATE TABLE job_test +( + job_id bigint NOT NULL, + name text NOT NULL, + value text NOT NULL, + + CONSTRAINT job_test_pkey PRIMARY KEY (job_id) +); diff --git a/workerpool_test.go b/workerpool_test.go index cae4831..e64929d 100644 --- a/workerpool_test.go +++ b/workerpool_test.go @@ -2,10 +2,13 @@ package qg_test import ( "database/sql" + "fmt" + "slices" "sort" "strings" "sync" "testing" + "time" "github.com/kanmu/qg/v5" ) @@ -26,25 +29,25 @@ func TestWorkerPool(t *testing.T) { "job1": func(j *qg.Job) error { defer wg.Done() tx := j.Tx() - _, err := tx.Exec("INSERT INTO job_test (job_id, name, queue) VALUES ($1, $2, $3)", j.ID, "job1", queueName1) + _, err := tx.Exec("INSERT INTO job_test (job_id, name, value) VALUES ($1, $2, $3)", j.ID, "job1", queueName1) return err }, "job2": func(j *qg.Job) error { defer wg.Done() tx := j.Tx() - _, err := tx.Exec("INSERT INTO job_test (job_id, name, queue) VALUES ($1, $2, $3)", j.ID, "job2", queueName1) + _, err := tx.Exec("INSERT INTO job_test (job_id, name, value) VALUES ($1, $2, $3)", j.ID, "job2", queueName1) return err }, "job3": func(j *qg.Job) error { defer wg.Done() tx := j.Tx() - _, err := tx.Exec("INSERT INTO job_test (job_id, name, queue) VALUES ($1, $2, $3)", j.ID, "job3", queueName1) + _, err := tx.Exec("INSERT INTO job_test (job_id, name, value) VALUES ($1, $2, $3)", j.ID, "job3", queueName1) return err }, "job4": func(j *qg.Job) error { defer wg.Done() tx := j.Tx() - _, err := tx.Exec("INSERT INTO job_test (job_id, name, queue) VALUES ($1, $2, $3)", j.ID, "job4", queueName1) + _, err := tx.Exec("INSERT INTO job_test (job_id, name, value) VALUES ($1, $2, $3)", j.ID, "job4", queueName1) return err }, } @@ -72,7 +75,7 @@ func TestWorkerPool(t *testing.T) { wg.Wait() pool.Shutdown() - rs, err := checkDB.Query("SELECT name, queue FROM job_test") + rs, err := checkDB.Query("SELECT name, value FROM job_test") if err != nil { t.Fatal(err) } @@ -80,12 +83,12 @@ func TestWorkerPool(t *testing.T) { rows := []string{} for rs.Next() { - var name, queue string - err = rs.Scan(&name, &queue) + var name, value string + err = rs.Scan(&name, &value) if err != nil { t.Fatal(err) } - rows = append(rows, name+","+queue) + rows = append(rows, name+","+value) } sort.Strings(rows) @@ -128,25 +131,25 @@ func TestWorkerPoolMultiQueue(t *testing.T) { "job1": func(j *qg.Job) error { defer wg.Done() tx := j.Tx() - _, err := tx.Exec("INSERT INTO job_test (job_id, name, queue) VALUES ($1, $2, $3)", j.ID, "job1", queueName1) + _, err := tx.Exec("INSERT INTO job_test (job_id, name, value) VALUES ($1, $2, $3)", j.ID, "job1", queueName1) return err }, "job2": func(j *qg.Job) error { defer wg.Done() tx := j.Tx() - _, err := tx.Exec("INSERT INTO job_test (job_id, name, queue) VALUES ($1, $2, $3)", j.ID, "job2", queueName1) + _, err := tx.Exec("INSERT INTO job_test (job_id, name, value) VALUES ($1, $2, $3)", j.ID, "job2", queueName1) return err }, "job3": func(j *qg.Job) error { defer wg.Done() tx := j.Tx() - _, err := tx.Exec("INSERT INTO job_test (job_id, name, queue) VALUES ($1, $2, $3)", j.ID, "job3", queueName1) + _, err := tx.Exec("INSERT INTO job_test (job_id, name, value) VALUES ($1, $2, $3)", j.ID, "job3", queueName1) return err }, "job4": func(j *qg.Job) error { defer wg.Done() tx := j.Tx() - _, err := tx.Exec("INSERT INTO job_test (job_id, name, queue) VALUES ($1, $2, $3)", j.ID, "job4", queueName1) + _, err := tx.Exec("INSERT INTO job_test (job_id, name, value) VALUES ($1, $2, $3)", j.ID, "job4", queueName1) return err }, } @@ -164,25 +167,25 @@ func TestWorkerPoolMultiQueue(t *testing.T) { "job1": func(j *qg.Job) error { defer wg.Done() tx := j.Tx() - _, err := tx.Exec("INSERT INTO job_test (job_id, name, queue) VALUES ($1, $2, $3)", j.ID, "job1", queueName2) + _, err := tx.Exec("INSERT INTO job_test (job_id, name, value) VALUES ($1, $2, $3)", j.ID, "job1", queueName2) return err }, "job2": func(j *qg.Job) error { defer wg.Done() tx := j.Tx() - _, err := tx.Exec("INSERT INTO job_test (job_id, name, queue) VALUES ($1, $2, $3)", j.ID, "job2", queueName2) + _, err := tx.Exec("INSERT INTO job_test (job_id, name, value) VALUES ($1, $2, $3)", j.ID, "job2", queueName2) return err }, "job3": func(j *qg.Job) error { defer wg.Done() tx := j.Tx() - _, err := tx.Exec("INSERT INTO job_test (job_id, name, queue) VALUES ($1, $2, $3)", j.ID, "job3", queueName2) + _, err := tx.Exec("INSERT INTO job_test (job_id, name, value) VALUES ($1, $2, $3)", j.ID, "job3", queueName2) return err }, "job4": func(j *qg.Job) error { defer wg.Done() tx := j.Tx() - _, err := tx.Exec("INSERT INTO job_test (job_id, name, queue) VALUES ($1, $2, $3)", j.ID, "job4", queueName2) + _, err := tx.Exec("INSERT INTO job_test (job_id, name, value) VALUES ($1, $2, $3)", j.ID, "job4", queueName2) return err }, } @@ -217,7 +220,7 @@ func TestWorkerPoolMultiQueue(t *testing.T) { pool1.Shutdown() pool2.Shutdown() - rs, err := checkDB.Query("SELECT name, queue FROM job_test") + rs, err := checkDB.Query("SELECT name, value FROM job_test") if err != nil { t.Fatal(err) } @@ -225,12 +228,12 @@ func TestWorkerPoolMultiQueue(t *testing.T) { rows := []string{} for rs.Next() { - var name, queue string - err = rs.Scan(&name, &queue) + var name, value string + err = rs.Scan(&name, &value) if err != nil { t.Fatal(err) } - rows = append(rows, name+","+queue) + rows = append(rows, name+","+value) } sort.Strings(rows) @@ -277,25 +280,25 @@ func TestWorkerPoolMultiDB(t *testing.T) { "job1": func(j *qg.Job) error { defer wg.Done() tx := j.Tx() - _, err := tx.Exec("INSERT INTO job_test (job_id, name, queue) VALUES ($1, $2, $3)", j.ID, "job1", queueName1) + _, err := tx.Exec("INSERT INTO job_test (job_id, name, value) VALUES ($1, $2, $3)", j.ID, "job1", queueName1) return err }, "job2": func(j *qg.Job) error { defer wg.Done() tx := j.Tx() - _, err := tx.Exec("INSERT INTO job_test (job_id, name, queue) VALUES ($1, $2, $3)", j.ID, "job2", queueName1) + _, err := tx.Exec("INSERT INTO job_test (job_id, name, value) VALUES ($1, $2, $3)", j.ID, "job2", queueName1) return err }, "job3": func(j *qg.Job) error { defer wg.Done() tx := j.Tx() - _, err := tx.Exec("INSERT INTO job_test (job_id, name, queue) VALUES ($1, $2, $3)", j.ID, "job3", queueName1) + _, err := tx.Exec("INSERT INTO job_test (job_id, name, value) VALUES ($1, $2, $3)", j.ID, "job3", queueName1) return err }, "job4": func(j *qg.Job) error { defer wg.Done() tx := j.Tx() - _, err := tx.Exec("INSERT INTO job_test (job_id, name, queue) VALUES ($1, $2, $3)", j.ID, "job4", queueName1) + _, err := tx.Exec("INSERT INTO job_test (job_id, name, value) VALUES ($1, $2, $3)", j.ID, "job4", queueName1) return err }, } @@ -320,25 +323,25 @@ func TestWorkerPoolMultiDB(t *testing.T) { "job1": func(j *qg.Job) error { defer wg.Done() tx := j.Tx() - _, err := tx.Exec("INSERT INTO job_test (job_id, name, queue) VALUES ($1, $2, $3)", j.ID, "job1", queue2Name) + _, err := tx.Exec("INSERT INTO job_test (job_id, name, value) VALUES ($1, $2, $3)", j.ID, "job1", queue2Name) return err }, "job2": func(j *qg.Job) error { defer wg.Done() tx := j.Tx() - _, err := tx.Exec("INSERT INTO job_test (job_id, name, queue) VALUES ($1, $2, $3)", j.ID, "job2", queue2Name) + _, err := tx.Exec("INSERT INTO job_test (job_id, name, value) VALUES ($1, $2, $3)", j.ID, "job2", queue2Name) return err }, "job3": func(j *qg.Job) error { defer wg.Done() tx := j.Tx() - _, err := tx.Exec("INSERT INTO job_test (job_id, name, queue) VALUES ($1, $2, $3)", j.ID, "job3", queue2Name) + _, err := tx.Exec("INSERT INTO job_test (job_id, name, value) VALUES ($1, $2, $3)", j.ID, "job3", queue2Name) return err }, "job4": func(j *qg.Job) error { defer wg.Done() tx := j.Tx() - _, err := tx.Exec("INSERT INTO job_test (job_id, name, queue) VALUES ($1, $2, $3)", j.ID, "job4", queue2Name) + _, err := tx.Exec("INSERT INTO job_test (job_id, name, value) VALUES ($1, $2, $3)", j.ID, "job4", queue2Name) return err }, } @@ -380,7 +383,7 @@ func TestWorkerPoolMultiDB(t *testing.T) { pool1.Shutdown() pool2.Shutdown() - rs, err := checkDB.Query("SELECT name, queue FROM job_test") + rs, err := checkDB.Query("SELECT name, value FROM job_test") if err != nil { t.Fatal(err) } @@ -388,12 +391,12 @@ func TestWorkerPoolMultiDB(t *testing.T) { rows := []string{} for rs.Next() { - var name, queue string - err = rs.Scan(&name, &queue) + var name, value string + err = rs.Scan(&name, &value) if err != nil { t.Fatal(err) } - rows = append(rows, name+","+queue) + rows = append(rows, name+","+value) } sort.Strings(rows) @@ -419,3 +422,139 @@ func TestWorkerPoolMultiDB(t *testing.T) { t.Errorf("unexpected que_jobs count: %d", queJobsCount) } } + +func TestWorkerPoolSingleQueueMultiDB(t *testing.T) { + var pool1, pool2 *qg.WorkerPool + var wg sync.WaitGroup + theOneQueue := "the-one-queue" + + wm := qg.WorkMap{ + "job1": func(j *qg.Job) error { + defer wg.Done() + tx := j.Tx() + _, err := tx.Exec("INSERT INTO job_test (job_id, name, value) VALUES ($1, $2, pg_backend_pid())", j.ID, "job1") + time.Sleep(300 * time.Millisecond) + return err + }, + "job2": func(j *qg.Job) error { + defer wg.Done() + tx := j.Tx() + _, err := tx.Exec("INSERT INTO job_test (job_id, name, value) VALUES ($1, $2, pg_backend_pid())", j.ID, "job2") + time.Sleep(300 * time.Millisecond) + return err + }, + "job3": func(j *qg.Job) error { + defer wg.Done() + tx := j.Tx() + _, err := tx.Exec("INSERT INTO job_test (job_id, name, value) VALUES ($1, $2, pg_backend_pid())", j.ID, "job3") + time.Sleep(300 * time.Millisecond) + return err + }, + "job4": func(j *qg.Job) error { + defer wg.Done() + tx := j.Tx() + time.Sleep(300 * time.Millisecond) + _, err := tx.Exec("INSERT INTO job_test (job_id, name, value) VALUES ($1, $2, pg_backend_pid())", j.ID, "job4") + return err + }, + } + + // thread1 + { + connector, err := qg.GetConnector("localhost", 5432, "qgtest", "", "qgtest") + + if err != nil { + t.Fatal(err) + } + + db := sql.OpenDB(connector) + client := qg.MustNewClient(db) + pool1 = qg.NewWorkerPool(client, wm, 4) + pool1.Queue = theOneQueue + } + + // thread2 + { + connector, err := qg.GetConnector("localhost", 5432, "qgtest", "", "qgtest") + + if err != nil { + t.Fatal(err) + } + + db := sql.OpenDB(connector) + client := qg.MustNewClient(db) + pool2 = qg.NewWorkerPool(client, wm, 4) + pool2.Queue = theOneQueue + } + + checkDB, err := sql.Open("pgx", "postgres://qgtest@localhost:5432/qgtest") + if err != nil { + t.Fatal(err) + } + _, err = checkDB.Exec("TRUNCATE TABLE que_jobs, job_test") + if err != nil { + t.Fatal(err) + } + + connector, err := qg.GetConnector("localhost", 5432, "qgtest", "", "qgtest") + + if err != nil { + t.Fatal(err) + } + + db := sql.OpenDB(connector) + client := qg.MustNewClient(db) + + for range 2 { + for _, jobType := range []string{"job1", "job2", "job3", "job4"} { + client.Enqueue(&qg.Job{Type: jobType, Queue: theOneQueue}) + wg.Add(1) + } + } + + pool1.Start() + pool2.Start() + wg.Wait() + pool1.Shutdown() + pool2.Shutdown() + + rs, err := checkDB.Query("SELECT name, value, COUNT(*) as count FROM job_test GROUP BY name, value") + if err != nil { + t.Fatal(err) + } + + rows := []string{} + pids := []string{} + + for rs.Next() { + var name, value string + var count int + err = rs.Scan(&name, &value, &count) + if err != nil { + t.Fatal(err) + } + rows = append(rows, fmt.Sprintf("%s,%d", name, count)) + pids = append(pids, value) + } + + sort.Strings(rows) + sort.Strings(pids) + pids = slices.Compact(pids) + + if strings.Join(rows, " ") != "job1,1 job1,1 job2,1 job2,1 job3,1 job3,1 job4,1 job4,1" { + t.Errorf("unexpected result: %v", rows) + } + + if len(pids) != 8 { + t.Errorf("unexpected unique pid count: %v", len(pids)) + } + + var queJobsCount int + err = checkDB.QueryRow("SELECT COUNT(*) FROM que_jobs").Scan(&queJobsCount) + if err != nil { + t.Fatal(err) + } + if queJobsCount != 0 { + t.Errorf("unexpected que_jobs count: %d", queJobsCount) + } +}