1 package cz.cuni.amis.pogamut.base.agent.module;
2
3 import java.util.logging.Level;
4 import java.util.logging.Logger;
5
6 import com.google.inject.Inject;
7
8 import cz.cuni.amis.pogamut.base.agent.IAgent;
9 import cz.cuni.amis.pogamut.base.agent.exceptions.AgentException;
10 import cz.cuni.amis.pogamut.base.agent.module.exception.LogicThreadAlteredException;
11 import cz.cuni.amis.pogamut.base.component.bus.event.EventFilter;
12 import cz.cuni.amis.pogamut.base.component.bus.event.IStartedEvent;
13 import cz.cuni.amis.pogamut.base.component.bus.event.WaitForEvent;
14 import cz.cuni.amis.pogamut.base.component.bus.exception.ComponentNotRunningException;
15 import cz.cuni.amis.pogamut.base.component.bus.exception.ComponentPausedException;
16 import cz.cuni.amis.pogamut.base.component.controller.ComponentDependencies;
17 import cz.cuni.amis.pogamut.base.component.controller.ComponentDependencyType;
18 import cz.cuni.amis.pogamut.base.component.controller.ComponentState;
19 import cz.cuni.amis.pogamut.base.component.exception.ComponentCantPauseException;
20 import cz.cuni.amis.pogamut.base.component.exception.ComponentCantResumeException;
21 import cz.cuni.amis.pogamut.base.component.exception.ComponentCantStartException;
22 import cz.cuni.amis.pogamut.base.component.exception.ComponentCantStopException;
23 import cz.cuni.amis.pogamut.base.utils.guice.AgentScoped;
24 import cz.cuni.amis.utils.Const;
25 import cz.cuni.amis.utils.ExceptionToString;
26 import cz.cuni.amis.utils.StopWatch;
27 import cz.cuni.amis.utils.exception.PogamutInterruptedException;
28 import cz.cuni.amis.utils.flag.Flag;
29
30 @AgentScoped
31 public class LogicModule<AGENT extends IAgent> extends AgentModule<AGENT> {
32
33 private static long THREAD_COUNTER = 0;
34
35 private static final long LOGIC_WAIT_TIME_PLUS_MILLIS = 1000;
36
37
38
39
40 public static final long MIN_LOGIC_PERIOD_MILLIS = 1;
41
42
43
44
45 public static final double MAX_LOGIC_FREQUENCY = 1000 / MIN_LOGIC_PERIOD_MILLIS;
46
47
48
49
50 public static final long MAX_LOGIC_PERIOD_MILLIS = 100000000;
51
52
53
54
55 public static final double MIN_LOGIC_FREQUENCY = 1000 / MAX_LOGIC_PERIOD_MILLIS;
56
57 protected Object mutex = new Object();
58
59 protected IAgentLogic logic;
60
61 protected Thread logicThread = null;
62
63 protected boolean logicShouldRun = true;
64
65 protected Flag<Boolean> logicRunning = new Flag<Boolean>(false);
66
67 protected Flag<Boolean> logicShouldPause = new Flag<Boolean>(false);
68
69 protected Flag<Boolean> logicPaused = new Flag<Boolean>(false);
70
71 protected double logicFrequency = 10;
72
73 protected double logicPeriod = 100;
74
75 protected long lastLogicRun = 0;
76
77 protected Throwable logicException;
78
79 @Inject
80 public LogicModule(AGENT agent, IAgentLogic logic) {
81 this(agent, logic, null, new ComponentDependencies(ComponentDependencyType.STARTS_WITH).add(agent));
82 }
83
84 public LogicModule(AGENT agent, IAgentLogic logic, Logger log) {
85 this(agent, logic, log, new ComponentDependencies(ComponentDependencyType.STARTS_WITH).add(agent));
86 }
87
88 public LogicModule(AGENT agent, IAgentLogic logic, Logger log, ComponentDependencies dependencies) {
89 super(agent, log, dependencies);
90 this.logic = logic;
91 }
92
93
94
95
96 private void clearLogicRunningVars() {
97 logicShouldPause.setFlag(false);
98 logicPaused.setFlag(false);
99 logicRunning.setFlag(false);
100 logicShouldRun = true;
101 }
102
103 @Override
104 protected void start(boolean startPaused) throws AgentException {
105 super.start(startPaused);
106 synchronized(mutex) {
107 if (logicThread != null) {
108 if (log.isLoggable(Level.WARNING)) log.warning("Logic thread is not null! Sending interrupt and dropping the reference, possibly leaking resources.");
109 logicThread.interrupt();
110 logicThread = null;
111 }
112 long counter = THREAD_COUNTER++;
113 String name = agent.getName() + "'s logic (" + counter + ")";
114 clearLogicRunningVars();
115 logicThread = new Thread(new LogicRunner("Thread " + counter), agent.getName() + " logic");
116 }
117 if (startPaused) {
118 if (log.isLoggable(Level.WARNING)) log.fine("Starting logic thread in paused state (start paused requested).");
119 }
120 logicShouldPause.setFlag(startPaused);
121 if (log.isLoggable(Level.FINE)) log.fine("Starting logic thread.");
122 logicThread.start();
123 long waitTime = logic.getLogicInitializeTime() + LOGIC_WAIT_TIME_PLUS_MILLIS;
124 if (log.isLoggable(Level.INFO)) log.info("Waiting for the logic to initialize (" + waitTime + " ms).");
125 Boolean result = logicRunning.waitFor(waitTime, true);
126 if (!controller.inState(ComponentState.STARTING)) {
127 throw new ComponentCantStartException("Woke up, module state differs. It is not " + ComponentState.STARTING + " but " + controller.getState().getFlag() + ".", log, this);
128 }
129 if (result == null) {
130 throw new ComponentCantStartException("Logic initialization is taking too long, did you correctly specified initialize time via getInitializeTime() method?", log, this);
131 }
132 }
133
134 @Override
135 public void stop() {
136 super.stop();
137 if (Thread.currentThread() == this.logicThread) {
138 inThreadStopping();
139 } else {
140 logicShouldRun = false;
141 logicShouldPause.setFlag(false);
142 long waitTime = (long)logicPeriod + LOGIC_WAIT_TIME_PLUS_MILLIS;
143 if (log.isLoggable(Level.INFO)) log.info("Waiting for the logic to stop (" + waitTime + " ms).");
144 Boolean result = logicRunning.waitFor(waitTime, false);
145 if (result == null) {
146 throw new ComponentCantStopException("Logic thread is still running! Is your logic too cpu-demanding?", log, this);
147 }
148 }
149 }
150
151 @Override
152 protected void kill() {
153 super.kill();
154 if (Thread.currentThread() == this.logicThread) {
155 inThreadKilling();
156 } else {
157 logicShouldRun = false;
158 logicShouldPause.setFlag(false);
159 synchronized(mutex) {
160 if (logicThread == null) return;
161 }
162 long waitTime = (long)logicPeriod + LOGIC_WAIT_TIME_PLUS_MILLIS;
163 if (log.isLoggable(Level.INFO)) log.info("Waiting for the logic to stop (" + waitTime + " ms).");
164 Boolean result = logicRunning.waitFor(waitTime, false);
165 synchronized(mutex) {
166 if (logicThread == null) return;
167 if (result == null) if (log.isLoggable(Level.WARNING)) log.warning("Logic thread is still running, sending interrupt.");
168 else return;
169 logicThread.interrupt();
170 }
171 if (log.isLoggable(Level.INFO)) log.info("Waiting for the logic to stop (" + waitTime + " ms).");
172 result = logicRunning.waitFor(waitTime, false);
173 synchronized(mutex) {
174 if (logicThread == null) return;
175 if (result == null) if (log.isLoggable(Level.WARNING)) log.warning("Logic thread is still running, is your logic too much cpu demanding?");
176 }
177 }
178 }
179
180 @Override
181 protected void pause() {
182 super.pause();
183 if (Thread.currentThread() == logicThread) {
184 inThreadPausing();
185 } else {
186 logicShouldPause.setFlag(true);
187 long waitTime = (long)logicPeriod + LOGIC_WAIT_TIME_PLUS_MILLIS;
188 if (log.isLoggable(Level.INFO)) log.info("Waiting for the logic to pause (" + waitTime + " ms).");
189 Boolean result = logicPaused.waitFor(waitTime, true);
190 if (result == null) {
191 throw new ComponentCantPauseException("Logic is still running, is your logic cpu demanding too much?", log, this);
192 }
193 }
194 }
195
196 @Override
197 protected void resume() {
198 super.resume();
199 if (Thread.currentThread() == logicThread) {
200 inThreadResuming();
201 } else {
202 logicShouldPause.setFlag(false);
203 long waitTime = (long)logicPeriod + LOGIC_WAIT_TIME_PLUS_MILLIS;
204 if (log.isLoggable(Level.INFO)) log.info("Waiting for the logic to resume (" + waitTime + " ms).");
205 Boolean result = logicPaused.waitFor(waitTime, false);
206 if (result == null) {
207 throw new ComponentCantResumeException("Logic did not resumed.", log, this);
208 }
209 }
210 }
211
212 protected void inThreadStopping() {
213 inThreadWarning("Stopping", "stopped", "stop");
214 logicShouldRun = false;
215 logicShouldPause.setFlag(false);
216 }
217
218 protected void inThreadKilling() {
219 inThreadWarning("Killing", "killed", "kill");
220 logicShouldRun = false;
221 logicShouldPause.setFlag(false);
222 logicThread.interrupt();
223 }
224
225 protected void inThreadPausing() {
226 inThreadWarning("Pausing", "paused", "pause");
227 logicShouldPause.setFlag(true);
228 }
229
230 protected void inThreadResuming() {
231 inThreadWarning("Resuming", "resumed", "resume");
232 logicShouldPause.setFlag(false);
233 }
234
235 private void inThreadWarning(String str1, String str2, String str3) {
236 String warning = "In-Logic-Thread " + str1 + " happens. This occurs whenever the LogicModule is being " + str2 + " from within its own thread. While this may proceed as you have expected, it is unsupported operation with uncertain result." + Const.NEW_LINE
237 + "It is adviced to perform the troubling operation in different thread, e.g.:" + Const.NEW_LINE
238 + " new Thread(new Runnable() {" + Const.NEW_LINE
239 + " @Override" + Const.NEW_LINE
240 + " public void run() {" + Const.NEW_LINE
241 + " // do something that happens to " + str3 + " the logic module //" + Const.NEW_LINE
242 + " }" + Const.NEW_LINE
243 + " }).start();"
244 + Const.NEW_LINE
245 + "Stacktrace:"
246 + Const.NEW_LINE
247 + ExceptionToString.getCurrentStackTrace();
248
249 if (log.isLoggable(Level.WARNING)) log.warning(warning);
250 else if (log.isLoggable(Level.SEVERE)) log.severe(warning);
251 }
252
253
254
255
256 protected void beforeLogic(String threadName) {
257 }
258
259
260
261
262 protected void afterLogic(String threadName) {
263 }
264
265
266
267
268
269 protected void afterLogicException(String threadName, Throwable e) {
270 logicException = e;
271 }
272
273
274
275
276
277
278
279
280
281
282 protected boolean shouldExecuteLogic() {
283 return true;
284 }
285
286
287
288
289 protected void logicLatch(String threadName) {
290 }
291
292 public double getLogicPeriod() {
293 return logicPeriod;
294 }
295
296
297
298
299
300
301 public Throwable getLogicException(){
302 return logicException;
303 }
304
305 public double getLogicFrequency() {
306 return logicFrequency;
307 }
308
309 public void setMinLogicFrequency() {
310 this.logicPeriod = MAX_LOGIC_PERIOD_MILLIS;
311 this.logicFrequency = MIN_LOGIC_FREQUENCY;
312 }
313
314 public void setMaxLogicFrequency() {
315 this.logicPeriod = MIN_LOGIC_PERIOD_MILLIS;
316 this.logicFrequency = MAX_LOGIC_FREQUENCY;
317 }
318
319 public void setLogicFrequency(double frequency) {
320 this.logicFrequency = frequency;
321 if (this.logicFrequency <= MIN_LOGIC_FREQUENCY) {
322 this.logicPeriod = MAX_LOGIC_PERIOD_MILLIS;
323 this.logicFrequency = MIN_LOGIC_FREQUENCY;
324 } else
325 if (this.logicFrequency > MAX_LOGIC_FREQUENCY){
326 this.logicPeriod = MIN_LOGIC_PERIOD_MILLIS;
327 this.logicFrequency = MAX_LOGIC_FREQUENCY;
328 } else {
329 this.logicPeriod = 1000 / frequency;
330 this.logicFrequency = frequency;
331 }
332 }
333
334 private class LogicRunner implements Runnable {
335
336 private String name;
337
338 private WaitForEvent startedEvent = new WaitForEvent(eventBus, new EventFilter(IStartedEvent.class, getComponentId()));
339
340 private boolean firstLogic = true;
341
342 public LogicRunner(String name) {
343 if (name == null) name = "unnamed";
344 this.name = name;
345 }
346
347 @Override
348 public void run() {
349 if (log.isLoggable(Level.WARNING)) log.warning(name + ": Thread started.");
350
351 if (log.isLoggable(Level.FINE)) log.fine(name + ": Initializing logic.");
352 StopWatch logicWatch = new StopWatch();
353 logic.logicInitialize(LogicModule.this);
354 if (log.isLoggable(Level.INFO)) log.info(name + ": Logic initialized (" + logicWatch.stopStr() + ").");
355
356 synchronized(mutex) {
357
358 if (logicThread != Thread.currentThread()) {
359 if (log.isLoggable(Level.SEVERE)) log.severe(name + ": Logic thread altered! Shutdown not called!");
360 return;
361 }
362 logicRunning.setFlag(true);
363 }
364
365
366 long sleepTime = 0;
367
368 try {
369 if (log.isLoggable(Level.FINER)) log.finer(name + ": waiting for the logic module started event.");
370 startedEvent.await();
371 if (log.isLoggable(Level.FINER)) log.finer(name + ": logic module started event received.");
372
373 synchronized(mutex) {
374 if (logicThread != Thread.currentThread()) {
375 throw new LogicThreadAlteredException(name, log, LogicModule.this);
376 }
377 }
378
379 logicLatch(name);
380
381 synchronized(mutex) {
382 if (logicThread != Thread.currentThread()) {
383 throw new LogicThreadAlteredException(name, log, LogicModule.this);
384 }
385 }
386
387 while(!Thread.currentThread().isInterrupted() &&
388 logicShouldRun
389 ) {
390 if (logicThread != Thread.currentThread()) throw new LogicThreadAlteredException(name, log, LogicModule.this);
391
392 if (logicShouldPause.getFlag()) {
393 synchronized(mutex) {
394 if (logicThread != Thread.currentThread()) throw new LogicThreadAlteredException(name, log, LogicModule.this);
395 logicPaused.setFlag(true);
396 }
397 if (log.isLoggable(Level.WARNING)) log.warning(name + ": Logic paused.");
398 logicShouldPause.waitFor(false);
399 synchronized(mutex) {
400 if (logicThread != Thread.currentThread()) throw new LogicThreadAlteredException(name, log, LogicModule.this);
401 logicPaused.setFlag(false);
402 }
403 if (log.isLoggable(Level.WARNING)) log.warning(name + ": Logic resumed.");
404 }
405
406 sleepTime = (long)logicPeriod - (System.currentTimeMillis() - lastLogicRun);
407 if (sleepTime > 0) {
408 if (log.isLoggable(Level.FINER)) log.finer(name + ": Sleeping for " + sleepTime + " ms.");
409 Thread.sleep(sleepTime);
410 }
411
412 if (logicThread != Thread.currentThread()) throw new LogicThreadAlteredException(name, log, LogicModule.this);
413
414 try {
415 if (log.isLoggable(Level.FINER)) log.finer(name + ": Logic iteration.");
416 logicWatch.start();
417 beforeLogic(name);
418 try {
419 lastLogicRun = System.currentTimeMillis();
420 if (shouldExecuteLogic()) {
421 if (firstLogic) {
422 logic.beforeFirstLogic();
423 firstLogic = false;
424 }
425 logic.logic();
426 } else {
427 if (log.isLoggable(Level.INFO)) log.info(name + ": Logic should not run now...");
428 }
429 } catch (Exception e1) {
430 try {
431 afterLogicException(name, e1);
432 } catch (Exception e2) {
433 if (log.isLoggable(Level.SEVERE)) log.severe(ExceptionToString.process(name + ": afterLogicException() exception.", e2));
434 }
435 throw e1;
436 }
437 afterLogic(name);
438 if (log.isLoggable(Level.FINE)) log.fine(name + ": Logic iteration finished (" + logicWatch.stopStr() + ").");
439 } catch (ComponentPausedException e) {
440 if (log.isLoggable(Level.INFO)) log.info(name + ": pausing the thread, received ComponentPausedException from " + e.getOrigin() + ".");
441 logicShouldPause.setFlag(true);
442 }
443
444 }
445
446 } catch (LogicThreadAlteredException e1) {
447 if (log.isLoggable(Level.WARNING)) log.warning(name + ": Logic thread has been altered, this one is not executed anymore.");
448
449 return;
450 } catch (InterruptedException e2) {
451 if (log.isLoggable(Level.WARNING)) log.warning(name + ": Interrupted!");
452 } catch (PogamutInterruptedException e3) {
453 if (log.isLoggable(Level.WARNING)) log.warning(name + ": Interrupted!");
454 } catch (ComponentNotRunningException e5) {
455 log.log(log.getLevel(), name + ": stopping the thread, received ComponentNotRunningException from " + e5.getOrigin() + ".");
456 } catch (Exception e4) {
457 controller.fatalError(name + ": Logic iteration exception.", e4);
458 } finally {
459 synchronized(mutex) {
460 if (logicThread != Thread.currentThread()) {
461 return;
462 }
463 try {
464 if (log.isLoggable(Level.FINE)) log.fine(name + ": Shutting down the logic.");
465 logicWatch.start();
466 logic.logicShutdown();
467 if (log.isLoggable(Level.INFO)) log.info(name + ": Logic shutdown (" + logicWatch.stopStr() + ").");
468 } catch (Exception e) {
469 controller.fatalError(name + ": Logic shutdown exception.", e);
470 } finally {
471 if (logicThread == Thread.currentThread()) {
472 clearLogicRunningVars();
473 logicThread = null;
474 }
475 if (log.isLoggable(Level.WARNING)) log.warning(name + ": Logic thread stopped.");
476 }
477 }
478 }
479 }
480
481 }
482
483 }