Skip to content

Commit

Permalink
数据处理前,借助埋点标量记录待处理数据的数量
Browse files Browse the repository at this point in the history
  • Loading branch information
nnhy committed Apr 16, 2024
1 parent c4984b7 commit b6f6574
Show file tree
Hide file tree
Showing 8 changed files with 32 additions and 28 deletions.
6 changes: 3 additions & 3 deletions AntJob.Data/Entity/作业.Biz.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public override void Valid(Boolean isNew)
private Int32 GetDefaultIdle()
{
// 定时调度,取步进加一分钟
if (Mode == JobModes.Alarm) return Step + 600;
if (Mode == JobModes.Time) return Step + 600;

return 3600;
}
Expand Down Expand Up @@ -204,7 +204,7 @@ public Boolean IsReady()
switch (Mode)
{
case JobModes.Data:
case JobModes.Alarm:
case JobModes.Time:
return Start.Year > 2000 && Step > 0;
case JobModes.Message:
return Topic.IsNullOrEmpty();
Expand Down Expand Up @@ -399,7 +399,7 @@ public Boolean TrySplit(DateTime start, Int32 step, out DateTime end)
if (End.Year > 2000 && end > End) end = End;

// 时间片必须严格要求按照步进大小分片,除非有合适的End
if (Mode != JobModes.Alarm)
if (Mode != JobModes.Time)
{
if (end > now) return false;
}
Expand Down
10 changes: 9 additions & 1 deletion AntJob.Extensions/DataHandler.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System.Collections;
using AntJob.Data;
using NewLife;
using NewLife.Log;
using XCode;
using XCode.Configuration;

Expand Down Expand Up @@ -100,6 +101,7 @@ public override Boolean Start()
protected override void OnProcess(JobContext ctx)
{
var prov = Provider;
var span = DefaultSpan.Current;
var row = 0;
while (true)
{
Expand All @@ -111,7 +113,11 @@ protected override void OnProcess(JobContext ctx)
var data = Fetch(ctx, ref row);

var list = data as IList;
if (list != null) ctx.Total += list.Count;
if (list != null)
{
ctx.Total += list.Count;
if (span != null) span.Value = ctx.Total;
}
ctx.Data = data;

if (data == null || list != null && list.Count == 0) break;
Expand Down Expand Up @@ -181,7 +187,9 @@ protected override Int32 Execute(JobContext ctx)
{
var count = 0;
foreach (var item in ctx.Data as IEnumerable)
{
if (ProcessItem(ctx, item as IEntity)) count++;
}

return count;
}
Expand Down
2 changes: 1 addition & 1 deletion AntJob.Server/Services/JobService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public ITask[] Acquire(App app, AcquireModel model, String ip)
list.AddRange(jb.AcquireMessage(model.Topic, server, ip, pid, model.Count - list.Count, _cacheProvider.Cache));
break;
case JobModes.Data:
case JobModes.Alarm:
case JobModes.Time:
//case JobModes.CSharp:
//case JobModes.Sql:
default:
Expand Down
2 changes: 1 addition & 1 deletion AntJob.Web/Areas/Ant/Views/Job/_List_Data.cshtml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
<td>@entity.AppName</td>
}
<td class="text-center" title="@entity.Mode.GetDescription()">
@if (entity.Mode == JobModes.Alarm)
@if (entity.Mode == JobModes.Time)
{
<b>@st</b>
}
Expand Down
2 changes: 1 addition & 1 deletion AntJob/Data/JobModes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public enum JobModes

/// <summary>定时调度</summary>
[Description("定时调度")]
Alarm = 2,
Time = 2,

/// <summary>消息调度</summary>
[Description("消息调度")]
Expand Down
17 changes: 11 additions & 6 deletions AntJob/Handler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
using AntJob.Data;
using AntJob.Providers;
using NewLife;
using NewLife.Collections;
using NewLife.Data;
using NewLife.Log;

namespace AntJob;
Expand All @@ -14,7 +14,7 @@ namespace AntJob;
///
/// 定时调度只要当前时间达到时间片开头就可以跑,数据调度要求达到时间片末尾才可以跑。
/// </remarks>
public abstract class Handler
public abstract class Handler : IExtend
{
#region 属性
/// <summary>名称</summary>
Expand All @@ -33,7 +33,7 @@ public abstract class Handler
public Boolean Active { get; private set; }

/// <summary>调度模式</summary>
public virtual JobModes Mode { get; set; } = JobModes.Alarm;
public virtual JobModes Mode { get; set; } = JobModes.Time;

private volatile Int32 _Busy;
/// <summary>正在处理中的任务数</summary>
Expand All @@ -44,11 +44,14 @@ public abstract class Handler
#endregion

#region 索引器
private readonly IDictionary<String, Object> _Items = new NullableDictionary<String, Object>(StringComparer.OrdinalIgnoreCase);
private readonly Dictionary<String, Object> _Items = [];

Check failure on line 47 in AntJob/Handler.cs

View workflow job for this annotation

GitHub Actions / build-publish

Invalid expression term '['

Check failure on line 47 in AntJob/Handler.cs

View workflow job for this annotation

GitHub Actions / build-publish

Syntax error; value expected

Check failure on line 47 in AntJob/Handler.cs

View workflow job for this annotation

GitHub Actions / build-publish

Invalid expression term '['

Check failure on line 47 in AntJob/Handler.cs

View workflow job for this annotation

GitHub Actions / build-publish

Syntax error; value expected

Check failure on line 47 in AntJob/Handler.cs

View workflow job for this annotation

GitHub Actions / build-publish

Invalid expression term '['

Check failure on line 47 in AntJob/Handler.cs

View workflow job for this annotation

GitHub Actions / build-publish

Syntax error; value expected

Check failure on line 47 in AntJob/Handler.cs

View workflow job for this annotation

GitHub Actions / build-publish

Invalid expression term '['

Check failure on line 47 in AntJob/Handler.cs

View workflow job for this annotation

GitHub Actions / build-publish

Syntax error; value expected
/// <summary>扩展数据</summary>
IDictionary<String, Object> IExtend.Items => _Items;

/// <summary>用户数据</summary>
/// <param name="item"></param>
/// <returns></returns>
public Object this[String item] { get => _Items[item]; set => _Items[item] = value; }
public Object this[String item] { get => _Items.TryGetValue(item, out var obj) ? obj : null; set => _Items[item] = value; }
#endregion

#region 构造
Expand All @@ -64,7 +67,7 @@ public Handler()
Start = new DateTime(now.Year, now.Month, 1),
Step = 30,
Offset = 15,
Mode = JobModes.Alarm,
Mode = JobModes.Time,
};

// 默认并发数为核心数
Expand Down Expand Up @@ -150,6 +153,8 @@ public void Process(ITask task)
try
{
OnProcess(ctx);

if (span != null) span.Value = ctx.Total;
}
catch (Exception ex)
{
Expand Down
17 changes: 4 additions & 13 deletions AntJob/Handlers/MessageHandler.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System.Collections;
using AntJob.Data;
using NewLife;
using NewLife.Log;
using NewLife.Serialization;

namespace AntJob.Handlers;
Expand Down Expand Up @@ -58,18 +59,11 @@ protected override void OnProcess(JobContext ctx)
var ss = ctx.Task.Data.ToJsonEntity<String[]>();
if (ss == null || ss.Length == 0) return;

//// 消息作业特殊优待字符串,不需要再次Json解码
//if (typeof(TModel) == typeof(String))
//{
ctx.Total = ss.Length;
ctx.Data = ss;
//}
//else
//{
// var ms = ss.Select(e => e.ToJsonEntity<TModel>()).ToList();
// ctx.Total = ms.Count;
// ctx.Data = ms;
//}

var span = DefaultSpan.Current;
if (span != null) span.Value = ctx.Total;

Execute(ctx);
}
Expand All @@ -82,9 +76,6 @@ protected override Int32 Execute(JobContext ctx)
var count = 0;
foreach (String item in ctx.Data as IEnumerable)
{
//ctx.Key = item as String;
//ctx.Entity = item;

if (ProcessItem(ctx, item)) count++;
}

Expand Down
4 changes: 2 additions & 2 deletions AntJob/Providers/FileJobProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public override ITask[] Acquire(IJob job, String topic, Int32 count)
if (job.End.Year > 2000 && end > job.End) end = job.End;

// 时间片必须严格要求按照步进大小分片,除非有合适的End
if (job.Mode != JobModes.Alarm)
if (job.Mode != JobModes.Time)
{
if (end > now) break;
}
Expand Down Expand Up @@ -166,7 +166,7 @@ public override void Finish(JobContext ctx)
var n = 0;
if (set.End > set.Start) n = (Int32)(set.End - set.Start).TotalSeconds;
var msg = $"{ctx.Handler.Name} 处理{ctx.Total:n0} 行,区间({set.Start} + {n}, {set.End:HH:mm:ss})";
if (ctx.Handler.Mode == JobModes.Alarm)
if (ctx.Handler.Mode == JobModes.Time)
msg += $",耗时{ctx.Cost:n0}ms";
else
msg += $",速度{ctx.Speed:n0}tps,耗时{ctx.Cost:n0}ms";
Expand Down

0 comments on commit b6f6574

Please sign in to comment.