1 package cz.cuni.amis.pogamut.base.agent.utils.runner.impl;
2
3 import java.util.ArrayList;
4 import java.util.List;
5 import java.util.concurrent.CountDownLatch;
6 import java.util.logging.Level;
7 import java.util.logging.Logger;
8
9 import cz.cuni.amis.pogamut.base.agent.IAgent;
10 import cz.cuni.amis.pogamut.base.agent.IAgentId;
11 import cz.cuni.amis.pogamut.base.agent.impl.AgentId;
12 import cz.cuni.amis.pogamut.base.agent.params.IAgentParameters;
13 import cz.cuni.amis.pogamut.base.agent.state.level0.IAgentState;
14 import cz.cuni.amis.pogamut.base.agent.state.level1.IAgentStateDown;
15 import cz.cuni.amis.pogamut.base.agent.state.level2.IAgentStateFailed;
16 import cz.cuni.amis.pogamut.base.agent.utils.runner.IAgentDescriptor;
17 import cz.cuni.amis.pogamut.base.agent.utils.runner.IAgentRunner;
18 import cz.cuni.amis.pogamut.base.agent.utils.runner.IMultipleAgentRunner;
19 import cz.cuni.amis.pogamut.base.factory.IAgentFactory;
20 import cz.cuni.amis.pogamut.base.factory.guice.GuiceAgentModule;
21 import cz.cuni.amis.pogamut.base.utils.Pogamut;
22 import cz.cuni.amis.pogamut.base.utils.logging.IAgentLogger;
23 import cz.cuni.amis.utils.exception.PogamutException;
24 import cz.cuni.amis.utils.exception.PogamutInterruptedException;
25 import cz.cuni.amis.utils.flag.FlagListener;
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61 public abstract class MultipleAgentRunner<AGENT extends IAgent, PARAMS extends IAgentParameters, MODULE extends GuiceAgentModule> implements IMultipleAgentRunner<AGENT, PARAMS, MODULE> {
62
63
64
65
66 private static long ID = 0;
67
68
69
70
71 private static Object idMutex = new Object();
72
73
74
75
76 protected Object mutex = new Object();
77
78
79
80
81
82
83 protected Logger log;
84
85
86
87
88 private boolean pausing = false;
89
90
91
92
93 protected Level defaultLogLevel = Level.WARNING;
94
95
96
97
98 protected boolean consoleLogging = true;
99
100
101
102
103
104
105
106
107 protected CountDownLatch latch;
108
109
110
111
112 protected List<AGENT> agents = null;
113
114
115
116
117 protected boolean killed = false;
118
119
120
121
122 protected boolean main = false;
123
124
125
126
127
128 protected FlagListener<IAgentState> listener = new FlagListener<IAgentState>() {
129 @Override
130 public void flagChanged(IAgentState changedValue) {
131 if (changedValue instanceof IAgentStateFailed) {
132 killAgents(agents);
133 } else
134 if (changedValue instanceof IAgentStateDown) {
135 latch.countDown();
136 }
137 }
138 };
139
140 public MultipleAgentRunner<AGENT, PARAMS, MODULE> setLog(Logger log) {
141 this.log = log;
142 return this;
143 }
144
145 public Logger getLog() {
146 return log;
147 }
148
149
150
151
152
153
154
155
156 public int getAgentCount(IAgentDescriptor<PARAMS,MODULE>... agentDescriptors) {
157 int result = 0;
158 for (IAgentDescriptor<PARAMS, MODULE> descriptor : agentDescriptors) {
159 result += descriptor.getCount();
160 }
161 return result;
162 }
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181 protected abstract IAgentParameters newDefaultAgentParameters();
182
183
184
185
186
187
188
189
190 protected abstract IAgentFactory newAgentFactory(MODULE agentModule);
191
192
193
194
195
196 public synchronized List<AGENT> startAgents(IAgentDescriptor<PARAMS,MODULE>... agentDescriptors) {
197 if (main) {
198 return startAgentsMain(agentDescriptors);
199 } else {
200 return startAgentsStandard(agentDescriptors);
201 }
202 }
203
204 @Override
205 public boolean isPausing() {
206 return pausing;
207 }
208
209 @Override
210 public synchronized MultipleAgentRunner<AGENT, PARAMS, MODULE> setPausing(boolean state) {
211 this.pausing = state;
212 return this;
213 }
214
215 @Override
216 public boolean isMain() {
217 return main;
218 }
219
220 @Override
221 public synchronized MultipleAgentRunner<AGENT, PARAMS, MODULE> setMain(boolean state) {
222 this.main = state;
223 return this;
224 }
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239 public MultipleAgentRunner<AGENT, PARAMS, MODULE> setLogLevel(Level logLevel) {
240 this.defaultLogLevel = logLevel;
241 return this;
242 }
243
244
245
246
247
248
249
250
251
252 public MultipleAgentRunner<AGENT, PARAMS, MODULE> setConsoleLogging(boolean enabled) {
253 this.consoleLogging = enabled;
254 return this;
255 }
256
257
258
259
260
261 protected List<AGENT> startAgentsStandard(IAgentDescriptor<PARAMS,MODULE>... agentDescriptors) {
262 int count = getAgentCount(agentDescriptors);
263 if (count == 0) return new ArrayList<AGENT>(0);
264
265 List<AGENT> result = new ArrayList<AGENT>(count);
266
267 try {
268
269 if (log != null && log.isLoggable(Level.FINE)) log.fine("Calling preInitHook()...");
270 preInitHook();
271
272 for (IAgentDescriptor<PARAMS,MODULE> descriptor : agentDescriptors) {
273 startAgentsStandard(descriptor, result);
274 }
275
276 if (isPausing()) {
277 if (log != null && log.isLoggable(Level.FINE)) log.fine("Calling preResumeHook()...");
278 preResumeHook(result);
279 for (AGENT agent : result) {
280 agent.resume();
281 }
282 }
283
284 if (log != null && log.isLoggable(Level.FINE)) log.fine("Calling postStartedHookCalled()...");
285 postStartedHook(result);
286
287 return result;
288
289 } catch (PogamutException e) {
290 killAgents(result);
291 throw e;
292 } catch (Exception e) {
293 killAgents(result);
294 throw new PogamutException("Agent's can't be started: " + e.getMessage(), e, this);
295 }
296 };
297
298 protected synchronized List<AGENT> startAgentsMain(IAgentDescriptor<PARAMS,MODULE>... agentDescriptors) {
299 int count = getAgentCount(agentDescriptors);
300 if (count == 0) return new ArrayList<AGENT>(0);
301
302 agents = new ArrayList<AGENT>(count);
303 latch = new CountDownLatch(count);
304 killed = false;
305
306 boolean pausingBehavior = isPausing();
307
308
309
310 if (log != null && log.isLoggable(Level.FINE)) log.fine("Calling preInitHook()...");
311 preInitHook();
312
313 for (IAgentDescriptor<PARAMS,MODULE> descriptor : agentDescriptors) {
314 if (killed) break;
315 startAgentsMain(descriptor, pausingBehavior, agents);
316 }
317
318 if (pausingBehavior) {
319 if (log != null && log.isLoggable(Level.FINE)) log.fine("Calling preResumeHook()...");
320 preResumeHook(agents);
321 for (AGENT agent : agents) {
322 if (killed) break;
323 agent.resume();
324 }
325 }
326
327 if (!killed) {
328 if (log != null && log.isLoggable(Level.FINE)) log.fine("Calling postStartedHookCalled()...");
329 postStartedHook(agents);
330
331 try {
332 latch.await();
333 } catch (InterruptedException e) {
334 throw new PogamutInterruptedException("Interrupted while waiting for the agents to finish their execution.", e, this);
335 }
336 }
337
338 if (killed) {
339 throw new PogamutException("Could not execute all agents due to an exception, see logs of respective agents.", this);
340 }
341
342 return agents;
343
344
345
346
347
348
349
350
351
352
353 }
354
355
356
357
358
359
360
361
362
363 protected void startAgentsStandard(IAgentDescriptor<PARAMS,MODULE> agentDescriptor, List<AGENT> result) {
364 if (agentDescriptor == null || agentDescriptor.getCount() == 0) return;
365
366 IAgentFactory<AGENT, PARAMS> agentFactory = newAgentFactory(agentDescriptor.getAgentModule());
367
368 PARAMS[] agentParams = agentDescriptor.getAgentParameters();
369
370 for (int i = 0; i < agentDescriptor.getCount(); ++i) {
371 PARAMS params = null;
372 if (agentParams.length > i) {
373 params = agentParams[i];
374 params.assignDefaults(newDefaultAgentParameters());
375 } else {
376 params = (PARAMS) newDefaultAgentParameters();
377 }
378
379 AGENT agent = createAgentWithParams(agentFactory, params);
380
381 if (log != null && log.isLoggable(Level.FINE)) log.fine("Calling preStartHook()...");
382 preStartHook(agent);
383
384 startAgent(agent);
385
386 if (isPausing()) {
387 agent.pause();
388 }
389
390 if (log != null && log.isLoggable(Level.FINE)) log.fine("Calling postStartHook()...");
391 postStartHook(agent);
392
393 result.add(agent);
394 }
395 }
396
397
398
399
400
401 protected void startAgentsMain(IAgentDescriptor<PARAMS,MODULE> agentDescriptor, boolean pausingBehavior, List<AGENT> result) {
402 if (agentDescriptor == null || agentDescriptor.getCount() == 0) return;
403
404 if (killed) return;
405
406 IAgentFactory<AGENT, PARAMS> agentFactory = newAgentFactory(agentDescriptor.getAgentModule());
407
408 PARAMS[] agentParams = agentDescriptor.getAgentParameters();
409
410 for (int i = 0; i < agentDescriptor.getCount(); ++i) {
411 PARAMS params = null;
412 if (agentParams.length > i) {
413 params = agentParams[i];
414 params.assignDefaults(newDefaultAgentParameters());
415 } else {
416 params = (PARAMS) newDefaultAgentParameters();
417 }
418
419 if (killed) return;
420
421 AGENT agent = createAgentWithParams(agentFactory, params);
422 result.add(agent);
423
424 if (killed) return;
425
426 if (log != null && log.isLoggable(Level.FINE)) log.fine("Calling preStartHook()...");
427 preStartHook(agent);
428
429 if (killed) return;
430
431 startAgent(agent);
432
433 if (pausingBehavior) {
434 if (killed) return;
435 agent.pause();
436 }
437
438 if (killed) return;
439
440 if (log != null && log.isLoggable(Level.FINE)) log.fine("Calling postStartHook()...");
441 postStartHook(agent);
442 }
443 }
444
445
446
447
448
449
450
451 protected IAgentId newAgentId(String name) {
452 if (name == null) name = "Unnamed";
453 synchronized(idMutex) {
454 return new AgentId(name + (++ID));
455 }
456 }
457
458
459
460
461
462
463
464
465
466 protected AGENT createAgentWithParams(IAgentFactory<AGENT, PARAMS> factory, PARAMS params) {
467 if (log != null && log.isLoggable(Level.INFO)) log.info("Instantiating agent with id '" + params.getAgentId().getToken() + "'");
468 AGENT agent = factory.newAgent(params);
469 if (consoleLogging) {
470 agent.getLogger().addDefaultConsoleHandler();
471 }
472 if (defaultLogLevel != null) {
473 agent.getLogger().setLevel(defaultLogLevel);
474 }
475 return agent;
476 }
477
478
479
480
481
482
483
484 protected void startAgent(AGENT agent) {
485 if (log != null && log.isLoggable(Level.INFO)) log.info("Starting agent with id '" + agent.getComponentId().getToken() + "'");
486 if (main) {
487 agent.getState().addListener(listener);
488 }
489 agent.start();
490 }
491
492
493
494
495
496
497
498
499 protected void killAgents(List<AGENT> agents) {
500 if (agents == null) return;
501 synchronized(mutex) {
502 if (main) {
503 if (killed) return;
504 while (latch.getCount() > 0) {
505 latch.countDown();
506 }
507 killed = true;
508 }
509 for (AGENT agent : agents) {
510 if (agent != null) {
511 killAgent(agent);
512 }
513 }
514 }
515
516 }
517
518
519
520
521
522 protected void killAgent(AGENT agent) {
523 if (agent == null) return;
524 synchronized(mutex) {
525 if (main) {
526 agent.getState().removeListener(listener);
527 }
528 if (!(agent.getState().getFlag() instanceof IAgentStateDown)) {
529 if (log != null && log.isLoggable(Level.WARNING)) log.warning("Killing agent with id '" + agent.getComponentId().getToken() + "'");
530 try {
531 agent.kill();
532 } catch (Exception e) {
533 }
534 }
535 }
536 }
537
538
539
540
541 protected void preInitHook() throws PogamutException {
542 }
543
544
545
546
547
548
549 protected void preStartHook(AGENT agent) throws PogamutException {
550 }
551
552
553
554
555
556
557
558 protected void postStartHook(AGENT agent) throws PogamutException {
559 }
560
561
562
563
564
565
566
567 protected void preResumeHook(List<AGENT> agents) {
568 }
569
570
571
572
573
574
575 protected void postStartedHook(List<AGENT> agents) {
576 }
577
578 }