Skip to content

Commit

Permalink
support uniqueness for specific args via struct tags
Browse files Browse the repository at this point in the history
  • Loading branch information
bgentry committed Sep 19, 2024
1 parent 8ba8546 commit 37c75c3
Show file tree
Hide file tree
Showing 12 changed files with 431 additions and 170 deletions.
1 change: 1 addition & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1193,6 +1193,7 @@ func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, conf
}

insertParams := &riverdriver.JobInsertFastParams{
Args: args,
CreatedAt: createdAt,
EncodedArgs: json.RawMessage(encodedArgs),
Kind: args.Kind(),
Expand Down
36 changes: 36 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4048,6 +4048,7 @@ func Test_Client_SubscribeConfig(t *testing.T) {
)
for i := 0; i < numJobsToInsert; i++ {
insertParams[i] = &riverdriver.JobInsertFastParams{
Args: &JobArgs{},
EncodedArgs: []byte(`{}`),
Kind: kind,
MaxAttempts: rivercommon.MaxAttemptsDefault,
Expand Down Expand Up @@ -5073,6 +5074,14 @@ func TestClient_JobTimeout(t *testing.T) {
}
}

type JobArgsStaticKind struct {
kind string
}

func (a JobArgsStaticKind) Kind() string {
return a.kind
}

func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -5203,6 +5212,33 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
require.Equal(t, internalUniqueOpts.StateBitmask(), params.UniqueStates)
})

t.Run("UniqueOptsWithPartialArgs", func(t *testing.T) {
t.Parallel()

uniqueOpts := UniqueOpts{ByArgs: true}

type PartialArgs struct {
JobArgsStaticKind
Included bool `json:"included" unique:"true"`
Excluded bool `json:"excluded"`
}

args := PartialArgs{
JobArgsStaticKind: JobArgsStaticKind{kind: "partialArgs"},
Included: true,
Excluded: true,
}

params, err := insertParamsFromConfigArgsAndOptions(archetype, config, args, &InsertOpts{UniqueOpts: uniqueOpts})
require.NoError(t, err)
internalUniqueOpts := &dbunique.UniqueOpts{ByArgs: true}

expectedKey := dbunique.UniqueKey(archetype.Time, internalUniqueOpts, params)

require.Equal(t, expectedKey, params.UniqueKey)
require.Equal(t, internalUniqueOpts.StateBitmask(), params.UniqueStates)
})

t.Run("PriorityIsLimitedTo4", func(t *testing.T) {
t.Parallel()

Expand Down
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ require (
github.com/riverqueue/river/rivertype v0.11.4
github.com/robfig/cron/v3 v3.0.1
github.com/stretchr/testify v1.9.0
github.com/tidwall/gjson v1.17.3
github.com/tidwall/sjson v1.2.5
go.uber.org/goleak v1.3.0
golang.org/x/sync v0.8.0
golang.org/x/text v0.18.0
Expand All @@ -26,6 +28,8 @@ require (
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/lib/pq v1.10.9 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
golang.org/x/crypto v0.27.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Expand Down
9 changes: 9 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/tidwall/gjson v1.14.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/gjson v1.17.3 h1:bwWLZU7icoKRG+C+0PNwIKC6FCJO/Q3p2pZvuP0jN94=
github.com/tidwall/gjson v1.17.3/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY=
github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A=
Expand Down
1 change: 1 addition & 0 deletions go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/yuin/goldmark v1.4.13 h1:fVcFKWvrslecOb/tg+Cc05dkeYx540o0FuFt3nUVDoE=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8=
golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA=
golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
Expand Down
54 changes: 44 additions & 10 deletions internal/dbunique/db_unique.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ package dbunique

import (
"crypto/sha256"
"slices"
"strings"
"time"

"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/rivershared/baseservice"
"github.com/riverqueue/river/rivershared/util/sliceutil"
"github.com/riverqueue/river/rivertype"
"github.com/tidwall/sjson"
)

// When a job has specified unique options, but has not set the ByState
Expand Down Expand Up @@ -80,8 +80,50 @@ func buildUniqueKeyString(timeGen baseservice.TimeGenerator, uniqueOpts *UniqueO
}

if uniqueOpts.ByArgs {
encodedArgsForUnique := params.EncodedArgs

{
// Get unique JSON keys from the JobArgs struct:
uniqueFields, err := getSortedUniqueFieldsCached(params.Args)
if err != nil {
// Instead of returning an error, we can just proceed using the entire
// encoded args.
//
// TODO: log or how will we find out about this?
goto AppendUniqueArgs
}

if len(uniqueFields) == 0 {
// Use all encoded args if no unique fields are specified.
goto AppendUniqueArgs
}

// Extract unique values from the EncodedArgs JSON
uniqueValues := extractUniqueValues(params.EncodedArgs, uniqueFields)

// Assemble the JSON object using bytes.Buffer
// Better to overallocate a bit than to allocate multiple times, so just
// assume we'll cap out at the length of the full encoded args.
sortedJSONWithOnlyUniqueValues := make([]byte, 0, len(params.EncodedArgs))

sjsonOpts := &sjson.Options{ReplaceInPlace: true}
for i, key := range uniqueFields {
if uniqueValues[i] == "undefined" {
continue
}
sortedJSONWithOnlyUniqueValues, err = sjson.SetRawBytesOptions(sortedJSONWithOnlyUniqueValues, key, []byte(uniqueValues[i]), sjsonOpts)
if err != nil {
// Should not happen unless key was invalid
goto AppendUniqueArgs
}
}

encodedArgsForUnique = sortedJSONWithOnlyUniqueValues
}

AppendUniqueArgs:
sb.WriteString("&args=")
sb.Write(params.EncodedArgs)
sb.Write(encodedArgsForUnique)
}

if uniqueOpts.ByPeriod != time.Duration(0) {
Expand All @@ -93,14 +135,6 @@ func buildUniqueKeyString(timeGen baseservice.TimeGenerator, uniqueOpts *UniqueO
sb.WriteString("&queue=" + params.Queue)
}

stateSet := defaultUniqueStatesStrings
if len(uniqueOpts.ByState) > 0 {
stateSet = sliceutil.Map(uniqueOpts.ByState, func(s rivertype.JobState) string { return string(s) })
slices.Sort(stateSet)
}

sb.WriteString("&state=" + strings.Join(stateSet, ","))

return sb.String()
}

Expand Down
Loading

0 comments on commit 37c75c3

Please sign in to comment.