View Javadoc

1   package cz.cuni.amis.pogamut.base.agent.module;
2   
3   import java.util.logging.Level;
4   import java.util.logging.Logger;
5   
6   import com.google.inject.Inject;
7   
8   import cz.cuni.amis.pogamut.base.agent.IAgent;
9   import cz.cuni.amis.pogamut.base.agent.exceptions.AgentException;
10  import cz.cuni.amis.pogamut.base.agent.module.exception.LogicThreadAlteredException;
11  import cz.cuni.amis.pogamut.base.component.bus.event.EventFilter;
12  import cz.cuni.amis.pogamut.base.component.bus.event.IStartedEvent;
13  import cz.cuni.amis.pogamut.base.component.bus.event.WaitForEvent;
14  import cz.cuni.amis.pogamut.base.component.bus.exception.ComponentNotRunningException;
15  import cz.cuni.amis.pogamut.base.component.bus.exception.ComponentPausedException;
16  import cz.cuni.amis.pogamut.base.component.controller.ComponentDependencies;
17  import cz.cuni.amis.pogamut.base.component.controller.ComponentDependencyType;
18  import cz.cuni.amis.pogamut.base.component.controller.ComponentState;
19  import cz.cuni.amis.pogamut.base.component.exception.ComponentCantPauseException;
20  import cz.cuni.amis.pogamut.base.component.exception.ComponentCantResumeException;
21  import cz.cuni.amis.pogamut.base.component.exception.ComponentCantStartException;
22  import cz.cuni.amis.pogamut.base.component.exception.ComponentCantStopException;
23  import cz.cuni.amis.pogamut.base.utils.guice.AgentScoped;
24  import cz.cuni.amis.utils.Const;
25  import cz.cuni.amis.utils.ExceptionToString;
26  import cz.cuni.amis.utils.StopWatch;
27  import cz.cuni.amis.utils.exception.PogamutInterruptedException;
28  import cz.cuni.amis.utils.flag.Flag;
29  
30  @AgentScoped
31  public class LogicModule<AGENT extends IAgent> extends AgentModule<AGENT> {
32  	
33  	private static long THREAD_COUNTER = 0;
34  	
35  	private static final long LOGIC_WAIT_TIME_PLUS_MILLIS = 1000;
36  	
37  	/**
38  	 * Must be greater than 0.
39  	 */
40  	public static final long MIN_LOGIC_PERIOD_MILLIS = 1; 
41  	
42  	/**
43  	 * Must be greater than 0.
44  	 */
45  	public static final double MAX_LOGIC_FREQUENCY = 1000 / MIN_LOGIC_PERIOD_MILLIS;
46  	
47  	/**
48  	 * Must be greater than 0.
49  	 */
50  	public static final long MAX_LOGIC_PERIOD_MILLIS = 100000000;
51  	
52  	/**
53  	 * Must be greater than 0.
54  	 */
55  	public static final double MIN_LOGIC_FREQUENCY = 1000 / MAX_LOGIC_PERIOD_MILLIS;
56  	
57  	protected Object mutex = new Object();
58  	
59  	protected IAgentLogic logic;
60  
61  	protected Thread logicThread = null;
62  	
63  	protected boolean logicShouldRun = true;
64  	
65  	protected Flag<Boolean> logicRunning = new Flag<Boolean>(false);
66  	
67  	protected Flag<Boolean> logicShouldPause = new Flag<Boolean>(false);
68  	
69  	protected Flag<Boolean> logicPaused = new Flag<Boolean>(false);
70  	
71  	protected double logicFrequency = 10;
72  	
73  	protected double logicPeriod = 100;
74  	
75  	protected long lastLogicRun = 0;
76  
77  	protected Throwable logicException;
78  
79  	@Inject
80  	public LogicModule(AGENT agent, IAgentLogic logic) {
81  		this(agent, logic, null, new ComponentDependencies(ComponentDependencyType.STARTS_WITH).add(agent));
82  	}
83  	
84  	public LogicModule(AGENT agent, IAgentLogic logic, Logger log) {
85  		this(agent, logic, log, new ComponentDependencies(ComponentDependencyType.STARTS_WITH).add(agent));
86  	}
87  	
88  	public LogicModule(AGENT agent, IAgentLogic logic, Logger log, ComponentDependencies dependencies) {
89  		super(agent, log, dependencies);
90  		this.logic = logic;
91  	}
92  
93  	/**
94  	 * Unsync! Call only within synchronized(mutex) block.
95  	 */
96  	private void clearLogicRunningVars() {
97  		logicShouldPause.setFlag(false);
98  		logicPaused.setFlag(false);
99  		logicRunning.setFlag(false);
100 		logicShouldRun = true;		
101 	}
102 	
103 	@Override
104 	protected void start(boolean startPaused) throws AgentException {
105 		super.start(startPaused);
106 		synchronized(mutex) {
107 			if (logicThread != null) {
108 				if (log.isLoggable(Level.WARNING)) log.warning("Logic thread is not null! Sending interrupt and dropping the reference, possibly leaking resources.");
109 				logicThread.interrupt();
110 				logicThread = null;
111 			}
112 			long counter = THREAD_COUNTER++;
113 			String name = agent.getName() + "'s logic (" + counter + ")"; 
114 			clearLogicRunningVars();
115 			logicThread = new Thread(new LogicRunner("Thread " + counter), agent.getName() + " logic");
116 		}
117 		if (startPaused) {
118 			if (log.isLoggable(Level.WARNING)) log.fine("Starting logic thread in paused state (start paused requested).");
119 		}
120 		logicShouldPause.setFlag(startPaused);
121 		if (log.isLoggable(Level.FINE)) log.fine("Starting logic thread.");
122 		logicThread.start();
123 		long waitTime = logic.getLogicInitializeTime() + LOGIC_WAIT_TIME_PLUS_MILLIS;
124 		if (log.isLoggable(Level.INFO)) log.info("Waiting for the logic to initialize (" + waitTime + " ms).");
125 		Boolean result = logicRunning.waitFor(waitTime, true);
126 		if (!controller.inState(ComponentState.STARTING)) {
127 			throw new ComponentCantStartException("Woke up, module state differs. It is not " + ComponentState.STARTING + " but " + controller.getState().getFlag() + ".", log, this);			
128 		}
129 		if (result == null) {
130 			throw new ComponentCantStartException("Logic initialization is taking too long, did you correctly specified initialize time via getInitializeTime() method?", log, this);
131 		}
132 	}
133 	
134 	@Override
135 	public void stop() {
136 		super.stop();
137 		if (Thread.currentThread() == this.logicThread) {
138 			inThreadStopping();
139 		} else {			
140 			logicShouldRun = false;
141 			logicShouldPause.setFlag(false);
142 			long waitTime = (long)logicPeriod + LOGIC_WAIT_TIME_PLUS_MILLIS;
143 			if (log.isLoggable(Level.INFO)) log.info("Waiting for the logic to stop (" + waitTime + " ms).");
144 			Boolean result = logicRunning.waitFor(waitTime, false);
145 			if (result == null) {
146 				throw new ComponentCantStopException("Logic thread is still running! Is your logic too cpu-demanding?", log, this);
147 			}
148 		}
149 	}
150 	
151 	@Override
152 	protected void kill() {
153 		super.kill();
154 		if (Thread.currentThread() == this.logicThread) {
155 			inThreadKilling();
156 		} else {
157 			logicShouldRun = false;
158 			logicShouldPause.setFlag(false);
159 			synchronized(mutex) {
160 				if (logicThread == null) return;
161 			}
162 			long waitTime = (long)logicPeriod + LOGIC_WAIT_TIME_PLUS_MILLIS;
163 			if (log.isLoggable(Level.INFO)) log.info("Waiting for the logic to stop (" + waitTime + " ms).");
164 			Boolean result = logicRunning.waitFor(waitTime, false);
165 			synchronized(mutex) {
166 				if (logicThread == null) return;
167 				if (result == null) if (log.isLoggable(Level.WARNING)) log.warning("Logic thread is still running, sending interrupt.");
168 				else return;
169 				logicThread.interrupt();
170 			}
171 			if (log.isLoggable(Level.INFO)) log.info("Waiting for the logic to stop (" + waitTime + " ms).");
172 			result = logicRunning.waitFor(waitTime, false);
173 			synchronized(mutex) {
174 				if (logicThread == null) return;
175 				if (result == null) if (log.isLoggable(Level.WARNING)) log.warning("Logic thread is still running, is your logic too much cpu demanding?");
176 			}
177 		}
178 	}
179 
180 	@Override
181 	protected void pause() {
182 		super.pause();
183 		if (Thread.currentThread() == logicThread) {
184 			inThreadPausing();
185 		} else {
186 			logicShouldPause.setFlag(true);
187 			long waitTime = (long)logicPeriod + LOGIC_WAIT_TIME_PLUS_MILLIS;
188 			if (log.isLoggable(Level.INFO)) log.info("Waiting for the logic to pause (" + waitTime + " ms).");
189 			Boolean result = logicPaused.waitFor(waitTime, true);
190 			if (result == null) {
191 				throw new ComponentCantPauseException("Logic is still running, is your logic cpu demanding too much?", log, this);
192 			}
193 		}
194 	}
195 
196 	@Override
197 	protected void resume() {
198 		super.resume();
199 		if (Thread.currentThread() == logicThread) {
200 			inThreadResuming();
201 		} else {
202 			logicShouldPause.setFlag(false);
203 			long waitTime = (long)logicPeriod + LOGIC_WAIT_TIME_PLUS_MILLIS;
204 			if (log.isLoggable(Level.INFO)) log.info("Waiting for the logic to resume (" + waitTime + " ms).");
205 			Boolean result = logicPaused.waitFor(waitTime, false);
206 			if (result == null) {
207 				throw new ComponentCantResumeException("Logic did not resumed.", log, this);
208 			}
209 		}
210 	}
211 	
212 	protected void inThreadStopping() {
213 		inThreadWarning("Stopping", "stopped", "stop");
214 		logicShouldRun = false;
215 		logicShouldPause.setFlag(false);
216 	}
217 	
218 	protected void inThreadKilling() {
219 		inThreadWarning("Killing", "killed", "kill");
220 		logicShouldRun = false;
221 		logicShouldPause.setFlag(false);
222 		logicThread.interrupt();
223 	}
224 	
225 	protected void inThreadPausing() {
226 		inThreadWarning("Pausing", "paused", "pause");
227 		logicShouldPause.setFlag(true);
228 	}
229 	
230 	protected void inThreadResuming() {
231 		inThreadWarning("Resuming", "resumed", "resume");
232 		logicShouldPause.setFlag(false);
233 	}
234 	
235 	private void inThreadWarning(String str1, String str2, String str3) {
236 		String warning = "In-Logic-Thread " + str1 + " happens. This occurs whenever the LogicModule is being " + str2 + " from within its own thread. While this may proceed as you have expected, it is unsupported operation with uncertain result." + Const.NEW_LINE 
237 				  + "It is adviced to perform the troubling operation in different thread, e.g.:" + Const.NEW_LINE
238 				  + "    new Thread(new Runnable() {" + Const.NEW_LINE 
239 				  + "        @Override" + Const.NEW_LINE
240 				  + "        public void run() {" + Const.NEW_LINE 
241 				  + "            // do something that happens to " + str3 + " the logic module //" + Const.NEW_LINE
242 				  + "        }" + Const.NEW_LINE
243 				  + "    }).start();"
244 				  + Const.NEW_LINE
245 				  + "Stacktrace:"
246 				  + Const.NEW_LINE
247 				  + ExceptionToString.getCurrentStackTrace();
248 		
249 		if (log.isLoggable(Level.WARNING)) log.warning(warning);
250 		else if (log.isLoggable(Level.SEVERE)) log.severe(warning);
251 	}
252 	
253 	/**
254 	 * Called right before the {@link LogicModule#logic}.doLogic() is called.
255 	 */
256 	protected void beforeLogic(String threadName) {
257 	}
258 
259 	/**
260 	 * Called right after the {@link LogicModule#logic}.doLogic() is called.
261 	 */
262 	protected void afterLogic(String threadName) {
263 	}
264 	
265 	/**
266 	 * Called whenever some exception is thrown inside {@link LogicRunner}.
267 	 * @param e
268 	 */
269 	protected void afterLogicException(String threadName, Throwable e) {
270 		logicException = e;
271 	}
272 	
273 	/**
274 	 * Controls whether the {@link LogicModule#logic}.logic() will be called.
275 	 * <p><p>
276 	 * If logic is running & is not paused you may use this to fine control the moments when the logic should execute.
277 	 * <p><p>
278 	 * Returns 'true' as default.
279 	 * 
280 	 * @return whether the logic should be executing 
281 	 */
282 	protected boolean shouldExecuteLogic() {
283 		return true;
284 	}
285 	
286 	/**
287 	 * Called before the {@link IAgentLogic#logic()} is periodically called - allows you to sleep the logic until the rest of the agent is ready.
288 	 */
289 	protected void logicLatch(String threadName) {		
290 	}
291 	
292 	public double getLogicPeriod() {
293 		return logicPeriod;
294 	}
295 	
296 	/**
297 	 * 
298 	 * @return
299 	 * 			the throwable that caused the logic to crash. 
300 	 */
301 	public Throwable getLogicException(){
302 		return logicException;
303 	}
304 	
305 	public double getLogicFrequency() {
306 		return logicFrequency;
307 	}
308 	
309 	public void setMinLogicFrequency() {
310 		this.logicPeriod = MAX_LOGIC_PERIOD_MILLIS;
311 		this.logicFrequency = MIN_LOGIC_FREQUENCY;
312 	}
313 	
314 	public void setMaxLogicFrequency() {
315 		this.logicPeriod = MIN_LOGIC_PERIOD_MILLIS;
316 		this.logicFrequency = MAX_LOGIC_FREQUENCY;
317 	}
318 
319 	public void setLogicFrequency(double frequency) {
320 		this.logicFrequency = frequency;
321 		if (this.logicFrequency <= MIN_LOGIC_FREQUENCY) {
322 			this.logicPeriod = MAX_LOGIC_PERIOD_MILLIS;
323 			this.logicFrequency = MIN_LOGIC_FREQUENCY;
324 		} else 
325 		if (this.logicFrequency > MAX_LOGIC_FREQUENCY){
326 			this.logicPeriod = MIN_LOGIC_PERIOD_MILLIS;
327 			this.logicFrequency = MAX_LOGIC_FREQUENCY;
328 		} else {
329 			this.logicPeriod = 1000 / frequency;
330 			this.logicFrequency = frequency;
331 		}
332 	}
333 
334 	private class LogicRunner implements Runnable {
335 
336 		private String name;
337 		
338 		private WaitForEvent startedEvent = new WaitForEvent(eventBus, new EventFilter(IStartedEvent.class, getComponentId()));
339 
340 		private boolean firstLogic = true;
341 
342 		public LogicRunner(String name) {
343 			if (name == null) name = "unnamed";
344 			this.name = name;
345 		}
346 		
347 		@Override
348 		public void run() {
349 			if (log.isLoggable(Level.WARNING)) log.warning(name + ": Thread started.");
350 			
351 			if (log.isLoggable(Level.FINE)) log.fine(name + ": Initializing logic.");
352 			StopWatch logicWatch = new StopWatch();
353 			logic.logicInitialize(LogicModule.this);
354 			if (log.isLoggable(Level.INFO)) log.info(name + ": Logic initialized (" + logicWatch.stopStr() + ").");
355 			
356 			synchronized(mutex) {
357 				// alter logicRunning only if inside valid thread, otherwise, stop the thread.
358 				if (logicThread != Thread.currentThread()) {
359 					if (log.isLoggable(Level.SEVERE)) log.severe(name + ": Logic thread altered! Shutdown not called!");
360 					return;
361 				}
362 				logicRunning.setFlag(true);
363 			}
364 			
365 			// preallocation
366 			long sleepTime = 0;
367 			
368 			try {
369 				if (log.isLoggable(Level.FINER)) log.finer(name + ": waiting for the logic module started event.");
370 				startedEvent.await();
371 				if (log.isLoggable(Level.FINER)) log.finer(name + ": logic module started event received.");
372 				
373 				synchronized(mutex) {
374 					if (logicThread != Thread.currentThread()) {
375 						throw new LogicThreadAlteredException(name, log, LogicModule.this);
376 					}
377 				}
378 				
379 				logicLatch(name);
380 				
381 				synchronized(mutex) {
382 					if (logicThread != Thread.currentThread()) {
383 						throw new LogicThreadAlteredException(name, log, LogicModule.this);
384 					}
385 				}
386 				
387 				while(!Thread.currentThread().isInterrupted() && // the thread was not interrupted 
388 					  logicShouldRun                             // and the logic should be running 
389 				) {
390 					if (logicThread != Thread.currentThread()) throw new LogicThreadAlteredException(name, log, LogicModule.this);
391 					
392 					if (logicShouldPause.getFlag()) {
393 						synchronized(mutex) {
394 							if (logicThread != Thread.currentThread()) throw new LogicThreadAlteredException(name, log, LogicModule.this);
395 							logicPaused.setFlag(true);
396 						}
397 						if (log.isLoggable(Level.WARNING)) log.warning(name + ": Logic paused.");
398 						logicShouldPause.waitFor(false);
399 						synchronized(mutex) {
400 							if (logicThread != Thread.currentThread()) throw new LogicThreadAlteredException(name, log, LogicModule.this);
401 							logicPaused.setFlag(false);
402 						}
403 						if (log.isLoggable(Level.WARNING)) log.warning(name + ": Logic resumed.");
404 					}
405 					
406 					sleepTime = (long)logicPeriod - (System.currentTimeMillis() - lastLogicRun);
407 					if (sleepTime > 0) {
408 						if (log.isLoggable(Level.FINER)) log.finer(name + ": Sleeping for " + sleepTime + " ms.");
409 						Thread.sleep(sleepTime);
410 					}
411 					
412 					if (logicThread != Thread.currentThread()) throw new LogicThreadAlteredException(name, log, LogicModule.this);
413 					
414 					try {
415 						if (log.isLoggable(Level.FINER)) log.finer(name + ": Logic iteration.");
416 						logicWatch.start();
417 						beforeLogic(name);
418 						try {
419 							lastLogicRun = System.currentTimeMillis();
420 							if (shouldExecuteLogic()) {
421 								if (firstLogic) {
422 									logic.beforeFirstLogic();
423 									firstLogic = false;
424 								}
425 								logic.logic();
426 							} else {
427 								if (log.isLoggable(Level.INFO)) log.info(name + ": Logic should not run now...");
428 							}
429 						} catch (Exception e1) {
430 							try {
431 								afterLogicException(name, e1);
432 							} catch (Exception e2) {
433 								if (log.isLoggable(Level.SEVERE)) log.severe(ExceptionToString.process(name + ": afterLogicException() exception.", e2));
434 							}
435 							throw e1;
436 						}
437 						afterLogic(name);
438 						if (log.isLoggable(Level.FINE)) log.fine(name + ": Logic iteration finished (" + logicWatch.stopStr() + ").");
439 					} catch (ComponentPausedException e) {
440 						if (log.isLoggable(Level.INFO)) log.info(name + ": pausing the thread, received ComponentPausedException from " + e.getOrigin() + ".");
441 						logicShouldPause.setFlag(true);
442 					}
443 					
444 				}
445 				
446 			} catch (LogicThreadAlteredException e1) {
447 				if (log.isLoggable(Level.WARNING)) log.warning(name + ": Logic thread has been altered, this one is not executed anymore.");
448 				// the logic is not being executed in this thread anymore...
449 				return;
450 			} catch (InterruptedException e2) {
451 				if (log.isLoggable(Level.WARNING)) log.warning(name + ": Interrupted!");
452 			} catch (PogamutInterruptedException e3) {
453 				if (log.isLoggable(Level.WARNING)) log.warning(name + ": Interrupted!");
454 			} catch (ComponentNotRunningException e5) {
455 				log.log(log.getLevel(), name + ": stopping the thread, received ComponentNotRunningException from " + e5.getOrigin() + ".");
456 			} catch (Exception e4) {
457 				controller.fatalError(name + ": Logic iteration exception.", e4);
458 			} finally {
459 				synchronized(mutex) {
460 					if (logicThread != Thread.currentThread()) {
461 						return;
462 					}
463 					try {
464 						if (log.isLoggable(Level.FINE)) log.fine(name + ": Shutting down the logic.");
465 						logicWatch.start();
466 						logic.logicShutdown();
467 						if (log.isLoggable(Level.INFO)) log.info(name + ": Logic shutdown (" + logicWatch.stopStr() + ").");
468 					} catch (Exception e) {
469 						controller.fatalError(name + ": Logic shutdown exception.", e);
470 					} finally {
471 						if (logicThread == Thread.currentThread()) {
472 							clearLogicRunningVars();
473 							logicThread = null;
474 						}
475 						if (log.isLoggable(Level.WARNING)) log.warning(name + ": Logic thread stopped.");
476 					}
477 				}
478 			}
479 		}
480 		
481 	}
482 
483 }