View Javadoc

1   package cz.cuni.amis.pogamut.base.communication.mediator.impl;
2   
3   import java.util.concurrent.CountDownLatch;
4   import java.util.logging.Level;
5   import java.util.logging.Logger;
6   
7   import com.google.inject.Inject;
8   
9   import cz.cuni.amis.pogamut.base.agent.IAgentId;
10  import cz.cuni.amis.pogamut.base.communication.exception.CommunicationException;
11  import cz.cuni.amis.pogamut.base.communication.mediator.IMediator;
12  import cz.cuni.amis.pogamut.base.communication.translator.event.IWorldChangeEvent;
13  import cz.cuni.amis.pogamut.base.communication.translator.event.IWorldChangeEventOutput;
14  import cz.cuni.amis.pogamut.base.communication.worldview.IWorldChangeEventInput;
15  import cz.cuni.amis.pogamut.base.component.bus.IComponentBus;
16  import cz.cuni.amis.pogamut.base.component.bus.exception.ComponentNotRunningException;
17  import cz.cuni.amis.pogamut.base.component.bus.exception.ComponentPausedException;
18  import cz.cuni.amis.pogamut.base.component.controller.ComponentControlHelper;
19  import cz.cuni.amis.pogamut.base.component.controller.ComponentController;
20  import cz.cuni.amis.pogamut.base.component.controller.ComponentDependencyType;
21  import cz.cuni.amis.pogamut.base.component.controller.IComponentControlHelper;
22  import cz.cuni.amis.pogamut.base.utils.guice.AgentScoped;
23  import cz.cuni.amis.pogamut.base.utils.logging.IAgentLogger;
24  import cz.cuni.amis.pogamut.base.utils.logging.LogCategory;
25  import cz.cuni.amis.utils.exception.PogamutException;
26  import cz.cuni.amis.utils.flag.Flag;
27  import cz.cuni.amis.utils.token.Token;
28  import cz.cuni.amis.utils.token.Tokens;
29  
30  /**
31   * This class should wrap the reading thread that continuously reads 
32   * {@link IWorldChangeEvent} from the {@link IWorldChangeEventOutput}
33   * passing them to the {@IWorldEventInput} without any delay.
34   * <p><p>
35   * Note that the mediator **NEEDS** {@link Mediator#setConsumer(IWorldChangeEventInput)} called in order to be started
36   * by the {@link IComponentBus}.
37   * <p><p>
38   * Ignores {@link IComponentControlHelper#startPaused()}, performs {@link IComponentControlHelper#start()} in both start cases.
39   */
40  @AgentScoped
41  public class Mediator implements IMediator {
42  	
43  	public static final Token COMPONENT_ID = Tokens.get("Mediator");
44  
45  	/**
46  	 * Name prefix for the worker thread and for the logs.
47  	 */
48  	public static final String WORKER_THREAD_NAME_PREFIX = "MediatorWorker";
49  
50  	/**
51  	 * Worker instance - it implements Runnable interface and is continuously
52  	 * reading messages from the connection object and passing them to the
53  	 * receiver.
54  	 */
55  	protected Worker worker = null;
56  	
57  	/**
58  	 * Thread of the worker.
59  	 */
60  	protected Thread workerThread = null;
61  
62  	/**
63  	 * Mutex for start synchronization.
64  	 */
65  	protected Object threadMutex = new Object();
66  
67  	/**
68  	 * Log category for the mediator (platform log).
69  	 */
70  	private LogCategory log = null;
71  
72  	/**
73  	 * Used to get events from the world.
74  	 */
75  	private IWorldChangeEventOutput producer;
76  	
77  	/**
78  	 * Who events get passed to.
79  	 */
80  	private IWorldChangeEventInput consumer;
81  	
82  	private ComponentController controller;
83  
84  	private IComponentBus eventBus;
85  
86  	private IAgentId agentId;
87  
88  	/**
89  	 * The object in passed to the constructor (IWorldEventOutput) is world
90  	 * event producer.
91  	 * <p><p>
92  	 * The mediator will read events from this producer and pass them to the
93  	 * IWorldEventInput specified during the start() of the mediator.
94  	 * <p><p>
95  	 * Note that the mediator **NEEDS** {@link Mediator#setConsumer(IWorldChangeEventInput)} called in order to be started
96  	 * by the {@link IComponentBus}.
97  	 * 
98  	 * @param connection
99  	 * @param messageParser
100 	 * @param commandSerializer
101 	 * @throws CommunicationException
102 	 */
103 	@Inject
104 	public Mediator(IWorldChangeEventOutput producer, IComponentBus bus, IAgentLogger logger) {
105 		this.agentId = logger.getAgentId();
106 		this.log = logger.getCategory(getComponentId().getToken());
107 		this.producer = producer;
108 		this.eventBus = bus;
109 	}
110 	
111 	private IComponentControlHelper control = new ComponentControlHelper() {
112 		
113 		@Override
114 		public void stop() throws PogamutException {
115 			Worker w = worker;
116 			if (w != null) {
117 				w.stop();
118 			}
119 		}
120 		
121 		@Override
122 		public void startPaused() {
123 			start();
124 		}
125 		
126 		@Override
127 		public void start() throws PogamutException {
128 			if (workerThread != null) {
129 				if (log.isLoggable(Level.WARNING)) log.warning("Mediator worker thread already exists, leaking resources?");
130 			}
131 			synchronized (threadMutex) {
132 				if (log.isLoggable(Level.FINER)) log.finer("Starting mediator thread " + WORKER_THREAD_NAME_PREFIX + ".");
133 				worker = new Worker();
134 				workerThread = new Thread(worker, agentId.getName().getFlag() + " mediator");
135 				workerThread.start();
136 			}
137 		}
138 		
139 		@Override
140 		public void kill() {
141 			Worker w = worker;
142 			if (w != null) w.kill();
143 			Thread thread = workerThread;
144 			if (thread != null) thread.interrupt();
145 			worker = null;
146 			workerThread = null;
147 		}
148 		
149 		public void pause() {
150 			Worker w = worker;
151 			if (w != null) {
152 				w.pause();
153 			}	
154 		}
155 		
156 		public void resume() {
157 			Worker w = worker;
158 			if (w != null) {
159 				w.resume();
160 			}	
161 		}
162 		
163 		@Override
164 		public void reset() {
165 			worker = null;
166 			workerThread = null;
167 		}
168 		
169 	};
170 	
171 	@Override
172 	public Token getComponentId() {
173 		return COMPONENT_ID;
174 	}
175 	
176 	public LogCategory getLog() {
177 		return log;
178 	}
179 	
180 	@Override
181 	public void setConsumer(IWorldChangeEventInput consumer) {
182 		this.consumer = consumer;
183 		this.controller = new ComponentController(this, control, eventBus, log, ComponentDependencyType.STARTS_AFTER, producer, consumer);
184 	}
185 	
186 	@Override
187 	public String toString() {
188 		if (this == null) return "Mediator";
189 		return getClass().getSimpleName() + "[producer=" + producer + ", consumer=" + consumer +"]";
190 	}
191 
192 
193 	private class Worker implements Runnable {
194 
195 		private volatile CountDownLatch stopLatch = new CountDownLatch(1);
196 		
197 		/**
198 		 * Simple flag that is telling us whether the Worker should run.
199 		 */
200 		private volatile boolean shouldRun = true;
201 		
202 		private volatile Flag<Boolean> shouldPause = new Flag<Boolean>(false);
203 		
204 		private volatile boolean running = false;
205 		
206 		private volatile boolean exceptionExpected = false;
207 		
208 		private Thread myThread;
209 		
210 		public void pause() {
211 			shouldPause.setFlag(true);
212 		}
213 		
214 		public void resume() {
215 			shouldPause.setFlag(false);
216 		}
217 		
218 		/**
219 		 * Drops the shouldRun flag.
220 		 */
221 		public void stop() {
222 			this.shouldRun = false;
223 			this.shouldPause.setFlag(false);
224 			exceptionExpected = true;
225 			myThread.interrupt();
226 		}
227 
228 		/**
229 		 * Drops the shouldRun flag, waits for 200ms and then interrupts the
230 		 * thread in hope it helps.
231 		 */
232 		public void kill() {
233 			if (!running) return;
234 			this.shouldRun = false;
235 			this.shouldPause.setFlag(false);
236 			try {
237 				Thread.sleep(200);
238 			} catch (InterruptedException e) {
239 			}
240 			exceptionExpected = true;
241 			myThread.interrupt();
242 		}
243 		
244 		/**
245 		 * Contains main while cycle that is continuously reading messages from
246 		 * the connection (using parser), notifying listeners and then passing
247 		 * them to the message receiver.
248 		 */
249 		@Override
250 		public void run() {
251 			myThread = Thread.currentThread();
252 			
253 			// set the running flag, we've been started
254 			running = true;
255 
256 			// notify that gateway started
257 			logWorker(Level.INFO, "Started.");
258 
259 			try {
260 				IWorldChangeEventInput currentConsumer;
261 				IWorldChangeEventOutput currentProducer;
262 				IWorldChangeEvent worldEvent = null;
263 
264 				while (shouldRun && !myThread.isInterrupted()) {
265 					
266 					if (shouldPause.getFlag()) {
267 						logWorker(Level.INFO, "Paused.");
268 						shouldPause.waitFor(false);
269 						logWorker(Level.INFO, "Resumed.");
270 					}
271 					
272 					// are we alive?
273 					if (!shouldRun || myThread.isInterrupted()) {
274 						break;
275 					}
276 					
277 					currentConsumer = consumer;
278 					if (currentConsumer == null) {
279 						running = false;
280 						if (!exceptionExpected) {
281 							controller.fatalError("Event consumer lost (is null).");
282 						}
283 						break;
284 					}
285 					
286 					currentProducer = producer;
287 					if (currentProducer == null) {
288 						running = false;
289 						if (!exceptionExpected) {
290 							controller.fatalError("Event producer lost (is null).");
291 						}
292 						break;
293 					}
294 					
295 					// do we have cached event?
296 					if (worldEvent == null) {
297 						try {
298 							// following call may block
299 							worldEvent = producer.getEvent();
300 							logWorker(Level.FINEST, "received - " + String.valueOf(worldEvent), worldEvent);
301 						} catch (ComponentPausedException cpe) {
302 							logWorker(Level.INFO, "Producer is paused, pausing mediator.");
303 							shouldPause.setFlag(true);
304 							continue;
305 						} catch (ComponentNotRunningException nre) {
306 							logWorker(Level.WARNING, "Producer is not running, stopping the mediator worker.");
307 							running = false;
308 							break;
309 						} catch (Exception ce) {
310 							running = false;
311 							if (!exceptionExpected) {
312 								controller.fatalError(WORKER_THREAD_NAME_PREFIX + ": Producer exception.", ce);
313 							} else {
314 								logWorker(Level.FINE, "Producer exception expected, caught: " + ce); 
315 							}
316 							break;
317 						}
318 					}
319 					
320 					// are we alive?
321 					if (!shouldRun || myThread.isInterrupted()) {
322 						break;
323 					}
324 					
325 					// yes we are, continue
326 					try {
327 						currentConsumer.notify(worldEvent);
328 					} catch (ComponentPausedException e) {
329 						logWorker(Level.INFO, "Consumer is paused, pausing mediator.");
330 						shouldPause.setFlag(true);
331 						continue;
332 					} catch (ComponentNotRunningException e) {
333 						logWorker(Level.WARNING, "Consumer is not running, stopping mediator worker.");
334 						running = false;
335 						break;
336 					} catch (Exception e) {						
337 						running = false;
338 						if (!exceptionExpected) {
339 							controller.fatalError(WORKER_THREAD_NAME_PREFIX + ": Consumer exception.", e);
340 						} else {
341 							logWorker(Level.FINE, "Consumer exception expected, caught: " + e); 
342 						}
343 						break;
344 					}
345 					
346 					worldEvent = null;
347 				}
348 			} catch (Exception e) {
349 				running = false;
350 				if (!exceptionExpected) {
351 					controller.fatalError(WORKER_THREAD_NAME_PREFIX + ": Exception.", e);
352 				} else {
353 					logWorker(Level.FINE, "Exception expected, caught: " + e);
354 				}
355 			}
356 			
357 			try {
358 				stopLatch.countDown();
359 			} finally {
360 				// clean after yourself
361 				shouldRun = false;			
362 				running = false;
363 				
364 				synchronized(threadMutex) {
365 					if (workerThread == myThread) {
366 						worker = null; 
367 						workerThread = null;
368 					}
369 				}
370 				
371 				logWorker(Level.WARNING, "Stopped.");
372 			}
373 		}
374 
375 		private void logWorker(Level level, String message) {
376 			log.log(level, WORKER_THREAD_NAME_PREFIX + ": " + message);
377 		}
378 
379 		private void logWorker(Level level, String message, Object obj) {
380 			if (obj == null)
381 				log.log(level, WORKER_THREAD_NAME_PREFIX + ": " + message);
382 			else
383 				log.log(level, WORKER_THREAD_NAME_PREFIX + ": " + message, obj);
384 		}
385 
386 	}
387 
388 }