View Javadoc

1   package cz.cuni.amis.utils.flag;
2   
3   import java.io.IOException;
4   import java.io.ObjectInputStream;
5   import java.io.Serializable;
6   import java.util.LinkedList;
7   import java.util.List;
8   import java.util.concurrent.Semaphore;
9   import java.util.concurrent.TimeUnit;
10  
11  import cz.cuni.amis.utils.exception.PogamutException;
12  import cz.cuni.amis.utils.exception.PogamutIOException;
13  import cz.cuni.amis.utils.exception.PogamutInterruptedException;
14  import cz.cuni.amis.utils.listener.Listeners;
15  
16  /**
17   * This class may be used to create an observable value (you may attach change-listeners to it).
18   * <p><p> 
19   * This is flag class which is designed for Boolean or Integer types (but 
20   * it should work with other types as well as long as they have equals() implemented
21   * correctly).
22   * <p><p>
23   * It allows you to store the state of flag and register listeners on the flag.
24   * <p><p>
25   * Note that the implementation is:
26   * <ol>
27   * <li>thread-safe (truly),</li>
28   * <li>recursion-safe (meaning that the flag may be changed from within the listener it notifies of the flag changes - such events are put into the queue and processed in correct sequence),</li>
29   * <li>setters/getters are non-blocking (or they blocks for finite small amount of time ~ few synchronized statements used, which can't block each other for greater period of time).</li>
30   * </ol>
31   * <p><p>
32   * Also note that can't be a really correct implementation of the flag that always returns
33   * the right value - if you heavily use flag from let's say a tens of threads then there may
34   * be glitches in the getFlag() returned value (but for the most implementation this value
35   * will be correct in 99.99999%!).
36   * <p><p>
37   * Note that the implementation of notifying about flag-change is
38   * strictly time-ordered. Every flag-change event is fully processed before another is raised/received.
39   * There is a possibility that a listener on the flag change will attempt to change the flag again (mentioned recursion-safe).
40   * In that case processing of this flag change is postponed until the previous event has been fully processed
41   * (e.g. all listeners has been notified about it).
42   * <p><p>
43   * Last piece of magic - if you want to change the flag value in-sync (meaning that you need 100% safe reading of the flag value), instantiate Flag.DoInSync<T> class
44   * and submit it via inSync() method - {@link DoInSync#execute(Object)} method will be executed in synchronized state so no one can change the flag value
45   * while you're inside this method. 
46   * 
47   * @author Jimmy
48   *
49   * @param <T> type of the flag
50   */
51  public class Flag<T> implements IFlag<T>, Serializable {
52  
53  	/**
54  	 * Usage of this abstract class is as simple as it could be ... all you have to do is to instantiate
55  	 * it (using anonymous objects).<p>
56  	 * <p>
57  	 * Example:<p><p>
58  	 * <code>
59  	 * Flag&lt;Integer> flag = new Flag&lt;Integer>(10);<p>
60  	 * flag.inSync(
61  	 *     new Flag.DoInSync&lt;Integer>(flag) {<p>
62  	 *         public abstract void execute(Integer flagValue) {<p>
63  	 *             setFlag(flagValue+1);<p>
64  	 *         }<p>
65  	 *     }
66  	 * );<p>
67  	 * </code>
68  	 * <p><p>
69  	 * No need to do anything else! The class will submit itself to the flag upon construction.
70  	 * <p><p>
71  	 * Use it to create correct counters (or use directly FlagInteger class).
72  	 * 
73  	 * @author Jimmy
74  	 *
75  	 * @param <T>
76  	 */
77  	public static abstract class DoInSync<T> {
78  		
79  		Flag<T> flag;
80  		
81  		public DoInSync() {
82  		}
83  		
84  		void setFlagInstance(Flag<T> flag) {
85  			this.flag = flag;
86  		}
87  		
88  		/**
89  		 * Tells you whether you operate over immutable flag (can't call setFlag() then) or not.
90  		 * @return
91  		 */
92  		protected boolean isImmutable() {
93  			return flag instanceof ImmutableFlag;
94  		}
95  		
96  		/**
97  		 * Set value in sync.
98  		 */
99  		protected void setFlag(T value) {
100 			if (flag instanceof ImmutableFlag) throw new UnsupportedOperationException("trying to set flag of the immutable flag!");
101 			flag.value = value;
102 			flag.notifier.setValue(value);
103 			flag.listeners.notify(flag.notifier);			
104 		}
105 		
106 		protected T getFlag() {
107 			return flag.getFlag();
108 		}
109 		
110 		/**
111 		 * @param flag
112 		 */
113 		public abstract void execute(T flagValue);
114 		
115 	}
116 	
117 	class SetInSync extends DoInSync<T> {
118 		
119 		T newValue;
120 		
121 		public SetInSync(T newValue) {
122 			this.newValue = newValue;
123 		}
124 
125 		@Override
126 		public void execute(T flagValue) {
127 			if (
128 				(newValue == null && flagValue != null)
129 				||				
130 				(newValue != null && !newValue.equals(flagValue))
131 		       ) {
132 				setFlag(newValue);				
133 			}
134 		}
135 		
136 	}
137 		
138 	transient Listeners<FlagListener<T>> listeners = new Listeners<FlagListener<T>>();
139 	
140 	/**
141 	 * Do not read directly - always use getFlag() method.
142 	 */
143     T value;
144     
145     transient FlagListener.FlagListenerNotifier<T> notifier = new FlagListener.FlagListenerNotifier<T>();
146     
147     /**
148      * Mutex that we synchronized on when the result of getValue() should be changed.
149      */
150     transient Object setMutex = new Object();
151     
152     /**
153      * Whether the set method is freezed.
154      */
155     transient boolean setFreezed = false;
156     
157     transient Object setFreezedMutex = new Object();
158     
159     transient Semaphore setFreezedSemaphore = new Semaphore(1);
160     
161     /**
162      * If availablePermits() == 1, the queue is not being processed by the method processSetFlagQueue()
163      */
164     transient Semaphore commandQueueProcessing = new Semaphore(1);
165     
166     transient Object commandQueueProcessingMutex = new Object();
167     
168     transient List<DoInSync<T>> commandQueue = new LinkedList<DoInSync<T>>();    
169     
170     /** Immutable version of this flag. */
171     transient ImmutableFlag<T> immutableWrapper = null;
172 
173     /**
174      * Initialize the flag with 'null' as an initial value.
175      */
176     public Flag() {
177         value = null;
178     }
179 
180     /**
181      * Initialize the flag with 'initialValue'.
182      * @param initialValue
183      */
184     public Flag(T initialValue) {
185         value = initialValue;
186     }
187     
188     /**
189      * Method honoring the de-serialization process, it correctly initializes all
190      * that needs to be.
191      * 
192      * @return
193      */
194     private void readObject(ObjectInputStream ois) {
195     	try {
196 			ois.defaultReadObject();
197 		} catch (IOException e) {
198 			throw new PogamutIOException("Could not read Flag", e);
199 		} catch (ClassNotFoundException e) {
200 			throw new PogamutException("Could not deserialize Flag", e);
201 		}
202     	this.listeners = new Listeners<FlagListener<T>>();
203     	this.notifier = new FlagListener.FlagListenerNotifier<T>();
204         this.setMutex = new Object();
205         this.setFreezed = false;
206         this.setFreezedMutex = new Object();
207         this.setFreezedSemaphore = new Semaphore(1);
208         this.commandQueueProcessing = new Semaphore(1);
209         this.commandQueueProcessingMutex = new Object();
210         this.commandQueue = new LinkedList<DoInSync<T>>();    
211         this.immutableWrapper = null;        
212     }
213     
214     /**
215      * Unsychronized!
216      * <p><p>
217      * setFlagProcessing must be acquired before calling! It will be released by this method.
218      */
219     private void processCommandQueue() {
220     	// check the events list size, do we have more events to process?
221 	    // note that the implementation is tricky ... try to think it over before modifying
222         while(true) {
223         	DoInSync<T> command = null;        	
224         	synchronized(commandQueue) {        		
225 	        	if (commandQueue.size() != 0) {
226 	        		command = commandQueue.get(0);
227 	        		commandQueue.remove(0);
228 	        	}
229         	}
230         	if (command != null) {
231         		command.setFlagInstance(this);
232         		command.execute(value);
233         	}
234         	synchronized(commandQueueProcessingMutex) {
235         		if (commandQueue.size() == 0) {
236         			commandQueueProcessing.release();
237         			return;
238         		}
239         	}
240         }    	
241     }
242     
243     /**
244      * Add a command that will be executed in-sync with other changes (you may be sure that no other changes
245      * are taking place right now).
246      * <p><p>
247      * This is also used by the setFlag() method.
248      * @param command
249      * @param addAsFirst if true the command will be added as a first to execute
250      */
251     protected void inSyncInner(DoInSync<T> command, boolean addAsFirst) {
252     	// we're going to change the result of getValue() method, synchronized on it
253     	synchronized(setMutex) {
254     		// we will modify the setFlagValues
255     		synchronized(commandQueue) {
256     			// we're going to query setFlagProcessing
257     			synchronized(commandQueueProcessingMutex) {
258     				// we're going to query the setFreezed flag
259 		    		synchronized(setFreezedMutex) {
260 		    			// insert command into the queue
261 		    			if (addAsFirst) commandQueue.add(0, command);
262 			        	else commandQueue.add(command);
263 		    			
264 						// is the set method freezed?		
265 				        if (setFreezed) return;
266 				        
267 				        // is the processSetFlagQueue running?
268 			    		if (commandQueueProcessing.availablePermits() <= 0) return;
269 			    		
270 			    		// we're not freezed nor the setFlag is running
271 			    		try {
272 			    			// acquire processing semaphore
273                                                 commandQueueProcessing.acquire();
274                                         } catch (InterruptedException e) {
275                                                 //throw new RuntimeException("could not happen...");
276                                                 return;
277                                         }
278 		    		}    			
279     			}
280 	    	}
281     	}
282     	// there can never ever be two threads in this place - we've ruled them out in the 
283     	// previous synchronized statement
284 		processCommandQueue();
285     }
286     
287     /**
288      * Add a command (to the end of the queue) that will be executed in-sync with other changes (you may be sure that no other changes
289      * are taking place right now).
290      * <p><p>
291      * This is also used by the setFlag() method.
292      * @param command
293      */
294     public void inSync(DoInSync<T> command) {
295     	inSyncInner(command, false);
296     }
297         
298     /**
299      * Changes the flag and informs all listeners.
300      * 
301      * @param newValue
302      * @throws InterruptedRuntimeException if interrupted during the await on the freeze latch
303      */
304     public void setFlag(T newValue) {
305     	inSyncInner(new SetInSync(newValue), false);
306     }
307     
308     /**
309      * Whether the flag-change has been frozen, i.e., setFlag() won't change the flag value
310      * immediately by will wait till {@link Flag#defreeze()} is called.
311      */
312     public boolean isFrozen() {
313     	return setFreezed;
314     }
315     
316     /**
317      * This method will freeze the processing of the setFlag() method. Method is synchronized.
318      * <p><p>
319      * It waits until all setFlag() pending requests are resolved and then returns.
320      * <p><p>
321      * It may be used to for the synchronized registration of the listeners (if you really care
322      * for the value of the flag before creating the listener). Or it may be used to obtain
323      * a true value of the flag in the moment of the method call (as it waits for all the listeners
324      * to execute).
325      * <p>
326      * In one of these cases, do this:<p>
327      * <ol>
328      * <li>flag.freeze()<li>
329      * <li>examine the flag value and/or register new listeners</li>
330      * <li>flag.defreeze() // DO NOT FORGET THIS!</li>
331      * </ol>
332      * <p>
333      * Example: you have a flag that is counting how many alive threads you have, those threads may
334      * be created concurrently ... without this synchronizing method you wouldn't be able to correctly
335      * read the number of threads before incrementing the flag.
336      * <p><p>
337      * Beware of deadlocks when using this method, watch out:<p>
338      * a) infinite recursion during the setFlag() (listeners are changing the flag value repeatedly)<p>
339      * b) incorrect sequences of the freeze() / defreeze() calls
340      * <p><p>
341      * Of course you may simulate this behavior with simple synchronized() statement, but
342      * this isn't always feasible as it blocks all other threads while accessing this flag,
343      * note that this flag implementation promotes non-blocking methods.
344      */
345     public void freeze() {
346     	try {
347 			setFreezedSemaphore.acquire();
348 		} catch (InterruptedException e) {
349 			throw new PogamutInterruptedException("wait on the freeze semapthore has been interrupter", e, this);
350 		}
351     	synchronized(setFreezedMutex) {
352     		setFreezed = true;
353     	}
354     	try {
355 			commandQueueProcessing.acquire(); // make sure the queue has been fully processed
356 			commandQueueProcessing.release();
357 		} catch (InterruptedException e) {
358 			throw new PogamutInterruptedException("interrupted during the wait for the setFlag() to finish it's work", e, this);
359 		}
360     }
361     
362     /**
363      * Method is synchronized. See {@link Flag#freeze()} for info.
364      */
365     public void defreeze() {    	    
366     	synchronized(commandQueueProcessingMutex) {
367     		try {
368 				commandQueueProcessing.acquire();
369 			} catch (InterruptedException e) {
370 				throw new PogamutInterruptedException("interrupted during acquiring setFlagProcessing", e, this);
371 			}    		
372     	}
373     	synchronized(setFreezedMutex) {
374     		if (!setFreezed) throw new PogamutException("flag has been defreezed twice", this);
375      		setFreezed = false;    		
376     	}
377     	processCommandQueue();
378     	setFreezedSemaphore.release();
379     }
380     
381     /**
382      * Pauses the thread till the flag change to another value.
383      * @return flag value that woke up the thread
384      * @throws PogamutInterrputedException
385      */
386     public T waitForChange() throws PogamutInterruptedException {
387     	return new WaitForFlagChange<T>(this).await();
388     }
389     
390     /** 
391      * Pauses the thread till the flag change to another value or timeout.
392      * <p><p>
393      * Returns null if times out, otherwise returns value that woke up the thread.
394      * @param timeoutMillis
395      * @param oneOfTheValue
396      * @return null (timeout) or value that woke up the thread
397      * @throws PogamutInterruptedException
398      */
399     public T waitForChange(long timeoutMillis) throws PogamutInterruptedException {
400     	return new WaitForFlagChange<T>(this).await(timeoutMillis, TimeUnit.MILLISECONDS);
401     }
402     
403     /**
404      * Pauses the thread till the flag is set from the outside to one of specified values.
405      * @param oneOfTheValue
406      * @return flag value that woke up the thread
407      * @throws PogamutInterruptedException
408      */
409     public T waitFor(T... oneOfTheValue) throws PogamutInterruptedException {
410     	return new WaitForFlagChange<T>(this, oneOfTheValue).await();		
411     }
412     
413     /** 
414      * Pauses the thread till the flag is set from the outside to one of specified values or times out.
415      * <p><p>
416      * Returns null if times out, otherwise returns value that woke up the thread.
417      * @param timeoutMillis
418      * @param oneOfTheValue
419      * @return null (timeout) or value that woke up the thread
420      * @throws PogamutInterruptedException
421      */
422     public T waitFor(long timeoutMillis, T... oneOfTheValue) throws PogamutInterruptedException {
423     	return new WaitForFlagChange<T>(this, oneOfTheValue).await(timeoutMillis, TimeUnit.MILLISECONDS);
424     }
425     
426     /**
427      * Returns the value of the flag.
428      * <p><p>
429      * Note that if the flag contains any set-flag pending requests queue it will return the last
430      * value from this queue.
431      * <p><p>
432      * This has a big advantage for the multi-thread heavy-listener oriented designs.
433      * <p>
434      * Every time a listener is informed about the flag change it receives a new value of the flag
435      * but additionally it may query the flag for the last value there will be set into it.
436      * <p>
437      * Note that if you use the Flag sparingly this mechanism won't affect you in 99.99999% of time.
438      * <p><p>
439      * Warning - this method won't return truly a correct value if you will use inSync() method because
440      * this time we won't be able to obtain the concrete value of the flag after the DoInSync command
441      * will be carried out - instead we return the first value we are aware of. Again this won't
442      * affect you in any way (... but you should know such behavior exist ;-)) 
443      * 
444      * @return value of the flag
445      */
446     public T getFlag() {
447     	synchronized(setMutex) {
448 	    	synchronized(commandQueue) {
449 	    		if (commandQueue.size() != 0) {
450 	    			for (int i = commandQueue.size()-1; i >= 0; --i) {
451 	    				DoInSync<T> command = commandQueue.get(i);
452 	    				if (command instanceof Flag.SetInSync) return ((SetInSync) command).newValue;
453 	    			}	    			
454 	    		}
455 	    		return value;
456 	    	}    	
457     	}
458     }
459     
460     /**
461      * Tells whether the flag is set to 'one of the values' passed.
462      * @param oneOfTheValue
463      * @return
464      */
465     public boolean isOne(T... oneOfTheValue) {
466     	T value = getFlag();
467     	for (T one : oneOfTheValue) {
468     		if (value.equals(one)) return true;
469     	}
470     	return false;
471     }
472     
473     /**
474      * Tells whether the flag is not set to anz of 'one of the values' passed.
475      * @param oneOfTheValue
476      * @return
477      */
478     public boolean isNone(T... oneOfTheValue) {
479     	T value = getFlag();
480     	for (T one : oneOfTheValue) {
481     		if (value.equals(one)) return false;
482     	}
483     	return true;
484     }
485     
486     /**
487      * @return Immutable version of this flag, setFlag(T) method of such 
488      * a flag will raise an exception. 
489      */
490     public ImmutableFlag<T> getImmutable() {
491         if (immutableWrapper == null) {
492             immutableWrapper = new ImmutableFlag<T>(this);
493         }
494         return immutableWrapper;
495     }
496 
497     /**
498      * Adds new listener to the flag (strong reference).
499      * <BR><BR>
500      * Using this method is memory-leak prone.
501      * 
502      * @param listener
503      */
504     public void addStrongListener(FlagListener<T> listener) {
505         if (listener == null) {
506             return;
507         }
508         listeners.addStrongListener(listener);
509     }
510         
511     /**
512      * Adds new listener to the flag with specified param. It will weak-reference the listener so when
513      * you drop the references to it, it will be automatically garbage-collected.
514      * <p><p>
515      * Note that all anonymous
516      * listeners are not subject to gc() because they are reachable from within the object where they were
517      * created.
518      * 
519      * @param listener
520      */
521     @Override
522     public void addListener(FlagListener<T> listener) {
523     	if (listener == null) {
524             return;
525         }
526     	listeners.addStrongListener(listener);
527     }
528     
529     /**
530      * Removes all registered 'listener' from the flag.
531      * @param listener
532      */
533     @Override
534     public void removeListener(FlagListener<T> listener) {
535         if (listener == null) {
536             return;
537         }
538         listeners.removeEqualListener(listener);        
539     }
540 
541     /**
542      * Removes all listeners.
543      */
544     @Override
545     public void removeAllListeners() {
546         listeners.clearListeners();
547     }
548 
549 
550 
551     /**
552      * Checks whether listener is already registered (using equals()).
553      * <BR><BR>
554      * @param listener
555      * @return true if listener is already registered
556      */
557     public boolean isListenning(FlagListener<T> listener) {
558         if (listener == null) {
559             return false;
560         }
561         return listeners.isEqualListening(listener);
562     }
563 
564     /** 
565      * Call to clear (remove) all the listeners on the flag.
566      * <BR><BR>
567      * Should be used when the flag isn't going to be used again
568      * to allow GC to collect the listeners (for instance anonymous objects).
569      */
570     public void clearListeners() {    	
571     	listeners.clearListeners();
572     }
573 
574 }
575