Skip to content

Commit

Permalink
Merge pull request #442 from JetBrains/memory-leak-usov
Browse files Browse the repository at this point in the history
Fix memory leak
  • Loading branch information
Iliya-usov authored Oct 19, 2023
2 parents 7634b15 + d229513 commit c5b0aa0
Show file tree
Hide file tree
Showing 15 changed files with 52 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,15 @@ sealed class Lifetime {
* Creates an intersection of some lifetimes: new lifetime that terminate when either one terminates.
* Created lifetime inherits the smallest [terminationTimeoutKind]
*/
fun intersect(lifetime1: Lifetime, lifetime2: Lifetime): Lifetime = defineIntersection(lifetime1, lifetime2).lifetime
fun intersect(lifetime1: Lifetime, lifetime2: Lifetime): Lifetime {
if (lifetime1 === lifetime2 || lifetime2.isEternal)
return lifetime1

if (lifetime1.isEternal)
return lifetime2

return defineIntersection(lifetime1, lifetime2).lifetime
}

/**
* Creates an intersection of some lifetimes: new lifetime that terminate when either one terminates.
Expand Down Expand Up @@ -642,7 +650,8 @@ operator fun Lifetime.plusAssign(action : () -> Unit) = onTermination(action)
* Creates an intersection of some lifetimes: new lifetime that terminate when either one terminates.
* Created lifetime inherits the smallest [terminationTimeoutKind]
*/
fun Lifetime.intersect(lifetime: Lifetime): LifetimeDefinition = Lifetime.defineIntersection(this, lifetime)
fun Lifetime.intersect(lifetime: Lifetime): Lifetime = Lifetime.intersect(this, lifetime)
fun Lifetime.defineIntersection(lifetime: Lifetime): LifetimeDefinition = Lifetime.defineIntersection(this, lifetime)

inline fun <T> Lifetime.view(viewable: IViewable<T>, crossinline handler: Lifetime.(T) -> Unit) {
viewable.view(this) { lt, value -> lt.handler(value) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class WriteOnceProperty<T : Any> : IOptProperty<T> {
if (def.isNotAlive || lifetime.isNotAlive) return

val nestedDef = def.intersect(lifetime)
super.advise(nestedDef.lifetime, handler)
super.advise(nestedDef, handler)
}

override fun fire(value: T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ class MessageBroker(queueMessages: Boolean = false) : IPrintable {
private val messageContext: ProtocolContexts.MessageContext
) : IRdWireableDispatchHelper {

override fun dispatch(lifetime: Lifetime, scheduler: IScheduler?, action: () -> Unit) {
doDispatch(lifetime.intersect(this.lifetime), scheduler ?: protocol.scheduler, action)
override fun dispatch(scheduler: IScheduler?, action: () -> Unit) {
doDispatch(lifetime, scheduler ?: protocol.scheduler, action)
}

private fun doDispatch(lifetime: Lifetime, scheduler: IScheduler, action: () -> Unit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ interface IRdWireableDispatchHelper {
val rdId: RdId
val lifetime: Lifetime

fun dispatch(lifetime: Lifetime = this.lifetime, scheduler: IScheduler? = null, action: () -> Unit)
fun dispatch(scheduler: IScheduler? = null, action: () -> Unit) = dispatch(lifetime, scheduler, action)
fun dispatch(scheduler: IScheduler? = null, action: () -> Unit)
}


Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,8 @@ class AsyncRdMap<K : Any, V : Any> private constructor(
override val lifetime: Lifetime
get() = dispatchHelper.lifetime

override fun dispatch(lifetime: Lifetime, scheduler: IScheduler?, action: () -> Unit) {
dispatchHelper.dispatch(lifetime, SynchronousScheduler, action)
override fun dispatch(scheduler: IScheduler?, action: () -> Unit) {
dispatchHelper.dispatch(SynchronousScheduler, action)
}
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@ class AsyncRdSet<T : Any> private constructor(
override val lifetime: Lifetime
get() = dispatchHelper.lifetime

override fun dispatch(lifetime: Lifetime, scheduler: IScheduler?, action: () -> Unit) {
dispatchHelper.dispatch(lifetime, SynchronousScheduler, action)
override fun dispatch(scheduler: IScheduler?, action: () -> Unit) {
dispatchHelper.dispatch(SynchronousScheduler, action)
}
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class CallSiteWiredRdTask<TReq, TRes>(
} else if (resultFromWire is RdTaskResult.Cancelled)
sendCancellation()

dispatchHelper.dispatch(outerLifetime, wireScheduler) {
dispatchHelper.dispatch(wireScheduler) {
if (!result.setIfEmpty(resultFromWire))
RdReactiveBase.logReceived.trace { "call `${call.location}` (${call.rdid}) response was dropped, task result is: ${result.valueOrNull}" }
}
Expand Down Expand Up @@ -192,7 +192,7 @@ class EndpointWiredRdTask<TReq, TRes>(
RdReactiveBase.logReceived.trace { "received cancellation" }
buffer.readVoid() //nothing just a void value

dispatchHelper.dispatch(lifetime, wireScheduler) {
dispatchHelper.dispatch(wireScheduler) {
val success = result.setIfEmpty(RdTaskResult.Cancelled())
val wireScheduler = call.protocol?.scheduler
if (success || wireScheduler == null)
Expand Down Expand Up @@ -365,10 +365,14 @@ class RdCall<TReq, TRes>(internal val requestSzr: ISerializer<TReq> = Polymorphi

val taskId = proto.identity.next(RdId.Null)
val bindLifetime = bindLifetime
val taskLifetime = lifetime.intersect(bindLifetime)

val task = CallSiteWiredRdTask(taskLifetime, this, taskId, scheduler ?: proto.scheduler)
taskLifetime.executeIfAlive {
val intersectedDef = lifetime.defineIntersection(bindLifetime)
val task = CallSiteWiredRdTask(intersectedDef.lifetime, this, taskId, scheduler ?: proto.scheduler)
task.result.advise(intersectedDef.lifetime) {
if (it !is RdTaskResult.Success || !it.value.isBindable()) {
intersectedDef.terminate(true)
}
}
intersectedDef.lifetime.executeIfAlive {
proto.wire.send(rdid) { buffer ->
logSend.trace { "call `$location`::($rdid) send${sync.condstr {" SYNC"}} request '$taskId' : ${request.printToString()} " }
taskId.write(buffer)
Expand Down
11 changes: 3 additions & 8 deletions rd-net/RdFramework/Base/IRdBindable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,24 +46,19 @@ public interface IRdWireableDispatchHelper
RdId RdId { get; }
Lifetime Lifetime { get; }

public void Dispatch(Lifetime lifetime, IScheduler? scheduler, Action action);
public void Dispatch(IScheduler? scheduler, Action action);
}

public static class RdWireableDispatchHelperEx
{
public static void Dispatch(this IRdWireableDispatchHelper helper, Lifetime lifetime, Action action)
{
helper.Dispatch(lifetime, null, action);
}

public static void Dispatch(this IRdWireableDispatchHelper helper, IScheduler? scheduler, Action action)
{
helper.Dispatch(helper.Lifetime, scheduler, action);
helper.Dispatch(scheduler, action);
}

public static void Dispatch(this IRdWireableDispatchHelper helper, Action action)
{
helper.Dispatch(helper.Lifetime, null, action);
helper.Dispatch(null, action);
}
}

Expand Down
2 changes: 1 addition & 1 deletion rd-net/RdFramework/Impl/AsyncRdMap.cs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ public DelegatingDispatchHelper(IRdWireableDispatchHelper dispatchHelper)
myDispatchHelper = dispatchHelper;
}

public void Dispatch(Lifetime lifetime, IScheduler? scheduler, Action action)
public void Dispatch(IScheduler? scheduler, Action action)
{
myDispatchHelper.Dispatch(SynchronousScheduler.Instance, action);
}
Expand Down
2 changes: 1 addition & 1 deletion rd-net/RdFramework/Impl/AsyncRdSet.cs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ public DelegatingDispatchHelper(IRdWireableDispatchHelper dispatchHelper)
myDispatchHelper = dispatchHelper;
}

public void Dispatch(Lifetime lifetime, IScheduler? scheduler, Action action)
public void Dispatch(IScheduler? scheduler, Action action)
{
myDispatchHelper.Dispatch(SynchronousScheduler.Instance, action);
}
Expand Down
4 changes: 2 additions & 2 deletions rd-net/RdFramework/Impl/MessageBroker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,9 @@ internal RdWireableDispatchHelper(Lifetime lifetime, ILog log, IRdWireable wirea
myMessageContext = messageContext;
}

public void Dispatch(Lifetime lifetime, IScheduler? scheduler, Action action)
public void Dispatch(IScheduler? scheduler, Action action)
{
DoDispatch(lifetime.Intersect(Lifetime), scheduler ?? myProtocol.Scheduler, action);
DoDispatch(Lifetime, scheduler ?? myProtocol.Scheduler, action);
}

private void DoDispatch(Lifetime lifetime, IScheduler scheduler, Action action)
Expand Down
16 changes: 12 additions & 4 deletions rd-net/RdFramework/Tasks/RdCall.cs
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,18 @@ private IRdTask<TRes> StartInternal(Lifetime requestLifetime, TReq request, ISch

var taskId = proto.Identities.Next(RdId.Nil);

var taskLifetime = Lifetime.Intersect(requestLifetime, myBindLifetime);
var task = new WiredRdTask<TReq, TRes>.CallSite(taskLifetime, this, taskId, scheduler ?? proto.Scheduler);

using var cookie = taskLifetime.UsingExecuteIfAlive();
var intersectedDef = Lifetime.DefineIntersection(requestLifetime, myBindLifetime);
var task = new WiredRdTask<TReq,TRes>.CallSite(intersectedDef.Lifetime, this, taskId, scheduler ?? proto.Scheduler);
task.Result.Advise(intersectedDef.Lifetime, result =>
{
if (result.Status != RdTaskStatus.Success || !result.Result.IsBindable())
{
intersectedDef.AllowTerminationUnderExecution = true;
intersectedDef.Terminate();
}
});

using var cookie = intersectedDef.UsingExecuteIfAlive();
if (cookie.Succeed)
{
proto.Wire.Send(RdId, (writer) =>
Expand Down
4 changes: 2 additions & 2 deletions rd-net/RdFramework/Tasks/WiredRdTask.cs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public override void OnWireReceived(IProtocol proto, SerializationCtx ctx, Unsaf
SendCancellation();


dispatchHelper.Dispatch(myOuterLifetime, WireScheduler, () =>
dispatchHelper.Dispatch(WireScheduler, () =>
{
if (!ResultInternal.SetIfEmpty(taskResult))
Trace(RdReactiveBase.ourLogReceived, "response from wire was rejected because task already has result");
Expand Down Expand Up @@ -203,7 +203,7 @@ public override void OnWireReceived(IProtocol proto, SerializationCtx ctx, Unsaf
Trace(RdReactiveBase.ourLogReceived, "received cancellation");
reader.ReadVoid(); //nothing just a void value

dispatchHelper.Dispatch(Lifetime, WireScheduler, () =>
dispatchHelper.Dispatch(WireScheduler, () =>
{
var success = ResultInternal.SetIfEmpty(RdTaskResult<TRes>.Cancelled());
var protocolScheduler = myCall.TryGetProto()?.Scheduler;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//------------------------------------------------------------------------------
// <auto-generated>
// This code was generated by a RdGen v1.11.
// This code was generated by a RdGen v1.12.
//
// Changes to this file may cause incorrect behavior and will be lost if
// the code is regenerated.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//------------------------------------------------------------------------------
// <auto-generated>
// This code was generated by a RdGen v1.11.
// This code was generated by a RdGen v1.12.
//
// Changes to this file may cause incorrect behavior and will be lost if
// the code is regenerated.
Expand Down

0 comments on commit c5b0aa0

Please sign in to comment.