1 package cz.cuni.amis.utils.flag; 2 3 import java.io.IOException; 4 import java.io.ObjectInputStream; 5 import java.io.Serializable; 6 import java.util.LinkedList; 7 import java.util.List; 8 import java.util.concurrent.Semaphore; 9 import java.util.concurrent.TimeUnit; 10 11 import cz.cuni.amis.utils.exception.PogamutException; 12 import cz.cuni.amis.utils.exception.PogamutIOException; 13 import cz.cuni.amis.utils.exception.PogamutInterruptedException; 14 import cz.cuni.amis.utils.listener.Listeners; 15 16 /** 17 * This class may be used to create an observable value (you may attach change-listeners to it). 18 * <p><p> 19 * This is flag class which is designed for Boolean or Integer types (but 20 * it should work with other types as well as long as they have equals() implemented 21 * correctly). 22 * <p><p> 23 * It allows you to store the state of flag and register listeners on the flag. 24 * <p><p> 25 * Note that the implementation is: 26 * <ol> 27 * <li>thread-safe (truly),</li> 28 * <li>recursion-safe (meaning that the flag may be changed from within the listener it notifies of the flag changes - such events are put into the queue and processed in correct sequence),</li> 29 * <li>setters/getters are non-blocking (or they blocks for finite small amount of time ~ few synchronized statements used, which can't block each other for greater period of time).</li> 30 * </ol> 31 * <p><p> 32 * Also note that can't be a really correct implementation of the flag that always returns 33 * the right value - if you heavily use flag from let's say a tens of threads then there may 34 * be glitches in the getFlag() returned value (but for the most implementation this value 35 * will be correct in 99.99999%!). 36 * <p><p> 37 * Note that the implementation of notifying about flag-change is 38 * strictly time-ordered. Every flag-change event is fully processed before another is raised/received. 39 * There is a possibility that a listener on the flag change will attempt to change the flag again (mentioned recursion-safe). 40 * In that case processing of this flag change is postponed until the previous event has been fully processed 41 * (e.g. all listeners has been notified about it). 42 * <p><p> 43 * Last piece of magic - if you want to change the flag value in-sync (meaning that you need 100% safe reading of the flag value), instantiate Flag.DoInSync<T> class 44 * and submit it via inSync() method - {@link DoInSync#execute(Object)} method will be executed in synchronized state so no one can change the flag value 45 * while you're inside this method. 46 * 47 * @author Jimmy 48 * 49 * @param <T> type of the flag 50 */ 51 public class Flag<T> implements IFlag<T>, Serializable { 52 53 /** 54 * Usage of this abstract class is as simple as it could be ... all you have to do is to instantiate 55 * it (using anonymous objects).<p> 56 * <p> 57 * Example:<p><p> 58 * <code> 59 * Flag<Integer> flag = new Flag<Integer>(10);<p> 60 * flag.inSync( 61 * new Flag.DoInSync<Integer>(flag) {<p> 62 * public abstract void execute(Integer flagValue) {<p> 63 * setFlag(flagValue+1);<p> 64 * }<p> 65 * } 66 * );<p> 67 * </code> 68 * <p><p> 69 * No need to do anything else! The class will submit itself to the flag upon construction. 70 * <p><p> 71 * Use it to create correct counters (or use directly FlagInteger class). 72 * 73 * @author Jimmy 74 * 75 * @param <T> 76 */ 77 public static abstract class DoInSync<T> { 78 79 Flag<T> flag; 80 81 public DoInSync() { 82 } 83 84 void setFlagInstance(Flag<T> flag) { 85 this.flag = flag; 86 } 87 88 /** 89 * Tells you whether you operate over immutable flag (can't call setFlag() then) or not. 90 * @return 91 */ 92 protected boolean isImmutable() { 93 return flag instanceof ImmutableFlag; 94 } 95 96 /** 97 * Set value in sync. 98 */ 99 protected void setFlag(T value) { 100 if (flag instanceof ImmutableFlag) throw new UnsupportedOperationException("trying to set flag of the immutable flag!"); 101 flag.value = value; 102 flag.notifier.setValue(value); 103 flag.listeners.notify(flag.notifier); 104 } 105 106 protected T getFlag() { 107 return flag.getFlag(); 108 } 109 110 /** 111 * @param flag 112 */ 113 public abstract void execute(T flagValue); 114 115 } 116 117 class SetInSync extends DoInSync<T> { 118 119 T newValue; 120 121 public SetInSync(T newValue) { 122 this.newValue = newValue; 123 } 124 125 @Override 126 public void execute(T flagValue) { 127 if ( 128 (newValue == null && flagValue != null) 129 || 130 (newValue != null && !newValue.equals(flagValue)) 131 ) { 132 setFlag(newValue); 133 } 134 } 135 136 } 137 138 transient Listeners<FlagListener<T>> listeners = new Listeners<FlagListener<T>>(); 139 140 /** 141 * Do not read directly - always use getFlag() method. 142 */ 143 T value; 144 145 transient FlagListener.FlagListenerNotifier<T> notifier = new FlagListener.FlagListenerNotifier<T>(); 146 147 /** 148 * Mutex that we synchronized on when the result of getValue() should be changed. 149 */ 150 transient Object setMutex = new Object(); 151 152 /** 153 * Whether the set method is freezed. 154 */ 155 transient boolean setFreezed = false; 156 157 transient Object setFreezedMutex = new Object(); 158 159 transient Semaphore setFreezedSemaphore = new Semaphore(1); 160 161 /** 162 * If availablePermits() == 1, the queue is not being processed by the method processSetFlagQueue() 163 */ 164 transient Semaphore commandQueueProcessing = new Semaphore(1); 165 166 transient Object commandQueueProcessingMutex = new Object(); 167 168 transient List<DoInSync<T>> commandQueue = new LinkedList<DoInSync<T>>(); 169 170 /** Immutable version of this flag. */ 171 transient ImmutableFlag<T> immutableWrapper = null; 172 173 /** 174 * Initialize the flag with 'null' as an initial value. 175 */ 176 public Flag() { 177 value = null; 178 } 179 180 /** 181 * Initialize the flag with 'initialValue'. 182 * @param initialValue 183 */ 184 public Flag(T initialValue) { 185 value = initialValue; 186 } 187 188 /** 189 * Method honoring the de-serialization process, it correctly initializes all 190 * that needs to be. 191 * 192 * @return 193 */ 194 private void readObject(ObjectInputStream ois) { 195 try { 196 ois.defaultReadObject(); 197 } catch (IOException e) { 198 throw new PogamutIOException("Could not read Flag", e); 199 } catch (ClassNotFoundException e) { 200 throw new PogamutException("Could not deserialize Flag", e); 201 } 202 this.listeners = new Listeners<FlagListener<T>>(); 203 this.notifier = new FlagListener.FlagListenerNotifier<T>(); 204 this.setMutex = new Object(); 205 this.setFreezed = false; 206 this.setFreezedMutex = new Object(); 207 this.setFreezedSemaphore = new Semaphore(1); 208 this.commandQueueProcessing = new Semaphore(1); 209 this.commandQueueProcessingMutex = new Object(); 210 this.commandQueue = new LinkedList<DoInSync<T>>(); 211 this.immutableWrapper = null; 212 } 213 214 /** 215 * Unsychronized! 216 * <p><p> 217 * setFlagProcessing must be acquired before calling! It will be released by this method. 218 */ 219 private void processCommandQueue() { 220 // check the events list size, do we have more events to process? 221 // note that the implementation is tricky ... try to think it over before modifying 222 while(true) { 223 DoInSync<T> command = null; 224 synchronized(commandQueue) { 225 if (commandQueue.size() != 0) { 226 command = commandQueue.get(0); 227 commandQueue.remove(0); 228 } 229 } 230 if (command != null) { 231 command.setFlagInstance(this); 232 command.execute(value); 233 } 234 synchronized(commandQueueProcessingMutex) { 235 if (commandQueue.size() == 0) { 236 commandQueueProcessing.release(); 237 return; 238 } 239 } 240 } 241 } 242 243 /** 244 * Add a command that will be executed in-sync with other changes (you may be sure that no other changes 245 * are taking place right now). 246 * <p><p> 247 * This is also used by the setFlag() method. 248 * @param command 249 * @param addAsFirst if true the command will be added as a first to execute 250 */ 251 protected void inSyncInner(DoInSync<T> command, boolean addAsFirst) { 252 // we're going to change the result of getValue() method, synchronized on it 253 synchronized(setMutex) { 254 // we will modify the setFlagValues 255 synchronized(commandQueue) { 256 // we're going to query setFlagProcessing 257 synchronized(commandQueueProcessingMutex) { 258 // we're going to query the setFreezed flag 259 synchronized(setFreezedMutex) { 260 // insert command into the queue 261 if (addAsFirst) commandQueue.add(0, command); 262 else commandQueue.add(command); 263 264 // is the set method freezed? 265 if (setFreezed) return; 266 267 // is the processSetFlagQueue running? 268 if (commandQueueProcessing.availablePermits() <= 0) return; 269 270 // we're not freezed nor the setFlag is running 271 try { 272 // acquire processing semaphore 273 commandQueueProcessing.acquire(); 274 } catch (InterruptedException e) { 275 //throw new RuntimeException("could not happen..."); 276 return; 277 } 278 } 279 } 280 } 281 } 282 // there can never ever be two threads in this place - we've ruled them out in the 283 // previous synchronized statement 284 processCommandQueue(); 285 } 286 287 /** 288 * Add a command (to the end of the queue) that will be executed in-sync with other changes (you may be sure that no other changes 289 * are taking place right now). 290 * <p><p> 291 * This is also used by the setFlag() method. 292 * @param command 293 */ 294 public void inSync(DoInSync<T> command) { 295 inSyncInner(command, false); 296 } 297 298 /** 299 * Changes the flag and informs all listeners. 300 * 301 * @param newValue 302 * @throws InterruptedRuntimeException if interrupted during the await on the freeze latch 303 */ 304 public void setFlag(T newValue) { 305 inSyncInner(new SetInSync(newValue), false); 306 } 307 308 /** 309 * Whether the flag-change has been frozen, i.e., setFlag() won't change the flag value 310 * immediately by will wait till {@link Flag#defreeze()} is called. 311 */ 312 public boolean isFrozen() { 313 return setFreezed; 314 } 315 316 /** 317 * This method will freeze the processing of the setFlag() method. Method is synchronized. 318 * <p><p> 319 * It waits until all setFlag() pending requests are resolved and then returns. 320 * <p><p> 321 * It may be used to for the synchronized registration of the listeners (if you really care 322 * for the value of the flag before creating the listener). Or it may be used to obtain 323 * a true value of the flag in the moment of the method call (as it waits for all the listeners 324 * to execute). 325 * <p> 326 * In one of these cases, do this:<p> 327 * <ol> 328 * <li>flag.freeze()<li> 329 * <li>examine the flag value and/or register new listeners</li> 330 * <li>flag.defreeze() // DO NOT FORGET THIS!</li> 331 * </ol> 332 * <p> 333 * Example: you have a flag that is counting how many alive threads you have, those threads may 334 * be created concurrently ... without this synchronizing method you wouldn't be able to correctly 335 * read the number of threads before incrementing the flag. 336 * <p><p> 337 * Beware of deadlocks when using this method, watch out:<p> 338 * a) infinite recursion during the setFlag() (listeners are changing the flag value repeatedly)<p> 339 * b) incorrect sequences of the freeze() / defreeze() calls 340 * <p><p> 341 * Of course you may simulate this behavior with simple synchronized() statement, but 342 * this isn't always feasible as it blocks all other threads while accessing this flag, 343 * note that this flag implementation promotes non-blocking methods. 344 */ 345 public void freeze() { 346 try { 347 setFreezedSemaphore.acquire(); 348 } catch (InterruptedException e) { 349 throw new PogamutInterruptedException("wait on the freeze semapthore has been interrupter", e, this); 350 } 351 synchronized(setFreezedMutex) { 352 setFreezed = true; 353 } 354 try { 355 commandQueueProcessing.acquire(); // make sure the queue has been fully processed 356 commandQueueProcessing.release(); 357 } catch (InterruptedException e) { 358 throw new PogamutInterruptedException("interrupted during the wait for the setFlag() to finish it's work", e, this); 359 } 360 } 361 362 /** 363 * Method is synchronized. See {@link Flag#freeze()} for info. 364 */ 365 public void defreeze() { 366 synchronized(commandQueueProcessingMutex) { 367 try { 368 commandQueueProcessing.acquire(); 369 } catch (InterruptedException e) { 370 throw new PogamutInterruptedException("interrupted during acquiring setFlagProcessing", e, this); 371 } 372 } 373 synchronized(setFreezedMutex) { 374 if (!setFreezed) throw new PogamutException("flag has been defreezed twice", this); 375 setFreezed = false; 376 } 377 processCommandQueue(); 378 setFreezedSemaphore.release(); 379 } 380 381 /** 382 * Pauses the thread till the flag change to another value. 383 * @return flag value that woke up the thread 384 * @throws PogamutInterrputedException 385 */ 386 public T waitForChange() throws PogamutInterruptedException { 387 return new WaitForFlagChange<T>(this).await(); 388 } 389 390 /** 391 * Pauses the thread till the flag change to another value or timeout. 392 * <p><p> 393 * Returns null if times out, otherwise returns value that woke up the thread. 394 * @param timeoutMillis 395 * @param oneOfTheValue 396 * @return null (timeout) or value that woke up the thread 397 * @throws PogamutInterruptedException 398 */ 399 public T waitForChange(long timeoutMillis) throws PogamutInterruptedException { 400 return new WaitForFlagChange<T>(this).await(timeoutMillis, TimeUnit.MILLISECONDS); 401 } 402 403 /** 404 * Pauses the thread till the flag is set from the outside to one of specified values. 405 * @param oneOfTheValue 406 * @return flag value that woke up the thread 407 * @throws PogamutInterruptedException 408 */ 409 public T waitFor(T... oneOfTheValue) throws PogamutInterruptedException { 410 return new WaitForFlagChange<T>(this, oneOfTheValue).await(); 411 } 412 413 /** 414 * Pauses the thread till the flag is set from the outside to one of specified values or times out. 415 * <p><p> 416 * Returns null if times out, otherwise returns value that woke up the thread. 417 * @param timeoutMillis 418 * @param oneOfTheValue 419 * @return null (timeout) or value that woke up the thread 420 * @throws PogamutInterruptedException 421 */ 422 public T waitFor(long timeoutMillis, T... oneOfTheValue) throws PogamutInterruptedException { 423 return new WaitForFlagChange<T>(this, oneOfTheValue).await(timeoutMillis, TimeUnit.MILLISECONDS); 424 } 425 426 /** 427 * Returns the value of the flag. 428 * <p><p> 429 * Note that if the flag contains any set-flag pending requests queue it will return the last 430 * value from this queue. 431 * <p><p> 432 * This has a big advantage for the multi-thread heavy-listener oriented designs. 433 * <p> 434 * Every time a listener is informed about the flag change it receives a new value of the flag 435 * but additionally it may query the flag for the last value there will be set into it. 436 * <p> 437 * Note that if you use the Flag sparingly this mechanism won't affect you in 99.99999% of time. 438 * <p><p> 439 * Warning - this method won't return truly a correct value if you will use inSync() method because 440 * this time we won't be able to obtain the concrete value of the flag after the DoInSync command 441 * will be carried out - instead we return the first value we are aware of. Again this won't 442 * affect you in any way (... but you should know such behavior exist ;-)) 443 * 444 * @return value of the flag 445 */ 446 public T getFlag() { 447 synchronized(setMutex) { 448 synchronized(commandQueue) { 449 if (commandQueue.size() != 0) { 450 for (int i = commandQueue.size()-1; i >= 0; --i) { 451 DoInSync<T> command = commandQueue.get(i); 452 if (command instanceof Flag.SetInSync) return ((SetInSync) command).newValue; 453 } 454 } 455 return value; 456 } 457 } 458 } 459 460 /** 461 * Tells whether the flag is set to 'one of the values' passed. 462 * @param oneOfTheValue 463 * @return 464 */ 465 public boolean isOne(T... oneOfTheValue) { 466 T value = getFlag(); 467 for (T one : oneOfTheValue) { 468 if (value.equals(one)) return true; 469 } 470 return false; 471 } 472 473 /** 474 * Tells whether the flag is not set to anz of 'one of the values' passed. 475 * @param oneOfTheValue 476 * @return 477 */ 478 public boolean isNone(T... oneOfTheValue) { 479 T value = getFlag(); 480 for (T one : oneOfTheValue) { 481 if (value.equals(one)) return false; 482 } 483 return true; 484 } 485 486 /** 487 * @return Immutable version of this flag, setFlag(T) method of such 488 * a flag will raise an exception. 489 */ 490 public ImmutableFlag<T> getImmutable() { 491 if (immutableWrapper == null) { 492 immutableWrapper = new ImmutableFlag<T>(this); 493 } 494 return immutableWrapper; 495 } 496 497 /** 498 * Adds new listener to the flag (strong reference). 499 * <BR><BR> 500 * Using this method is memory-leak prone. 501 * 502 * @param listener 503 */ 504 public void addStrongListener(FlagListener<T> listener) { 505 if (listener == null) { 506 return; 507 } 508 listeners.addStrongListener(listener); 509 } 510 511 /** 512 * Adds new listener to the flag with specified param. It will weak-reference the listener so when 513 * you drop the references to it, it will be automatically garbage-collected. 514 * <p><p> 515 * Note that all anonymous 516 * listeners are not subject to gc() because they are reachable from within the object where they were 517 * created. 518 * 519 * @param listener 520 */ 521 @Override 522 public void addListener(FlagListener<T> listener) { 523 if (listener == null) { 524 return; 525 } 526 listeners.addStrongListener(listener); 527 } 528 529 /** 530 * Removes all registered 'listener' from the flag. 531 * @param listener 532 */ 533 @Override 534 public void removeListener(FlagListener<T> listener) { 535 if (listener == null) { 536 return; 537 } 538 listeners.removeEqualListener(listener); 539 } 540 541 /** 542 * Removes all listeners. 543 */ 544 @Override 545 public void removeAllListeners() { 546 listeners.clearListeners(); 547 } 548 549 550 551 /** 552 * Checks whether listener is already registered (using equals()). 553 * <BR><BR> 554 * @param listener 555 * @return true if listener is already registered 556 */ 557 public boolean isListenning(FlagListener<T> listener) { 558 if (listener == null) { 559 return false; 560 } 561 return listeners.isEqualListening(listener); 562 } 563 564 /** 565 * Call to clear (remove) all the listeners on the flag. 566 * <BR><BR> 567 * Should be used when the flag isn't going to be used again 568 * to allow GC to collect the listeners (for instance anonymous objects). 569 */ 570 public void clearListeners() { 571 listeners.clearListeners(); 572 } 573 574 } 575