1 package cz.cuni.amis.pogamut.base.agent.utils.runner.impl;
2
3 import java.util.ArrayList;
4 import java.util.Arrays;
5 import java.util.List;
6 import java.util.concurrent.CountDownLatch;
7 import java.util.logging.Level;
8 import java.util.logging.Logger;
9
10 import cz.cuni.amis.pogamut.base.agent.IAgent;
11 import cz.cuni.amis.pogamut.base.agent.IAgentId;
12 import cz.cuni.amis.pogamut.base.agent.impl.AgentId;
13 import cz.cuni.amis.pogamut.base.agent.params.IAgentParameters;
14 import cz.cuni.amis.pogamut.base.agent.state.level0.IAgentState;
15 import cz.cuni.amis.pogamut.base.agent.state.level1.IAgentStateDown;
16 import cz.cuni.amis.pogamut.base.agent.state.level2.IAgentStateFailed;
17 import cz.cuni.amis.pogamut.base.agent.utils.runner.IAgentRunner;
18 import cz.cuni.amis.pogamut.base.factory.IAgentFactory;
19 import cz.cuni.amis.pogamut.base.utils.Pogamut;
20 import cz.cuni.amis.pogamut.base.utils.PogamutPlatform;
21 import cz.cuni.amis.pogamut.base.utils.logging.IAgentLogger;
22 import cz.cuni.amis.utils.NullCheck;
23 import cz.cuni.amis.utils.exception.PogamutException;
24 import cz.cuni.amis.utils.exception.PogamutInterruptedException;
25 import cz.cuni.amis.utils.flag.FlagListener;
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68 public abstract class AgentRunner<AGENT extends IAgent, PARAMS extends IAgentParameters> implements IAgentRunner<AGENT, PARAMS> {
69
70
71
72
73 private static long ID = 0;
74
75
76
77
78 private static Object idMutex = new Object();
79
80
81
82
83 protected Object mutex = new Object();
84
85
86
87
88 protected IAgentFactory<AGENT, PARAMS> factory;
89
90
91
92
93
94
95 protected Logger log;
96
97
98
99
100 private boolean pausing = false;
101
102
103
104
105 protected Level defaultLogLevel = Level.WARNING;
106
107
108
109
110 protected boolean consoleLogging = true;
111
112
113
114
115
116
117
118
119 protected CountDownLatch latch;
120
121
122
123
124 protected List<AGENT> agents = null;
125
126
127
128
129 protected boolean killed = false;
130
131
132
133
134 protected boolean main = false;
135
136
137
138
139 protected boolean killingAgents = false;
140
141
142
143
144 protected Object killingAgentsMutex = new Object();
145
146
147
148
149
150 protected FlagListener<IAgentState> listener = new FlagListener<IAgentState>() {
151 @Override
152 public void flagChanged(IAgentState changedValue) {
153 if (changedValue instanceof IAgentStateFailed) {
154 killAgents(agents);
155 } else
156 if (changedValue instanceof IAgentStateDown) {
157 latch.countDown();
158 }
159 }
160 };
161
162
163
164
165
166
167 public AgentRunner(IAgentFactory<AGENT, PARAMS> factory) {
168 this.factory = factory;
169 NullCheck.check(this.factory, "factory");
170 }
171
172 public Logger getLog() {
173 return log;
174 }
175
176 public AgentRunner<AGENT, PARAMS> setLog(Logger log) {
177 this.log = log;
178 return this;
179 }
180
181
182
183
184
185 @Override
186 public synchronized AGENT startAgent() throws PogamutException {
187 List<AGENT> agent;
188 if (main) {
189 agent = startAgentWithParamsMain(false, (PARAMS) newDefaultAgentParameters());
190 } else {
191 agent = startAgentWithParams(false, (PARAMS) newDefaultAgentParameters());
192 }
193 return agent.get(0);
194 }
195
196 @Override
197 public synchronized List<AGENT> startAgents(int count) throws PogamutException {
198 PARAMS[] params = (PARAMS[]) new IAgentParameters[count];
199 for (int i = 0; i < params.length; ++i) params[i] = (PARAMS) newDefaultAgentParameters();
200 if (main) {
201 return startAgentWithParamsMain(false, params);
202 } else {
203 return startAgentWithParams(false, params);
204 }
205 }
206
207 @Override
208 public synchronized List<AGENT> startAgents(PARAMS... agentParameters) throws PogamutException {
209 if (main) {
210 return startAgentWithParamsMain(true, agentParameters);
211 } else {
212 return startAgentWithParams(true, agentParameters);
213 }
214 }
215
216 @Override
217 public boolean isPausing() {
218 return pausing;
219 }
220
221 @Override
222 public synchronized AgentRunner<AGENT, PARAMS> setPausing(boolean state) {
223 this.pausing = state;
224 return this;
225 }
226
227 @Override
228 public boolean isMain() {
229 return main;
230 }
231
232 @Override
233 public synchronized AgentRunner<AGENT, PARAMS> setMain(boolean state) {
234 this.main = state;
235 return this;
236 }
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251 public AgentRunner<AGENT, PARAMS> setLogLevel(Level logLevel) {
252 this.defaultLogLevel = logLevel;
253 return this;
254 }
255
256
257
258
259
260
261
262
263
264 public AgentRunner<AGENT, PARAMS> setConsoleLogging(boolean enabled) {
265 this.consoleLogging = enabled;
266 return this;
267 }
268
269
270
271
272
273
274
275
276
277
278
279
280
281 protected List<AGENT> startAgentWithParams(boolean fillDefaults, PARAMS... params) {
282 if (params == null || params.length == 0) return new ArrayList<AGENT>(0);
283 List<AGENT> result = new ArrayList<AGENT>(params.length);
284
285 boolean pausingBehavior = isPausing();
286
287 try {
288 if (log != null && log.isLoggable(Level.FINE)) log.fine("Calling preInitHook()...");
289 preInitHook();
290
291 for (int i = 0; i < params.length; ++i) {
292 if (fillDefaults) {
293 params[i].assignDefaults(newDefaultAgentParameters());
294 }
295 AGENT agent = createAgentWithParams(params[i]);
296
297 if (log != null && log.isLoggable(Level.FINE)) log.fine("Calling preStartHook()...");
298 preStartHook(agent);
299
300 startAgent(agent);
301
302 if (pausingBehavior) {
303 agent.pause();
304 }
305
306 if (log != null && log.isLoggable(Level.FINE)) log.fine("Calling postStartHook()...");
307 postStartHook(agent);
308
309 result.add(agent);
310 }
311
312 if (pausingBehavior) {
313 if (log != null && log.isLoggable(Level.FINE)) log.fine("Calling preResumeHook()...");
314 preResumeHook(result);
315 for (AGENT agent : result) {
316 agent.resume();
317 }
318 }
319
320 if (log != null && log.isLoggable(Level.FINE)) log.fine("Calling postStartedHookCalled()...");
321 postStartedHook(result);
322 } catch (PogamutException e) {
323 killAgents(result);
324 throw e;
325 } catch (Exception e) {
326 killAgents(result);
327 throw new PogamutException("Agent's can't be started: " + e.getMessage(), e, this);
328 }
329
330 return result;
331 }
332
333
334
335
336
337 protected List<AGENT> startAgentWithParamsMain(boolean fillDefaults, PARAMS... params) {
338 if (params == null || params.length == 0) return new ArrayList<AGENT>(0);
339 latch = new CountDownLatch(params.length);
340
341 agents = new ArrayList<AGENT>(params.length);
342 killed = false;
343
344 boolean pausingBehavior = isPausing();
345
346 try {
347 if (log != null && log.isLoggable(Level.FINE)) log.fine("Calling preInitHook()...");
348 preInitHook();
349
350 for (int i = 0; i < params.length; ++i) {
351 if (killed) break;
352
353 if (fillDefaults) {
354 params[i].assignDefaults(newDefaultAgentParameters());
355 }
356 AGENT agent = createAgentWithParams(params[i]);
357
358 if (killed) break;
359
360 if (log != null && log.isLoggable(Level.FINE)) log.fine("Calling preStartHook()...");
361 preStartHook(agent);
362
363 if (killed) break;
364
365 startAgent(agent);
366
367 if (killed) {
368 killAgent(agent);
369 break;
370 }
371
372 if (pausingBehavior) {
373 agent.pause();
374 }
375
376 if (killed) {
377 killAgent(agent);
378 break;
379 }
380
381 if (log != null && log.isLoggable(Level.FINE)) log.fine("Calling postStartHook()...");
382 postStartHook(agent);
383
384 if (killed) {
385 killAgent(agent);
386 break;
387 }
388
389 synchronized(mutex) {
390 if (killed) {
391 killAgent(agent);
392 break;
393 }
394 agents.add(agent);
395 }
396 }
397
398 if (!killed) {
399 if (pausingBehavior) {
400 if (log != null && log.isLoggable(Level.FINE)) log.fine("Calling preResumeHook()...");
401 preResumeHook(agents);
402 for (AGENT agent : agents) {
403 agent.resume();
404 }
405 }
406 if (!killed) {
407 if (log != null && log.isLoggable(Level.FINE)) log.fine("Calling postStartedHookCalled()...");
408 postStartedHook(agents);
409 }
410
411 if (!killed) {
412 try {
413 latch.await();
414 } catch (InterruptedException e) {
415 throw new PogamutInterruptedException("Interrupted while waiting for the agents to finish their execution.", e, this);
416 }
417 }
418 }
419
420 if (killed) {
421 throw new PogamutException("Could not execute all agents due to an exception, check logs of respective agents.", this);
422 }
423
424 return agents;
425
426 } catch (PogamutException e) {
427 killAgents(agents);
428 throw e;
429 } catch (Exception e) {
430 killAgents(agents);
431 throw new PogamutException("Agents can't be started: " + e.getMessage(), e, this);
432 } finally {
433 Pogamut.getPlatform().close();
434 }
435 };
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450 protected abstract IAgentParameters newDefaultAgentParameters();
451
452
453
454
455
456
457
458 protected IAgentId newAgentId(String name) {
459 if (name == null) name = "Unnamed";
460 synchronized(idMutex) {
461 return new AgentId(name + (++ID));
462 }
463 }
464
465
466
467
468
469 protected void fillInDefaults(PARAMS params) {
470 params.assignDefaults(newDefaultAgentParameters());
471 }
472
473
474
475
476
477
478 protected void fillInDefaults(PARAMS[] paramsArray) {
479 for (PARAMS params : paramsArray) {
480 params.assignDefaults(newDefaultAgentParameters());
481 }
482 }
483
484
485
486
487
488
489
490
491
492 protected AGENT createAgentWithParams(PARAMS params) {
493 if (log != null && log.isLoggable(Level.INFO)) log.info("Instantiating agent with id '" + params.getAgentId().getToken() + "'");
494 AGENT agent = factory.newAgent(params);
495 if (consoleLogging) {
496 agent.getLogger().addDefaultConsoleHandler();
497 }
498 if (defaultLogLevel != null) {
499 agent.getLogger().setLevel(defaultLogLevel);
500 }
501 return agent;
502 }
503
504
505
506
507
508
509
510 protected void startAgent(AGENT agent) {
511 if (main) {
512 agent.getState().addListener(listener);
513 }
514 if (log != null && log.isLoggable(Level.INFO)) log.info("Starting agent with id '" + agent.getComponentId().getToken() + "'");
515 agent.start();
516 }
517
518
519
520
521
522
523
524
525 protected void killAgents(List<AGENT> agents) {
526 synchronized(killingAgentsMutex) {
527 if (killingAgents) return;
528 killingAgents = true;
529 }
530
531 try {
532 synchronized(mutex) {
533 if (main) {
534 if (killed) return;
535 if (agents == null) return;
536 while (latch.getCount() > 0) {
537 latch.countDown();
538 }
539 killed = true;
540 }
541 if (agents == null) return;
542 for (AGENT agent : agents) {
543 if (agent != null) {
544 killAgent(agent);
545 }
546 }
547 }
548 } finally {
549 killingAgents = false;
550 }
551 }
552
553
554
555
556
557 protected void killAgent(AGENT agent) {
558 if (agent == null) return;
559 synchronized(mutex) {
560 if (main) {
561 agent.getState().removeListener(listener);
562 }
563 if (!(agent.getState().getFlag() instanceof IAgentStateDown)) {
564 if (log != null && log.isLoggable(Level.WARNING)) log.warning("Killing agent with id '" + agent.getComponentId().getToken() + "'");
565 try {
566 agent.kill();
567 } catch (Exception e) {
568 }
569 }
570 }
571 }
572
573
574
575
576
577
578 protected void preInitHook() throws PogamutException {
579 }
580
581
582
583
584
585
586
587
588
589 protected void preStartHook(AGENT agent) throws PogamutException {
590 }
591
592
593
594
595
596
597
598
599
600
601 protected void postStartHook(AGENT agent) throws PogamutException {
602 }
603
604
605
606
607
608
609
610
611
612 protected void preResumeHook(List<AGENT> agents) {
613 }
614
615
616
617
618
619
620
621
622
623 protected void postStartedHook(List<AGENT> agents) {
624 }
625
626 }