Skip to content

Commit

Permalink
RavenDB-22983 - add compressed bjro support to document &drop Transac…
Browse files Browse the repository at this point in the history
…tionForgetAboutEnumerator & ForgetAbout
  • Loading branch information
garayx committed Nov 12, 2024
1 parent 4ff452d commit ec49035
Show file tree
Hide file tree
Showing 24 changed files with 138 additions and 141 deletions.
2 changes: 0 additions & 2 deletions src/Raven.Server/Documents/CollectionRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,6 @@ protected async Task<IOperationResult> ExecuteOperation(string collectionName, l

startAfterId = document.Id;
ids.Enqueue(document.Id);

context.Transaction.ForgetAbout(document);
}
}
}
Expand Down
26 changes: 26 additions & 0 deletions src/Raven.Server/Documents/Document.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,32 @@

namespace Raven.Server.Documents
{
public sealed unsafe class CompressedBlittableJsonReaderObject : BlittableJsonReaderObject
{
public DocumentsOperationContext DocsContext;
private long StorageId;
public CompressedBlittableJsonReaderObject(byte* mem, int size, DocumentsOperationContext context, long storageId, UnmanagedWriteBuffer buffer = default(UnmanagedWriteBuffer))
: base(mem, size, context, buffer)
{
DocsContext = context;
StorageId = storageId;
}

public override void Dispose()
{
//if (DocsContext == null)
// return;
if (DocsContext.Transaction == null)
return;
if (DocsContext.Transaction.InnerTransaction == null)
return;

DocsContext.Transaction.InnerTransaction.ForgetAbout(StorageId);

base.Dispose();
}
}

public class TimeSeriesStream
{
public IEnumerable<DynamicJsonValue> TimeSeries;
Expand Down
61 changes: 57 additions & 4 deletions src/Raven.Server/Documents/DocumentsStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1564,28 +1564,81 @@ public void AssertMetadataKey(string id, BlittableJsonReaderObject document, Doc
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static Document ParseDocument(JsonOperationContext context, ref TableValueReader tvr, DocumentFields fields)
private static Document ParseDocument(DocumentsOperationContext context, ref TableValueReader tvr, DocumentFields fields)
{
if (fields == DocumentFields.All)
{
if (context.Transaction.InnerTransaction.IsCompressed(tvr.Id) == false)
return ParseRawDataDocument(context, ref tvr, fields);

return new Document
{
StorageId = tvr.Id,
LowerId = TableValueToString(context, (int)DocumentsTable.LowerId, ref tvr),
Id = TableValueToId(context, (int)DocumentsTable.Id, ref tvr),
Etag = TableValueToEtag((int)DocumentsTable.Etag, ref tvr),
Data = new BlittableJsonReaderObject(tvr.Read((int)DocumentsTable.Data, out int size), size, context),
Data = new CompressedBlittableJsonReaderObject(tvr.Read((int)DocumentsTable.Data, out int size), size, context, tvr.Id),
ChangeVector = TableValueToChangeVector(context, (int)DocumentsTable.ChangeVector, ref tvr),
LastModified = TableValueToDateTime((int)DocumentsTable.LastModified, ref tvr),
Flags = TableValueToFlags((int)DocumentsTable.Flags, ref tvr),
TransactionMarker = TableValueToShort((int)DocumentsTable.TransactionMarker, nameof(DocumentsTable.TransactionMarker), ref tvr),
};
}

if (context.Transaction.InnerTransaction.IsCompressed(tvr.Id) == false)
return ParseRawDataDocument(context, ref tvr, fields);

return ParseDocumentPartial(context, ref tvr, fields);
}

private static Document ParseDocumentPartial(JsonOperationContext context, ref TableValueReader tvr, DocumentFields fields)
private static Document ParseDocumentPartial(DocumentsOperationContext context, ref TableValueReader tvr, DocumentFields fields)
{
var result = new Document();

if (fields.Contain(DocumentFields.LowerId))
result.LowerId = TableValueToString(context, (int)DocumentsTable.LowerId, ref tvr);

if (fields.Contain(DocumentFields.Id))
result.Id = TableValueToId(context, (int)DocumentsTable.Id, ref tvr);

if (fields.Contain(DocumentFields.Data))
result.Data = new CompressedBlittableJsonReaderObject(tvr.Read((int)DocumentsTable.Data, out int size), size, context, tvr.Id);

if (fields.Contain(DocumentFields.ChangeVector))
result.ChangeVector = TableValueToChangeVector(context, (int)DocumentsTable.ChangeVector, ref tvr);

result.Etag = TableValueToEtag((int)DocumentsTable.Etag, ref tvr);
result.LastModified = TableValueToDateTime((int)DocumentsTable.LastModified, ref tvr);
result.Flags = TableValueToFlags((int)DocumentsTable.Flags, ref tvr);
result.StorageId = tvr.Id;
result.TransactionMarker = TableValueToShort((int)DocumentsTable.TransactionMarker, nameof(DocumentsTable.TransactionMarker), ref tvr);

return result;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static Document ParseRawDataDocument(JsonOperationContext context, ref TableValueReader tvr, DocumentFields fields)
{
if (fields == DocumentFields.All)
{
return new Document
{
StorageId = tvr.Id,
LowerId = TableValueToString(context, (int)DocumentsTable.LowerId, ref tvr),
Id = TableValueToId(context, (int)DocumentsTable.Id, ref tvr),
Etag = TableValueToEtag((int)DocumentsTable.Etag, ref tvr),
Data = new BlittableJsonReaderObject(tvr.Read((int)DocumentsTable.Data, out int size/*, out var compressed*/), size, context/*, compressed*/),
ChangeVector = TableValueToChangeVector(context, (int)DocumentsTable.ChangeVector, ref tvr),
LastModified = TableValueToDateTime((int)DocumentsTable.LastModified, ref tvr),
Flags = TableValueToFlags((int)DocumentsTable.Flags, ref tvr),
TransactionMarker = TableValueToShort((int)DocumentsTable.TransactionMarker, nameof(DocumentsTable.TransactionMarker), ref tvr),
};
}

return ParseRawDataDocumentPartial(context, ref tvr, fields);
}

private static Document ParseRawDataDocumentPartial(JsonOperationContext context, ref TableValueReader tvr, DocumentFields fields)
{
var result = new Document();

Expand Down Expand Up @@ -1616,7 +1669,7 @@ public static Document ParseRawDataSectionDocumentWithValidation(JsonOperationCo
if (size > expectedSize || size <= 0)
throw new ArgumentException("Data size is invalid, possible corruption when parsing BlittableJsonReaderObject", nameof(size));

return ParseDocument(context, ref tvr, DocumentFields.All);
return ParseRawDataDocument(context, ref tvr, DocumentFields.All);
}

public static Tombstone TableValueToTombstone(JsonOperationContext context, ref TableValueReader tvr)
Expand Down
6 changes: 3 additions & 3 deletions src/Raven.Server/Documents/DocumentsTransaction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,11 @@ public void AddToCache(string collectionName, CollectionName name)
_collectionCache.Add(collectionName, name);
}

public void ForgetAbout(Document doc)
public bool IsCompressed(Document doc)
{
if (doc == null)
return;
InnerTransaction.ForgetAbout(doc.StorageId);
return false;
return InnerTransaction.IsCompressed(doc.StorageId);
}

internal void CheckIfShouldDeleteAttachmentStream(Slice hash)
Expand Down
5 changes: 0 additions & 5 deletions src/Raven.Server/Documents/Expiration/ExpirationStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ private Queue<DocumentExpirationInfo> GetDocuments(ExpiredDocumentsOptions optio

expired.Enqueue(new DocumentExpirationInfo(ticksAsSlice, clonedId, document.Id));
totalCount++;
options.Context.Transaction.ForgetAbout(document);
}
}
catch (DocumentConflictException)
Expand Down Expand Up @@ -273,8 +272,6 @@ public int DeleteDocumentsExpiration(DocumentsOperationContext context, Queue<Do
_database.DocumentsStorage.Delete(context, documentInfo.LowerId, documentInfo.Id, expectedChangeVector: null);
}
}

context.Transaction.ForgetAbout(doc);
}
}
catch (DocumentConflictException)
Expand Down Expand Up @@ -348,8 +345,6 @@ public int RefreshDocuments(DocumentsOperationContext context, Queue<ExpirationS
}
}
}

