Skip to content

Commit

Permalink
Merge pull request #6718 from planetarium/feature/action-tx-pipeline
Browse files Browse the repository at this point in the history
fix action pipeline
  • Loading branch information
jonny-jeahyunchoi authored Jan 16, 2025
2 parents d47ab08 + f1a16b0 commit c3fbe8c
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 53 deletions.
38 changes: 38 additions & 0 deletions nekoyume/Assets/_Scripts/ApiClient/ArenaServiceManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,44 @@ await Client.GetSeasonsAvailableopponentsAsync(seasonId, Game.Game.instance.Agen
}
}

public async Task<bool> PostSeasonsAvailableOpponentsAsync(int seasonId, string avatarAddress)
{
if (!IsInitialized)
{
throw new InvalidOperationException("[ArenaServiceManager] Called before initialization");
}

try
{
await UniTask.SwitchToMainThread();
string jwt = CreateJwt(Game.Game.instance.Agent.PrivateKey, avatarAddress);
bool response = false;

await Client.PostSeasonsAvailableopponentsAsync(seasonId, jwt,
on201Created: result =>
{
response = true;
},
onError: error =>
{
NcDebug.LogError($"[ArenaServiceManager] Failed to post available opponents | " +
$"SeasonId: {seasonId} | " +
$"AvatarAddress: {avatarAddress ?? "null"} | " +
$"Error: {error}");
});

return response;
}
catch (Exception e)
{
NcDebug.LogError($"[ArenaServiceManager] Exception while posting available opponents | " +
$"SeasonId: {seasonId} | " +
$"AvatarAddress: {avatarAddress ?? "null"} | " +
$"Error: {e.Message}");
throw;
}
}

