diff --git a/processor/processor_test.go b/processor/processor_test.go index 7e9f79fcfeb..83f86a973af 100644 --- a/processor/processor_test.go +++ b/processor/processor_test.go @@ -1212,11 +1212,29 @@ var _ = Describe("Processor with event schemas v2", Ordered, func() { "2001-01-02T02:23:45.000Z", []mockEventData{ messages["message-1"], + }, + createMessagePayloadWithoutSources, + ), + EventCount: 1, + LastJobStatus: jobsdb.JobStatusT{}, + Parameters: createBatchParameters(SourceIDEnabledNoUT), + WorkspaceId: sampleWorkspaceID, + }, + { + UUID: uuid.New(), + JobID: 1011, + CreatedAt: time.Date(2020, 0o4, 28, 23, 26, 0o0, 0o0, time.UTC), + ExpireAt: time.Date(2020, 0o4, 28, 23, 26, 0o0, 0o0, time.UTC), + CustomVal: gatewayCustomVal[0], + EventPayload: createBatchPayload( + WriteKeyEnabledNoUT, + "2001-01-02T02:23:45.000Z", + []mockEventData{ messages["message-2"], }, createMessagePayloadWithoutSources, ), - EventCount: 2, + EventCount: 1, LastJobStatus: jobsdb.JobStatusT{}, Parameters: createBatchParameters(SourceIDEnabledNoUT), WorkspaceId: sampleWorkspaceID, @@ -1256,12 +1274,46 @@ var _ = Describe("Processor with event schemas v2", Ordered, func() { "2002-01-02T02:23:45.000Z", []mockEventData{ messages["message-3"], + }, + createMessagePayload, + ), + EventCount: 1, + Parameters: createBatchParametersWithSources(SourceIDEnabledNoUT), + WorkspaceId: sampleWorkspaceID, + }, + { + UUID: uuid.New(), + JobID: 2011, + CreatedAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), + ExpireAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), + CustomVal: gatewayCustomVal[0], + EventPayload: createBatchPayload( + WriteKeyEnabledNoUT, + "2002-01-02T02:23:45.000Z", + []mockEventData{ messages["message-4"], + }, + createMessagePayload, + ), + EventCount: 1, + Parameters: createBatchParametersWithSources(SourceIDEnabledNoUT), + WorkspaceId: sampleWorkspaceID, + }, + { + UUID: uuid.New(), + JobID: 2012, + CreatedAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), + ExpireAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), + CustomVal: gatewayCustomVal[0], + EventPayload: createBatchPayload( + WriteKeyEnabledNoUT, + "2002-01-02T02:23:45.000Z", + []mockEventData{ messages["message-5"], }, createMessagePayload, ), - EventCount: 3, + EventCount: 1, Parameters: createBatchParametersWithSources(SourceIDEnabledNoUT), WorkspaceId: sampleWorkspaceID, }, @@ -1383,11 +1435,29 @@ var _ = Describe("Processor with event schemas v2", Ordered, func() { "2001-01-02T02:23:45.000Z", []mockEventData{ messages["message-1"], + }, + createMessagePayloadWithoutSources, + ), + EventCount: 1, + LastJobStatus: jobsdb.JobStatusT{}, + Parameters: createBatchParameters(SourceIDEnabledNoUT), + WorkspaceId: sampleWorkspaceID, + }, + { + UUID: uuid.New(), + JobID: 1011, + CreatedAt: time.Date(2020, 0o4, 28, 23, 26, 0o0, 0o0, time.UTC), + ExpireAt: time.Date(2020, 0o4, 28, 23, 26, 0o0, 0o0, time.UTC), + CustomVal: gatewayCustomVal[0], + EventPayload: createBatchPayload( + WriteKeyEnabledNoUT, + "2001-01-02T02:23:45.000Z", + []mockEventData{ messages["message-2"], }, createMessagePayloadWithoutSources, ), - EventCount: 2, + EventCount: 1, LastJobStatus: jobsdb.JobStatusT{}, Parameters: createBatchParameters(SourceIDEnabledNoUT), WorkspaceId: sampleWorkspaceID, @@ -1427,12 +1497,46 @@ var _ = Describe("Processor with event schemas v2", Ordered, func() { "2002-01-02T02:23:45.000Z", []mockEventData{ messages["message-3"], + }, + createMessagePayload, + ), + EventCount: 1, + Parameters: createBatchParametersWithSources(SourceIDEnabledNoUT), + WorkspaceId: sampleWorkspaceID, + }, + { + UUID: uuid.New(), + JobID: 2011, + CreatedAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), + ExpireAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), + CustomVal: gatewayCustomVal[0], + EventPayload: createBatchPayload( + WriteKeyEnabledNoUT, + "2002-01-02T02:23:45.000Z", + []mockEventData{ messages["message-4"], + }, + createMessagePayload, + ), + EventCount: 1, + Parameters: createBatchParametersWithSources(SourceIDEnabledNoUT), + WorkspaceId: sampleWorkspaceID, + }, + { + UUID: uuid.New(), + JobID: 2012, + CreatedAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), + ExpireAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), + CustomVal: gatewayCustomVal[0], + EventPayload: createBatchPayload( + WriteKeyEnabledNoUT, + "2002-01-02T02:23:45.000Z", + []mockEventData{ messages["message-5"], }, createMessagePayload, ), - EventCount: 3, + EventCount: 1, Parameters: createBatchParametersWithSources(SourceIDEnabledNoUT), WorkspaceId: sampleWorkspaceID, }, @@ -1581,11 +1685,28 @@ var _ = Describe("Processor with ArchivalV2 enabled", Ordered, func() { "2001-01-02T02:23:45.000Z", []mockEventData{ messages["message-1"], + }, + createMessagePayloadWithoutSources, // should be stored + ), + EventCount: 1, + LastJobStatus: jobsdb.JobStatusT{}, + Parameters: createBatchParameters(SourceIDEnabledNoUT), + }, + { + UUID: uuid.New(), + JobID: 1011, + CreatedAt: time.Date(2020, 0o4, 28, 23, 26, 0o0, 0o0, time.UTC), + ExpireAt: time.Date(2020, 0o4, 28, 23, 26, 0o0, 0o0, time.UTC), + CustomVal: gatewayCustomVal[0], + EventPayload: createBatchPayload( + WriteKeyEnabledNoUT, + "2001-01-02T02:23:45.000Z", + []mockEventData{ messages["message-2"], }, createMessagePayloadWithoutSources, // should be stored ), - EventCount: 2, + EventCount: 1, LastJobStatus: jobsdb.JobStatusT{}, Parameters: createBatchParameters(SourceIDEnabledNoUT), }, @@ -1622,12 +1743,44 @@ var _ = Describe("Processor with ArchivalV2 enabled", Ordered, func() { "2002-01-02T02:23:45.000Z", []mockEventData{ messages["message-3"], + }, + createMessagePayload, // shouldn't be stored to archivedb + ), + EventCount: 1, + Parameters: createBatchParametersWithSources(SourceIDEnabledNoUT), + }, + { + UUID: uuid.New(), + JobID: 2011, + CreatedAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), + ExpireAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), + CustomVal: gatewayCustomVal[0], + EventPayload: createBatchPayload( + WriteKeyEnabledNoUT, + "2002-01-02T02:23:45.000Z", + []mockEventData{ messages["message-4"], + }, + createMessagePayload, // shouldn't be stored to archivedb + ), + EventCount: 1, + Parameters: createBatchParametersWithSources(SourceIDEnabledNoUT), + }, + { + UUID: uuid.New(), + JobID: 2012, + CreatedAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), + ExpireAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), + CustomVal: gatewayCustomVal[0], + EventPayload: createBatchPayload( + WriteKeyEnabledNoUT, + "2002-01-02T02:23:45.000Z", + []mockEventData{ messages["message-5"], }, createMessagePayload, // shouldn't be stored to archivedb ), - EventCount: 3, + EventCount: 1, Parameters: createBatchParametersWithSources(SourceIDEnabledNoUT), }, } @@ -1738,11 +1891,28 @@ var _ = Describe("Processor with ArchivalV2 enabled", Ordered, func() { "2001-01-02T02:23:45.000Z", []mockEventData{ messages["message-1"], + }, + createMessagePayloadWithoutSources, // should be stored + ), + EventCount: 1, + LastJobStatus: jobsdb.JobStatusT{}, + Parameters: createBatchParameters(SourceIDEnabledNoUT), + }, + { + UUID: uuid.New(), + JobID: 1011, + CreatedAt: time.Date(2020, 0o4, 28, 23, 26, 0o0, 0o0, time.UTC), + ExpireAt: time.Date(2020, 0o4, 28, 23, 26, 0o0, 0o0, time.UTC), + CustomVal: gatewayCustomVal[0], + EventPayload: createBatchPayload( + WriteKeyEnabledNoUT, + "2001-01-02T02:23:45.000Z", + []mockEventData{ messages["message-2"], }, createMessagePayloadWithoutSources, // should be stored ), - EventCount: 2, + EventCount: 1, LastJobStatus: jobsdb.JobStatusT{}, Parameters: createBatchParameters(SourceIDEnabledNoUT), }, @@ -1779,12 +1949,44 @@ var _ = Describe("Processor with ArchivalV2 enabled", Ordered, func() { "2002-01-02T02:23:45.000Z", []mockEventData{ messages["message-3"], + }, + createMessagePayload, // shouldn't be stored to archivedb + ), + EventCount: 1, + Parameters: createBatchParametersWithSources(SourceIDEnabledNoUT), + }, + { + UUID: uuid.New(), + JobID: 2011, + CreatedAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), + ExpireAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), + CustomVal: gatewayCustomVal[0], + EventPayload: createBatchPayload( + WriteKeyEnabledNoUT, + "2002-01-02T02:23:45.000Z", + []mockEventData{ messages["message-4"], + }, + createMessagePayload, // shouldn't be stored to archivedb + ), + EventCount: 1, + Parameters: createBatchParametersWithSources(SourceIDEnabledNoUT), + }, + { + UUID: uuid.New(), + JobID: 2012, + CreatedAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), + ExpireAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), + CustomVal: gatewayCustomVal[0], + EventPayload: createBatchPayload( + WriteKeyEnabledNoUT, + "2002-01-02T02:23:45.000Z", + []mockEventData{ messages["message-5"], }, createMessagePayload, // shouldn't be stored to archivedb ), - EventCount: 3, + EventCount: 1, Parameters: createBatchParametersWithSources(SourceIDEnabledNoUT), }, } @@ -1838,7 +2040,7 @@ var _ = Describe("Processor with ArchivalV2 enabled", Ordered, func() { // this message should not be delivered to destination A "message-2": { id: "2", - jobid: 1010, + jobid: 1011, originalTimestamp: "2000-02-02T01:23:45", expectedOriginalTimestamp: "2000-02-02T01:23:45.000Z", expectedReceivedAt: "2001-01-02T02:23:45.000Z", @@ -1857,7 +2059,7 @@ var _ = Describe("Processor with ArchivalV2 enabled", Ordered, func() { // this message should be delivered to all destinations (default All value) "message-4": { id: "4", - jobid: 2010, + jobid: 2011, originalTimestamp: "2000-04-02T02:23:15.000Z", // missing sentAt expectedOriginalTimestamp: "2000-04-02T02:23:15.000Z", expectedReceivedAt: "2002-01-02T02:23:45.000Z", @@ -1866,7 +2068,7 @@ var _ = Describe("Processor with ArchivalV2 enabled", Ordered, func() { // this message should not be delivered to any destination "message-5": { id: "5", - jobid: 2010, + jobid: 2012, expectedReceivedAt: "2002-01-02T02:23:45.000Z", integrations: map[string]bool{"All": false}, }, @@ -1895,11 +2097,28 @@ var _ = Describe("Processor with ArchivalV2 enabled", Ordered, func() { "2001-01-02T02:23:45.000Z", []mockEventData{ messages["message-1"], + }, + createMessagePayloadWithoutSources, + ), + EventCount: 1, + LastJobStatus: jobsdb.JobStatusT{}, + Parameters: createBatchParameters(SourceIDTransient), + }, + { + UUID: uuid.New(), + JobID: 1011, + CreatedAt: time.Date(2020, 0o4, 28, 23, 26, 0o0, 0o0, time.UTC), + ExpireAt: time.Date(2020, 0o4, 28, 23, 26, 0o0, 0o0, time.UTC), + CustomVal: gatewayCustomVal[0], + EventPayload: createBatchPayload( + WriteKeyTransient, + "2001-01-02T02:23:45.000Z", + []mockEventData{ messages["message-2"], }, createMessagePayloadWithoutSources, ), - EventCount: 2, + EventCount: 1, LastJobStatus: jobsdb.JobStatusT{}, Parameters: createBatchParameters(SourceIDTransient), }, @@ -1936,12 +2155,44 @@ var _ = Describe("Processor with ArchivalV2 enabled", Ordered, func() { "2002-01-02T02:23:45.000Z", []mockEventData{ messages["message-3"], + }, + createMessagePayload, + ), + EventCount: 1, + Parameters: createBatchParameters(SourceIDTransient), + }, + { + UUID: uuid.New(), + JobID: 2011, + CreatedAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), + ExpireAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), + CustomVal: gatewayCustomVal[0], + EventPayload: createBatchPayload( + WriteKeyTransient, + "2002-01-02T02:23:45.000Z", + []mockEventData{ messages["message-4"], + }, + createMessagePayload, + ), + EventCount: 1, + Parameters: createBatchParameters(SourceIDTransient), + }, + { + UUID: uuid.New(), + JobID: 2012, + CreatedAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), + ExpireAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), + CustomVal: gatewayCustomVal[0], + EventPayload: createBatchPayload( + WriteKeyTransient, + "2002-01-02T02:23:45.000Z", + []mockEventData{ messages["message-5"], }, createMessagePayload, ), - EventCount: 3, + EventCount: 1, Parameters: createBatchParameters(SourceIDTransient), }, } @@ -2018,7 +2269,7 @@ var _ = Describe("Processor with trackedUsers feature enabled", Ordered, func() // this message should not be delivered to destination A "message-2": { id: "2", - jobid: 1010, + jobid: 1011, originalTimestamp: "2000-02-02T01:23:45", expectedOriginalTimestamp: "2000-02-02T01:23:45.000Z", expectedReceivedAt: "2001-01-02T02:23:45.000Z", @@ -2039,7 +2290,7 @@ var _ = Describe("Processor with trackedUsers feature enabled", Ordered, func() // this message should be delivered to all destinations (default All value) "message-4": { id: "4", - jobid: 2010, + jobid: 2011, originalTimestamp: "2000-04-02T02:23:15.000Z", // missing sentAt expectedOriginalTimestamp: "2000-04-02T02:23:15.000Z", expectedReceivedAt: "2002-01-02T02:23:45.000Z", @@ -2049,7 +2300,7 @@ var _ = Describe("Processor with trackedUsers feature enabled", Ordered, func() // this message should not be delivered to any destination "message-5": { id: "5", - jobid: 2010, + jobid: 2012, expectedReceivedAt: "2002-01-02T02:23:45.000Z", integrations: map[string]bool{"All": false}, params: map[string]string{"source_id": "enabled-source-no-ut", "source_job_run_id": "job_run_id_1", "source_task_run_id": "task_run_id_1"}, @@ -2079,6 +2330,22 @@ var _ = Describe("Processor with trackedUsers feature enabled", Ordered, func() "2001-01-02T02:23:45.000Z", []mockEventData{ messages["message-1"], + }, createMessagePayloadWithoutSources, + ), + EventCount: 1, + LastJobStatus: jobsdb.JobStatusT{}, + Parameters: createBatchParameters(SourceIDEnabledNoUT), + }, + { + UUID: uuid.New(), + JobID: 1011, + CreatedAt: time.Date(2020, 0o4, 28, 23, 26, 0o0, 0o0, time.UTC), + ExpireAt: time.Date(2020, 0o4, 28, 23, 26, 0o0, 0o0, time.UTC), + CustomVal: gatewayCustomVal[0], + EventPayload: createBatchPayload( + WriteKeyEnabledNoUT, + "2001-01-02T02:23:45.000Z", + []mockEventData{ messages["message-2"], }, createMessagePayloadWithoutSources, ), @@ -2119,12 +2386,44 @@ var _ = Describe("Processor with trackedUsers feature enabled", Ordered, func() "2002-01-02T02:23:45.000Z", []mockEventData{ messages["message-3"], + }, + createMessagePayloadWithoutSources, + ), + EventCount: 1, + Parameters: createBatchParametersWithSources(SourceIDEnabledNoUT), + }, + { + UUID: uuid.New(), + JobID: 2011, + CreatedAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), + ExpireAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), + CustomVal: gatewayCustomVal[0], + EventPayload: createBatchPayload( + WriteKeyEnabledNoUT, + "2002-01-02T02:23:45.000Z", + []mockEventData{ messages["message-4"], + }, + createMessagePayloadWithoutSources, + ), + EventCount: 1, + Parameters: createBatchParametersWithSources(SourceIDEnabledNoUT), + }, + { + UUID: uuid.New(), + JobID: 2012, + CreatedAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), + ExpireAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), + CustomVal: gatewayCustomVal[0], + EventPayload: createBatchPayload( + WriteKeyEnabledNoUT, + "2002-01-02T02:23:45.000Z", + []mockEventData{ messages["message-5"], }, createMessagePayloadWithoutSources, ), - EventCount: 3, + EventCount: 1, Parameters: createBatchParametersWithSources(SourceIDEnabledNoUT), }, } @@ -2226,7 +2525,7 @@ var _ = Describe("Processor with trackedUsers feature enabled", Ordered, func() TaskRunID: "task_run_id_1", SourceID: "enabled-source-no-ut", }, - rsources.Stats{Out: 1}, + rsources.Stats{Out: 3}, ).Times(1).Return(nil) c.mockArchivalDB.EXPECT(). @@ -2262,6 +2561,9 @@ var _ = Describe("Processor with trackedUsers feature enabled", Ordered, func() { SourceID: SourceIDEnabledNoUT, }, + { + SourceID: SourceIDEnabledNoUT, + }, { SourceID: SourceIDEnabled, }, @@ -2271,6 +2573,12 @@ var _ = Describe("Processor with trackedUsers feature enabled", Ordered, func() { SourceID: SourceIDEnabledNoUT, }, + { + SourceID: SourceIDEnabledNoUT, + }, + { + SourceID: SourceIDEnabledNoUT, + }, } Setup(processor, c, false, false, false) processor.trackedUsersReporter = c.mockTrackedUsersReporter @@ -2302,7 +2610,7 @@ var _ = Describe("Processor with trackedUsers feature enabled", Ordered, func() // this message should not be delivered to destination A "message-2": { id: "2", - jobid: 1010, + jobid: 1011, originalTimestamp: "2000-02-02T01:23:45", expectedOriginalTimestamp: "2000-02-02T01:23:45.000Z", expectedReceivedAt: "2001-01-02T02:23:45.000Z", @@ -2323,7 +2631,7 @@ var _ = Describe("Processor with trackedUsers feature enabled", Ordered, func() // this message should be delivered to all destinations (default All value) "message-4": { id: "4", - jobid: 2010, + jobid: 2011, originalTimestamp: "2000-04-02T02:23:15.000Z", // missing sentAt expectedOriginalTimestamp: "2000-04-02T02:23:15.000Z", expectedReceivedAt: "2002-01-02T02:23:45.000Z", @@ -2333,7 +2641,7 @@ var _ = Describe("Processor with trackedUsers feature enabled", Ordered, func() // this message should not be delivered to any destination "message-5": { id: "5", - jobid: 2010, + jobid: 2012, expectedReceivedAt: "2002-01-02T02:23:45.000Z", integrations: map[string]bool{"All": false}, params: map[string]string{"source_id": "enabled-source-no-ut", "source_job_run_id": "job_run_id_1", "source_task_run_id": "task_run_id_1"}, @@ -2363,10 +2671,26 @@ var _ = Describe("Processor with trackedUsers feature enabled", Ordered, func() "2001-01-02T02:23:45.000Z", []mockEventData{ messages["message-1"], + }, createMessagePayloadWithoutSources, + ), + EventCount: 1, + LastJobStatus: jobsdb.JobStatusT{}, + Parameters: createBatchParameters(SourceIDEnabledNoUT), + }, + { + UUID: uuid.New(), + JobID: 1011, + CreatedAt: time.Date(2020, 0o4, 28, 23, 26, 0o0, 0o0, time.UTC), + ExpireAt: time.Date(2020, 0o4, 28, 23, 26, 0o0, 0o0, time.UTC), + CustomVal: gatewayCustomVal[0], + EventPayload: createBatchPayload( + WriteKeyEnabledNoUT, + "2001-01-02T02:23:45.000Z", + []mockEventData{ messages["message-2"], }, createMessagePayloadWithoutSources, ), - EventCount: 2, + EventCount: 1, LastJobStatus: jobsdb.JobStatusT{}, Parameters: createBatchParameters(SourceIDEnabledNoUT), }, @@ -2403,12 +2727,44 @@ var _ = Describe("Processor with trackedUsers feature enabled", Ordered, func() "2002-01-02T02:23:45.000Z", []mockEventData{ messages["message-3"], + }, + createMessagePayloadWithoutSources, + ), + EventCount: 1, + Parameters: createBatchParametersWithSources(SourceIDEnabledNoUT), + }, + { + UUID: uuid.New(), + JobID: 2011, + CreatedAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), + ExpireAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), + CustomVal: gatewayCustomVal[0], + EventPayload: createBatchPayload( + WriteKeyEnabledNoUT, + "2002-01-02T02:23:45.000Z", + []mockEventData{ messages["message-4"], + }, + createMessagePayloadWithoutSources, + ), + EventCount: 1, + Parameters: createBatchParametersWithSources(SourceIDEnabledNoUT), + }, + { + UUID: uuid.New(), + JobID: 2012, + CreatedAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), + ExpireAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), + CustomVal: gatewayCustomVal[0], + EventPayload: createBatchPayload( + WriteKeyEnabledNoUT, + "2002-01-02T02:23:45.000Z", + []mockEventData{ messages["message-5"], }, createMessagePayloadWithoutSources, ), - EventCount: 3, + EventCount: 1, Parameters: createBatchParametersWithSources(SourceIDEnabledNoUT), }, } @@ -2501,6 +2857,18 @@ var _ = Describe("Processor with trackedUsers feature enabled", Ordered, func() rsources.Stats{In: 2, Failed: 2}, ).Times(1).Return(nil) + c.MockRsourcesService.EXPECT(). + IncrementStats( + gomock.Any(), + gomock.Any(), + "job_run_id_1", + rsources.JobTargetKey{ + TaskRunID: "task_run_id_1", + SourceID: "enabled-source-no-ut", + }, + rsources.Stats{Out: 3}, + ).Times(1).Return(nil) + c.MockRsourcesService.EXPECT(). IncrementStats( gomock.Any(), @@ -2546,6 +2914,9 @@ var _ = Describe("Processor with trackedUsers feature enabled", Ordered, func() { SourceID: SourceIDEnabledNoUT, }, + { + SourceID: SourceIDEnabledNoUT, + }, { SourceID: SourceIDEnabled, }, @@ -2555,6 +2926,12 @@ var _ = Describe("Processor with trackedUsers feature enabled", Ordered, func() { SourceID: SourceIDEnabledNoUT, }, + { + SourceID: SourceIDEnabledNoUT, + }, + { + SourceID: SourceIDEnabledNoUT, + }, } Setup(processor, c, false, true, false) processor.trackedUsersReporter = c.mockTrackedUsersReporter @@ -2723,7 +3100,7 @@ var _ = Describe("Processor", Ordered, func() { // this message should not be delivered to destination A "message-2": { id: "2", - jobid: 1010, + jobid: 1011, originalTimestamp: "2000-02-02T01:23:45", expectedOriginalTimestamp: "2000-02-02T01:23:45.000Z", expectedReceivedAt: "2001-01-02T02:23:45.000Z", @@ -2744,7 +3121,7 @@ var _ = Describe("Processor", Ordered, func() { // this message should be delivered to all destinations (default All value) "message-4": { id: "4", - jobid: 2010, + jobid: 2011, originalTimestamp: "2000-04-02T02:23:15.000Z", // missing sentAt expectedOriginalTimestamp: "2000-04-02T02:23:15.000Z", expectedReceivedAt: "2002-01-02T02:23:45.000Z", @@ -2754,7 +3131,7 @@ var _ = Describe("Processor", Ordered, func() { // this message should not be delivered to any destination "message-5": { id: "5", - jobid: 2010, + jobid: 2012, expectedReceivedAt: "2002-01-02T02:23:45.000Z", integrations: map[string]bool{"All": false}, params: map[string]string{"source_id": "enabled-source-no-ut", "source_job_run_id": "job_run_id_1", "source_task_run_id": "task_run_id_1"}, @@ -2771,11 +3148,28 @@ var _ = Describe("Processor", Ordered, func() { EventPayload: nil, EventCount: 1, LastJobStatus: jobsdb.JobStatusT{}, - Parameters: createBatchParameters(SourceIDEnabled), + Parameters: createBatchParameters(SourceIDEnabled), + }, + { + UUID: uuid.New(), + JobID: 1010, + CreatedAt: time.Date(2020, 0o4, 28, 23, 26, 0o0, 0o0, time.UTC), + ExpireAt: time.Date(2020, 0o4, 28, 23, 26, 0o0, 0o0, time.UTC), + CustomVal: gatewayCustomVal[0], + EventPayload: createBatchPayload( + WriteKeyEnabledNoUT, + "2001-01-02T02:23:45.000Z", + []mockEventData{ + messages["message-1"], + }, createMessagePayloadWithoutSources, + ), + EventCount: 2, + LastJobStatus: jobsdb.JobStatusT{}, + Parameters: createBatchParameters(SourceIDEnabledNoUT), }, { UUID: uuid.New(), - JobID: 1010, + JobID: 1011, CreatedAt: time.Date(2020, 0o4, 28, 23, 26, 0o0, 0o0, time.UTC), ExpireAt: time.Date(2020, 0o4, 28, 23, 26, 0o0, 0o0, time.UTC), CustomVal: gatewayCustomVal[0], @@ -2783,7 +3177,6 @@ var _ = Describe("Processor", Ordered, func() { WriteKeyEnabledNoUT, "2001-01-02T02:23:45.000Z", []mockEventData{ - messages["message-1"], messages["message-2"], }, createMessagePayloadWithoutSources, ), @@ -2824,12 +3217,44 @@ var _ = Describe("Processor", Ordered, func() { "2002-01-02T02:23:45.000Z", []mockEventData{ messages["message-3"], + }, + createMessagePayloadWithoutSources, + ), + EventCount: 1, + Parameters: createBatchParametersWithSources(SourceIDEnabledNoUT), + }, + { + UUID: uuid.New(), + JobID: 2011, + CreatedAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), + ExpireAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), + CustomVal: gatewayCustomVal[0], + EventPayload: createBatchPayload( + WriteKeyEnabledNoUT, + "2002-01-02T02:23:45.000Z", + []mockEventData{ messages["message-4"], + }, + createMessagePayloadWithoutSources, + ), + EventCount: 1, + Parameters: createBatchParametersWithSources(SourceIDEnabledNoUT), + }, + { + UUID: uuid.New(), + JobID: 2012, + CreatedAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), + ExpireAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), + CustomVal: gatewayCustomVal[0], + EventPayload: createBatchPayload( + WriteKeyEnabledNoUT, + "2002-01-02T02:23:45.000Z", + []mockEventData{ messages["message-5"], }, createMessagePayloadWithoutSources, ), - EventCount: 3, + EventCount: 1, Parameters: createBatchParametersWithSources(SourceIDEnabledNoUT), }, } @@ -2929,7 +3354,7 @@ var _ = Describe("Processor", Ordered, func() { TaskRunID: "task_run_id_1", SourceID: "enabled-source-no-ut", }, - rsources.Stats{Out: 1}, + rsources.Stats{Out: 3}, ).Times(1).Return(nil) c.mockArchivalDB.EXPECT(). @@ -2977,7 +3402,7 @@ var _ = Describe("Processor", Ordered, func() { // this message should not be delivered to destination B "message-2": { id: "2", - jobid: 1010, + jobid: 1011, originalTimestamp: "2000-02-02T01:23:45", expectedOriginalTimestamp: "2000-02-02T01:23:45.000Z", expectedReceivedAt: "2001-01-02T02:23:45.000Z", @@ -2996,7 +3421,7 @@ var _ = Describe("Processor", Ordered, func() { // this message should be delivered to all destinations (default All value) "message-4": { id: "4", - jobid: 2010, + jobid: 2011, originalTimestamp: "2000-04-02T02:23:15.000Z", // missing sentAt expectedOriginalTimestamp: "2000-04-02T02:23:15.000Z", expectedReceivedAt: "2002-01-02T02:23:45.000Z", @@ -3005,7 +3430,7 @@ var _ = Describe("Processor", Ordered, func() { // this message should not be delivered to any destination "message-5": { id: "5", - jobid: 2010, + jobid: 2012, expectedReceivedAt: "2002-01-02T02:23:45.000Z", integrations: map[string]bool{"All": false}, }, @@ -3034,6 +3459,23 @@ var _ = Describe("Processor", Ordered, func() { "2001-01-02T02:23:45.000Z", []mockEventData{ messages["message-1"], + }, + createMessagePayloadWithoutSources, + ), + EventCount: 1, + LastJobStatus: jobsdb.JobStatusT{}, + Parameters: createBatchParameters(SourceIDEnabledOnlyUT), + }, + { + UUID: uuid.New(), + JobID: 1011, + CreatedAt: time.Date(2020, 0o4, 28, 23, 26, 0o0, 0o0, time.UTC), + ExpireAt: time.Date(2020, 0o4, 28, 23, 26, 0o0, 0o0, time.UTC), + CustomVal: gatewayCustomVal[0], + EventPayload: createBatchPayload( + WriteKeyEnabledOnlyUT, + "2001-01-02T02:23:45.000Z", + []mockEventData{ messages["message-2"], }, createMessagePayloadWithoutSources, @@ -3073,7 +3515,39 @@ var _ = Describe("Processor", Ordered, func() { "2002-01-02T02:23:45.000Z", []mockEventData{ messages["message-3"], + }, + createMessagePayloadWithoutSources, + ), + EventCount: 1, + Parameters: createBatchParameters(SourceIDEnabledOnlyUT), + }, + { + UUID: uuid.New(), + JobID: 2011, + CreatedAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), + ExpireAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), + CustomVal: gatewayCustomVal[0], + EventPayload: createBatchPayload( + WriteKeyEnabledOnlyUT, + "2002-01-02T02:23:45.000Z", + []mockEventData{ messages["message-4"], + }, + createMessagePayloadWithoutSources, + ), + EventCount: 1, + Parameters: createBatchParameters(SourceIDEnabledOnlyUT), + }, + { + UUID: uuid.New(), + JobID: 2012, + CreatedAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), + ExpireAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), + CustomVal: gatewayCustomVal[0], + EventPayload: createBatchPayload( + WriteKeyEnabledOnlyUT, + "2002-01-02T02:23:45.000Z", + []mockEventData{ messages["message-5"], }, createMessagePayloadWithoutSources, @@ -3232,17 +3706,34 @@ var _ = Describe("Processor", Ordered, func() { "2001-01-02T02:23:45.000Z", []mockEventData{ messages["message-some-id-2"], - messages["message-some-id-1"], }, createMessagePayloadWithSameMessageId, ), - EventCount: 2, + EventCount: 1, LastJobStatus: jobsdb.JobStatusT{}, Parameters: createBatchParameters(SourceIDEnabled), }, { UUID: uuid.New(), JobID: 2010, + CreatedAt: time.Date(2020, 0o4, 28, 23, 26, 0o0, 0o0, time.UTC), + ExpireAt: time.Date(2020, 0o4, 28, 23, 26, 0o0, 0o0, time.UTC), + CustomVal: gatewayCustomVal[0], + EventPayload: createBatchPayload( + WriteKeyEnabled, + "2001-01-02T02:23:45.000Z", + []mockEventData{ + messages["message-some-id-1"], + }, + createMessagePayloadWithSameMessageId, + ), + EventCount: 1, + LastJobStatus: jobsdb.JobStatusT{}, + Parameters: createBatchParameters(SourceIDEnabled), + }, + { + UUID: uuid.New(), + JobID: 3010, CreatedAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), ExpireAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), CustomVal: gatewayCustomVal[0], @@ -3343,11 +3834,28 @@ var _ = Describe("Processor", Ordered, func() { "2001-01-02T02:23:45.000Z", []mockEventData{ messages["message-some-id-2"], + }, + createMessagePayloadWithSameMessageId, + ), + EventCount: 1, + LastJobStatus: jobsdb.JobStatusT{}, + Parameters: createBatchParameters(SourceIDEnabled), + }, + { + UUID: uuid.New(), + JobID: 1011, + CreatedAt: time.Date(2020, 0o4, 28, 23, 26, 0o0, 0o0, time.UTC), + ExpireAt: time.Date(2020, 0o4, 28, 23, 26, 0o0, 0o0, time.UTC), + CustomVal: gatewayCustomVal[0], + EventPayload: createBatchPayload( + WriteKeyEnabled, + "2001-01-02T02:23:45.000Z", + []mockEventData{ messages["message-some-id-1"], }, createMessagePayloadWithSameMessageId, ), - EventCount: 2, + EventCount: 1, LastJobStatus: jobsdb.JobStatusT{}, Parameters: createBatchParameters(SourceIDEnabled), }, @@ -3375,6 +3883,7 @@ var _ = Describe("Processor", Ordered, func() { callUnprocessed := c.mockGatewayJobsDB.EXPECT().GetUnprocessed(gomock.Any(), gomock.Any()).Return(jobsdb.JobsResult{Jobs: unprocessedJobsList}, nil).Times(1) c.MockDedup.EXPECT().GetBatch(gomock.Any()).Return(map[dedupTypes.KeyValue]bool{ {Key: "message-some-id", JobID: 1010, WorkspaceID: ""}: true, + {Key: "message-some-id", JobID: 1011, WorkspaceID: ""}: false, {Key: "message-some-id", JobID: 2010, WorkspaceID: ""}: true, }, nil).After(callUnprocessed).AnyTimes() c.MockDedup.EXPECT().Commit(gomock.Any()).Times(1) @@ -3385,7 +3894,7 @@ var _ = Describe("Processor", Ordered, func() { c.mockRouterJobsDB.EXPECT().WithStoreSafeTx(gomock.Any(), gomock.Any()).Do(func(ctx context.Context, f func(tx jobsdb.StoreSafeTx) error) { _ = f(jobsdb.EmptyStoreSafeTx()) }).Return(nil).Times(1) - callStoreRouter := c.mockRouterJobsDB.EXPECT().StoreInTx(gomock.Any(), gomock.Any(), gomock.Len(3)).Times(1) + callStoreRouter := c.mockRouterJobsDB.EXPECT().StoreInTx(gomock.Any(), gomock.Any(), gomock.Len(2)).Times(1) c.mockArchivalDB.EXPECT().WithStoreSafeTx(gomock.Any(), gomock.Any()).AnyTimes().Do(func(ctx context.Context, f func(tx jobsdb.StoreSafeTx) error) { _ = f(jobsdb.EmptyStoreSafeTx()) @@ -3428,7 +3937,7 @@ var _ = Describe("Processor", Ordered, func() { // this message should not be delivered to destination A "message-2": { id: "2", - jobid: 1010, + jobid: 1011, originalTimestamp: "2000-02-02T01:23:45", expectedOriginalTimestamp: "2000-02-02T01:23:45.000Z", expectedReceivedAt: "2001-01-02T02:23:45.000Z", @@ -3449,7 +3958,7 @@ var _ = Describe("Processor", Ordered, func() { // this message should be delivered to all destinations (default All value) "message-4": { id: "4", - jobid: 2010, + jobid: 2011, originalTimestamp: "2000-04-02T02:23:15.000Z", // missing sentAt expectedOriginalTimestamp: "2000-04-02T02:23:15.000Z", expectedReceivedAt: "2002-01-02T02:23:45.000Z", @@ -3459,7 +3968,7 @@ var _ = Describe("Processor", Ordered, func() { // this message should not be delivered to any destination "message-5": { id: "5", - jobid: 2010, + jobid: 2012, expectedReceivedAt: "2002-01-02T02:23:45.000Z", integrations: map[string]bool{"All": false}, params: map[string]string{"source_id": "enabled-source-no-ut", "source_job_run_id": "job_run_id_1", "source_task_run_id": "task_run_id_1"}, @@ -3489,6 +3998,22 @@ var _ = Describe("Processor", Ordered, func() { "2001-01-02T02:23:45.000Z", []mockEventData{ messages["message-1"], + }, createMessagePayloadWithoutSources, + ), + EventCount: 2, + LastJobStatus: jobsdb.JobStatusT{}, + Parameters: createBatchParameters(SourceIDEnabledNoUT), + }, + { + UUID: uuid.New(), + JobID: 1011, + CreatedAt: time.Date(2020, 0o4, 28, 23, 26, 0o0, 0o0, time.UTC), + ExpireAt: time.Date(2020, 0o4, 28, 23, 26, 0o0, 0o0, time.UTC), + CustomVal: gatewayCustomVal[0], + EventPayload: createBatchPayload( + WriteKeyEnabledNoUT, + "2001-01-02T02:23:45.000Z", + []mockEventData{ messages["message-2"], }, createMessagePayloadWithoutSources, ), @@ -3529,7 +4054,39 @@ var _ = Describe("Processor", Ordered, func() { "2002-01-02T02:23:45.000Z", []mockEventData{ messages["message-3"], + }, + createMessagePayloadWithoutSources, + ), + EventCount: 3, + Parameters: createBatchParametersWithSources(SourceIDEnabledNoUT), + }, + { + UUID: uuid.New(), + JobID: 2011, + CreatedAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), + ExpireAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), + CustomVal: gatewayCustomVal[0], + EventPayload: createBatchPayload( + WriteKeyEnabledNoUT, + "2002-01-02T02:23:45.000Z", + []mockEventData{ messages["message-4"], + }, + createMessagePayloadWithoutSources, + ), + EventCount: 3, + Parameters: createBatchParametersWithSources(SourceIDEnabledNoUT), + }, + { + UUID: uuid.New(), + JobID: 2012, + CreatedAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), + ExpireAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), + CustomVal: gatewayCustomVal[0], + EventPayload: createBatchPayload( + WriteKeyEnabledNoUT, + "2002-01-02T02:23:45.000Z", + []mockEventData{ messages["message-5"], }, createMessagePayloadWithoutSources, @@ -3634,7 +4191,7 @@ var _ = Describe("Processor", Ordered, func() { TaskRunID: "task_run_id_1", SourceID: "enabled-source-no-ut", }, - rsources.Stats{Out: 1}, + rsources.Stats{Out: 3}, ).Times(1).Return(nil) c.mockArchivalDB.EXPECT(). @@ -3682,7 +4239,7 @@ var _ = Describe("Processor", Ordered, func() { // this message should not be delivered to destination B "message-2": { id: "2", - jobid: 1010, + jobid: 1011, originalTimestamp: "2000-02-02T01:23:45", expectedOriginalTimestamp: "2000-02-02T01:23:45.000Z", expectedReceivedAt: "2001-01-02T02:23:45.000Z", @@ -3701,7 +4258,7 @@ var _ = Describe("Processor", Ordered, func() { // this message should be delivered to all destinations (default All value) "message-4": { id: "4", - jobid: 2010, + jobid: 2011, originalTimestamp: "2000-04-02T02:23:15.000Z", // missing sentAt expectedOriginalTimestamp: "2000-04-02T02:23:15.000Z", expectedReceivedAt: "2002-01-02T02:23:45.000Z", @@ -3710,7 +4267,7 @@ var _ = Describe("Processor", Ordered, func() { // this message should not be delivered to any destination "message-5": { id: "5", - jobid: 2010, + jobid: 2012, expectedReceivedAt: "2002-01-02T02:23:45.000Z", integrations: map[string]bool{"All": false}, }, @@ -3739,6 +4296,23 @@ var _ = Describe("Processor", Ordered, func() { "2001-01-02T02:23:45.000Z", []mockEventData{ messages["message-1"], + }, + createMessagePayloadWithoutSources, + ), + EventCount: 1, + LastJobStatus: jobsdb.JobStatusT{}, + Parameters: createBatchParameters(SourceIDEnabledOnlyUT), + }, + { + UUID: uuid.New(), + JobID: 1011, + CreatedAt: time.Date(2020, 0o4, 28, 23, 26, 0o0, 0o0, time.UTC), + ExpireAt: time.Date(2020, 0o4, 28, 23, 26, 0o0, 0o0, time.UTC), + CustomVal: gatewayCustomVal[0], + EventPayload: createBatchPayload( + WriteKeyEnabledOnlyUT, + "2001-01-02T02:23:45.000Z", + []mockEventData{ messages["message-2"], }, createMessagePayloadWithoutSources, @@ -3778,7 +4352,39 @@ var _ = Describe("Processor", Ordered, func() { "2002-01-02T02:23:45.000Z", []mockEventData{ messages["message-3"], + }, + createMessagePayloadWithoutSources, + ), + EventCount: 1, + Parameters: createBatchParameters(SourceIDEnabledOnlyUT), + }, + { + UUID: uuid.New(), + JobID: 2011, + CreatedAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), + ExpireAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), + CustomVal: gatewayCustomVal[0], + EventPayload: createBatchPayload( + WriteKeyEnabledOnlyUT, + "2002-01-02T02:23:45.000Z", + []mockEventData{ messages["message-4"], + }, + createMessagePayloadWithoutSources, + ), + EventCount: 1, + Parameters: createBatchParameters(SourceIDEnabledOnlyUT), + }, + { + UUID: uuid.New(), + JobID: 2012, + CreatedAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), + ExpireAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), + CustomVal: gatewayCustomVal[0], + EventPayload: createBatchPayload( + WriteKeyEnabledOnlyUT, + "2002-01-02T02:23:45.000Z", + []mockEventData{ messages["message-5"], }, createMessagePayloadWithoutSources, @@ -4050,7 +4656,7 @@ var _ = Describe("Processor", Ordered, func() { "message-1": { id: "1", jobid: 1010, - originalTimestamp: "2000-01-02T01:23:45", + originalTimestamp: "2000-01-02T01:23:45.000Z", expectedOriginalTimestamp: "2000-01-02T01:23:45.000Z", sentAt: "2000-01-02 01:23", expectedSentAt: "2000-01-02T01:23:00.000Z", @@ -4060,7 +4666,7 @@ var _ = Describe("Processor", Ordered, func() { "message-2": { id: "2", jobid: 1011, - originalTimestamp: "2000-01-02T01:23:45", + originalTimestamp: "2000-01-02T01:23:45.000Z", expectedOriginalTimestamp: "2000-01-02T01:23:45.000Z", sentAt: "2000-01-02 01:23", expectedSentAt: "2000-01-02T01:23:00.000Z", @@ -4081,6 +4687,22 @@ var _ = Describe("Processor", Ordered, func() { "2001-01-02T02:23:45.000Z", []mockEventData{ messages["message-1"], + }, + createMessagePayload, + ), + LastJobStatus: jobsdb.JobStatusT{}, + Parameters: createBatchParameters(SourceIDEnabled), + }, + { + UUID: uuid.New(), + JobID: 1011, + CreatedAt: time.Date(2020, 0o4, 28, 23, 26, 0o0, 0o0, time.UTC), + ExpireAt: time.Date(2020, 0o4, 28, 23, 26, 0o0, 0o0, time.UTC), + CustomVal: gatewayCustomVal[0], + EventPayload: createBatchPayload( + WriteKeyEnabled, + "2001-01-02T02:23:45.000Z", + []mockEventData{ messages["message-2"], }, createMessagePayload, @@ -4924,9 +5546,21 @@ var _ = Describe("Processor", Ordered, func() { ReceivedAt: time.Now(), } eventsByMessageID := map[string]types.SingularEventWithReceivedAtWithPayloadFunc{ - "msg1": {SingularEventWithReceivedAt: singularEventWithReceivedAt1, PayloadFunc: nil}, - "msg2": {SingularEventWithReceivedAt: singularEventWithReceivedAt2, PayloadFunc: nil}, - "msg3": {SingularEventWithReceivedAt: singularEventWithReceivedAt3, PayloadFunc: nil}, + "msg1": {SingularEventWithReceivedAt: singularEventWithReceivedAt1, PayloadFunc: func() json.RawMessage { + b, err := json.Marshal(event1) + Expect(err).To(BeNil()) + return b + }}, + "msg2": {SingularEventWithReceivedAt: singularEventWithReceivedAt2, PayloadFunc: func() json.RawMessage { + b, err := json.Marshal(event2) + Expect(err).To(BeNil()) + return b + }}, + "msg3": {SingularEventWithReceivedAt: singularEventWithReceivedAt3, PayloadFunc: func() json.RawMessage { + b, err := json.Marshal(event3) + Expect(err).To(BeNil()) + return b + }}, } metadata1 := commonMetadata metadata1.MessageID = "msg1"