1 package cz.cuni.amis.utils.flag;
2
3 import java.io.IOException;
4 import java.io.ObjectInputStream;
5 import java.io.Serializable;
6 import java.util.LinkedList;
7 import java.util.List;
8 import java.util.concurrent.Semaphore;
9 import java.util.concurrent.TimeUnit;
10
11 import cz.cuni.amis.utils.exception.PogamutException;
12 import cz.cuni.amis.utils.exception.PogamutIOException;
13 import cz.cuni.amis.utils.exception.PogamutInterruptedException;
14 import cz.cuni.amis.utils.listener.Listeners;
15
16 /**
17 * This class may be used to create an observable value (you may attach change-listeners to it).
18 * <p><p>
19 * This is flag class which is designed for Boolean or Integer types (but
20 * it should work with other types as well as long as they have equals() implemented
21 * correctly).
22 * <p><p>
23 * It allows you to store the state of flag and register listeners on the flag.
24 * <p><p>
25 * Note that the implementation is:
26 * <ol>
27 * <li>thread-safe (truly),</li>
28 * <li>recursion-safe (meaning that the flag may be changed from within the listener it notifies of the flag changes - such events are put into the queue and processed in correct sequence),</li>
29 * <li>setters/getters are non-blocking (or they blocks for finite small amount of time ~ few synchronized statements used, which can't block each other for greater period of time).</li>
30 * </ol>
31 * <p><p>
32 * Also note that can't be a really correct implementation of the flag that always returns
33 * the right value - if you heavily use flag from let's say a tens of threads then there may
34 * be glitches in the getFlag() returned value (but for the most implementation this value
35 * will be correct in 99.99999%!).
36 * <p><p>
37 * Note that the implementation of notifying about flag-change is
38 * strictly time-ordered. Every flag-change event is fully processed before another is raised/received.
39 * There is a possibility that a listener on the flag change will attempt to change the flag again (mentioned recursion-safe).
40 * In that case processing of this flag change is postponed until the previous event has been fully processed
41 * (e.g. all listeners has been notified about it).
42 * <p><p>
43 * Last piece of magic - if you want to change the flag value in-sync (meaning that you need 100% safe reading of the flag value), instantiate Flag.DoInSync<T> class
44 * and submit it via inSync() method - {@link DoInSync#execute(Object)} method will be executed in synchronized state so no one can change the flag value
45 * while you're inside this method.
46 *
47 * @author Jimmy
48 *
49 * @param <T> type of the flag
50 */
51 public class Flag<T> implements IFlag<T>, Serializable {
52
53 /**
54 * Usage of this abstract class is as simple as it could be ... all you have to do is to instantiate
55 * it (using anonymous objects).<p>
56 * <p>
57 * Example:<p><p>
58 * <code>
59 * Flag<Integer> flag = new Flag<Integer>(10);<p>
60 * flag.inSync(
61 * new Flag.DoInSync<Integer>(flag) {<p>
62 * public abstract void execute(Integer flagValue) {<p>
63 * setFlag(flagValue+1);<p>
64 * }<p>
65 * }
66 * );<p>
67 * </code>
68 * <p><p>
69 * No need to do anything else! The class will submit itself to the flag upon construction.
70 * <p><p>
71 * Use it to create correct counters (or use directly FlagInteger class).
72 *
73 * @author Jimmy
74 *
75 * @param <T>
76 */
77 public static abstract class DoInSync<T> {
78
79 Flag<T> flag;
80
81 public DoInSync() {
82 }
83
84 void setFlagInstance(Flag<T> flag) {
85 this.flag = flag;
86 }
87
88 /**
89 * Tells you whether you operate over immutable flag (can't call setFlag() then) or not.
90 * @return
91 */
92 protected boolean isImmutable() {
93 return flag instanceof ImmutableFlag;
94 }
95
96 /**
97 * Set value in sync.
98 */
99 protected void setFlag(T value) {
100 if (flag instanceof ImmutableFlag) throw new UnsupportedOperationException("trying to set flag of the immutable flag!");
101 flag.value = value;
102 flag.notifier.setValue(value);
103 flag.listeners.notify(flag.notifier);
104 }
105
106 protected T getFlag() {
107 return flag.getFlag();
108 }
109
110 /**
111 * @param flag
112 */
113 public abstract void execute(T flagValue);
114
115 }
116
117 class SetInSync extends DoInSync<T> {
118
119 T newValue;
120
121 public SetInSync(T newValue) {
122 this.newValue = newValue;
123 }
124
125 @Override
126 public void execute(T flagValue) {
127 if (
128 (newValue == null && flagValue != null)
129 ||
130 (newValue != null && !newValue.equals(flagValue))
131 ) {
132 setFlag(newValue);
133 }
134 }
135
136 }
137
138 transient Listeners<FlagListener<T>> listeners = new Listeners<FlagListener<T>>();
139
140 /**
141 * Do not read directly - always use getFlag() method.
142 */
143 T value;
144
145 transient FlagListener.FlagListenerNotifier<T> notifier = new FlagListener.FlagListenerNotifier<T>();
146
147 /**
148 * Mutex that we synchronized on when the result of getValue() should be changed.
149 */
150 transient Object setMutex = new Object();
151
152 /**
153 * Whether the set method is freezed.
154 */
155 transient boolean setFreezed = false;
156
157 transient Object setFreezedMutex = new Object();
158
159 transient Semaphore setFreezedSemaphore = new Semaphore(1);
160
161 /**
162 * If availablePermits() == 1, the queue is not being processed by the method processSetFlagQueue()
163 */
164 transient Semaphore commandQueueProcessing = new Semaphore(1);
165
166 transient Object commandQueueProcessingMutex = new Object();
167
168 transient List<DoInSync<T>> commandQueue = new LinkedList<DoInSync<T>>();
169
170 /** Immutable version of this flag. */
171 transient ImmutableFlag<T> immutableWrapper = null;
172
173 /**
174 * Initialize the flag with 'null' as an initial value.
175 */
176 public Flag() {
177 value = null;
178 }
179
180 /**
181 * Initialize the flag with 'initialValue'.
182 * @param initialValue
183 */
184 public Flag(T initialValue) {
185 value = initialValue;
186 }
187
188 /**
189 * Method honoring the de-serialization process, it correctly initializes all
190 * that needs to be.
191 *
192 * @return
193 */
194 private void readObject(ObjectInputStream ois) {
195 try {
196 ois.defaultReadObject();
197 } catch (IOException e) {
198 throw new PogamutIOException("Could not read Flag", e);
199 } catch (ClassNotFoundException e) {
200 throw new PogamutException("Could not deserialize Flag", e);
201 }
202 this.listeners = new Listeners<FlagListener<T>>();
203 this.notifier = new FlagListener.FlagListenerNotifier<T>();
204 this.setMutex = new Object();
205 this.setFreezed = false;
206 this.setFreezedMutex = new Object();
207 this.setFreezedSemaphore = new Semaphore(1);
208 this.commandQueueProcessing = new Semaphore(1);
209 this.commandQueueProcessingMutex = new Object();
210 this.commandQueue = new LinkedList<DoInSync<T>>();
211 this.immutableWrapper = null;
212 }
213
214 /**
215 * Unsychronized!
216 * <p><p>
217 * setFlagProcessing must be acquired before calling! It will be released by this method.
218 */
219 private void processCommandQueue() {
220 // check the events list size, do we have more events to process?
221 // note that the implementation is tricky ... try to think it over before modifying
222 while(true) {
223 DoInSync<T> command = null;
224 synchronized(commandQueue) {
225 if (commandQueue.size() != 0) {
226 command = commandQueue.get(0);
227 commandQueue.remove(0);
228 }
229 }
230 if (command != null) {
231 command.setFlagInstance(this);
232 command.execute(value);
233 }
234 synchronized(commandQueueProcessingMutex) {
235 if (commandQueue.size() == 0) {
236 commandQueueProcessing.release();
237 return;
238 }
239 }
240 }
241 }
242
243 /**
244 * Add a command that will be executed in-sync with other changes (you may be sure that no other changes
245 * are taking place right now).
246 * <p><p>
247 * This is also used by the setFlag() method.
248 * @param command
249 * @param addAsFirst if true the command will be added as a first to execute
250 */
251 protected void inSyncInner(DoInSync<T> command, boolean addAsFirst) {
252 // we're going to change the result of getValue() method, synchronized on it
253 synchronized(setMutex) {
254 // we will modify the setFlagValues
255 synchronized(commandQueue) {
256 // we're going to query setFlagProcessing
257 synchronized(commandQueueProcessingMutex) {
258 // we're going to query the setFreezed flag
259 synchronized(setFreezedMutex) {
260 // insert command into the queue
261 if (addAsFirst) commandQueue.add(0, command);
262 else commandQueue.add(command);
263
264 // is the set method freezed?
265 if (setFreezed) return;
266
267 // is the processSetFlagQueue running?
268 if (commandQueueProcessing.availablePermits() <= 0) return;
269
270 // we're not freezed nor the setFlag is running
271 try {
272 // acquire processing semaphore
273 commandQueueProcessing.acquire();
274 } catch (InterruptedException e) {
275 //throw new RuntimeException("could not happen...");
276 return;
277 }
278 }
279 }
280 }
281 }
282 // there can never ever be two threads in this place - we've ruled them out in the
283 // previous synchronized statement
284 processCommandQueue();
285 }
286
287 /**
288 * Add a command (to the end of the queue) that will be executed in-sync with other changes (you may be sure that no other changes
289 * are taking place right now).
290 * <p><p>
291 * This is also used by the setFlag() method.
292 * @param command
293 */
294 public void inSync(DoInSync<T> command) {
295 inSyncInner(command, false);
296 }
297
298 /**
299 * Changes the flag and informs all listeners.
300 *
301 * @param newValue
302 * @throws InterruptedRuntimeException if interrupted during the await on the freeze latch
303 */
304 public void setFlag(T newValue) {
305 inSyncInner(new SetInSync(newValue), false);
306 }
307
308 /**
309 * Whether the flag-change has been frozen, i.e., setFlag() won't change the flag value
310 * immediately by will wait till {@link Flag#defreeze()} is called.
311 */
312 public boolean isFrozen() {
313 return setFreezed;
314 }
315
316 /**
317 * This method will freeze the processing of the setFlag() method. Method is synchronized.
318 * <p><p>
319 * It waits until all setFlag() pending requests are resolved and then returns.
320 * <p><p>
321 * It may be used to for the synchronized registration of the listeners (if you really care
322 * for the value of the flag before creating the listener). Or it may be used to obtain
323 * a true value of the flag in the moment of the method call (as it waits for all the listeners
324 * to execute).
325 * <p>
326 * In one of these cases, do this:<p>
327 * <ol>
328 * <li>flag.freeze()<li>
329 * <li>examine the flag value and/or register new listeners</li>
330 * <li>flag.defreeze() // DO NOT FORGET THIS!</li>
331 * </ol>
332 * <p>
333 * Example: you have a flag that is counting how many alive threads you have, those threads may
334 * be created concurrently ... without this synchronizing method you wouldn't be able to correctly
335 * read the number of threads before incrementing the flag.
336 * <p><p>
337 * Beware of deadlocks when using this method, watch out:<p>
338 * a) infinite recursion during the setFlag() (listeners are changing the flag value repeatedly)<p>
339 * b) incorrect sequences of the freeze() / defreeze() calls
340 * <p><p>
341 * Of course you may simulate this behavior with simple synchronized() statement, but
342 * this isn't always feasible as it blocks all other threads while accessing this flag,
343 * note that this flag implementation promotes non-blocking methods.
344 */
345 public void freeze() {
346 try {
347 setFreezedSemaphore.acquire();
348 } catch (InterruptedException e) {
349 throw new PogamutInterruptedException("wait on the freeze semapthore has been interrupter", e, this);
350 }
351 synchronized(setFreezedMutex) {
352 setFreezed = true;
353 }
354 try {
355 commandQueueProcessing.acquire(); // make sure the queue has been fully processed
356 commandQueueProcessing.release();
357 } catch (InterruptedException e) {
358 throw new PogamutInterruptedException("interrupted during the wait for the setFlag() to finish it's work", e, this);
359 }
360 }
361
362 /**
363 * Method is synchronized. See {@link Flag#freeze()} for info.
364 */
365 public void defreeze() {
366 synchronized(commandQueueProcessingMutex) {
367 try {
368 commandQueueProcessing.acquire();
369 } catch (InterruptedException e) {
370 throw new PogamutInterruptedException("interrupted during acquiring setFlagProcessing", e, this);
371 }
372 }
373 synchronized(setFreezedMutex) {
374 if (!setFreezed) throw new PogamutException("flag has been defreezed twice", this);
375 setFreezed = false;
376 }
377 processCommandQueue();
378 setFreezedSemaphore.release();
379 }
380
381 /**
382 * Pauses the thread till the flag change to another value.
383 * @return flag value that woke up the thread
384 * @throws PogamutInterrputedException
385 */
386 public T waitForChange() throws PogamutInterruptedException {
387 return new WaitForFlagChange<T>(this).await();
388 }
389
390 /**
391 * Pauses the thread till the flag change to another value or timeout.
392 * <p><p>
393 * Returns null if times out, otherwise returns value that woke up the thread.
394 * @param timeoutMillis
395 * @param oneOfTheValue
396 * @return null (timeout) or value that woke up the thread
397 * @throws PogamutInterruptedException
398 */
399 public T waitForChange(long timeoutMillis) throws PogamutInterruptedException {
400 return new WaitForFlagChange<T>(this).await(timeoutMillis, TimeUnit.MILLISECONDS);
401 }
402
403 /**
404 * Pauses the thread till the flag is set from the outside to one of specified values.
405 * @param oneOfTheValue
406 * @return flag value that woke up the thread
407 * @throws PogamutInterruptedException
408 */
409 public T waitFor(T... oneOfTheValue) throws PogamutInterruptedException {
410 return new WaitForFlagChange<T>(this, oneOfTheValue).await();
411 }
412
413 /**
414 * Pauses the thread till the flag is set from the outside to one of specified values or times out.
415 * <p><p>
416 * Returns null if times out, otherwise returns value that woke up the thread.
417 * @param timeoutMillis
418 * @param oneOfTheValue
419 * @return null (timeout) or value that woke up the thread
420 * @throws PogamutInterruptedException
421 */
422 public T waitFor(long timeoutMillis, T... oneOfTheValue) throws PogamutInterruptedException {
423 return new WaitForFlagChange<T>(this, oneOfTheValue).await(timeoutMillis, TimeUnit.MILLISECONDS);
424 }
425
426 /**
427 * Returns the value of the flag.
428 * <p><p>
429 * Note that if the flag contains any set-flag pending requests queue it will return the last
430 * value from this queue.
431 * <p><p>
432 * This has a big advantage for the multi-thread heavy-listener oriented designs.
433 * <p>
434 * Every time a listener is informed about the flag change it receives a new value of the flag
435 * but additionally it may query the flag for the last value there will be set into it.
436 * <p>
437 * Note that if you use the Flag sparingly this mechanism won't affect you in 99.99999% of time.
438 * <p><p>
439 * Warning - this method won't return truly a correct value if you will use inSync() method because
440 * this time we won't be able to obtain the concrete value of the flag after the DoInSync command
441 * will be carried out - instead we return the first value we are aware of. Again this won't
442 * affect you in any way (... but you should know such behavior exist ;-))
443 *
444 * @return value of the flag
445 */
446 public T getFlag() {
447 synchronized(setMutex) {
448 synchronized(commandQueue) {
449 if (commandQueue.size() != 0) {
450 for (int i = commandQueue.size()-1; i >= 0; --i) {
451 DoInSync<T> command = commandQueue.get(i);
452 if (command instanceof Flag.SetInSync) return ((SetInSync) command).newValue;
453 }
454 }
455 return value;
456 }
457 }
458 }
459
460 /**
461 * Tells whether the flag is set to 'one of the values' passed.
462 * @param oneOfTheValue
463 * @return
464 */
465 public boolean isOne(T... oneOfTheValue) {
466 T value = getFlag();
467 for (T one : oneOfTheValue) {
468 if (value.equals(one)) return true;
469 }
470 return false;
471 }
472
473 /**
474 * Tells whether the flag is not set to anz of 'one of the values' passed.
475 * @param oneOfTheValue
476 * @return
477 */
478 public boolean isNone(T... oneOfTheValue) {
479 T value = getFlag();
480 for (T one : oneOfTheValue) {
481 if (value.equals(one)) return false;
482 }
483 return true;
484 }
485
486 /**
487 * @return Immutable version of this flag, setFlag(T) method of such
488 * a flag will raise an exception.
489 */
490 public ImmutableFlag<T> getImmutable() {
491 if (immutableWrapper == null) {
492 immutableWrapper = new ImmutableFlag<T>(this);
493 }
494 return immutableWrapper;
495 }
496
497 /**
498 * Adds new listener to the flag (strong reference).
499 * <BR><BR>
500 * Using this method is memory-leak prone.
501 *
502 * @param listener
503 */
504 public void addStrongListener(FlagListener<T> listener) {
505 if (listener == null) {
506 return;
507 }
508 listeners.addStrongListener(listener);
509 }
510
511 /**
512 * Adds new listener to the flag with specified param. It will weak-reference the listener so when
513 * you drop the references to it, it will be automatically garbage-collected.
514 * <p><p>
515 * Note that all anonymous
516 * listeners are not subject to gc() because they are reachable from within the object where they were
517 * created.
518 *
519 * @param listener
520 */
521 @Override
522 public void addListener(FlagListener<T> listener) {
523 if (listener == null) {
524 return;
525 }
526 listeners.addStrongListener(listener);
527 }
528
529 /**
530 * Removes all registered 'listener' from the flag.
531 * @param listener
532 */
533 @Override
534 public void removeListener(FlagListener<T> listener) {
535 if (listener == null) {
536 return;
537 }
538 listeners.removeEqualListener(listener);
539 }
540
541 /**
542 * Removes all listeners.
543 */
544 @Override
545 public void removeAllListeners() {
546 listeners.clearListeners();
547 }
548
549
550
551 /**
552 * Checks whether listener is already registered (using equals()).
553 * <BR><BR>
554 * @param listener
555 * @return true if listener is already registered
556 */
557 public boolean isListenning(FlagListener<T> listener) {
558 if (listener == null) {
559 return false;
560 }
561 return listeners.isEqualListening(listener);
562 }
563
564 /**
565 * Call to clear (remove) all the listeners on the flag.
566 * <BR><BR>
567 * Should be used when the flag isn't going to be used again
568 * to allow GC to collect the listeners (for instance anonymous objects).
569 */
570 public void clearListeners() {
571 listeners.clearListeners();
572 }
573
574 }
575