View Javadoc

1   package cz.cuni.amis.pogamut.base.agent.module;
2   
3   import java.util.logging.Level;
4   import java.util.logging.Logger;
5   
6   import com.google.inject.Inject;
7   
8   import cz.cuni.amis.pogamut.base.agent.IAgent;
9   import cz.cuni.amis.pogamut.base.agent.exceptions.AgentException;
10  import cz.cuni.amis.pogamut.base.agent.module.exception.LogicThreadAlteredException;
11  import cz.cuni.amis.pogamut.base.component.bus.event.EventFilter;
12  import cz.cuni.amis.pogamut.base.component.bus.event.IStartedEvent;
13  import cz.cuni.amis.pogamut.base.component.bus.event.WaitForEvent;
14  import cz.cuni.amis.pogamut.base.component.bus.exception.ComponentNotRunningException;
15  import cz.cuni.amis.pogamut.base.component.bus.exception.ComponentPausedException;
16  import cz.cuni.amis.pogamut.base.component.controller.ComponentDependencies;
17  import cz.cuni.amis.pogamut.base.component.controller.ComponentDependencyType;
18  import cz.cuni.amis.pogamut.base.component.controller.ComponentState;
19  import cz.cuni.amis.pogamut.base.component.exception.ComponentCantPauseException;
20  import cz.cuni.amis.pogamut.base.component.exception.ComponentCantResumeException;
21  import cz.cuni.amis.pogamut.base.component.exception.ComponentCantStartException;
22  import cz.cuni.amis.pogamut.base.component.exception.ComponentCantStopException;
23  import cz.cuni.amis.pogamut.base.utils.guice.AgentScoped;
24  import cz.cuni.amis.utils.Const;
25  import cz.cuni.amis.utils.ExceptionToString;
26  import cz.cuni.amis.utils.StopWatch;
27  import cz.cuni.amis.utils.exception.PogamutInterruptedException;
28  import cz.cuni.amis.utils.flag.Flag;
29  
30  @AgentScoped
31  public class LogicModule<AGENT extends IAgent> extends AgentModule<AGENT> {
32  	
33  	private static long THREAD_COUNTER = 0;
34  	
35  	private static final long LOGIC_WAIT_TIME_PLUS_MILLIS = 1000;
36  	
37  	/**
38  	 * Must be greater than 0.
39  	 */
40  	public static final long MIN_LOGIC_PERIOD_MILLIS = 1; 
41  	
42  	/**
43  	 * Must be greater than 0.
44  	 */
45  	public static final double MAX_LOGIC_FREQUENCY = 1000 / MIN_LOGIC_PERIOD_MILLIS;
46  	
47  	/**
48  	 * Must be greater than 0.
49  	 */
50  	public static final long MAX_LOGIC_PERIOD_MILLIS = 100000000;
51  	
52  	/**
53  	 * Must be greater than 0.
54  	 */
55  	public static final double MIN_LOGIC_FREQUENCY = 1000 / MAX_LOGIC_PERIOD_MILLIS;
56  	
57  	protected Object mutex = new Object();
58  	
59  	protected IAgentLogic logic;
60  
61  	protected Thread logicThread = null;
62  	
63  	protected boolean logicShouldRun = true;
64  	
65  	protected Flag<Boolean> logicRunning = new Flag<Boolean>(false);
66  	
67  	protected Flag<Boolean> logicShouldPause = new Flag<Boolean>(false);
68  	
69  	protected Flag<Boolean> logicPaused = new Flag<Boolean>(false);
70  	
71  	protected double logicFrequency = 10;
72  	
73  	protected double logicPeriod = 100;
74  	
75  	protected long lastLogicRun = 0;
76  
77  	protected Throwable logicException;
78  
79  	@Inject
80  	public LogicModule(AGENT agent, IAgentLogic logic) {
81  		this(agent, logic, null, new ComponentDependencies(ComponentDependencyType.STARTS_WITH).add(agent));
82  	}
83  	
84  	public LogicModule(AGENT agent, IAgentLogic logic, Logger log) {
85  		this(agent, logic, log, new ComponentDependencies(ComponentDependencyType.STARTS_WITH).add(agent));
86  	}
87  	
88  	public LogicModule(AGENT agent, IAgentLogic logic, Logger log, ComponentDependencies dependencies) {
89  		super(agent, log, dependencies);
90  		this.logic = logic;
91  	}
92  
93  	/**
94  	 * Unsync! Call only within synchronized(mutex) block.
95  	 */
96  	private void clearLogicRunningVars() {
97  		logicShouldPause.setFlag(false);
98  		logicPaused.setFlag(false);
99  		logicRunning.setFlag(false);
100 		logicShouldRun = true;		
101 	}
102 	
103 	@Override
104 	protected void start(boolean startPaused) throws AgentException {
105 		super.start(startPaused);
106 		synchronized(mutex) {
107 			if (logicThread != null) {
108 				if (log.isLoggable(Level.WARNING)) log.warning("Logic thread is not null! Sending interrupt and dropping the reference, possibly leaking resources.");
109 				logicThread.interrupt();
110 				logicThread = null;
111 			}
112 			long counter = THREAD_COUNTER++;
113 			String name = agent.getName() + "'s logic (" + counter + ")"; 
114 			clearLogicRunningVars();
115 			logicThread = new Thread(new LogicRunner("Thread " + counter), agent.getName() + " logic");
116 		}
117 		if (startPaused) {
118 			if (log.isLoggable(Level.WARNING)) log.fine("Starting logic thread in paused state (start paused requested).");
119 		}
120 		logicShouldPause.setFlag(startPaused);
121 		if (log.isLoggable(Level.FINE)) log.fine("Starting logic thread.");
122 		logicThread.start();
123 		long waitTime = logic.getLogicInitializeTime() + LOGIC_WAIT_TIME_PLUS_MILLIS;
124 		if (log.isLoggable(Level.INFO)) log.info("Waiting for the logic to initialize (" + waitTime + " ms).");
125 		Boolean result = logicRunning.waitFor(waitTime, true);
126 		if (!controller.inState(ComponentState.STARTING)) {
127 			throw new ComponentCantStartException("Woke up, module state differs. It is not " + ComponentState.STARTING + " but " + controller.getState().getFlag() + ".", log, this);			
128 		}
129 		if (result == null) {
130 			throw new ComponentCantStartException("Logic initialization is taking too long, did you correctly specified initialize time via getInitializeTime() method?", log, this);
131 		}
132 	}
133 	
134 	@Override
135 	public void stop() {
136 		super.stop();
137 		if (Thread.currentThread() == this.logicThread) {
138 			inThreadStopping();
139 		} else {			
140 			logicShouldRun = false;
141 			logicShouldPause.setFlag(false);
142 			long waitTime = (long)logicPeriod + LOGIC_WAIT_TIME_PLUS_MILLIS;
143 			if (log.isLoggable(Level.INFO)) log.info("Waiting for the logic to stop (" + waitTime + " ms).");
144 			Boolean result = logicRunning.waitFor(waitTime, false);
145 			if (result == null) {
146 				throw new ComponentCantStopException("Logic thread is still running! Is your logic too cpu-demanding?", log, this);
147 			}
148 		}
149 	}
150 	
151 	@Override
152 	protected void kill() {
153 		super.kill();
154 		if (Thread.currentThread() == this.logicThread) {
155 			inThreadKilling();
156 		} else {
157 			logicShouldRun = false;
158 			logicShouldPause.setFlag(false);
159 			synchronized(mutex) {
160 				if (logicThread == null) return;
161 			}
162 			long waitTime = (long)logicPeriod + LOGIC_WAIT_TIME_PLUS_MILLIS;
163 			if (log.isLoggable(Level.INFO)) log.info("Waiting for the logic to stop (" + waitTime + " ms).");
164 			Boolean result = logicRunning.waitFor(waitTime, false);
165 			synchronized(mutex) {
166 				if (logicThread == null) return;
167 				if (result == null) if (log.isLoggable(Level.WARNING)) log.warning("Logic thread is still running, sending interrupt.");
168 				else return;
169 				logicThread.interrupt();
170 			}
171 			if (log.isLoggable(Level.INFO)) log.info("Waiting for the logic to stop (" + waitTime + " ms).");
172 			result = logicRunning.waitFor(waitTime, false);
173 			synchronized(mutex) {
174 				if (logicThread == null) return;
175 				if (result == null) if (log.isLoggable(Level.WARNING)) log.warning("Logic thread is still running, is your logic too much cpu demanding?");
176 			}
177 		}
178 	}
179 
180 	@Override
181 	protected void pause() {
182 		super.pause();
183 		if (Thread.currentThread() == logicThread) {
184 			inThreadPausing();
185 		} else {
186 			logicShouldPause.setFlag(true);
187 			long waitTime = (long)logicPeriod + LOGIC_WAIT_TIME_PLUS_MILLIS;
188 			if (log.isLoggable(Level.INFO)) log.info("Waiting for the logic to pause (" + waitTime + " ms).");
189 			Boolean result = logicPaused.waitFor(waitTime, true);
190 			if (result == null) {
191 				throw new ComponentCantPauseException("Logic is still running, is your logic cpu demanding too much?", log, this);
192 			}
193 		}
194 	}
195 
196 	@Override
197 	protected void resume() {
198 		super.resume();
199 		if (Thread.currentThread() == logicThread) {
200 			inThreadResuming();
201 		} else {
202 			logicShouldPause.setFlag(false);
203 			long waitTime = (long)logicPeriod + LOGIC_WAIT_TIME_PLUS_MILLIS;
204 			if (log.isLoggable(Level.INFO)) log.info("Waiting for the logic to resume (" + waitTime + " ms).");
205 			Boolean result = logicPaused.waitFor(waitTime, false);
206 			if (result == null) {
207 				throw new ComponentCantResumeException("Logic did not resumed.", log, this);
208 			}
209 		}
210 	}
211 	
212 	protected void inThreadStopping() {
213 		inThreadWarning("Stopping", "stopped", "stop");
214 		logicShouldRun = false;
215 		logicShouldPause.setFlag(false);
216 	}
217 	
218 	protected void inThreadKilling() {
219 		inThreadWarning("Killing", "killed", "kill");
220 		logicShouldRun = false;
221 		logicShouldPause.setFlag(false);
222 		logicThread.interrupt();
223 	}
224 	
225 	protected void inThreadPausing() {
226 		inThreadWarning("Pausing", "paused", "pause");
227 		logicShouldPause.setFlag(true);
228 	}
229 	
230 	protected void inThreadResuming() {
231 		inThreadWarning("Resuming", "resumed", "resume");
232 		logicShouldPause.setFlag(false);
233 	}
234 	
235 	private void inThreadWarning(String str1, String str2, String str3) {
236 		String warning = "In-Logic-Thread " + str1 + " happens. This occurs whenever the LogicModule is being " + str2 + " from within its own thread. While this may proceed as you have expected, it is unsupported operation with uncertain result." + Const.NEW_LINE 
237 				  + "It is adviced to perform the troubling operation in different thread, e.g.:" + Const.NEW_LINE
238 				  + "    new Thread(new Runnable() {" + Const.NEW_LINE 
239 				  + "        @Override" + Const.NEW_LINE
240 				  + "        public void run() {" + Const.NEW_LINE 
241 				  + "            // do something that happens to " + str3 + " the logic module //" + Const.NEW_LINE
242 				  + "        }" + Const.NEW_LINE
243 				  + "    }).start();";
244 		if (log.isLoggable(Level.WARNING)) log.warning(warning);
245 		else if (log.isLoggable(Level.SEVERE)) log.severe(warning);
246 	}
247 	
248 	/**
249 	 * Called right before the {@link LogicModule#logic}.doLogic() is called.
250 	 */
251 	protected void beforeLogic(String threadName) {
252 	}
253 
254 	/**
255 	 * Called right after the {@link LogicModule#logic}.doLogic() is called.
256 	 */
257 	protected void afterLogic(String threadName) {
258 	}
259 	
260 	/**
261 	 * Called whenever some exception is thrown inside {@link LogicRunner}.
262 	 * @param e
263 	 */
264 	protected void afterLogicException(String threadName, Throwable e) {
265 		logicException = e;
266 	}
267 	
268 	/**
269 	 * Controls whether the {@link LogicModule#logic}.logic() will be called.
270 	 * <p><p>
271 	 * If logic is running & is not paused you may use this to fine control the moments when the logic should execute.
272 	 * <p><p>
273 	 * Returns 'true' as default.
274 	 * 
275 	 * @return whether the logic should be executing 
276 	 */
277 	protected boolean shouldExecuteLogic() {
278 		return true;
279 	}
280 	
281 	/**
282 	 * Called before the {@link IAgentLogic#logic()} is periodically called - allows you to sleep the logic until the rest of the agent is ready.
283 	 */
284 	protected void logicLatch(String threadName) {		
285 	}
286 	
287 	public double getLogicPeriod() {
288 		return logicPeriod;
289 	}
290 	
291 	/**
292 	 * 
293 	 * @return
294 	 * 			the throwable that caused the logic to crash. 
295 	 */
296 	public Throwable getLogicException(){
297 		return logicException;
298 	}
299 	
300 	public double getLogicFrequency() {
301 		return logicFrequency;
302 	}
303 	
304 	public void setMinLogicFrequency() {
305 		this.logicPeriod = MAX_LOGIC_PERIOD_MILLIS;
306 		this.logicFrequency = MIN_LOGIC_FREQUENCY;
307 	}
308 	
309 	public void setMaxLogicFrequency() {
310 		this.logicPeriod = MIN_LOGIC_PERIOD_MILLIS;
311 		this.logicFrequency = MAX_LOGIC_FREQUENCY;
312 	}
313 
314 	public void setLogicFrequency(double frequency) {
315 		this.logicFrequency = frequency;
316 		if (this.logicFrequency <= MIN_LOGIC_FREQUENCY) {
317 			this.logicPeriod = MAX_LOGIC_PERIOD_MILLIS;
318 			this.logicFrequency = MIN_LOGIC_FREQUENCY;
319 		} else 
320 		if (this.logicFrequency > MAX_LOGIC_FREQUENCY){
321 			this.logicPeriod = MIN_LOGIC_PERIOD_MILLIS;
322 			this.logicFrequency = MAX_LOGIC_FREQUENCY;
323 		} else {
324 			this.logicPeriod = 1000 / frequency;
325 			this.logicFrequency = frequency;
326 		}
327 	}
328 
329 	private class LogicRunner implements Runnable {
330 
331 		private String name;
332 		
333 		private WaitForEvent startedEvent = new WaitForEvent(eventBus, new EventFilter(IStartedEvent.class, getComponentId()));
334 
335 		private boolean firstLogic = true;
336 
337 		public LogicRunner(String name) {
338 			if (name == null) name = "unnamed";
339 			this.name = name;
340 		}
341 		
342 		@Override
343 		public void run() {
344 			if (log.isLoggable(Level.WARNING)) log.warning(name + ": Thread started.");
345 			
346 			if (log.isLoggable(Level.FINE)) log.fine(name + ": Initializing logic.");
347 			StopWatch logicWatch = new StopWatch();
348 			logic.logicInitialize(LogicModule.this);
349 			if (log.isLoggable(Level.INFO)) log.info(name + ": Logic initialized (" + logicWatch.stopStr() + ").");
350 			
351 			synchronized(mutex) {
352 				// alter logicRunning only if inside valid thread, otherwise, stop the thread.
353 				if (logicThread != Thread.currentThread()) {
354 					if (log.isLoggable(Level.SEVERE)) log.severe(name + ": Logic thread altered! Shutdown not called!");
355 					return;
356 				}
357 				logicRunning.setFlag(true);
358 			}
359 			
360 			// preallocation
361 			long sleepTime = 0;
362 			
363 			try {
364 				if (log.isLoggable(Level.FINER)) log.finer(name + ": waiting for the logic module started event.");
365 				startedEvent.await();
366 				if (log.isLoggable(Level.FINER)) log.finer(name + ": logic module started event received.");
367 				
368 				synchronized(mutex) {
369 					if (logicThread != Thread.currentThread()) {
370 						throw new LogicThreadAlteredException(name, log, LogicModule.this);
371 					}
372 				}
373 				
374 				logicLatch(name);
375 				
376 				synchronized(mutex) {
377 					if (logicThread != Thread.currentThread()) {
378 						throw new LogicThreadAlteredException(name, log, LogicModule.this);
379 					}
380 				}
381 				
382 				while(!Thread.currentThread().isInterrupted() && // the thread was not interrupted 
383 					  logicShouldRun                             // and the logic should be running 
384 				) {
385 					if (logicThread != Thread.currentThread()) throw new LogicThreadAlteredException(name, log, LogicModule.this);
386 					
387 					if (logicShouldPause.getFlag()) {
388 						synchronized(mutex) {
389 							if (logicThread != Thread.currentThread()) throw new LogicThreadAlteredException(name, log, LogicModule.this);
390 							logicPaused.setFlag(true);
391 						}
392 						if (log.isLoggable(Level.WARNING)) log.warning(name + ": Logic paused.");
393 						logicShouldPause.waitFor(false);
394 						synchronized(mutex) {
395 							if (logicThread != Thread.currentThread()) throw new LogicThreadAlteredException(name, log, LogicModule.this);
396 							logicPaused.setFlag(false);
397 						}
398 						if (log.isLoggable(Level.WARNING)) log.warning(name + ": Logic resumed.");
399 					}
400 					
401 					sleepTime = (long)logicPeriod - (System.currentTimeMillis() - lastLogicRun);
402 					if (sleepTime > 0) {
403 						if (log.isLoggable(Level.FINER)) log.finer(name + ": Sleeping for " + sleepTime + " ms.");
404 						Thread.sleep(sleepTime);
405 					}
406 					
407 					if (logicThread != Thread.currentThread()) throw new LogicThreadAlteredException(name, log, LogicModule.this);
408 					
409 					try {
410 						if (log.isLoggable(Level.FINER)) log.finer(name + ": Logic iteration.");
411 						logicWatch.start();
412 						beforeLogic(name);
413 						try {
414 							lastLogicRun = System.currentTimeMillis();
415 							if (shouldExecuteLogic()) {
416 								if (firstLogic) {
417 									logic.beforeFirstLogic();
418 									firstLogic = false;
419 								}
420 								logic.logic();
421 							} else {
422 								if (log.isLoggable(Level.INFO)) log.info(name + ": Logic should not run now...");
423 							}
424 						} catch (Exception e1) {
425 							try {
426 								afterLogicException(name, e1);
427 							} catch (Exception e2) {
428 								if (log.isLoggable(Level.SEVERE)) log.severe(ExceptionToString.process(name + ": afterLogicException() exception.", e2));
429 							}
430 							throw e1;
431 						}
432 						afterLogic(name);
433 						if (log.isLoggable(Level.FINE)) log.fine(name + ": Logic iteration finished (" + logicWatch.stopStr() + ").");
434 					} catch (ComponentPausedException e) {
435 						if (log.isLoggable(Level.INFO)) log.info(name + ": pausing the thread, received ComponentPausedException from " + e.getOrigin() + ".");
436 						logicShouldPause.setFlag(true);
437 					}
438 					
439 				}
440 				
441 			} catch (LogicThreadAlteredException e1) {
442 				if (log.isLoggable(Level.WARNING)) log.warning(name + ": Logic thread has been altered, this one is not executed anymore.");
443 				// the logic is not being executed in this thread anymore...
444 				return;
445 			} catch (InterruptedException e2) {
446 				if (log.isLoggable(Level.WARNING)) log.warning(name + ": Interrupted!");
447 			} catch (PogamutInterruptedException e3) {
448 				if (log.isLoggable(Level.WARNING)) log.warning(name + ": Interrupted!");
449 			} catch (ComponentNotRunningException e5) {
450 				log.log(log.getLevel(), name + ": stopping the thread, received ComponentNotRunningException from " + e5.getOrigin() + ".");
451 			} catch (Exception e4) {
452 				controller.fatalError(name + ": Logic iteration exception.", e4);
453 			} finally {
454 				synchronized(mutex) {
455 					if (logicThread != Thread.currentThread()) {
456 						return;
457 					}
458 					try {
459 						if (log.isLoggable(Level.FINE)) log.fine(name + ": Shutting down the logic.");
460 						logicWatch.start();
461 						logic.logicShutdown();
462 						if (log.isLoggable(Level.INFO)) log.info(name + ": Logic shutdown (" + logicWatch.stopStr() + ").");
463 					} catch (Exception e) {
464 						controller.fatalError(name + ": Logic shutdown exception.", e);
465 					} finally {
466 						if (logicThread == Thread.currentThread()) {
467 							clearLogicRunningVars();
468 							logicThread = null;
469 						}
470 						if (log.isLoggable(Level.WARNING)) log.warning(name + ": Logic thread stopped.");
471 					}
472 				}
473 			}
474 		}
475 		
476 	}
477 
478 }