Java工具类Timer源码解析与使用

通常我们需要定时执行一些任务,这时候可以使用Timer来完成,准确来说是Timer和TimerTask两个类配合完成。Timer是一个工具类,而TimerTask是一个抽象类,实现了Runnable接口。通过继承TimerTask可以定制我们自己的任务,并将我们自己的TimerTask加入Timer中。
Timer内部会调用TimerTask的run方法来执行业务流程。

Timer类

Timer类的方法

  • public void schedule(TimerTask task, Date time)
    只执行一次,在等于或者超过某个时间之后执行
  • public void schedule(TimerTask task, Date firstTime, long period)
    执行多次,在fistTime后首次执行task,之后每隔period毫秒执行task
  • public void schedule(TimerTask task, long delay)
    执行一次,在delay毫秒后执行task
  • public void schedule(TimerTask task, long delay, long period)
    执行多次,在delay毫秒后首次执行task,之后每隔period毫秒执行task
  • public void scheduleAtFixedRate(TimerTask task, Date firstTime, long period)
    与上面类似,有点不一样。
  • public void scheduleAtFixedRate(TimerTask task, long delay, long period)
  • public void cancel()
    终止Timer里面的所有已安排任务
  • public int purge()
    移除Timer里面的所有已取消的任务,返回取消任务的数量

    schedule 与 scheduleAtFixedRate区别

  • schedule按照上一次实际执行完成的时间点进行计算
  • scheduleAtFixedRate按照上一次开始的时间点进行计算,需要考虑同步问题

TimerTask类

  • public void cancel()
    取消当前TimerTask里的任务
  • public long scheduledExecutionTime()
    返回此任务的最近实际执行的计划执行时间。

一个例子

MyTimerTask.java代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import java.text.SimpleDateFormat;
import java.util.TimerTask;

public class MyTimerTask extends TimerTask {
private String name;
public MyTimerTask() { }
public MyTimerTask(String name) {
this.name = name;
}
@Override
public void run() { // 具体业务代码可以在run方法内实现,Timer在触发任务的时候会调用这个方法
SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println(sf.format(System.currentTimeMillis()) + ": " +name); // 简单输出一下
}
}

Main.java代码

1
2
3
4
5
6
7
8
9
10
11
12
import java.util.Timer;
import java.util.TimerTask;

public class Main {

public static void main(String[] args) {
Timer timer = new Timer(); // 新建Timer实例
TimerTask task = new MyTimerTask("Task Test"); // 新建任务
// public void schedule(TimerTask task, long delay, long period)
timer.schedule(task, 2000L, 1000L); // 添加任务
}
}

输出结果

1
2
3
4
"F:\Program Files\Java\jdk1.8.0_172\bin\java.exe" ...
2018-05-05 01:00:09: Task Test
2018-05-05 01:00:10: Task Test
2018-05-05 01:00:11: Task Test

源码解析(JDK 8u172)

TimerTask类源码

TimerTask类其实很简单,里面只有两个函数有具体代码。一个是cancel()方法,一个是scheduledExecutionTime()方法。
TimerTask类中有一个属性是period,这个属性是用来表示重复任务的周期,以毫秒为单位。但是这个属性不一定是正数,也可能是负数。当是正数的时候表示以固定的速率执行,负数是固定延迟执行。这个比较难理解,其实就是说正数的时候不受到任务的执行时间影响,而负数的时候是任务完成了才开始计算这个周期。Timer中的scheduleAtFixedRate是正数模式,schedule方法是负数模式。0的时候表示非重复任务。
另外一个比较重要的是nextExecutionTime这个属性,他是代表了下一次执行的时间,格式是类似System.currentTimeMillis()的返回值。Timer也是根据这个属性来对TimerTask建堆的。
下面是删除了注释的TimerTask类源码,其实也就这么一点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package java.util;
public abstract class TimerTask implements Runnable {
final Object lock = new Object();
int state = VIRGIN;
static final int VIRGIN = 0;
static final int SCHEDULED = 1;
static final int EXECUTED = 2;
static final int CANCELLED = 3;
long nextExecutionTime;
long period = 0;

protected TimerTask() {
}

public abstract void run();

public boolean cancel() {
synchronized(lock) {
boolean result = (state == SCHEDULED);
state = CANCELLED;
return result;
}
}

public long scheduledExecutionTime() {
synchronized(lock) {
return (period < 0 ? nextExecutionTime + period
: nextExecutionTime - period);
}
}
}

