æ·±å ¥åæJava宿¶ä»»å¡ScheduledThreadPoolExecutor
宿¶ä»»å¡ ScheduledThreadPoolExecutor ç±»æä¸¤ä¸ªç¨éï¼æå®æ¶é´å»¶è¿åæ§è¡ä»»å¡ï¼å¨ææ§é夿§è¡ä»»å¡ã
JDK 1.5 ä¹åï¼ä¸»è¦ä½¿ç¨Timerç±»æ¥å®æå®æ¶ä»»å¡ï¼ä½æ¯Timeræä»¥ä¸ç¼ºé·ï¼
- Timer æ¯åçº¿ç¨æ¨¡å¼ï¼
- 妿卿§è¡ä»»å¡æé´æä¸ª TimerTask èæ¶è¾ä¹ ï¼å°±ä¼å½±åå ¶å®ä»»å¡çè°åº¦ï¼
- Timer çä»»å¡è°åº¦æ¯åºäºç»å¯¹æ¶é´çï¼å¯¹ç³»ç»æ¶é´ææï¼
- Timer ä¸ä¼æè·æ§è¡ TimerTask æ¶ææåºçå¼å¸¸ï¼ç±äº Timer æ¯å线ç¨çï¼æä»¥ä¸æ¦åºç°å¼å¸¸ï¼çº¿ç¨å°±ä¼ç»æ¢ï¼å ¶ä»ä»»å¡æ æ³æ§è¡ã
äºæ¯ JDK 1.5 ä¹åï¼å¼åè
å°±æå¼äº Timerï¼å¼å§ä½¿ç¨ScheduledThreadPoolExecutorãå
éè¿ä¸é¢è¿å¼ 徿åä¸ã

ä½¿ç¨æ¡ä¾
å设æä»¬æè¿æ ·ä¸ä¸ªéæ±ï¼æå®æ¶é´ç»å ¶ä»äººåéæ¶æ¯ãé£ä¹æä»¬ä¼å°æ¶æ¯ï¼å å«åéæ¶é´ï¼åå¨å¨æ°æ®åºä¸ï¼ç¶åç¨ä¸ä¸ªå®æ¶ä»»å¡ï¼æ¯é 1 ç§æ£æ¥æ°æ®åºå¨å½åæ¶é´ææ²¡æéè¦åéçæ¶æ¯ï¼é£è¿ä¸ªè®¡å任塿ä¹å®æå¢ï¼ä¸é¢æ¯ä¸ä¸ª Demo:
public class ThreadPool {
private static final ScheduledExecutorService executor = new
ScheduledThreadPoolExecutor(1, Executors.defaultThreadFactory());
private static SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public static void main(String[] args){
// æ°å»ºä¸ä¸ªåºå®å»¶è¿æ¶é´ç计åä»»å¡
executor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
if (haveMsgAtCurrentTime()) {
System.out.println(df.format(new Date()));
System.out.println("大家注æäºï¼æè¦åæ¶æ¯äº");
}
}
}, 1, 1, TimeUnit.SECONDS);
}
public static boolean haveMsgAtCurrentTime(){
//æ¥è¯¢æ°æ®åºï¼ææ²¡æå½åæ¶é´éè¦åéçæ¶æ¯
//è¿éçç¥å®ç°ï¼ç´æ¥è¿åtrue
return true;
}
}ä¸é¢æªå䏿®µè¾åºï¼demo ä¼ä¸ç´è¿è¡ä¸å»ï¼ï¼
2023-08-24 16:16:48
大家注æäºï¼æè¦åæ¶æ¯äº
2023-08-24 16:16:49
大家注æäºï¼æè¦åæ¶æ¯äº
2023-08-24 16:16:50
大家注æäºï¼æè¦åæ¶æ¯äº
2023-08-24 16:16:51
大家注æäºï¼æè¦åæ¶æ¯äº
2023-08-24 16:16:52
大家注æäºï¼æè¦åæ¶æ¯äº
2023-08-24 16:16:53
大家注æäºï¼æè¦åæ¶æ¯äº
2023-08-24 16:16:54
大家注æäºï¼æè¦åæ¶æ¯äº
2023-08-24 16:16:55
大家注æäºï¼æè¦åæ¶æ¯äºè¿å°±æ¯ ScheduledThreadPoolExecutor çä¸ä¸ªç®åè¿ç¨ï¼æ¥ä¸æ¥æä»¬æ¥ççå®çå®ç°åçã
ç±»ç»æ
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor
implements ScheduledExecutorService {
public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
//â¦â¦
}ScheduledThreadPoolExecutor ç»§æ¿äºThreadPoolExecutorï¼å¹¶å®ç°äºScheduledExecutorServiceæ¥å£ãçº¿ç¨æ± ThreadPoolExecutor å¨ä¹åä»ç»è¿äºï¼ç¸ä¿¡å¤§å®¶é½è¿æå°è±¡ï¼æ¥ä¸æ¥æä»¬æ¥çç ScheduledExecutorService æ¥å£ã
public interface ScheduledExecutorService extends ExecutorService {
/**
* 宿ä¸ä¸ªRunnableä»»å¡å¨ç»å®çå»¶è¿åæ§è¡ã
*
* @param command éè¦æ§è¡çä»»å¡
* @param delay å»¶è¿æ¶é´
* @param unit æ¶é´åä½
* @return å¯ç¨äºæåç»ææåæ¶çScheduledFuture
*/
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
/**
* 宿ä¸ä¸ªCallableä»»å¡å¨ç»å®çå»¶è¿åæ§è¡ã
*
* @param callable éè¦æ§è¡çä»»å¡
* @param delay å»¶è¿æ¶é´
* @param unit æ¶é´åä½
* @return å¯ç¨äºæåç»ææåæ¶çScheduledFuture
*/
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
/**
* 宿ä¸ä¸ªRunnableä»»å¡å¨ç»å®çåå§å»¶è¿å馿¬¡æ§è¡ï¼éåæ¯ä¸ªperiodæ¶é´é´éæ§è¡ä¸æ¬¡ã
*
* @param command éè¦æ§è¡çä»»å¡
* @param initialDelay 馿¬¡æ§è¡çåå§å»¶è¿
* @param period è¿ç»æ§è¡ä¹é´çæ¶é´é´é
* @param unit æ¶é´åä½
* @return å¯ç¨äºæåç»ææåæ¶çScheduledFuture
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
/**
* 宿ä¸ä¸ªRunnableä»»å¡å¨ç»å®çåå§å»¶è¿å馿¬¡æ§è¡ï¼éåæ¯æ¬¡å®æä»»å¡åçå¾
æå®çå»¶è¿å次æ§è¡ã
*
* @param command éè¦æ§è¡çä»»å¡
* @param initialDelay 馿¬¡æ§è¡çåå§å»¶è¿
* @param delay æ¯æ¬¡æ§è¡ç»æåçå»¶è¿æ¶é´
* @param unit æ¶é´åä½
* @return å¯ç¨äºæåç»ææåæ¶çScheduledFuture
*/
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
}ScheduledExecutorService æ¥å£ç»§æ¿äº ExecutorService æ¥å£ï¼å¹¶å¢å äºå ä¸ªå®æ¶ç¸å
³çæ¥å£æ¹æ³ãåä¸¤ä¸ªæ¹æ³ç¨äºå次è°åº¦æ§è¡ä»»å¡ï¼åºå«æ¯ææ²¡æè¿åå¼ã
éç¹ä»ç»ä¸ä¸åé¢ä¸¤ä¸ªæ¹æ³ï¼
01ãscheduleAtFixedRate
scheduleAtFixedRate æ¹æ³å¨initialDelayæ¶é¿åç¬¬ä¸æ¬¡æ§è¡ä»»å¡ï¼ä»¥åæ¯éperiodæ¶é¿å次æ§è¡ä»»å¡ã注æï¼period æ¯ä»ä»»å¡å¼å§æ§è¡ç®èµ·çãå¼å§æ§è¡ä»»å¡åï¼å®æ¶å¨æ¯é period æ¶é¿æ£æ¥è¯¥ä»»å¡æ¯å¦å®æï¼å¦æå®æå忬¡å¯å¨ä»»å¡ï¼å¦åç该任å¡ç»æåæå¯å¨ä»»å¡ãçä¸å¾ï¼