public async Task<ArenaParticipantModel> GetSeasonsLeaderboardParticipantAsync(int seasonId, string avatarAddress)
{
if (!IsInitialized)
Expand Down
57 changes: 30 additions & 27 deletions nekoyume/Assets/_Scripts/Blockchain/ActionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

namespace Nekoyume.Blockchain
{
using System.Threading.Tasks;
using Nekoyume.ApiClient;
using UniRx;
using static ArenaServiceClient;
Expand All @@ -58,7 +59,7 @@ public class ActionManager : IDisposable

public static ActionManager Instance => Game.Game.instance.ActionManager;

private List<Tuple<ActionBase,System.Action<TxId>>> _cachedPostProcessedActions = new();
private List<Tuple<ActionBase, System.Action<TxId>>> _cachedPostProcessedActions = new();

public static bool IsLastBattleActionId(Guid actionId)
{
Expand Down Expand Up @@ -125,12 +126,12 @@ public bool TryPopActionEnqueuedDateTime(Guid actionId, out DateTime enqueuedDat
return true;
}

private void ProcessAction<T>(T actionBase) where T : ActionBase
private void ProcessAction<T>(T actionBase, Func<TxId, Task<bool>> onTxIdReceived = null) where T : ActionBase
{
var actionType = actionBase.GetActionTypeAttribute();
NcDebug.Log($"[{nameof(ActionManager)}] {nameof(ProcessAction)}() called. \"{actionType.TypeIdentifier}\"");

_agent.EnqueueAction(actionBase);
_agent.EnqueueAction(actionBase, onTxIdReceived);

if (actionBase is GameAction gameAction)
{
Expand Down Expand Up @@ -934,7 +935,7 @@ int round
.DoOnError(e => { Game.Game.BackToMainAsync(HandleException(action.Id, e)).Forget(); });
}

public IObservable<ActionEvaluation<Action.Arena.Battle>> BattleArena(
public void BattleArena(
Address enemyAvatarAddress,
List<Guid> costumes,
List<Guid> equipments,
Expand Down Expand Up @@ -976,43 +977,45 @@ BattleTokenResponse token
evt.AddCustomAttribute("avatar-address", States.Instance.AgentState.address.ToString());
AirbridgeUnity.TrackEvent(evt);

ProcessAction(action);

_cachedPostProcessedActions.Add(Tuple.Create<ActionBase, Action<TxId>>(action, (txId) => {
ProcessAction(action, (txId) =>
{
// todo: 아레나 서비스
// 타입변경되면 수정해야함
// tx나 액션 보내는 시점에따라 추가변경필요할수있음.
var task = ApiClients.Instance.Arenaservicemanager.PostSeasonsBattleRequestAsync(txId.ToString(), token.BattleLogId, RxProps.CurrentArenaSeasonId, States.Instance.CurrentAvatarState.address.ToHex());
task.ContinueWith(t =>
return task.ContinueWith(t =>
{
if (t.IsFaulted)
{
// 오류 처리
NcDebug.LogError($"[ActionManager] 아레나 서비스 요청 실패: {t.Exception?.Message}");
Game.Game.BackToMainAsync(t.Exception).Forget();
return false;
}
_lastBattleActionId = action.Id;
_agent.ActionRenderer.EveryRender<Action.Arena.Battle>()
.Timeout(ActionTimeout)
.Where(eval => eval.Action.Id.Equals(action.Id))
.First()
.ObserveOnMainThread()
.DoOnError(e =>
{
if (_lastBattleActionId == action.Id)
{
_lastBattleActionId = null;
}

Game.Game.BackToMainAsync(HandleException(action.Id, e)).Forget();
}).Finally(() => Analyzer.Instance.FinishTrace(sentryTrace)).Subscribe();

return true;
});
}));

_lastBattleActionId = action.Id;
return _agent.ActionRenderer.EveryRender<Action.Arena.Battle>()
.Timeout(ActionTimeout)
.Where(eval => eval.Action.Id.Equals(action.Id))
.First()
.ObserveOnMainThread()
.DoOnError(e =>
{
if (_lastBattleActionId == action.Id)
{
_lastBattleActionId = null;
}

Game.Game.BackToMainAsync(HandleException(action.Id, e)).Forget();
}).Finally(() => Analyzer.Instance.FinishTrace(sentryTrace));
});
}
catch (Exception e)
{
Game.Game.BackToMainAsync(e).Forget();
return null;
return;
}
}

Expand Down Expand Up @@ -1346,7 +1349,7 @@ public IObservable<ActionEvaluation<Synthesize>> Synthesize(
.First()
.ObserveOnMainThread()
.DoOnError(e => HandleException(action.Id, e))
.Finally(() => { });
.Finally(() => { });
}

public IObservable<ActionEvaluation<UnlockEquipmentRecipe>> UnlockEquipmentRecipe(
Expand Down
2 changes: 1 addition & 1 deletion nekoyume/Assets/_Scripts/Blockchain/Agent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ public void Dispose()
disposed = true;
}

public void EnqueueAction(ActionBase actionBase)
public void EnqueueAction(ActionBase actionBase, Func<TxId, Task<bool>> onTxIdReceived = null)
{
NcDebug.LogFormat("Enqueue GameAction: {0}", actionBase);
_queuedActions.Enqueue(actionBase);
Expand Down
2 changes: 1 addition & 1 deletion nekoyume/Assets/_Scripts/Blockchain/IAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ IEnumerator Initialize(
Action<bool> callback
);

void EnqueueAction(ActionBase actionBase);
void EnqueueAction(ActionBase actionBase, Func<TxId, Task<bool>> onTxIdReceived = null);

IValue GetState(Address accountAddress, Address address);
IValue GetState(HashDigest<SHA256> stateRootHash, Address accountAddress, Address address);
Expand Down
50 changes: 28 additions & 22 deletions nekoyume/Assets/_Scripts/Blockchain/RPCAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class RPCAgent : MonoBehaviour, IAgent, IActionEvaluationHubReceiver
{
private const int RpcConnectionRetryCount = 6;
private const float TxProcessInterval = 1.0f;
private readonly ConcurrentQueue<ActionBase> _queuedActions = new();
private readonly ConcurrentQueue<(ActionBase, Func<TxId, Task<bool>>)> _queuedActions = new();

private readonly TransactionMap _transactions = new(20);

Expand Down Expand Up @@ -98,7 +98,7 @@ public class RPCAgent : MonoBehaviour, IAgent, IActionEvaluationHubReceiver

public readonly Subject<(RPCAgent, int retryCount)> OnRetryAttempt = new();

public readonly Subject<bool> OnTxStageEnded = new ();
public readonly Subject<bool> OnTxStageEnded = new();

public BlockHash BlockTipHash { get; private set; }

Expand Down Expand Up @@ -672,12 +672,12 @@ public void SendException(Exception exc)
{
}

public void EnqueueAction(ActionBase actionBase)
public void EnqueueAction(ActionBase actionBase, Func<TxId, Task<bool>> onTxIdReceived = null)
{
_queuedActions.Enqueue(actionBase);
_queuedActions.Enqueue((actionBase, onTxIdReceived));
}

#region Mono
#region Mono

private void Awake()
{
Expand Down Expand Up @@ -762,7 +762,7 @@ private async void OnDestroy()
cancellationTokenSource?.Dispose();
}

#endregion
#endregion

private IEnumerator CoJoin(Action<bool> callback)
{
Expand Down Expand Up @@ -852,7 +852,7 @@ private IEnumerator CoTxProcessor()
}

NcDebug.Log($"[RPCAgent] CoTxProcessor()... before MakeTransaction.({++i})");
var task = Task.Run(async () => { await MakeTransaction(new List<ActionBase> { action }); });
var task = Task.Run(async () => { await MakeTransaction(action); });
yield return new WaitUntil(() => task.IsCompleted);
NcDebug.Log("[RPCAgent] CoTxProcessor()... after MakeTransaction." +
$" task completed({task.IsCompleted})");
Expand All @@ -870,45 +870,51 @@ private IEnumerator CoTxProcessor()
}
}

private async Task MakeTransaction(List<ActionBase> actions)
private async Task MakeTransaction((ActionBase, Func<TxId, Task<bool>>) action)
{
var nonce = await GetNonceAsync();
var gasLimit = actions.Any(a => a is ITransferAsset or ITransferAssets) ? 4L : 1L;
var gasLimit = action.Item1 is ITransferAsset or ITransferAssets ? 4L : 1L;
var tx = NCTx.Create(
nonce,
PrivateKey,
_genesis?.Hash,
actions.Select(action => action.PlainValue),
new List<IValue> { action.Item1.PlainValue },
FungibleAssetValue.Parse(Currencies.Mead, "0.00001"),
gasLimit
);

var actionsText = string.Join(", ", actions.Select(action =>
if (action.Item2 is not null)
{
if (action is GameAction gameAction)
bool callBackResult = await action.Item2(tx.Id);
if (!callBackResult)
{
return $"{action.GetActionTypeAttribute().TypeIdentifier}" +
$"({gameAction.Id.ToString()})";
NcDebug.LogError($"[RPCAgent] MakeTransaction()... callBackResult is false. TxId: {tx.Id}");
return;
}
}

var actionsText = action.Item1.GetActionTypeAttribute().TypeIdentifier.ToString();
Guid gameActionId = Guid.Empty;
if (action.Item1 is GameAction gameAction)
{
actionsText = $"{action.Item1.GetActionTypeAttribute().TypeIdentifier}" +
$"({gameAction.Id.ToString()})";
gameActionId = gameAction.Id;
}

return action.GetActionTypeAttribute().TypeIdentifier.ToString();
}));
NcDebug.Log("[RPCAgent] MakeTransaction()... w/" +
$" nonce={nonce}" +
$" PrivateKeyAddr={PrivateKey.Address.ToString()}" +
$" GenesisBlockHash={_genesis?.Hash}" +
$" TxId={tx.Id}" +
$" Actions=[{actionsText}]");

_onMakeTransactionSubject.OnNext((tx, actions));
_onMakeTransactionSubject.OnNext((tx, new List<ActionBase> { action.Item1 }));
var result = await _service.PutTransaction(tx.Serialize());
OnTxStageEnded.OnNext(result);
foreach (var action in actions)
if (gameActionId != Guid.Empty)
{
if (action is GameAction gameAction)
{
_transactions.TryAdd(gameAction.Id, tx.Id);
}
_transactions.TryAdd(gameActionId, tx.Id);
}
}

Expand Down
3 changes: 1 addition & 2 deletions nekoyume/Assets/_Scripts/UI/Widget/ArenaBattlePreparation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,7 @@ private void SendBattleArenaAction(int ticket = TicketCountToUse)
runeInfos,
_roundData.ChampionshipId,
_roundData.Round,
token)
.Subscribe();
token);
}
catch (Exception e)
{
Expand Down
13 changes: 13 additions & 0 deletions nekoyume/Assets/_Scripts/UI/Widget/ArenaBoard.cs
Original file line number Diff line number Diff line change
Expand Up @@ -241,5 +241,18 @@ private void UpdateScrolls()

return (scrollData, 0);
}

private async UniTask RefreshArenaBoardAsync()
{
// todo: 갱신가능한지 확인후 필요한 경우에만 갱신하도록 추가해야함.
// 로딩관련 처리 추가해야함.
await RxProps.ArenaPostCurrentSeasonsParticipantsAsync();
await RxProps.ArenaInformationOrderedWithScore.UpdateAsync(Game.Game.instance.Agent.BlockTipStateRootHash);
}

public void RefreshArenaBoard()
{
RefreshArenaBoardAsync().Forget();
}
}
}

0 comments on commit c3fbe8c

Please sign in to comment.