context.Transaction.ForgetAbout(doc);
}

refreshCount++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public async Task StreamDocsGet()
initialState.Skip = new Reference<long>();
}

var documentsEnumerator = new TransactionForgetAboutDocumentEnumerator(new PulsedTransactionEnumerator<Document, DocsStreamingIterationState>(context, state =>
var documentsEnumerator = new PulsedTransactionEnumerator<Document, DocsStreamingIterationState>(context, state =>
{
if (string.IsNullOrEmpty(state.StartsWith) == false)
{
Expand All @@ -61,11 +61,12 @@ public async Task StreamDocsGet()
}

if (state.LastIteratedEtag != null)
return Database.DocumentsStorage.GetDocumentsInReverseEtagOrderFrom(context, state.LastIteratedEtag.Value, state.Take, skip: 1); // we seek to LastIteratedEtag but skip 1 item because we iterated it already
return Database.DocumentsStorage.GetDocumentsInReverseEtagOrderFrom(context, state.LastIteratedEtag.Value, state.Take,
skip: 1); // we seek to LastIteratedEtag but skip 1 item because we iterated it already

return Database.DocumentsStorage.GetDocumentsInReverseEtagOrder(context, state.Start, state.Take);
},
initialState), context);
initialState);

using (var token = CreateHttpRequestBoundOperationToken())
await using (var writer = GetLoadDocumentsResultsWriter(format, context, ResponseBodyStream()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ public bool MoveNext(DocumentsOperationContext ctx, out IEnumerable resultsOfCur
{
using (_documentReadStats.Start())
{
ctx.Transaction.ForgetAbout(_results[0]);
_results[0]?.Dispose();

var moveNext = _itemsEnumerator.MoveNext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,6 @@ public bool MoveNext(DocumentsOperationContext ctx, out IEnumerable resultsOfCur
{
using (_documentReadStats?.Start())
{
if(Current is DocumentIndexItem di && di.Item is Document d)
ctx.Transaction.ForgetAbout(d);

Current?.Dispose();
etag = null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,9 +418,6 @@ private IEnumerable<IndexItem> GetItemsFromCollectionThatReference(QueryOperatio

void DisposeItem()
{
if (item.Item is Document doc)
queryContext.Documents.Transaction.ForgetAbout(doc);

item.Dispose();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,6 @@ private void CountDocumentsInEnumerator(bool countQuery)
_skippedResults.Value++;
}
}

_context.Transaction.ForgetAbout(doc);
}
}

Expand Down Expand Up @@ -511,7 +509,6 @@ void ReleaseDocument()
if (document != null)
{
document.Dispose();
_context.Transaction.ForgetAbout(document);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ private async ValueTask ExecuteCollectionQueryAsync(QueryResultServerSide<Docume
}
else
{
enumerator = new TransactionForgetAboutDocumentEnumerator(new PulsedTransactionEnumerator<Document, CollectionQueryResultsIterationState>(context.Documents,
enumerator = new PulsedTransactionEnumerator<Document, CollectionQueryResultsIterationState>(context.Documents,
state =>
{
query.Start = state.Start;
Expand All @@ -176,7 +176,7 @@ private async ValueTask ExecuteCollectionQueryAsync(QueryResultServerSide<Docume
{
Start = query.Start,
Take = query.PageSize
}), context.Documents);
});
}

IncludeCountersCommand includeCountersCommand = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ public override async ValueTask AddResultAsync(Document result, CancellationToke
using (result)
await GetWriter().AddResultAsync(result, token).ConfigureAwait(false);

_context.Transaction.InnerTransaction.ForgetAbout(result.StorageId);

GetToken().Delay();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,8 @@ public ForgetAboutDecompressionBuffer(Document doc, DocumentsOperationContext co

public void Dispose()
{
_context.Transaction.ForgetAbout(_doc);
// TODO: egor
//_context.Transaction.ForgetAbout(_doc);
}
}
}
Expand Down
13 changes: 5 additions & 8 deletions src/Raven.Server/Documents/Revisions/RevisionsStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1246,7 +1246,7 @@ private IEnumerable<Document> GetRevisionsForConflict(

if (state.ShouldDelete(revision) == false)
{
context.Transaction.ForgetAbout(revision);
//context.Transaction.ForgetAbout(revision);
revision.Dispose();
result.Skip++;
continue;
Expand Down Expand Up @@ -1275,8 +1275,6 @@ private long GetConflictRevisionsCount(
{
if (revision.Flags.Contain(DocumentFlags.Conflicted) || revision.Flags.Contain(DocumentFlags.Resolved))
conflictCount++;

context.Transaction.ForgetAbout(revision);
}
}

Expand Down Expand Up @@ -1306,7 +1304,6 @@ private IEnumerable<Document> GetAllRevisions(DocumentsOperationContext context,

if (skipForceCreated && revision.Flags.Contain(DocumentFlags.ForceCreated))
{
context.Transaction.ForgetAbout(revision);
revision.Dispose();
result.Skip++;
continue;
Expand Down Expand Up @@ -2616,7 +2613,7 @@ private bool LastRevision(DocumentsOperationContext context, string collection,
}
}

internal static unsafe Document TableValueToRevision(JsonOperationContext context, ref TableValueReader tvr, DocumentFields fields = DocumentFields.All)
internal static unsafe Document TableValueToRevision(DocumentsOperationContext context, ref TableValueReader tvr, DocumentFields fields = DocumentFields.All)
{
if (fields == DocumentFields.All)
{
Expand All @@ -2630,14 +2627,14 @@ internal static unsafe Document TableValueToRevision(JsonOperationContext contex
Flags = TableValueToFlags((int)RevisionsTable.Flags, ref tvr),
TransactionMarker = TableValueToShort((int)RevisionsTable.TransactionMarker, nameof(RevisionsTable.TransactionMarker), ref tvr),
ChangeVector = TableValueToChangeVector(context, (int)RevisionsTable.ChangeVector, ref tvr),
Data = new BlittableJsonReaderObject(tvr.Read((int)RevisionsTable.Document, out var size), size, context)
Data = new CompressedBlittableJsonReaderObject(tvr.Read((int)RevisionsTable.Document, out var size), size, context, tvr.Id)
};
}

return ParseRevisionPartial(context, ref tvr, fields);
}

private static unsafe Document ParseRevisionPartial(JsonOperationContext context, ref TableValueReader tvr, DocumentFields fields)
private static unsafe Document ParseRevisionPartial(DocumentsOperationContext context, ref TableValueReader tvr, DocumentFields fields)
{
var result = new Document();

Expand All @@ -2648,7 +2645,7 @@ private static unsafe Document ParseRevisionPartial(JsonOperationContext context
result.Id = TableValueToId(context, (int)RevisionsTable.Id, ref tvr);

if (fields.Contain(DocumentFields.Data))
result.Data = new BlittableJsonReaderObject(tvr.Read((int)RevisionsTable.Document, out var size), size, context);
result.Data = new CompressedBlittableJsonReaderObject(tvr.Read((int)RevisionsTable.Document, out var size), size, context, tvr.Id);

if (fields.Contain(DocumentFields.ChangeVector))
result.ChangeVector = TableValueToChangeVector(context, (int)RevisionsTable.ChangeVector, ref tvr);
Expand Down
20 changes: 10 additions & 10 deletions src/Raven.Server/Documents/Subscriptions/SubscriptionFetcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,20 +102,20 @@ public RevisionSubscriptionFetcher(DocumentDatabase database, SubscriptionConnec
{
return Collection switch
{
Constants.Documents.Collections.AllDocumentsCollection => new TransactionForgetAboutCurrentPreviousRevisionEnumerator(
Constants.Documents.Collections.AllDocumentsCollection =>
Database.DocumentsStorage.RevisionsStorage.GetCurrentAndPreviousRevisionsForSubscriptionsFrom(DocsContext, StartEtag + 1, 0, long.MaxValue)
.GetEnumerator(), DocsContext),
_ => new TransactionForgetAboutCurrentPreviousRevisionEnumerator(
.GetEnumerator(),
_ =>
Database.DocumentsStorage.RevisionsStorage
.GetCurrentAndPreviousRevisionsForSubscriptionsFrom(DocsContext, new CollectionName(Collection), StartEtag + 1, long.MaxValue)
.GetEnumerator(), DocsContext)
.GetEnumerator()
};
}

protected override IEnumerator<(Document Previous, Document Current)> FetchFromResend()
{
return new TransactionForgetAboutCurrentPreviousRevisionEnumerator(SubscriptionConnectionsState.GetRevisionsFromResend(Database, ClusterContext, DocsContext, Active)
.GetEnumerator(), DocsContext);
return SubscriptionConnectionsState.GetRevisionsFromResend(Database, ClusterContext, DocsContext, Active)
.GetEnumerator();
}
}

Expand All @@ -131,16 +131,16 @@ protected override IEnumerator<Document> FetchByEtag()
return Collection switch
{
Constants.Documents.Collections.AllDocumentsCollection =>
new TransactionForgetAboutDocumentEnumerator(Database.DocumentsStorage.GetDocumentsFrom(DocsContext, StartEtag + 1, 0, long.MaxValue)
.GetEnumerator(), DocsContext),
Database.DocumentsStorage.GetDocumentsFrom(DocsContext, StartEtag + 1, 0, long.MaxValue)
.GetEnumerator(),
_ =>
new TransactionForgetAboutDocumentEnumerator(Database.DocumentsStorage.GetDocumentsFrom(
Database.DocumentsStorage.GetDocumentsFrom(
DocsContext,
Collection,
StartEtag + 1,
0,
long.MaxValue)
.GetEnumerator(), DocsContext)
.GetEnumerator()
};
}

Expand Down
Loading

0 comments on commit ec49035

Please sign in to comment.