02ãscheduleWithFixDelay
è¯¥æ¹æ³å¨initialDelayæ¶é¿åç¬¬ä¸æ¬¡æ§è¡ä»»å¡ï¼ä»¥åæ¯å½ä»»å¡æ§è¡å®æåï¼çå¾
delayæ¶é¿ï¼å次æ§è¡ä»»å¡ãçä¸å¾ã

ç¸ä¿¡å¤§å®¶è½ä½ä¼åºæ¥å ¶ä¸çå·®å¼ã
ä¸»è¦æ¹æ³
schedule
// delayæ¶é¿åæ§è¡ä»»å¡commandï¼è¯¥ä»»å¡åªæ§è¡ä¸æ¬¡
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
// è¿éçdecorateTaskæ¹æ³ä»
ä»
è¿å第äºä¸ªåæ°
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null, triggerTime(delay,unit)));
// å»¶æ¶æè
卿æ§è¡ä»»å¡çä¸»è¦æ¹æ³,ç¨åç»ä¸è¯´æ
delayedExecute(t);
return t;
}æä»¬å ççé颿¶åå°çå ä¸ªç±»åæ¥å£çå ³ç³»å¾è°±ï¼

Delayed æ¥å£
// ç»§æ¿Comparableæ¥å£ï¼è¡¨ç¤ºè¯¥ç±»å¯¹è±¡æ¯ææåº
public interface Delayed extends Comparable<Delayed> {
// è¿å该对象å©ä½æ¶å»¶
long getDelay(TimeUnit unit);
}Delayedæ¥å£å¾ç®åï¼ç»§æ¿äºComparableæ¥å£ï¼è¡¨ç¤ºå¯¹è±¡æ¯å¯ä»¥æ¯è¾æåºçã
ScheduledFuture æ¥å£
// ä»
ä»
ç»§æ¿äºDelayedåFutureæ¥å£ï¼èªå·±æ²¡æä»»ä½ä»£ç
public interface ScheduledFuture<V> extends Delayed, Future<V> {
}RunnableScheduledFuture æ¥å£
public interface RunnableScheduledFuture<V> extends RunnableFuture<V>, ScheduledFuture<V> { Â Â
// æ¯å¦æ¯å¨æä»»å¡ï¼å¨æä»»å¡å¯è¢«è°åº¦è¿è¡å¤æ¬¡ï¼é卿任å¡åªè¢«è¿è¡ä¸æ¬¡ Â
boolean isPeriodic();
}ScheduledFutureTask ç±»
åå°scheculeæ¹æ³ä¸ï¼å®å建äºä¸ä¸ªScheduledFutureTask对象ï¼ç±ä¸é¢çå
³ç³»å¾å¯ç¥ï¼ScheduledFutureTaskç´æ¥æè
é´æ¥å®ç°äºå¾å¤æ¥å£ï¼ä¸èµ·ççScheduledFutureTaskéé¢çå®ç°æ¹æ³å§ã
æé æ¹æ³
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
// è°ç¨ç¶ç±»FutureTaskçæé æ¹æ³
super(r, result);
// time表示任å¡ä¸æ¬¡æ§è¡çæ¶é´
this.time = ns;
// 卿任å¡ï¼æ£æ°è¡¨ç¤ºæç
§åºå®éçï¼è´æ°è¡¨ç¤ºæç
§åºå®æ¶å»¶,0è¡¨ç¤ºä¸æ¯å¨æä»»å¡
this.period = period;
// ä»»å¡çç¼å·
this.sequenceNumber = sequencer.getAndIncrement();
}Delayed æ¥å£çå®ç°
// å®ç°Delayedæ¥å£çgetDelayæ¹æ³ï¼è¿åä»»å¡å¼å§æ§è¡çå©ä½æ¶é´
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), TimeUnit.NANOSECONDS);
}Comparable æ¥å£çå®ç°
// Comparableæ¥å£çcompareToæ¹æ³ï¼æ¯è¾ä¸¤ä¸ªä»»å¡çâ大å°âã
public int compareTo(Delayed other) {
if (other == this)
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
// å°äº0ï¼è¯´æå½åä»»å¡çæ§è¡æ¶é´ç¹æ©äºotherï¼è¦æå¨å»¶æ¶éåotherçåé¢
if (diff < 0)
return -1;
// 大äº0ï¼è¯´æå½åä»»å¡çæ§è¡æ¶é´ç¹æäºotherï¼è¦æå¨å»¶æ¶éåotherçåé¢
else if (diff > 0)
return 1;
// å¦æä¸¤ä¸ªä»»å¡çæ§è¡æ¶é´ç¹ä¸æ ·ï¼æ¯è¾ä¸¤ä¸ªä»»å¡çç¼å·ï¼ç¼å·å°çæå¨éååé¢ï¼ç¼å·å¤§çæå¨éååé¢
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
// 妿任å¡ç±»å䏿¯ScheduledFutureTaskï¼éè¿getDelayæ¹æ³æ¯è¾
long d = (getDelay(TimeUnit.NANOSECONDS) -
other.getDelay(TimeUnit.NANOSECONDS));
return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
}setNextRunTime
// 任塿§è¡å®åï¼è®¾ç½®ä¸æ¬¡æ§è¡çæ¶é´
private void setNextRunTime() {
long p = period;
// p > 0ï¼è¯´ææ¯åºå®éçè¿è¡çä»»å¡
// å¨åæ¥ä»»å¡å¼å§æ§è¡æ¶é´çåºç¡ä¸å ä¸på³å¯
if (p > 0)
time += p;
// p < 0ï¼è¯´ææ¯åºå®æ¶å»¶è¿è¡çä»»å¡ï¼
// 䏿¬¡æ§è¡æ¶é´å¨å½åæ¶é´(任塿§è¡å®æçæ¶é´)çåºç¡ä¸å ä¸-pçæ¶é´
else
time = triggerTime(-p);
}Runnable æ¥å£å®ç°
public void run() {
boolean periodic = isPeriodic();
// 妿å½åç¶æä¸ä¸è½æ§è¡ä»»å¡ï¼ååæ¶ä»»å¡
if (!canRunInCurrentRunState(periodic))
cancel(false);
// 䏿¯å¨ææ§ä»»å¡ï¼æ§è¡ä¸æ¬¡ä»»å¡å³å¯ï¼è°ç¨ç¶ç±»çrunæ¹æ³
else if (!periodic)
ScheduledFutureTask.super.run();
// æ¯å¨ææ§ä»»å¡ï¼è°ç¨FutureTaskçrunAndResetæ¹æ³ï¼æ¹æ³æ§è¡å®æå
// éæ°è®¾ç½®ä»»å¡ä¸ä¸æ¬¡æ§è¡çæ¶é´ï¼å¹¶å°è¯¥ä»»å¡éæ°å
¥éï¼çå¾
忬¡è¢«è°åº¦
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}æ»ç»ä¸ä¸ run æ¹æ³çæ§è¡è¿ç¨ï¼
- 妿å½åçº¿ç¨æ± è¿è¡ç¶æä¸è¿è¡æ§è¡ä»»å¡ï¼é£ä¹å°±åæ¶è¯¥ä»»å¡ï¼ç¶åç´æ¥è¿åï¼å¦åæ§è¡æ¥éª¤ 2ï¼
- 妿䏿¯å¨ææ§ä»»å¡ï¼è°ç¨ FutureTask ä¸ç run æ¹æ³æ§è¡ï¼ä¼è®¾ç½®æ§è¡ç»æï¼ç¶åç´æ¥è¿åï¼å¦åæ§è¡æ¥éª¤ 3ï¼
- 妿æ¯å¨ææ§ä»»å¡ï¼è°ç¨ FutureTask ä¸ç runAndReset æ¹æ³æ§è¡ï¼ä¸ä¼è®¾ç½®æ§è¡ç»æï¼ç¶åç´æ¥è¿åï¼å¦åæ§è¡æ¥éª¤ 4 åæ¥éª¤ 5ï¼
- 计ç®ä¸æ¬¡æ§è¡è¯¥ä»»å¡çå ·ä½æ¶é´ï¼
- é夿§è¡ä»»å¡ã
runAndResetæ¹æ³æ¯ä¸ºä»»å¡å¤æ¬¡æ§è¡è设计çãrunAndResetæ¹æ³æ§è¡å®ä»»å¡åä¸ä¼è®¾ç½®ä»»å¡çæ§è¡ç»æï¼ä¹ä¸ä¼å»æ´æ°ä»»å¡çç¶æï¼ä»¥åç»´æä»»å¡çç¶æä¸ºåå§ç¶æï¼NEWç¶æï¼ï¼è¿ä¹æ¯è¯¥æ¹æ³å FutureTask runæ¹æ³çåºå«ã
scheduleAtFixedRate
æä»¬çä¸ä¸ä»£ç ï¼
// 注æï¼åºå®éçååºå®æ¶å»¶ï¼ä¼ å
¥ç忰齿¯Runnableï¼ä¹å°±æ¯è¯´è¿ç§å®æ¶ä»»å¡æ¯æ²¡æè¿åå¼ç
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
// å建ä¸ä¸ªæåå§å»¶æ¶ååºå®å¨æçä»»å¡
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
// outerTask表示å°ä¼éæ°å
¥éçä»»å¡
sft.outerTask = t;
// ç¨å说æ
delayedExecute(t);
return t;
}scheduleAtFixedRateè¿ä¸ªæ¹æ³åschedule类似ï¼ä¸åç¹æ¯scheduleAtFixedRateæ¹æ³å
é¨åå»ºçæ¯ScheduledFutureTaskï¼å¸¦æåå§å»¶æ¶ååºå®å¨æçä»»å¡ã
scheduleWithFixedDelay
scheduleWithFixedDelay乿¯éè¿ScheduledFutureTaskä½ç°çï¼å¯ä¸ä¸åçå°æ¹å¨äºå建çScheduledFutureTaskä¸åã
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
// å建ä¸ä¸ªæåå§å»¶æ¶ååºå®æ¶å»¶çä»»å¡
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
// outerTask表示å°ä¼éæ°å
¥éçä»»å¡
sft.outerTask = t;
// ç¨å说æ
delayedExecute(t);
return t;
}delayedExecute
åé¢è®²å°çscheduleãscheduleAtFixedRateåscheduleWithFixedDelayæåé½è°ç¨äºdelayedExecuteæ¹æ³ï¼è¯¥æ¹æ³æ¯å®æ¶ä»»å¡æ§è¡çä¸»è¦æ¹æ³ã ä¸èµ·æ¥ççæºç ï¼
private void delayedExecute(RunnableScheduledFuture<?> task) {
// çº¿ç¨æ± å·²ç»å
³éï¼è°ç¨æç»æ§è¡å¤çå¨å¤ç
if (isShutdown())
reject(task);
else {
// å°ä»»å¡å å
¥å°çå¾
éå
super.getQueue().add(task);
// çº¿ç¨æ± å·²ç»å
³éï¼ä¸å½åç¶æä¸è½è¿è¡è¯¥ä»»å¡ï¼å°è¯¥ä»»å¡ä»çå¾
éåç§»é¤å¹¶åæ¶è¯¥ä»»å¡
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
// å¢å ä¸ä¸ªworkerï¼å°±ç®corePoolSize=0ä¹è¦å¢å ä¸ä¸ªworker
ensurePrestart();
}
}delayedExecuteæ¹æ³çé»è¾ä¹å¾ç®åï¼ä¸»è¦å°±æ¯å°ä»»å¡æ·»å å°çå¾
éåï¼ç¶åè°ç¨ensurePrestartæ¹æ³ã
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}ensurePrestartæ¹æ³ä¸»è¦æ¯è°ç¨äºaddWorker æ¹æ³ï¼çº¿ç¨æ± ä¸çå·¥ä½çº¿ç¨å°±æ¯éè¿è¯¥æ¹æ³æ¥å¯å¨å¹¶æ§è¡ä»»å¡çãç¸ä¿¡å¤§å®¶é½è¿æå°è±¡ã
对äºScheduledThreadPoolExecutorï¼workeræ·»å å°çº¿ç¨æ± åä¼å¨çå¾
éåä¸çå¾
è·åä»»å¡ï¼è¿ç¹æ¯åThreadPoolExecutoræ¯ä¸è´çã使¯ worker æ¯æä¹ä»çå¾
éåå宿¶ä»»å¡çå¢ï¼
DelayedWorkQueue
ScheduledThreadPoolExecutor使ç¨äºDelayedWorkQueue æ¥ä¿åçå¾
çä»»å¡ã
该çå¾
éåçéé¦åºè¯¥ä¿åçæ¯æè¿å°è¦æ§è¡çä»»å¡ï¼æä»¥workeråªå
³å¿éé¦ä»»å¡ï¼å¦æéé¦ä»»å¡çå¼å§æ§è¡æ¶é´è¿æªå°ï¼worker ä¹åºè¯¥ç»§ç»çå¾
ã
DelayedWorkQueue æ¯ä¸ä¸ªæ çä¼å éåï¼ä½¿ç¨æ°ç»åå¨ï¼åºå±ä½¿ç¨å ç»ææ¥å®ç°ä¼å éåçåè½ã