Timer类源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
package java.util;
import java.util.Date;
import java.util.concurrent.atomic.AtomicInteger;
public class Timer {
// 任务队列,使用最小堆实现
private final TaskQueue queue = new TaskQueue();
// 内部维护的线程
private final TimerThread thread = new TimerThread(queue);

private final Object threadReaper = new Object() {
protected void finalize() throws Throwable {
synchronized(queue) {
thread.newTasksMayBeScheduled = false;
queue.notify(); // In case queue is empty.
}
}
};


private final static AtomicInteger nextSerialNumber = new AtomicInteger(0);
private static int serialNumber() {
return nextSerialNumber.getAndIncrement();
}

// 构造方法
public Timer() {
this("Timer-" + serialNumber());
}

// 构造方法, 守护方式执行
public Timer(boolean isDaemon) {
this("Timer-" + serialNumber(), isDaemon);
}

public Timer(String name) {
thread.setName(name);
thread.start();
}

public Timer(String name, boolean isDaemon) {
thread.setName(name);
thread.setDaemon(isDaemon);
thread.start();
}

public void schedule(TimerTask task, long delay) {
if (delay < 0)
throw new IllegalArgumentException("Negative delay.");
sched(task, System.currentTimeMillis()+delay, 0);
}

public void schedule(TimerTask task, Date time) {
sched(task, time.getTime(), 0);
}

public void schedule(TimerTask task, long delay, long period) {
if (delay < 0)
throw new IllegalArgumentException("Negative delay.");
if (period <= 0)
throw new IllegalArgumentException("Non-positive period.");
sched(task, System.currentTimeMillis()+delay, -period);
}

public void schedule(TimerTask task, Date firstTime, long period) {
if (period <= 0)
throw new IllegalArgumentException("Non-positive period.");
sched(task, firstTime.getTime(), -period);
}

public void scheduleAtFixedRate(TimerTask task, long delay, long period) {
if (delay < 0)
throw new IllegalArgumentException("Negative delay.");
if (period <= 0)
throw new IllegalArgumentException("Non-positive period.");
sched(task, System.currentTimeMillis()+delay, period);
}

public void scheduleAtFixedRate(TimerTask task, Date firstTime,
long period) {
if (period <= 0)
throw new IllegalArgumentException("Non-positive period.");
sched(task, firstTime.getTime(), period);
}

private void sched(TimerTask task, long time, long period) {
if (time < 0)
throw new IllegalArgumentException("Illegal execution time.");

// Constrain value of period sufficiently to prevent numeric
// overflow while still being effectively infinitely large.
if (Math.abs(period) > (Long.MAX_VALUE >> 1))
period >>= 1;

synchronized(queue) {
if (!thread.newTasksMayBeScheduled)
throw new IllegalStateException("Timer already cancelled.");

synchronized(task.lock) {
if (task.state != TimerTask.VIRGIN)
throw new IllegalStateException(
"Task already scheduled or cancelled");
task.nextExecutionTime = time;
task.period = period;
task.state = TimerTask.SCHEDULED;
}

queue.add(task);
if (queue.getMin() == task)
queue.notify();
}
}

// 这个cancel与TimerTask中的不一样,这该是取消所有的任务
public void cancel() {
synchronized(queue) {
thread.newTasksMayBeScheduled = false;
queue.clear(); // 清空队列里面的内容
queue.notify(); // In case queue was already empty.
}
}

// 清除已经cancel的任务
public int purge() {
int result = 0;

synchronized(queue) {
for (int i = queue.size(); i > 0; i--) {
if (queue.get(i).state == TimerTask.CANCELLED) {
queue.quickRemove(i); // 这个quickRemove不调整堆,所以接下来要重新建堆
result++;
}
}

if (result != 0)
queue.heapify(); // 调整堆,其实也是重新建堆
}

return result;
}
}

