View Javadoc

1   package cz.cuni.amis.utils.future;
2   
3   import java.util.concurrent.CountDownLatch;
4   import java.util.concurrent.Future;
5   import java.util.concurrent.TimeUnit;
6   import java.util.concurrent.TimeoutException;
7   
8   import cz.cuni.amis.utils.exception.PogamutException;
9   import cz.cuni.amis.utils.exception.PogamutInterruptedException;
10  import cz.cuni.amis.utils.listener.Listeners;
11  import cz.cuni.amis.utils.listener.Listeners.ListenerNotifier;
12  
13  /**
14   * Abstract class that represents future result of some computation that allows you to hook
15   * listeners on the status of the future computation (see {@link FutureWithListeners#addFutureListener(IFutureListener)}).
16   * Whenever the computation is completed (or cancelled / exception has happened / etc.) the listeners are informed.
17   * 
18   * @author Jimmy
19   *
20   * @param <RESULT>
21   */
22  public abstract class FutureWithListeners<RESULT> implements IFutureWithListeners<RESULT> {
23  
24  	/**
25  	 * Mutex synchronizing access to internal data structures of the future.
26  	 */
27  	protected Object mutex = new Object();
28  	
29  	/**
30  	 * Future listeners, here we store listeners registred in {@link FutureWithListeners#addFutureListener(IFutureListener)}.
31  	 */
32  	protected Listeners<IFutureListener<RESULT>> listeners = new Listeners<IFutureListener<RESULT>>();
33  	
34  	/**
35  	 * Misc - used by {@link FutureWithListeners#notifier} to know what the old status was.
36  	 */
37  	private FutureStatus oldStatus = null;
38  	
39  	/**
40  	 * Misc - used by {@link FutureWithListeners#notifier} to know what the new status is.
41  	 */
42  	private FutureStatus newStatus = null;
43  	
44  	/**
45  	 * Notifier that raises events on respective listeners.
46  	 */
47  	private ListenerNotifier<IFutureListener<RESULT>> notifier = new ListenerNotifier<IFutureListener<RESULT>>() {
48  		
49  		@Override
50  		public Object getEvent() {
51  			return newStatus;
52  		}
53  
54  		@Override
55  		public void notify(IFutureListener<RESULT> listener) {
56  			listener.futureEvent(FutureWithListeners.this, oldStatus, newStatus);
57  		}
58  	};
59  	
60  	/**
61  	 * Current status of the future computation. The status is changed only via {@link FutureWithListeners#switchStatus(FutureStatus)} that
62  	 * also notifies all listeners about the change.
63  	 */
64  	private FutureStatus status = FutureStatus.FUTURE_IS_BEING_COMPUTED;
65  	
66  	/**
67  	 * Result of the future.
68  	 */
69  	private RESULT result = null;
70  	
71  	/**
72  	 * Latch where threads are waiting when using {@link FutureWithListeners#get()} or {@link FutureWithListeners#get(long, TimeUnit)}. This
73  	 * latch is instantiated whenever needed via method {@link FutureWithListeners#createLatch()}.
74  	 */
75  	protected CountDownLatch latch = null;
76  
77  	/**
78  	 * If the computation results in an exception and the future is informed about such fact, the exception
79  	 * is stored here.
80  	 */
81  	private Exception exception;
82  	
83  	/**
84  	 * Current status of the future computation.
85  	 * @return
86  	 */
87      @Override
88  	public FutureStatus getStatus() {
89  		return status;
90  	}
91  	
92  	/**
93  	 * Adds a listener on a future status (using strong reference). Listeners are automatically
94  	 * removed whenever the future gets its result (or is cancelled or an exception happens).
95  	 * @param listener
96  	 */
97      @Override
98  	public void addFutureListener(IFutureListener<RESULT> listener) {
99  		listeners.addStrongListener(listener);
100 	}
101 	
102 	/**
103 	 * Removes a listener from the future.
104 	 * @param listener
105 	 */
106     @Override
107 	public void removeFutureListener(IFutureListener<RESULT> listener) {
108 		listeners.removeListener(listener);
109 	}
110 	
111 	/**
112 	 * Whether some listener is listening on the future.
113 	 * @param listener
114 	 * @return
115 	 */
116     @Override
117 	public boolean isListening(IFutureListener<RESULT> listener) {
118 		return listeners.isListening(listener);
119 	}
120 	
121 	/**
122 	 * Sets the result of the future computation.
123 	 * <p><p>
124 	 * Switches the status to FUTURE_IS_READY (notifying listeners along the way).
125 	 * <p><p>
126 	 * The result can be set only iff NOT {@link FutureWithListeners#isDone()}, i.e., status is {@link FutureStatus}:FUTURE_IS_BEING_COMPUTED.
127 	 * @param result
128 	 */
129     @Override
130 	public void setResult(RESULT result) {
131 		synchronized(mutex) {
132 			if (status != FutureStatus.FUTURE_IS_BEING_COMPUTED) {
133 				throw new PogamutException("Future is not being computed anymore - can't set result.", this);
134 			}			
135 			this.result = result;
136 			switchStatus(FutureStatus.FUTURE_IS_READY);
137 			if (latch != null) {
138 				while (latch.getCount() > 0) latch.countDown();
139 			} else {
140 				latch = new CountDownLatch(0);
141 			}			
142 			listeners.clearListeners();
143 		}
144 	}
145 	
146 	/** 
147 	 * Informs the future that it can't be computed due to the exception. 
148 	 * <p><p>
149 	 * Switches the status to EXCEPTION (notifying listeners along the way).
150 	 * <p><p>
151 	 * The result can be set only iff NOT {@link FutureWithListeners#isDone()}, i.e., status is {@link FutureStatus}:FUTURE_IS_BEING_COMPUTED.
152 	 * @param e
153 	 */
154     @Override
155 	public void computationException(Exception e) {
156 		synchronized(mutex) {
157 			if (status != FutureStatus.FUTURE_IS_BEING_COMPUTED) {
158 				throw new PogamutException("Future is not being computed anymore - can't process computation exception.", e);
159 			}
160 			this.exception = e;
161 			switchStatus(FutureStatus.COMPUTATION_EXCEPTION);
162 			this.result = null;
163 			if (latch != null) {
164 				while (latch.getCount() > 0) latch.countDown();
165 			} else {
166 				latch = new CountDownLatch(0);
167 			}
168 		}
169 	}
170 	
171 	/**
172 	 * Changes the status of the future (if it is different than current one) and notifies the listeners
173 	 * about this change.
174 	 * 
175 	 * @param newStatus
176 	 */
177 	protected void switchStatus(FutureStatus newStatus) {
178         synchronized(mutex){
179 			if (newStatus == status) return;
180 			oldStatus = status;
181 			this.newStatus = newStatus;
182 			status = newStatus;
183 			listeners.notify(notifier);
184         }
185 	}
186 	
187 	/**
188 	 * Factory method that should return {@link CountDownLatch} or its descendant initialized to 1.
189 	 * @return
190 	 */
191 	protected CountDownLatch createLatch() {
192 		return new CountDownLatch(1);
193 	}
194 	
195 	/**
196 	 * This should cancel the computation of the future. Current implementation returns always false. Override
197 	 * the method to provide correct behavior for particular future.
198 	 * @param mayInterruptIfRunning
199 	 * @return
200 	 */
201 	protected boolean cancelComputation(boolean mayInterruptIfRunning) {
202 		return false;
203 	}
204 	
205 	@Override
206 	public final boolean cancel(boolean mayInterruptIfRunning) {
207 		synchronized(mutex) {
208 			if (cancelComputation(mayInterruptIfRunning)) {
209 				switchStatus(FutureStatus.CANCELED);
210 				return true;
211 			} else {
212 				return false;
213 			}
214 		}
215 	}
216 
217 	@Override
218 	public RESULT get() {
219 		if (status == FutureStatus.FUTURE_IS_READY) return result;
220 		if (status == FutureStatus.FUTURE_IS_BEING_COMPUTED) {
221 			synchronized(mutex) {
222 				if (status == FutureStatus.FUTURE_IS_READY) return result;
223 				if (status == FutureStatus.FUTURE_IS_BEING_COMPUTED) {
224 					latch = createLatch();
225 				}
226 			}
227 			try {
228 				latch.await();
229 			} catch (InterruptedException e) {
230 				throw new PogamutInterruptedException("Interrupted while awaiting furure result.", e, this);
231 			}
232 			if (status == FutureStatus.FUTURE_IS_READY) return result;
233 		}
234 		return null;
235 	}
236 
237 	/**
238 	 * Returns a result or waits for the computation till timeout. 
239 	 * <p><p>
240 	 * Does not throw {@link TimeoutException}! It returns null instead - always examine status of the future
241 	 * via {@link FutureWithListeners#getStatus()} if the null is returned to tell whether the 'null' is the
242 	 * result of the computation (if the status is FUTURE_IS_READY than the 'null' is truly the result).
243 	 * @param timeout
244 	 * @param unit
245 	 * @return
246 	 */
247 	@Override
248 	public RESULT get(long timeout, TimeUnit unit) {
249 		if (status == FutureStatus.FUTURE_IS_READY) return result;
250 		if (status == FutureStatus.FUTURE_IS_BEING_COMPUTED) {
251 			synchronized(mutex) {
252 				if (status == FutureStatus.FUTURE_IS_READY) return result;
253 				if (status == FutureStatus.FUTURE_IS_BEING_COMPUTED) {
254 					latch = createLatch();
255 				}
256 			}
257 			try {
258 				latch.await(timeout, unit);
259 			} catch (InterruptedException e) {
260 				throw new PogamutInterruptedException("Interrupted while awaiting future result.", e, this);
261 			}
262 			if (status == FutureStatus.FUTURE_IS_READY) return result;
263 		}
264 		return null;
265 	}
266 
267 	@Override
268 	public boolean isCancelled() {
269 		return status == FutureStatus.CANCELED;
270 	}
271 
272 	@Override
273 	public boolean isDone() {
274 		return status != FutureStatus.FUTURE_IS_BEING_COMPUTED;
275 	}
276 
277 	/**
278 	 * Contains an exception that has happened during the computation in the case of ({@link FutureWithListeners#getStatus()} == EXCEPTION).
279 	 * @return
280 	 */
281     @Override
282 	public Exception getException() {
283 		return exception;
284 	}
285 	
286 }