å¯ä»¥è½¬æ¢æå¦ä¸çæ°ç»ï¼

å¨è¿ç§ç»æä¸ï¼å¯ä»¥åç°æå¦ä¸ç¹æ§ãå设ï¼ç´¢å¼å¼ä» 0 å¼å§ï¼åèç¹çç´¢å¼å¼ä¸º kï¼ç¶èç¹çç´¢å¼å¼ä¸º pï¼åï¼
- ä¸ä¸ªèç¹çå·¦åèç¹çç´¢å¼ä¸ºï¼k = p * 2 + 1ï¼
- ä¸ä¸ªèç¹çå³åèç¹çç´¢å¼ä¸ºï¼k = (p + 1) * 2ï¼
- ä¸ä¸ªèç¹çç¶èç¹çç´¢å¼ä¸ºï¼p = (k - 1) / 2ã
æä»¬æ¥çç DelayedWorkQueue ç声æåæååéï¼
static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {
// éååå§å®¹é
private static final int INITIAL_CAPACITY = 16;
// æ°ç»ç¨æ¥åå¨å®æ¶ä»»å¡ï¼éè¿æ°ç»å®ç°å æåº
private RunnableScheduledFuture[] queue = new RunnableScheduledFuture[INITIAL_CAPACITY];
// å½åå¨éé¦çå¾
ç线ç¨
private Thread leader = null;
// éåçè§å¨ï¼ç¨äºleader线ç¨
private final ReentrantLock lock = new ReentrantLock();
private final Condition available = lock.newCondition();
// å
¶ä»ä»£ç ï¼ç¥
}å½ä¸ä¸ªçº¿ç¨æä¸º leaderï¼å®åªéçå¾ éé¦ä»»å¡ç delay æ¶é´å³å¯ï¼å ¶ä»çº¿ç¨ä¼æ æ¡ä»¶çå¾ ãleader åå°ä»»å¡è¿ååè¦éç¥å ¶ä»çº¿ç¨ï¼ç´å°æçº¿ç¨æä¸ºæ°ç leaderãæ¯å½éé¦ç宿¶ä»»å¡è¢«å ¶ä»æ´æ©éè¦æ§è¡ç任塿¿æ¢ï¼leader 就设置为 nullï¼å ¶ä»çå¾ ç线ç¨ï¼è¢«å½å leader éç¥ï¼åå½åç leader éæ°ç«äºæä¸º leaderã
ææçº¿ç¨é½ä¼æä¸ç§èº«ä»½ä¸çä¸ç§ï¼leaderãfollowerï¼ä»¥åä¸ä¸ªå¹²æ´»ä¸çç¶æï¼proccesserãå®çåºæ¬ååæ¯ï¼æ°¸è¿æå¤åªæä¸ä¸ª leaderãææ follower é½å¨çå¾ æä¸º leaderãçº¿ç¨æ± å¯å¨æ¶ä¼èªå¨äº§çä¸ä¸ª Leader è´è´£çå¾ ç½ç» IO äºä»¶ï¼å½æä¸ä¸ªäºä»¶äº§çæ¶ï¼Leader 线ç¨é¦å éç¥ä¸ä¸ª Follower 线ç¨å°å ¶ææä¸ºæ°ç Leaderï¼ç¶åèªå·±å°±å»å¹²æ´»äºï¼å»å¤çè¿ä¸ªç½ç»äºä»¶ï¼å¤ç宿¯åå å ¥ Follower 线ç¨çå¾ éåï¼çå¾ ä¸æ¬¡æä¸º Leaderãè¿ç§æ¹æ³å¯ä»¥å¢å¼º CPU é«éç¼åç¸ä¼¼æ§ï¼åæ¶é¤å¨æå ååé å线ç¨é´çæ°æ®äº¤æ¢ã
åæ¶ï¼å®ä¹äº ReentrantLock é lock å Condition available ç¨äºæ§å¶åéç¥ä¸ä¸ä¸ªçº¿ç¨ç«äºæä¸º leaderã
å½ä¸ä¸ªæ°çä»»å¡æä¸ºéé¦ï¼æè éè¦ææ°ççº¿ç¨æä¸º leader æ¶ï¼available çè§å¨ä¸ç线ç¨å°ä¼è¢«éç¥ï¼ç¶åç«äºæä¸º leader 线ç¨ãæäºç±»ä¼¼äºç产è -æ¶è´¹è 模å¼ã
DelayedWorkQueue æ¯ä¸ä¸ªä¼å
级éåï¼å®å¯ä»¥ä¿è¯æ¯æ¬¡åºéçä»»å¡é½æ¯å½åéå䏿§è¡æ¶é´æé åçï¼ç±äºå®æ¯åºäºå ç»æçéåï¼å ç»æå¨æ§è¡æå
¥åå 餿使¶çæåæ¶é´å¤æåº¦æ¯ O(logN)ã
æ¥ä¸æ¥ççDelayedWorkQueueä¸å 个æ¯è¾éè¦çæ¹æ³ã
take
public RunnableScheduledFuture take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
// åå é¡¶çä»»å¡ï¼å é¡¶æ¯æè¿è¦æ§è¡çä»»å¡
RunnableScheduledFuture first = queue[0];
// å 顶为空ï¼çº¿ç¨è¦å¨æ¡ä»¶availableä¸çå¾
if (first == null)
available.await();
else {
// å é¡¶ä»»å¡è¿è¦å¤é¿æ¶é´æè½æ§è¡
long delay = first.getDelay(TimeUnit.NANOSECONDS);
// å é¡¶ä»»å¡å·²ç»å¯ä»¥æ§è¡äºï¼finishPollä¼éæ°è°æ´å ï¼ä½¿å
¶æ»¡è¶³æå°å ç¹æ§ï¼è¯¥æ¹æ³è®¾ç½®ä»»å¡å¨
// å ä¸çindex为-1å¹¶è¿å该任å¡
if (delay <= 0)
return finishPoll(first);
// 妿leaderä¸ä¸ºç©ºï¼è¯´æå·²ç»æçº¿ç¨æä¸ºleaderå¹¶çå¾
å é¡¶ä»»å¡
// å°è¾¾æ§è¡æ¶é´ï¼æ¤æ¶ï¼å
¶ä»çº¿ç¨é½éè¦å¨availableæ¡ä»¶ä¸çå¾
else if (leader != null)
available.await();
else {
// leader为空ï¼å½åçº¿ç¨æä¸ºæ°çleader
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// å½å线ç¨å·²ç»æä¸ºleaderäºï¼åªéè¦çå¾
å é¡¶ä»»å¡å°è¾¾æ§è¡æ¶é´å³å¯
available.awaitNanos(delay);
} finally {
// è¿åå é¡¶å
ç´ ä¹åå°leader设置为空
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// éç¥å
¶ä»å¨availableæ¡ä»¶çå¾
ç线ç¨ï¼è¿äºçº¿ç¨å¯ä»¥å»ç«äºæä¸ºæ°çleader
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}takeæ¹æ³æ¯ä»ä¹æ¶åè°ç¨çå¢ï¼
å¨è®²è§£çº¿ç¨æ± çæ¶åï¼æä»¬ä»ç»äºgetTaskæ¹æ³ï¼å·¥ä½çº¿ç¨ä¼å¾ªç¯ä»workQueueä¸åä»»å¡ãä½è®¡åä»»å¡å´ä¸åï¼å ä¸ºä¸æ¦getTaskæ¹æ³ååºäºä»»å¡å°±å¼å§æ§è¡äºï¼èè¿æ¶å¯è½è¿æ²¡æå°æ§è¡æ¶é´ï¼æä»¥å¨takeæ¹æ³ä¸ï¼è¦ä¿è¯åªæå°æå®çæ§è¡æ¶é´ï¼ä»»å¡æå¯ä»¥è¢«åèµ°ã
æ»ç»ä¸ä¸æµç¨ï¼
- 妿å é¡¶å ç´ ä¸ºç©ºï¼å¨ available ä¸çå¾ ã
- 妿å é¡¶ä»»å¡çæ§è¡æ¶é´å·²å°ï¼å°å é¡¶å ç´ æ¿æ¢ä¸ºå çæåä¸ä¸ªå ç´ å¹¶è°æ´å ä½¿å ¶æ»¡è¶³æå°å ç¹æ§ï¼åæ¶è®¾ç½®ä»»å¡å¨å ä¸ç´¢å¼ä¸º-1ï¼è¿å该任å¡ã
- 妿 leader ä¸ä¸ºç©ºï¼è¯´æå·²ç»æçº¿ç¨æä¸º leader äºï¼å ¶ä»çº¿ç¨é½è¦å¨ available çè§å¨ä¸çå¾ ã
- 妿 leader 为空ï¼å½åçº¿ç¨æä¸ºæ°ç leaderï¼å¹¶çå¾ ç´å°å 顶任塿§è¡æ¶é´å°è¾¾ã
- take æ¹æ³è¿åä¹åï¼å° leader 设置为空ï¼å¹¶éç¥å ¶ä»çº¿ç¨ã
åæ¥è¯´ä¸ä¸ leader çä½ç¨ï¼è¿éç leader æ¯ä¸ºäºåå°ä¸å¿
è¦ç宿¶çå¾
ï¼å½ä¸ä¸ªçº¿ç¨æä¸º leader æ¶ï¼å®åªçå¾
ä¸ä¸ä¸ªèç¹çæ¶é´é´éï¼ä½å
¶å®çº¿ç¨æ éæçå¾
ã leader 线ç¨å¿
é¡»å¨take()æpoll()è¿åä¹å signal å
¶å®çº¿ç¨ï¼é¤éå
¶ä»çº¿ç¨æä¸ºäº leaderã
䏾便¥è¯´ï¼å¦ææ²¡æ leaderï¼é£ä¹å¨æ§è¡ take æ¶ï¼é½è¦æ§è¡available.awaitNanos(delay)ï¼å设å½åçº¿ç¨æ§è¡äºè¯¥æ®µä»£ç ï¼è¿æ¶è¿æ²¡æ signalï¼ç¬¬äºä¸ªçº¿ç¨ä¹æ§è¡äºè¯¥æ®µä»£ç ï¼å第äºä¸ªçº¿ç¨ä¹è¦è¢«é»å¡ã
ä½åªæä¸ä¸ªçº¿ç¨è¿åéé¦ä»»å¡ï¼å
¶ä»ç线ç¨å¨awaitNanos(delay)ä¹åï¼ç»§ç»æ§è¡ for 循ç¯ï¼å 为éé¦ä»»å¡å·²ç»è¢«è¿åäºï¼æä»¥è¿ä¸ªæ¶åç for å¾ªç¯æ¿å°çéé¦ä»»å¡æ¯æ°çï¼åéè¦éæ°å¤ææ¶é´ï¼åè¦ç»§ç»é»å¡ã
æä»¥ï¼ä¸ºäºä¸è®©å¤ä¸ªçº¿ç¨é¢ç¹çåæ ç¨ç宿¶çå¾
ï¼è¿éå¢å äº leaderï¼å¦æ leader ä¸ä¸ºç©ºï¼å说æéåä¸ç¬¬ä¸ä¸ªèç¹å·²ç»å¨çå¾
åºéï¼è¿æ¶å
¶å®ç线ç¨ä¼ä¸ç´é»å¡ï¼åå°äºæ ç¨çé»å¡ï¼æ³¨æï¼å¨finallyä¸è°ç¨äºsignal()æ¥å¤éä¸ä¸ªçº¿ç¨ï¼è䏿¯signalAll()ï¼ã
offer
è¯¥æ¹æ³å¾éåæå ¥ä¸ä¸ªå¼ï¼è¿åæ¯å¦æåæå ¥ã
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture e = (RunnableScheduledFuture)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
// éåå
ç´ å·²ç»å¤§äºçäºæ°ç»çé¿åº¦ï¼éè¦æ©å®¹ï¼æ°å ç容鿝忥å 容éç1.5å
if (i >= queue.length)
grow();
// å ä¸å
ç´ å¢å 1
size = i + 1;
// è°æ´å
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
// è°æ´å ï¼ä½¿ç满足æå°å ï¼æ¯è¾å¤§å°çæ¹å¼å°±æ¯ä¸ææå°çcompareToæ¹æ³
siftUp(i, e);
}
if (queue[0] == e) {
leader = null;
// éç¥å
¶ä»å¨availableæ¡ä»¶ä¸çå¾
ç线ç¨ï¼è¿äºçº¿ç¨å¯ä»¥ç«äºæä¸ºæ°çleader
available.signal();
}
} finally {
lock.unlock();
}
return true;
}offer æ¹æ³å®ç°äºåå»¶è¿éåæå ¥ä¸ä¸ªä»»å¡çæä½ï¼å¹¶ä¿è¯æ´ä¸ªéåä»ç¶æ»¡è¶³æå°å çæ§è´¨ã
æå°å ï¼Min Heapï¼æ¯ä¸ä¸ªå®å ¨äºåæ ï¼å ¶ä¸æ¯ä¸ä¸ªç¶èç¹çå¼é½å°äºæçäºå ¶åèç¹çå¼ãæ¢å¥è¯è¯´ï¼å¨æå°å ä¸ï¼æ ¹èç¹ï¼å³æ çé¡¶é¨ï¼æ¯ææèç¹ä¸çæå°å¼ã
å颿们乿å°è¿æå°å ãæä»¬æ¥çä¸ä¸ç¨äºè°æ´å ç siftUp æ¹æ³ã
private void siftUp(int k, RunnableScheduledFuture<?> key) {
while (k > 0) {
// æ¾å°ç¶èç¹çç´¢å¼
int parent = (k - 1) >>> 1;
// è·åç¶èç¹
RunnableScheduledFuture<?> e = queue[parent];
// 妿keyèç¹çæ§è¡æ¶é´å¤§äºç¶èç¹çæ§è¡æ¶é´ï¼ä¸éè¦åæåºäº
if (key.compareTo(e) >= 0)
break;
// 妿key.compareTo(e) < 0ï¼è¯´ækeyèç¹çæ§è¡æ¶é´å°äºç¶èç¹çæ§è¡æ¶é´ï¼éè¦æç¶èç¹ç§»å°åé¢
queue[k] = e;
// 设置索å¼ä¸ºk
setIndex(e, k);
k = parent;
}
// key设置为æåºåçä½ç½®ä¸
queue[k] = key;
setIndex(key, k);
}代ç å¾å¥½çè§£ï¼å°±æ¯å¾ªç¯çæ ¹æ®keyèç¹ä¸å®çç¶èç¹æ¥å¤æï¼å¦ækeyèç¹çæ§è¡æ¶é´å°äºç¶èç¹ï¼åå°ä¸¤ä¸ªèç¹äº¤æ¢ï¼ä½¿æ§è¡æ¶é´é åçèç¹æåå¨éåçåé¢ã
å设æ°å ¥éçèç¹çå»¶è¿æ¶é´ï¼è°ç¨getDelay()æ¹æ³è·å¾ï¼æ¯5ï¼æ§è¡è¿ç¨å¦ä¸ï¼
1ãå å°æ°çèç¹æ·»å å°æ°ç»çå°¾é¨ï¼è¿æ¶æ°èç¹çç´¢å¼k为7ï¼

