forked from BruceEckel/OnJava8-Examples
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathPriorityBlockingQueueDemo.java
More file actions
139 lines (135 loc) · 3.53 KB
/
PriorityBlockingQueueDemo.java
File metadata and controls
139 lines (135 loc) · 3.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
// lowlevel/PriorityBlockingQueueDemo.java
// (c)2017 MindView LLC: see Copyright.txt
// We make no guarantees that this code is fit for any purpose.
// Visit http://OnJava8.com for more book information.
import java.util.*;
import java.util.stream.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import onjava.Nap;
class Prioritized implements Comparable<Prioritized> {
private static AtomicInteger counter =
new AtomicInteger();
private final int id = counter.getAndIncrement();
private final int priority;
private static List<Prioritized> sequence =
new CopyOnWriteArrayList<>();
Prioritized(int priority) {
this.priority = priority;
sequence.add(this);
}
@Override
public int compareTo(Prioritized arg) {
return priority < arg.priority ? 1 :
(priority > arg.priority ? -1 : 0);
}
@Override
public String toString() {
return String.format(
"[%d] Prioritized %d", priority, id);
}
public void displaySequence() {
int count = 0;
for(Prioritized pt : sequence) {
System.out.printf("(%d:%d)", pt.id, pt.priority);
if(++count % 5 == 0)
System.out.println();
}
}
public static class EndSentinel extends Prioritized {
EndSentinel() { super(-1); }
}
}
class Producer implements Runnable {
private static AtomicInteger seed =
new AtomicInteger(47);
private SplittableRandom rand =
new SplittableRandom(seed.getAndAdd(10));
private Queue<Prioritized> queue;
Producer(Queue<Prioritized> q) {
queue = q;
}
@Override
public void run() {
rand.ints(10, 0, 20)
.mapToObj(Prioritized::new)
.peek(p -> new Nap(rand.nextDouble() / 10))
.forEach(p -> queue.add(p));
queue.add(new Prioritized.EndSentinel());
}
}
class Consumer implements Runnable {
private PriorityBlockingQueue<Prioritized> q;
private SplittableRandom rand =
new SplittableRandom(47);
Consumer(PriorityBlockingQueue<Prioritized> q) {
this.q = q;
}
@Override
public void run() {
while(true) {
try {
Prioritized pt = q.take();
System.out.println(pt);
if(pt instanceof Prioritized.EndSentinel) {
pt.displaySequence();
break;
}
new Nap(rand.nextDouble() / 10);
} catch(InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
public class PriorityBlockingQueueDemo {
public static void main(String[] args) {
PriorityBlockingQueue<Prioritized> queue =
new PriorityBlockingQueue<>();
CompletableFuture.runAsync(new Producer(queue));
CompletableFuture.runAsync(new Producer(queue));
CompletableFuture.runAsync(new Producer(queue));
CompletableFuture.runAsync(new Consumer(queue))
.join();
}
}
/* Output:
[15] Prioritized 2
[17] Prioritized 1
[17] Prioritized 5
[16] Prioritized 6
[14] Prioritized 9
[12] Prioritized 0
[11] Prioritized 4
[11] Prioritized 12
[13] Prioritized 13
[12] Prioritized 16
[14] Prioritized 18
[15] Prioritized 23
[18] Prioritized 26
[16] Prioritized 29
[12] Prioritized 17
[11] Prioritized 30
[11] Prioritized 24
[10] Prioritized 15
[10] Prioritized 22
[8] Prioritized 25
[8] Prioritized 11
[8] Prioritized 10
[6] Prioritized 31
[3] Prioritized 7
[2] Prioritized 20
[1] Prioritized 3
[0] Prioritized 19
[0] Prioritized 8
[0] Prioritized 14
[0] Prioritized 21
[-1] Prioritized 28
(0:12)(2:15)(1:17)(3:1)(4:11)
(5:17)(6:16)(7:3)(8:0)(9:14)
(10:8)(11:8)(12:11)(13:13)(14:0)
(15:10)(16:12)(17:12)(18:14)(19:0)
(20:2)(21:0)(22:10)(23:15)(24:11)
(25:8)(26:18)(27:-1)(28:-1)(29:16)
(30:11)(31:6)(32:-1)
*/