Skip to content

Commit

Permalink
RavenDB-23249 - add support for time series indexes
Browse files Browse the repository at this point in the history
  • Loading branch information
grisha-kotler committed Jan 16, 2025
1 parent 2464309 commit 0c1aec5
Show file tree
Hide file tree
Showing 3 changed files with 205 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,12 @@ public MapReduceTimeSeriesIndex(MapReduceIndexDefinition definition, AbstractSta
foreach (var collection in _compiled.ReferencedCollections)
{
foreach (var referencedCollection in collection.Value)
{
_referencedCollections.Add(referencedCollection.Name);

if (referencedCollection.Name == Constants.Documents.Collections.AllDocumentsCollection)
HandleAllDocs = true;
}
}
}

Expand Down Expand Up @@ -143,7 +148,7 @@ protected override void HandleDocumentChange(DocumentChange change)
return;
}

if (_referencedCollections.Contains(change.CollectionName))
if (HandleAllDocs || _referencedCollections.Contains(change.CollectionName))
{
_mre.Set();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,12 @@ private MapTimeSeriesIndex(MapIndexDefinition definition, AbstractStaticIndexBas
foreach (var collection in _compiled.ReferencedCollections)
{
foreach (var referencedCollection in collection.Value)
{
_referencedCollections.Add(referencedCollection.Name);

if (referencedCollection.Name == Constants.Documents.Collections.AllDocumentsCollection)
HandleAllDocs = true;
}
}
}

Expand Down Expand Up @@ -83,7 +88,7 @@ protected override void HandleDocumentChange(DocumentChange change)
return;
}

if (_referencedCollections.Contains(change.CollectionName))
if (HandleAllDocs || _referencedCollections.Contains(change.CollectionName))
{
_mre.Set();
}
Expand Down Expand Up @@ -160,6 +165,9 @@ public override bool WorksOnAnyCollection(HashSet<string> collections)
if (_referencedCollections == null)
return false;

if (HandleAllDocs)
return true;

return _referencedCollections.Overlaps(collections);
}

Expand Down
190 changes: 190 additions & 0 deletions test/SlowTests/Issues/RavenDB-23249.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using Raven.Client.Documents;
using Raven.Client.Documents.Indexes;
using Raven.Client.Documents.Indexes.Counters;
using Raven.Client.Documents.Indexes.TimeSeries;
using Tests.Infrastructure;
using Xunit;
using Xunit.Abstractions;
Expand Down Expand Up @@ -258,6 +259,144 @@ public async Task CanIndexAllDocumentsReferencesMapReduceCountersIndex()
}
}

[RavenFact(RavenTestCategory.Indexes)]
public async Task CanIndexAllDocumentsReferencesMapTimeSeriesIndex()
{
using (var store = GetDocumentStore())
{
var company = new Company
{
Id = CommonName,
Name = CompanyName1
};

await new MapTimeSeriesIndex().ExecuteAsync(store);

using (var session = store.OpenAsyncSession())
{
await session.StoreAsync(company);
await session.SaveChangesAsync();
}

using (var bulk = store.BulkInsert())
{
var baseDate = DateTime.UtcNow;

for (var i = 0; i < EmployeesCount; i++)
{
var employee = new Employee();
await bulk.StoreAsync(employee);

using (var ts = bulk.TimeSeriesFor(employee.Id, CommonName))
{
await ts.AppendAsync(baseDate, 1, company.Id);
}
}
}

await Indexes.WaitForIndexingAsync(store, timeout: TimeSpan.FromMinutes(3));

using (var session = store.OpenAsyncSession())
{
var count = await session.Query<MapTimeSeriesIndex.Result, MapTimeSeriesIndex>()
.Where(x => x.CompanyName == CompanyName1).CountAsync();

Assert.Equal(EmployeesCount, count);
}

WaitForUserToContinueTheTest(store);

using (var session = store.OpenAsyncSession())
{
session.Advanced.WaitForIndexesAfterSaveChanges();

company.Name = CompanyName2;
await session.StoreAsync(company, company.Id);
await session.SaveChangesAsync();
}

using (var session = store.OpenAsyncSession())
{
var count = await session.Query<MapTimeSeriesIndex.Result, MapTimeSeriesIndex>()
.Where(x => x.CompanyName == CompanyName1).CountAsync();

Assert.Equal(0, count);

count = await session.Query<MapTimeSeriesIndex.Result, MapTimeSeriesIndex>()
.Where(x => x.CompanyName == CompanyName2).CountAsync();

Assert.Equal(EmployeesCount, count);
}
}
}

