1 package cz.cuni.amis.pogamut.base.agent.module;
2
3 import java.util.logging.Level;
4 import java.util.logging.Logger;
5
6 import com.google.inject.Inject;
7
8 import cz.cuni.amis.pogamut.base.agent.IAgent;
9 import cz.cuni.amis.pogamut.base.agent.exceptions.AgentException;
10 import cz.cuni.amis.pogamut.base.agent.module.exception.LogicThreadAlteredException;
11 import cz.cuni.amis.pogamut.base.component.bus.event.EventFilter;
12 import cz.cuni.amis.pogamut.base.component.bus.event.IStartedEvent;
13 import cz.cuni.amis.pogamut.base.component.bus.event.WaitForEvent;
14 import cz.cuni.amis.pogamut.base.component.bus.exception.ComponentNotRunningException;
15 import cz.cuni.amis.pogamut.base.component.bus.exception.ComponentPausedException;
16 import cz.cuni.amis.pogamut.base.component.controller.ComponentDependencies;
17 import cz.cuni.amis.pogamut.base.component.controller.ComponentDependencyType;
18 import cz.cuni.amis.pogamut.base.component.controller.ComponentState;
19 import cz.cuni.amis.pogamut.base.component.exception.ComponentCantPauseException;
20 import cz.cuni.amis.pogamut.base.component.exception.ComponentCantResumeException;
21 import cz.cuni.amis.pogamut.base.component.exception.ComponentCantStartException;
22 import cz.cuni.amis.pogamut.base.component.exception.ComponentCantStopException;
23 import cz.cuni.amis.pogamut.base.utils.guice.AgentScoped;
24 import cz.cuni.amis.utils.Const;
25 import cz.cuni.amis.utils.ExceptionToString;
26 import cz.cuni.amis.utils.StopWatch;
27 import cz.cuni.amis.utils.exception.PogamutInterruptedException;
28 import cz.cuni.amis.utils.flag.Flag;
29
30 @AgentScoped
31 public class LogicModule<AGENT extends IAgent> extends AgentModule<AGENT> {
32
33 private static long THREAD_COUNTER = 0;
34
35 private static final long LOGIC_WAIT_TIME_PLUS_MILLIS = 1000;
36
37
38
39
40 public static final long MIN_LOGIC_PERIOD_MILLIS = 1;
41
42
43
44
45 public static final double MAX_LOGIC_FREQUENCY = 1000 / MIN_LOGIC_PERIOD_MILLIS;
46
47
48
49
50 public static final long MAX_LOGIC_PERIOD_MILLIS = 100000000;
51
52
53
54
55 public static final double MIN_LOGIC_FREQUENCY = 1000 / MAX_LOGIC_PERIOD_MILLIS;
56
57 protected Object mutex = new Object();
58
59 protected IAgentLogic logic;
60
61 protected Thread logicThread = null;
62
63 protected boolean logicShouldRun = true;
64
65 protected Flag<Boolean> logicRunning = new Flag<Boolean>(false);
66
67 protected Flag<Boolean> logicShouldPause = new Flag<Boolean>(false);
68
69 protected Flag<Boolean> logicPaused = new Flag<Boolean>(false);
70
71 protected double logicFrequency = 10;
72
73 protected double logicPeriod = 100;
74
75 protected long lastLogicRun = 0;
76
77 protected Throwable logicException;
78
79 @Inject
80 public LogicModule(AGENT agent, IAgentLogic logic) {
81 this(agent, logic, null, new ComponentDependencies(ComponentDependencyType.STARTS_WITH).add(agent));
82 }
83
84 public LogicModule(AGENT agent, IAgentLogic logic, Logger log) {
85 this(agent, logic, log, new ComponentDependencies(ComponentDependencyType.STARTS_WITH).add(agent));
86 }
87
88 public LogicModule(AGENT agent, IAgentLogic logic, Logger log, ComponentDependencies dependencies) {
89 super(agent, log, dependencies);
90 this.logic = logic;
91 }
92
93
94
95
96 private void clearLogicRunningVars() {
97 logicShouldPause.setFlag(false);
98 logicPaused.setFlag(false);
99 logicRunning.setFlag(false);
100 logicShouldRun = true;
101 }
102
103 @Override
104 protected void start(boolean startPaused) throws AgentException {
105 super.start(startPaused);
106 synchronized(mutex) {
107 if (logicThread != null) {
108 if (log.isLoggable(Level.WARNING)) log.warning("Logic thread is not null! Sending interrupt and dropping the reference, possibly leaking resources.");
109 logicThread.interrupt();
110 logicThread = null;
111 }
112 long counter = THREAD_COUNTER++;
113 String name = agent.getName() + "'s logic (" + counter + ")";
114 clearLogicRunningVars();
115 logicThread = new Thread(new LogicRunner("Thread " + counter), agent.getName() + " logic");
116 }
117 if (startPaused) {
118 if (log.isLoggable(Level.WARNING)) log.fine("Starting logic thread in paused state (start paused requested).");
119 }
120 logicShouldPause.setFlag(startPaused);
121 if (log.isLoggable(Level.FINE)) log.fine("Starting logic thread.");
122 logicThread.start();
123 long waitTime = logic.getLogicInitializeTime() + LOGIC_WAIT_TIME_PLUS_MILLIS;
124 if (log.isLoggable(Level.INFO)) log.info("Waiting for the logic to initialize (" + waitTime + " ms).");
125 Boolean result = logicRunning.waitFor(waitTime, true);
126 if (!controller.inState(ComponentState.STARTING)) {
127 throw new ComponentCantStartException("Woke up, module state differs. It is not " + ComponentState.STARTING + " but " + controller.getState().getFlag() + ".", log, this);
128 }
129 if (result == null) {
130 throw new ComponentCantStartException("Logic initialization is taking too long, did you correctly specified initialize time via getInitializeTime() method?", log, this);
131 }
132 }
133
134 @Override
135 public void stop() {
136 super.stop();
137 if (Thread.currentThread() == this.logicThread) {
138 inThreadStopping();
139 } else {
140 logicShouldRun = false;
141 logicShouldPause.setFlag(false);
142 long waitTime = (long)logicPeriod + LOGIC_WAIT_TIME_PLUS_MILLIS;
143 if (log.isLoggable(Level.INFO)) log.info("Waiting for the logic to stop (" + waitTime + " ms).");
144 Boolean result = logicRunning.waitFor(waitTime, false);
145 if (result == null) {
146 throw new ComponentCantStopException("Logic thread is still running! Is your logic too cpu-demanding?", log, this);
147 }
148 }
149 }
150
151 @Override
152 protected void kill() {
153 super.kill();
154 if (Thread.currentThread() == this.logicThread) {
155 inThreadKilling();
156 } else {
157 logicShouldRun = false;
158 logicShouldPause.setFlag(false);
159 synchronized(mutex) {
160 if (logicThread == null) return;
161 }
162 long waitTime = (long)logicPeriod + LOGIC_WAIT_TIME_PLUS_MILLIS;
163 if (log.isLoggable(Level.INFO)) log.info("Waiting for the logic to stop (" + waitTime + " ms).");
164 Boolean result = logicRunning.waitFor(waitTime, false);
165 synchronized(mutex) {
166 if (logicThread == null) return;
167 if (result == null) if (log.isLoggable(Level.WARNING)) log.warning("Logic thread is still running, sending interrupt.");
168 else return;
169 logicThread.interrupt();
170 }
171 if (log.isLoggable(Level.INFO)) log.info("Waiting for the logic to stop (" + waitTime + " ms).");
172 result = logicRunning.waitFor(waitTime, false);
173 synchronized(mutex) {
174 if (logicThread == null) return;
175 if (result == null) if (log.isLoggable(Level.WARNING)) log.warning("Logic thread is still running, is your logic too much cpu demanding?");
176 }
177 }
178 }
179
180 @Override
181 protected void pause() {
182 super.pause();
183 if (Thread.currentThread() == logicThread) {
184 inThreadPausing();
185 } else {
186 logicShouldPause.setFlag(true);
187 long waitTime = (long)logicPeriod + LOGIC_WAIT_TIME_PLUS_MILLIS;
188 if (log.isLoggable(Level.INFO)) log.info("Waiting for the logic to pause (" + waitTime + " ms).");
189 Boolean result = logicPaused.waitFor(waitTime, true);
190 if (result == null) {
191 throw new ComponentCantPauseException("Logic is still running, is your logic cpu demanding too much?", log, this);
192 }
193 }
194 }
195
196 @Override
197 protected void resume() {
198 super.resume();
199 if (Thread.currentThread() == logicThread) {
200 inThreadResuming();
201 } else {
202 logicShouldPause.setFlag(false);
203 long waitTime = (long)logicPeriod + LOGIC_WAIT_TIME_PLUS_MILLIS;
204 if (log.isLoggable(Level.INFO)) log.info("Waiting for the logic to resume (" + waitTime + " ms).");
205 Boolean result = logicPaused.waitFor(waitTime, false);
206 if (result == null) {
207 throw new ComponentCantResumeException("Logic did not resumed.", log, this);
208 }
209 }
210 }
211
212 protected void inThreadStopping() {
213 inThreadWarning("Stopping", "stopped", "stop");
214 logicShouldRun = false;
215 logicShouldPause.setFlag(false);
216 }
217
218 protected void inThreadKilling() {
219 inThreadWarning("Killing", "killed", "kill");
220 logicShouldRun = false;
221 logicShouldPause.setFlag(false);
222 logicThread.interrupt();
223 }
224
225 protected void inThreadPausing() {
226 inThreadWarning("Pausing", "paused", "pause");
227 logicShouldPause.setFlag(true);
228 }
229
230 protected void inThreadResuming() {
231 inThreadWarning("Resuming", "resumed", "resume");
232 logicShouldPause.setFlag(false);
233 }
234
235 private void inThreadWarning(String str1, String str2, String str3) {
236 String warning = "In-Logic-Thread " + str1 + " happens. This occurs whenever the LogicModule is being " + str2 + " from within its own thread. While this may proceed as you have expected, it is unsupported operation with uncertain result." + Const.NEW_LINE
237 + "It is adviced to perform the troubling operation in different thread, e.g.:" + Const.NEW_LINE
238 + " new Thread(new Runnable() {" + Const.NEW_LINE
239 + " @Override" + Const.NEW_LINE
240 + " public void run() {" + Const.NEW_LINE
241 + " // do something that happens to " + str3 + " the logic module //" + Const.NEW_LINE
242 + " }" + Const.NEW_LINE
243 + " }).start();";
244 if (log.isLoggable(Level.WARNING)) log.warning(warning);
245 else if (log.isLoggable(Level.SEVERE)) log.severe(warning);
246 }
247
248
249
250
251 protected void beforeLogic(String threadName) {
252 }
253
254
255
256
257 protected void afterLogic(String threadName) {
258 }
259
260
261
262
263
264 protected void afterLogicException(String threadName, Throwable e) {
265 logicException = e;
266 }
267
268
269
270
271
272
273
274
275
276
277 protected boolean shouldExecuteLogic() {
278 return true;
279 }
280
281
282
283
284 protected void logicLatch(String threadName) {
285 }
286
287 public double getLogicPeriod() {
288 return logicPeriod;
289 }
290
291
292
293
294
295
296 public Throwable getLogicException(){
297 return logicException;
298 }
299
300 public double getLogicFrequency() {
301 return logicFrequency;
302 }
303
304 public void setMinLogicFrequency() {
305 this.logicPeriod = MAX_LOGIC_PERIOD_MILLIS;
306 this.logicFrequency = MIN_LOGIC_FREQUENCY;
307 }
308
309 public void setMaxLogicFrequency() {
310 this.logicPeriod = MIN_LOGIC_PERIOD_MILLIS;
311 this.logicFrequency = MAX_LOGIC_FREQUENCY;
312 }
313
314 public void setLogicFrequency(double frequency) {
315 this.logicFrequency = frequency;
316 if (this.logicFrequency <= MIN_LOGIC_FREQUENCY) {
317 this.logicPeriod = MAX_LOGIC_PERIOD_MILLIS;
318 this.logicFrequency = MIN_LOGIC_FREQUENCY;
319 } else
320 if (this.logicFrequency > MAX_LOGIC_FREQUENCY){
321 this.logicPeriod = MIN_LOGIC_PERIOD_MILLIS;
322 this.logicFrequency = MAX_LOGIC_FREQUENCY;
323 } else {
324 this.logicPeriod = 1000 / frequency;
325 this.logicFrequency = frequency;
326 }
327 }
328
329 private class LogicRunner implements Runnable {
330
331 private String name;
332
333 private WaitForEvent startedEvent = new WaitForEvent(eventBus, new EventFilter(IStartedEvent.class, getComponentId()));
334
335 private boolean firstLogic = true;
336
337 public LogicRunner(String name) {
338 if (name == null) name = "unnamed";
339 this.name = name;
340 }
341
342 @Override
343 public void run() {
344 if (log.isLoggable(Level.WARNING)) log.warning(name + ": Thread started.");
345
346 if (log.isLoggable(Level.FINE)) log.fine(name + ": Initializing logic.");
347 StopWatch logicWatch = new StopWatch();
348 logic.logicInitialize(LogicModule.this);
349 if (log.isLoggable(Level.INFO)) log.info(name + ": Logic initialized (" + logicWatch.stopStr() + ").");
350
351 synchronized(mutex) {
352
353 if (logicThread != Thread.currentThread()) {
354 if (log.isLoggable(Level.SEVERE)) log.severe(name + ": Logic thread altered! Shutdown not called!");
355 return;
356 }
357 logicRunning.setFlag(true);
358 }
359
360
361 long sleepTime = 0;
362
363 try {
364 if (log.isLoggable(Level.FINER)) log.finer(name + ": waiting for the logic module started event.");
365 startedEvent.await();
366 if (log.isLoggable(Level.FINER)) log.finer(name + ": logic module started event received.");
367
368 synchronized(mutex) {
369 if (logicThread != Thread.currentThread()) {
370 throw new LogicThreadAlteredException(name, log, LogicModule.this);
371 }
372 }
373
374 logicLatch(name);
375
376 synchronized(mutex) {
377 if (logicThread != Thread.currentThread()) {
378 throw new LogicThreadAlteredException(name, log, LogicModule.this);
379 }
380 }
381
382 while(!Thread.currentThread().isInterrupted() &&
383 logicShouldRun
384 ) {
385 if (logicThread != Thread.currentThread()) throw new LogicThreadAlteredException(name, log, LogicModule.this);
386
387 if (logicShouldPause.getFlag()) {
388 synchronized(mutex) {
389 if (logicThread != Thread.currentThread()) throw new LogicThreadAlteredException(name, log, LogicModule.this);
390 logicPaused.setFlag(true);
391 }
392 if (log.isLoggable(Level.WARNING)) log.warning(name + ": Logic paused.");
393 logicShouldPause.waitFor(false);
394 synchronized(mutex) {
395 if (logicThread != Thread.currentThread()) throw new LogicThreadAlteredException(name, log, LogicModule.this);
396 logicPaused.setFlag(false);
397 }
398 if (log.isLoggable(Level.WARNING)) log.warning(name + ": Logic resumed.");
399 }
400
401 sleepTime = (long)logicPeriod - (System.currentTimeMillis() - lastLogicRun);
402 if (sleepTime > 0) {
403 if (log.isLoggable(Level.FINER)) log.finer(name + ": Sleeping for " + sleepTime + " ms.");
404 Thread.sleep(sleepTime);
405 }
406
407 if (logicThread != Thread.currentThread()) throw new LogicThreadAlteredException(name, log, LogicModule.this);
408
409 try {
410 if (log.isLoggable(Level.FINER)) log.finer(name + ": Logic iteration.");
411 logicWatch.start();
412 beforeLogic(name);
413 try {
414 lastLogicRun = System.currentTimeMillis();
415 if (shouldExecuteLogic()) {
416 if (firstLogic) {
417 logic.beforeFirstLogic();
418 firstLogic = false;
419 }
420 logic.logic();
421 } else {
422 if (log.isLoggable(Level.INFO)) log.info(name + ": Logic should not run now...");
423 }
424 } catch (Exception e1) {
425 try {
426 afterLogicException(name, e1);
427 } catch (Exception e2) {
428 if (log.isLoggable(Level.SEVERE)) log.severe(ExceptionToString.process(name + ": afterLogicException() exception.", e2));
429 }
430 throw e1;
431 }
432 afterLogic(name);
433 if (log.isLoggable(Level.FINE)) log.fine(name + ": Logic iteration finished (" + logicWatch.stopStr() + ").");
434 } catch (ComponentPausedException e) {
435 if (log.isLoggable(Level.INFO)) log.info(name + ": pausing the thread, received ComponentPausedException from " + e.getOrigin() + ".");
436 logicShouldPause.setFlag(true);
437 }
438
439 }
440
441 } catch (LogicThreadAlteredException e1) {
442 if (log.isLoggable(Level.WARNING)) log.warning(name + ": Logic thread has been altered, this one is not executed anymore.");
443
444 return;
445 } catch (InterruptedException e2) {
446 if (log.isLoggable(Level.WARNING)) log.warning(name + ": Interrupted!");
447 } catch (PogamutInterruptedException e3) {
448 if (log.isLoggable(Level.WARNING)) log.warning(name + ": Interrupted!");
449 } catch (ComponentNotRunningException e5) {
450 log.log(log.getLevel(), name + ": stopping the thread, received ComponentNotRunningException from " + e5.getOrigin() + ".");
451 } catch (Exception e4) {
452 controller.fatalError(name + ": Logic iteration exception.", e4);
453 } finally {
454 synchronized(mutex) {
455 if (logicThread != Thread.currentThread()) {
456 return;
457 }
458 try {
459 if (log.isLoggable(Level.FINE)) log.fine(name + ": Shutting down the logic.");
460 logicWatch.start();
461 logic.logicShutdown();
462 if (log.isLoggable(Level.INFO)) log.info(name + ": Logic shutdown (" + logicWatch.stopStr() + ").");
463 } catch (Exception e) {
464 controller.fatalError(name + ": Logic shutdown exception.", e);
465 } finally {
466 if (logicThread == Thread.currentThread()) {
467 clearLogicRunningVars();
468 logicThread = null;
469 }
470 if (log.isLoggable(Level.WARNING)) log.warning(name + ": Logic thread stopped.");
471 }
472 }
473 }
474 }
475
476 }
477
478 }