2ãè®¡ç®æ°ç¶èç¹çç´¢å¼ï¼parent = (k - 1) >>> 1ï¼parent = 3ï¼é£ä¹queue[3]çæ¶é´é´éå¼ä¸º8ï¼å 为 5 < 8 ï¼å°æ§è¡queue[7] = queue[3]ï¼

3ãè¿æ¶å°k设置为3ï¼ç»§ç»å¾ªç¯ï¼å次计ç®parent为1ï¼queue[1]çæ¶é´é´é为3ï¼å 为 5 > 3 ï¼è¿æ¶éåºå¾ªç¯ï¼æç»k为3ï¼

å¯è§ï¼æ¯æ¬¡æ°å¢èç¹æ¶ï¼åªæ¯æ ¹æ®ç¶èç¹æ¥å¤æï¼èä¸ä¼å½±åå å¼èç¹ã
å°ç»
ScheduledThreadPoolExecutoræ¯ä¸ä¸ªå®æ¶ä»»å¡ççº¿ç¨æ± ï¼å®ç主è¦ä½ç¨æ¯å¨ææ§çæ§è¡ä»»å¡ãå®çå®ç°åçæ¯éè¿DelayedWorkQueueæ¥ä¿åçå¾
çä»»å¡ï¼DelayedWorkQueueæ¯ä¸ä¸ªæ çä¼å
éåï¼ä½¿ç¨æ°ç»åå¨ï¼åºå±ä½¿ç¨å ç»ææ¥å®ç°ä¼å
éåçåè½ã
ç¼è¾ï¼æ²é»çäºï¼åæå å®¹æ¥æºäºæåå°ä¸è¤ç«è«å¼æºçè¿ä¸ªä»åºï¼æ·±å ¥æµ åº Java å¤çº¿ç¨ï¼å¼ºçæ¨èãå ¶ä»åè龿¥å¦ä¸ï¼
æ¨èé 读ï¼è¯»è ä¸åç 11 ç§å»¶è¿ä»»å¡çå®ç°æ¹å¼
GitHub 䏿 æ 10000+ ç弿ºç¥è¯åºãäºå¥ç Java è¿é¶ä¹è·¯ã第äºä»½ PDF ãå¹¶åç¼ç¨å°åãç»äºæ¥äºï¼å æ¬çº¿ç¨çåºæ¬æ¦å¿µåä½¿ç¨æ¹æ³ãJavaçå 忍¡åãsychronizedãvolatileãCASãAQSãReentrantLockãçº¿ç¨æ± ãå¹¶å容å¨ãThreadLocalãçäº§è æ¶è´¹è 模åçé¢è¯åå¼åå¿ é¡»ææ¡çå 容ï¼å ±è®¡ 15 ä¸ä½åï¼200+å¼ æç»å¾ï¼å¯ä»¥è¯´æ¯éä¿ææãé£è¶£å¹½é»â¦â¦è¯¦æ æ³ï¼å¤ªèµäºï¼äºå¥çå¹¶åç¼ç¨è¿é¶ä¹è·¯.pdf
å å ¥äºå¥çç¼ç¨æçï¼å¨æçç第äºä¸ªç½®é¡¶å¸ãç¥è¯å¾è°±ãéå°±å¯ä»¥è·å PDF çæ¬ã

