C#基于时间轮调度实现延迟任务详解

发布时间:2023-06-29

  在很多.net开发体系中开发者在面对调度作业需求的时候一般会选择三方开源成熟的作业调度框架来满足业务需求,比如Hangfire、Quartz.NET这样的框架。但是有些时候可能我们只是需要一个简易的延迟任务,这个时候引入这些框架就费力不讨好了。

  最简单的粗暴的办法当然是:

  

  

?

  

1

  

2

  

3

  

4

  

5

  

6

  

Task.Run(async () =>

  

{

  

//延迟xx毫秒

  

await Task.Delay(time);

  

//业务执行

  

});

  

  

  当时作为一个开发者,有时候还是希望使用更优雅的、可复用的一体化方案,比如可以实现一个简易的时间轮来完成基于内存的非核心重要业务的延迟调度。什么是时间轮呢,其实就是一个环形数组,每一个数组有一个插槽代表对应时刻的任务,数组的值是一个任务队列,假设我们有一个基于60秒的延迟时间轮,也就是说我们的任务会在不超过60秒(超过的情况增加分钟插槽,下面会讲)的情况下执行,那么如何实现?下面我们将定义一段代码来实现这个简单的需求

  话不多说,撸代码,首先我们需要定义一个时间轮的Model类用于承载我们的延迟任务和任务处理器。简单定义如下:

  

  

?

  

1

  

2

  

3

  

4

  

5

  

publicclassWheelTask<T>

  

{

  

publicT Data { get; set; }

  

publicFunc<T, Task> Handle { get; set; }

  

}

  

  

  定义很简单,就是一个入参T代表要执行的任务所需要的入参,然后就是任务的具体处理器Handle。接着我们来定义时间轮本轮的核心代码:

  可以看到时间轮其实核心就两个东西,一个是毫秒计时器,一个是数组插槽,这里数组插槽我们使用了字典来实现,key值分别对应0到59秒。每一个插槽的value对应一个任务队列。当添加一个新任务的时候,输入需要延迟的秒数,就会将任务插入到延迟多少秒对应的插槽内,当计时器启动的时候,每一跳刚好1秒,那么就会对插槽计数+1,然后去寻找当前插槽是否有任务,有的话就会调用ExecuteTask执行该插槽下的所有任务。

  

  

?

  

1

  

2

  

3

  

4

  

5

  

6

  

7

  

8

  

9

  

10

  

11

  

12

  

13

  

14

  

15

  

16

  

17

  

18

  

19

  

20

  

21

  

22

  

23

  

24

  

25

  

26

  

27

  

28

  

29

  

30

  

31

  

32

  

33

  

34

  

35

  

36

  

37

  

38

  

39

  

40

  

41

  

publicclassTimeWheel<T>

  

{

  

intsecondSlot = 0;

  

DateTime wheelTime { get{ returnnewDateTime(1, 1, 1, 0, 0, secondSlot); } }

  

Dictionary<int, ConcurrentQueue<WheelTask<T>>> secondTaskQueue;

  

publicvoidStart()

  

{

  

newTimer(Callback, null, 0, 1000);

  

secondTaskQueue = newDictionary<int, ConcurrentQueue<WheelTask<T>>>();

  

Enumerable.Range(0, 60).ToList().ForEach(x =>

  

{

  

secondTaskQueue.Add(x, newConcurrentQueue<WheelTask<T>>());

  

});

  

}

  

publicasync Task AddTaskAsync(intsecond, T data, Func<T, Task> handler)

  

{

  

var handTime = wheelTime.AddSeconds(second);

  

if(handTime.Second != wheelTime.Second)

  

secondTaskQueue[handTime.Second].Enqueue(newWheelTask<T>(data, handler));

  

else

  

await handler(data);

  

}

  

async voidCallback(objecto)

  

{

  

if(secondSlot != 59)

  

secondSlot++;

  

else

  

{

  

secondSlot = 0;

  

}

  

if(secondTaskQueue[secondSlot].Any())

  

await ExecuteTask();

  

}

  

async Task ExecuteTask()

  

{

  

if(secondTaskQueue[secondSlot].Any())

  

while(secondTaskQueue[secondSlot].Any())

  

if(secondTaskQueue[secondSlot].TryDequeue(outWheelTask<T> task))

  

await task.Handle(task.Data);

  

}

  

}

  

  

  接下来就是如果我需要大于60秒的情况如何处理呢。其实就是增加分钟插槽数组,举个例子我有一个任务需要2分40秒后执行,那么当我 插入到时间轮的时候我先插入到分钟插槽,当计时器每过去60秒,分钟插槽值+1,当分钟插槽对应有任务的时候就将这些任务从分钟插槽里弹出再入队到秒插槽中,这样一个任务会先进入插槽值=2(假设从0开始计算)的分钟插槽,计时器运行120秒后分钟值从0累加到2,2插槽的任务弹出到插槽值=40的秒插槽里,当计时器再运行40秒,刚好就可以执行这个延迟2分40秒的任务。话不多说,上代码:

  首先我们将任务WheelTask增加一个Second属性,用于当任务从分钟插槽弹出来时需要知道自己入队哪个秒插槽

  

  

