1 package cz.cuni.amis.utils.future;
2
3 import java.util.concurrent.CountDownLatch;
4 import java.util.concurrent.Future;
5 import java.util.concurrent.TimeUnit;
6 import java.util.concurrent.TimeoutException;
7
8 import cz.cuni.amis.utils.exception.PogamutException;
9 import cz.cuni.amis.utils.exception.PogamutInterruptedException;
10 import cz.cuni.amis.utils.listener.Listeners;
11 import cz.cuni.amis.utils.listener.Listeners.ListenerNotifier;
12
13
14
15
16
17
18
19
20
21
22 public abstract class FutureWithListeners<RESULT> implements IFutureWithListeners<RESULT> {
23
24
25
26
27 protected Object mutex = new Object();
28
29
30
31
32 protected Listeners<IFutureListener<RESULT>> listeners = new Listeners<IFutureListener<RESULT>>();
33
34
35
36
37 private FutureStatus oldStatus = null;
38
39
40
41
42 private FutureStatus newStatus = null;
43
44
45
46
47 private ListenerNotifier<IFutureListener<RESULT>> notifier = new ListenerNotifier<IFutureListener<RESULT>>() {
48
49 @Override
50 public Object getEvent() {
51 return newStatus;
52 }
53
54 @Override
55 public void notify(IFutureListener<RESULT> listener) {
56 listener.futureEvent(FutureWithListeners.this, oldStatus, newStatus);
57 }
58 };
59
60
61
62
63
64 private FutureStatus status = FutureStatus.FUTURE_IS_BEING_COMPUTED;
65
66
67
68
69 private RESULT result = null;
70
71
72
73
74
75 protected CountDownLatch latch = null;
76
77
78
79
80
81 private Exception exception;
82
83
84
85
86
87 @Override
88 public FutureStatus getStatus() {
89 return status;
90 }
91
92
93
94
95
96
97 @Override
98 public void addFutureListener(IFutureListener<RESULT> listener) {
99 listeners.addStrongListener(listener);
100 }
101
102
103
104
105
106 @Override
107 public void removeFutureListener(IFutureListener<RESULT> listener) {
108 listeners.removeListener(listener);
109 }
110
111
112
113
114
115
116 @Override
117 public boolean isListening(IFutureListener<RESULT> listener) {
118 return listeners.isListening(listener);
119 }
120
121
122
123
124
125
126
127
128
129 @Override
130 public void setResult(RESULT result) {
131 synchronized(mutex) {
132 if (status != FutureStatus.FUTURE_IS_BEING_COMPUTED) {
133 throw new PogamutException("Future is not being computed anymore - can't set result.", this);
134 }
135 this.result = result;
136 switchStatus(FutureStatus.FUTURE_IS_READY);
137 if (latch != null) {
138 while (latch.getCount() > 0) latch.countDown();
139 } else {
140 latch = new CountDownLatch(0);
141 }
142 listeners.clearListeners();
143 }
144 }
145
146
147
148
149
150
151
152
153
154 @Override
155 public void computationException(Exception e) {
156 synchronized(mutex) {
157 if (status != FutureStatus.FUTURE_IS_BEING_COMPUTED) {
158 throw new PogamutException("Future is not being computed anymore - can't process computation exception.", e);
159 }
160 this.exception = e;
161 switchStatus(FutureStatus.COMPUTATION_EXCEPTION);
162 this.result = null;
163 if (latch != null) {
164 while (latch.getCount() > 0) latch.countDown();
165 } else {
166 latch = new CountDownLatch(0);
167 }
168 }
169 }
170
171
172
173
174
175
176
177 protected void switchStatus(FutureStatus newStatus) {
178 synchronized(mutex){
179 if (newStatus == status) return;
180 oldStatus = status;
181 this.newStatus = newStatus;
182 status = newStatus;
183 listeners.notify(notifier);
184 }
185 }
186
187
188
189
190
191 protected CountDownLatch createLatch() {
192 return new CountDownLatch(1);
193 }
194
195
196
197
198
199
200
201 protected boolean cancelComputation(boolean mayInterruptIfRunning) {
202 return false;
203 }
204
205 @Override
206 public final boolean cancel(boolean mayInterruptIfRunning) {
207 synchronized(mutex) {
208 if (cancelComputation(mayInterruptIfRunning)) {
209 switchStatus(FutureStatus.CANCELED);
210 return true;
211 } else {
212 return false;
213 }
214 }
215 }
216
217 @Override
218 public RESULT get() {
219 if (status == FutureStatus.FUTURE_IS_READY) return result;
220 if (status == FutureStatus.FUTURE_IS_BEING_COMPUTED) {
221 synchronized(mutex) {
222 if (status == FutureStatus.FUTURE_IS_READY) return result;
223 if (status == FutureStatus.FUTURE_IS_BEING_COMPUTED) {
224 latch = createLatch();
225 }
226 }
227 try {
228 latch.await();
229 } catch (InterruptedException e) {
230 throw new PogamutInterruptedException("Interrupted while awaiting furure result.", e, this);
231 }
232 if (status == FutureStatus.FUTURE_IS_READY) return result;
233 }
234 return null;
235 }
236
237
238
239
240
241
242
243
244
245
246
247 @Override
248 public RESULT get(long timeout, TimeUnit unit) {
249 if (status == FutureStatus.FUTURE_IS_READY) return result;
250 if (status == FutureStatus.FUTURE_IS_BEING_COMPUTED) {
251 synchronized(mutex) {
252 if (status == FutureStatus.FUTURE_IS_READY) return result;
253 if (status == FutureStatus.FUTURE_IS_BEING_COMPUTED) {
254 latch = createLatch();
255 }
256 }
257 try {
258 latch.await(timeout, unit);
259 } catch (InterruptedException e) {
260 throw new PogamutInterruptedException("Interrupted while awaiting future result.", e, this);
261 }
262 if (status == FutureStatus.FUTURE_IS_READY) return result;
263 }
264 return null;
265 }
266
267 @Override
268 public boolean isCancelled() {
269 return status == FutureStatus.CANCELED;
270 }
271
272 @Override
273 public boolean isDone() {
274 return status != FutureStatus.FUTURE_IS_BEING_COMPUTED;
275 }
276
277
278
279
280
281 @Override
282 public Exception getException() {
283 return exception;
284 }
285
286 }