[RavenFact(RavenTestCategory.Indexes)]
public async Task CanIndexAllDocumentsReferencesMapReduceTimeSeriesIndex()
{
using (var store = GetDocumentStore())
{
var company = new Company
{
Name = CompanyName1
};

await new MapReduceTimeSeriesIndex().ExecuteAsync(store);

using (var session = store.OpenAsyncSession())
{
await session.StoreAsync(company);
await session.SaveChangesAsync();
}

using (var bulk = store.BulkInsert())
{
var baseDate = DateTime.UtcNow;

for (var i = 0; i < EmployeesCount; i++)
{
var employee = new Employee();
await bulk.StoreAsync(employee);

using (var ts = bulk.TimeSeriesFor(employee.Id, CommonName))
{
await ts.AppendAsync(baseDate, 1, company.Id);
}
}
}

await Indexes.WaitForIndexingAsync(store, timeout: TimeSpan.FromMinutes(3));

WaitForUserToContinueTheTest(store);

using (var session = store.OpenAsyncSession())
{
var result = await session.Query<MapReduceTimeSeriesIndex.Result, MapReduceTimeSeriesIndex>().ToListAsync();

Assert.Equal(1, result.Count);
Assert.Equal(CompanyName1, result[0].CompanyName);
Assert.Equal(EmployeesCount, result[0].Count);
}

using (var session = store.OpenAsyncSession())
{
session.Advanced.WaitForIndexesAfterSaveChanges();

company.Name = CompanyName2;
await session.StoreAsync(company, company.Id);
await session.SaveChangesAsync();
}

using (var session = store.OpenAsyncSession())
{
var result = await session.Query<MapReduceTimeSeriesIndex.Result, MapReduceTimeSeriesIndex>().ToListAsync();

Assert.Equal(1, result.Count);
Assert.Equal(CompanyName2, result[0].CompanyName);
Assert.Equal(EmployeesCount, result[0].Count);
}
}
}

private class Company
{
public string Id { get; set; }
Expand Down Expand Up @@ -366,4 +505,55 @@ group r by r.CompanyName into g
};
}
}

private class MapTimeSeriesIndex : AbstractTimeSeriesIndexCreationTask<Employee>
{
public class Result
{
public string CompanyName { get; set; }
}

public MapTimeSeriesIndex()
{
AddMap(
CommonName,
timeSeries => from ts in timeSeries
from entry in ts.Entries
select new Result
{
CompanyName = LoadDocument<Company>(entry.Tag, Constants.Documents.Collections.AllDocumentsCollection).Name
});
}
}

private class MapReduceTimeSeriesIndex : AbstractTimeSeriesIndexCreationTask<Employee, MapReduceTimeSeriesIndex.Result>
{
public class Result
{
public string CompanyName { get; set; }

public int Count { get; set; }
}

public MapReduceTimeSeriesIndex()
{
AddMap(
CommonName,
timeSeries => from ts in timeSeries
from entry in ts.Entries
select new Result
{
CompanyName = LoadDocument<Company>(entry.Tag, Constants.Documents.Collections.AllDocumentsCollection).Name,
Count = 1
});

Reduce = results => from r in results
group r by r.CompanyName into g
select new Result
{
CompanyName = g.Key,
Count = g.Sum(x => x.Count)
};
}
}
}

0 comments on commit 0c1aec5

Please sign in to comment.