?

  

1

  

2

  

3

  

4

  

5

  

6

  

publicclassWheelTask<T>

  

{

  

...

  

publicintSecond { get; set; }

  

...

  

}

  

  

  接着我们再重新定义时间轮的逻辑增加分钟插槽值以及插槽队列的部分

  

  

?

  

1

  

2

  

3

  

4

  

5

  

6

  

7

  

8

  

9

  

10

  

11

  

12

  

13

  

14

  

15

  

16

  

17

  

18

  

publicclassTimeWheel<T>

  

{

  

intminuteSlot, secondSlot = 0;

  

DateTime wheelTime { get{ returnnewDateTime(1, 1, 1, 0, minuteSlot, secondSlot); } }

  

Dictionary<int, ConcurrentQueue<WheelTask<T>>> minuteTaskQueue, secondTaskQueue;

  

publicvoidStart()

  

{

  

newTimer(Callback, null, 0, 1000);、

  

minuteTaskQueue = newDictionary<int, ConcurrentQueue<WheelTask<T>>>();

  

secondTaskQueue = newDictionary<int, ConcurrentQueue<WheelTask<T>>>();

  

Enumerable.Range(0, 60).ToList().ForEach(x =>

  

{

  

minuteTaskQueue.Add(x, newConcurrentQueue<WheelTask<T>>());

  

secondTaskQueue.Add(x, newConcurrentQueue<WheelTask<T>>());

  

});

  

}

  

...

  

}

  

  

  同样的在添加任务的AddTaskAsync函数中我们需要增加分钟,代码改为这样,当大于1分钟的任务会入队到分钟插槽中,小于1分钟的会按原逻辑直接入队到秒插槽中:

  

  

?

  

1

  

2

  

3

  

4

  

5

  

6

  

7

  

8

  

9

  

10

  

11

  

12

  

13

  

publicasync Task AddTaskAsync(intminute, intsecond, T data, Func<T, Task> handler)

  

{

  

var handTime = wheelTime.AddMinutes(minute).AddSeconds(second);

  

if(handTime.Minute != wheelTime.Minute)

  

minuteTaskQueue[handTime.Minute].Enqueue(newWheelTask<T>(handTime.Second, data, handler));

  

else

  

{

  

if(handTime.Second != wheelTime.Second)

  

secondTaskQueue[handTime.Second].Enqueue(newWheelTask<T>(data, handler));

  

else

  

await handler(data);

  

}

  

}

  

  

  最后的部分就是计时器的callback以及任务执行的部分:

  

  

?

  

1

  

2

  

3

  

4

  

5

  

6

  

7

  

8

  

9

  

10

  

11

  

12

  

13

  

14

  

15

  

16

  

17

  

18

  

19

  

20

  

21

  

22

  

23

  

24

  

25

  

26

  

27

  

28

  

29

  

30

  

async voidCallback(objecto)

  

{

  

boolminuteExecuteTask = false;

  

if(secondSlot != 59)

  

secondSlot++;

  

else

  

{

  

secondSlot = 0;

  

minuteExecuteTask = true;

  

if(minuteSlot != 59)

  

minuteSlot++;

  

else

  

{

  

minuteSlot = 0;

  

}

  

}

  

if(minuteExecuteTask secondTaskQueue[secondSlot].Any())

  

await ExecuteTask(minuteExecuteTask);

  

}

  

async Task ExecuteTask(boolminuteExecuteTask)

  

{

  

if(minuteExecuteTask)

  

while(minuteTaskQueue[minuteSlot].Any())

  

if(minuteTaskQueue[minuteSlot].TryDequeue(outWheelTask<T> task))

  

secondTaskQueue[task.Second].Enqueue(task);

  

if(secondTaskQueue[secondSlot].Any())

  

while(secondTaskQueue[secondSlot].Any())

  

if(secondTaskQueue[secondSlot].TryDequeue(outWheelTask<T> task))

  

await task.Handle(task.Data);

  

}

  

  

  基本上基于分钟+秒的时间轮延迟任务核心功能就这些。

注册即送1000元现金券