// Timer的任务线程类
class TimerThread extends Thread {
boolean newTasksMayBeScheduled = true;
private TaskQueue queue; // 任务队列
TimerThread(TaskQueue queue) {
this.queue = queue;
}
public void run() {
try {
mainLoop();
} finally {
// Someone killed this Thread, behave as if Timer cancelled
synchronized(queue) {
newTasksMayBeScheduled = false;
queue.clear(); // Eliminate obsolete references
}
}
}

// 线程循环
private void mainLoop() {
while (true) {
try {
TimerTask task; // 当前的任务
boolean taskFired; // 任务激活标志
synchronized(queue) {
// 如果队列是空的,就使用wait等待任务
while (queue.isEmpty() && newTasksMayBeScheduled)
queue.wait();
if (queue.isEmpty())
break; // 如果收到notify后仍然是空的,直接退出循环

// 如果队列不为空,往下执行
long currentTime, executionTime;
task = queue.getMin();
synchronized(task.lock) {
if (task.state == TimerTask.CANCELLED) {
queue.removeMin(); // 移除已经cancel的队列
continue;
}
currentTime = System.currentTimeMillis();
executionTime = task.nextExecutionTime;
if (taskFired = (executionTime<=currentTime)) {
if (task.period == 0) { // 0是不重复任务
queue.removeMin(); // 这个removeMin是有维护堆的
task.state = TimerTask.EXECUTED;
} else { // 重复的任务
queue.rescheduleMin(
task.period<0 ? currentTime - task.period /* schedule模式 */
: executionTime + task.period); // AtFixedRate模式
}
}
}
if (!taskFired) // 如果发现有任务,但是最近的任务没有到执行时间,进入线程等待
queue.wait(executionTime - currentTime);
}
if (taskFired) // Task fired; run it, holding no locks
task.run();
} catch(InterruptedException e) {
}
}
}
}


class TaskQueue {
private TimerTask[] queue = new TimerTask[128];

/**
* The number of tasks in the priority queue. (The tasks are stored in
* queue[1] up to queue[size]).
*/
private int size = 0;

int size() {
return size;
}


void add(TimerTask task) {
// Grow backing store if necessary
if (size + 1 == queue.length) // 如果发现位置不够用,就扩充数组
queue = Arrays.copyOf(queue, 2*queue.length);

queue[++size] = task; // 将任务加入堆的最后
fixUp(size); // 堆向上调整,维护堆
}

TimerTask getMin() {
return queue[1];
}

TimerTask get(int i) {
return queue[i];
}

void removeMin() {
queue[1] = queue[size];
queue[size--] = null; // Drop extra reference to prevent memory leak
fixDown(1); // 堆向下调整,维护堆
}

void quickRemove(int i) {
assert i <= size;

queue[i] = queue[size];
queue[size--] = null; // Drop extra ref to prevent memory leak
}


void rescheduleMin(long newTime) {
queue[1].nextExecutionTime = newTime;
fixDown(1);
}

boolean isEmpty() {
return size==0;
}

void clear() {
// Null out task references to prevent memory leak
for (int i=1; i<=size; i++)
queue[i] = null;

size = 0;
}


private void fixUp(int k) {
while (k > 1) {
int j = k >> 1; // j是父结点,k是子结点
if (queue[j].nextExecutionTime <= queue[k].nextExecutionTime)
break;
TimerTask tmp = queue[j]; queue[j] = queue[k]; queue[k] = tmp;
k = j;
}
}

private void fixDown(int k) {
int j;
while ((j = k << 1) <= size && j > 0) { // 这里j和j+1是子结点,k是父结点
if (j < size &&
queue[j].nextExecutionTime > queue[j+1].nextExecutionTime) // 找出两个子结点中最小的
j++; // j indexes smallest kid
if (queue[k].nextExecutionTime <= queue[j].nextExecutionTime) // 判断是否需要调整堆
break;
TimerTask tmp = queue[j]; queue[j] = queue[k]; queue[k] = tmp;
k = j;
}
}

void heapify() { // 建堆
for (int i = size/2; i >= 1; i--)
fixDown(i);
}
}

总结

对于一些比较简单的任务可以用Timer和TimerTask实现任务队列,但是对于一些耗时的任务这个Timer就不能胜任了。在Timer的内部实现中,始终是单线程的方式执行,任务的耗时实际上是会影响下个任务的开始执行时间的,即便有scheduleAtFixedRate仍然不能很好的解决这个问题。

坚持原创技术分享,您的支持将鼓励我继续创作!