View Javadoc

1   package cz.cuni.amis.pogamut.base.agent.utils.runner.impl;
2   
3   import java.util.ArrayList;
4   import java.util.List;
5   import java.util.concurrent.CountDownLatch;
6   import java.util.logging.Level;
7   import java.util.logging.Logger;
8   
9   import cz.cuni.amis.pogamut.base.agent.IAgent;
10  import cz.cuni.amis.pogamut.base.agent.IAgentId;
11  import cz.cuni.amis.pogamut.base.agent.impl.AgentId;
12  import cz.cuni.amis.pogamut.base.agent.params.IAgentParameters;
13  import cz.cuni.amis.pogamut.base.agent.state.level0.IAgentState;
14  import cz.cuni.amis.pogamut.base.agent.state.level1.IAgentStateDown;
15  import cz.cuni.amis.pogamut.base.agent.state.level2.IAgentStateFailed;
16  import cz.cuni.amis.pogamut.base.agent.utils.runner.IAgentDescriptor;
17  import cz.cuni.amis.pogamut.base.agent.utils.runner.IAgentRunner;
18  import cz.cuni.amis.pogamut.base.agent.utils.runner.IMultipleAgentRunner;
19  import cz.cuni.amis.pogamut.base.factory.IAgentFactory;
20  import cz.cuni.amis.pogamut.base.factory.guice.GuiceAgentModule;
21  import cz.cuni.amis.pogamut.base.utils.Pogamut;
22  import cz.cuni.amis.pogamut.base.utils.logging.IAgentLogger;
23  import cz.cuni.amis.utils.exception.PogamutException;
24  import cz.cuni.amis.utils.exception.PogamutInterruptedException;
25  import cz.cuni.amis.utils.flag.FlagListener;
26  
27  /**
28   * Class used for starting the agent with certain default parameters. 
29   * <p><p>
30   * The class is similar to the {@link MultipleAgentRunner} but implements different interface ({@link IMultipleAgentRunner}),
31   * that is, it allows to start different classes of agents at once. In fact it uses {@link MultipleAgentRunner} to do the job.
32   * <p><p>
33   * 
34   * TODO!!!
35   * 
36   * Additional features:
37   * <ul>
38   * <li>{@link AgentRunner#setLogLevel(Level)} - allows you to set default logging level for the newly created agent (default {@link Level#WARNING})</li>
39   * <li>{@link AgentRunner#setConsoleLogging(boolean)} - allows you to attach default console logger (via {@link IAgentLogger#addDefaultConsoleHandler()})</li>
40   * </ul>
41  
42   * 
43   * The class provides start-synchronization behavior of respective agents via {@link MultipleAgentRunner#setPausing(boolean)}.
44   * For more information see {@link IAgentRunner#setPausing(boolean)}. 
45   * <p><p>
46   * Note that the class also provides you with hook-methods that can be utilized to additionally configure
47   * agent instances as they are created and started. These are {@link MultipleAgentRunner#preInitHook()}, {@link MultipleAgentRunner#preStartHook(IAgent)},
48   * {@link MultipleAgentRunner#preResumeHook(IAgent[])}, {@link MultipleAgentRunner#postStartHook(IAgent)} and {@link MultipleAgentRunner#postStartedHook(IAgent[])}.
49   * <p><p>
50   * This class is (almost complete) implementation that can instantiate and start one or multiple agents (of the same
51   * class). The only thing that is left to be implemented is {@link MultipleAgentRunner#newDefaultAgentParameters()} that
52   * are used to {@link IAgentParameters#assignDefaults(IAgentParameters)} into user provided parameters (if any of
53   * are provided).
54   * <p><p>
55   * This runner is based on the {@link IAgentFactory} interface that is utilized to create new instances. It is advised
56   * that concrete agent runners hides this fact from the user instantiating the factory themselves so the user
57   * must not dive deep into GaviaLib architecture.
58   * 
59   * @author Jimmy
60   */
61  public abstract class MultipleAgentRunner<AGENT extends IAgent, PARAMS extends IAgentParameters, MODULE extends GuiceAgentModule> implements IMultipleAgentRunner<AGENT, PARAMS, MODULE> {
62      
63  	/**
64  	 * Used to uniquely number all agents across all {@link MultipleAgentRunner} instances.
65  	 */
66  	private static long ID = 0;
67  	
68  	/**
69  	 * Mutex we're synchronizing on when accessing {@link MultipleAgentRunner#ID}.
70  	 */
71  	private static Object idMutex = new Object();
72  	
73  	/**
74  	 * Mutex that synchronize killing of agents due to a failure.
75  	 */
76  	protected Object mutex = new Object();
77  	
78      /**
79       * Use to log stuff.
80       * <p><p>
81       * WARNING: may be null! Always check it before logging!
82       */
83  	protected Logger log;
84  	
85  	/**
86  	 * Whether the pausing feature is enables, see {@link AgentRunner#setPausing(boolean)}.
87  	 */
88  	private boolean pausing = false;
89  	
90  	/**
91  	 * Default log level that is set to the agent after its instantiation.
92  	 */
93  	protected Level defaultLogLevel = Level.WARNING;
94  	
95  	/**
96  	 * Whether the console logging is enabled as default. (Default is TRUE.)
97  	 */
98  	protected boolean consoleLogging = true;
99  	
100 	////
101 	// FIELDS USED MAINLY FOR 'setMain(true)' FUNCTIONALITY
102 	////
103 	
104 	/**
105      * Latch where we're awaiting till all agents finishes.
106      */
107     protected CountDownLatch latch;
108     
109     /**
110      * List of started agents.
111      */
112     protected List<AGENT> agents = null;
113     
114     /**
115      * Whether we had to kill all agents (i.e., some exception/failure has happened).
116      */
117     protected boolean killed = false;
118     
119     /**
120      * Whether we should provide 'main' feature.
121      */
122     protected boolean main = false;
123     
124 	/**
125 	 * Listener that lowers the count on the {@link MultipleAgentRunner#latch} (if reaches zero, start method resumes and closes the Pogamut platform),
126 	 * and watches for the agent's failure ({@link MultipleAgentRunner#killAgents(IAgent[])} in this case).
127 	 */
128 	protected FlagListener<IAgentState> listener = new FlagListener<IAgentState>() {
129 		@Override
130 		public void flagChanged(IAgentState changedValue) {
131 			if (changedValue instanceof IAgentStateFailed) {
132 				killAgents(agents);
133 			} else
134 			if (changedValue instanceof IAgentStateDown) {
135 				latch.countDown();
136 			}
137 		}    	        	
138     };   
139 
140 	public MultipleAgentRunner<AGENT, PARAMS, MODULE> setLog(Logger log) {
141 		this.log = log;
142 		return this;
143 	}
144 	
145 	public Logger getLog() {
146 		return log;
147 	}
148 
149 	/**
150      * Sums all {@link IAgentDescriptor#getCount()}.
151      * 
152      * @param agentDescriptors
153      * 
154      * @return number of agents described by 'agentDescriptors'
155      */
156     public int getAgentCount(IAgentDescriptor<PARAMS,MODULE>... agentDescriptors) {
157     	int result = 0;
158     	for (IAgentDescriptor<PARAMS, MODULE> descriptor : agentDescriptors) {
159     		result += descriptor.getCount();
160     	}
161     	return result;
162     }
163     
164     // ---------
165     // ABSTRACTS
166     // ---------
167     
168     /**
169      * Method that is called to provide another default parameters for newly created agents.
170      * <p><p>
171      * Note that it might be the case, that some parameters can't be shared between agent instances,
172      * thus you might always need to provide a new parameters. This decision is up to you as an implementor
173      * (that means you must understand how these parameters are used by the particular agent, fail-safe behaviour
174      * is to always provide a new one).
175      * <p><p>
176      * Notice that the method does not require you to provide parameters of the type 'PARAMS' allowing you
177      * to provide any params as defaults.
178      * 
179      * @return new default parameters
180      */
181     protected abstract IAgentParameters newDefaultAgentParameters();
182     
183     /**
184      * Creates a new factory for the given 'agentModule'. Called from within {@link MultipleAgentRunner#startAgents(IAgentDescriptor, List)}
185      * to obtain a factory for the {@link IAgentDescriptor#getAgentModule()}.
186      * 
187      * @param module
188      * @return new factory configured with 'agentModule'
189      */
190     protected abstract IAgentFactory newAgentFactory(MODULE agentModule);
191     
192     // ------------------------------
193     // INTERFACE IMultipleAgentRunner
194     // ------------------------------
195     
196     public synchronized List<AGENT> startAgents(IAgentDescriptor<PARAMS,MODULE>... agentDescriptors) {
197     	if (main) {
198     		return startAgentsMain(agentDescriptors);
199     	} else {
200     		return startAgentsStandard(agentDescriptors);
201     	}
202     }
203     
204     @Override
205     public boolean isPausing() {
206     	return pausing;
207     }
208     
209     @Override
210     public synchronized MultipleAgentRunner<AGENT, PARAMS, MODULE> setPausing(boolean state) {
211     	this.pausing = state;
212     	return this;
213     }
214     
215     @Override
216     public boolean isMain() {
217     	return main;
218     }
219     
220     @Override
221     public synchronized MultipleAgentRunner<AGENT, PARAMS, MODULE> setMain(boolean state) {
222     	this.main = state;
223     	return this;
224     }
225     
226     // --------------------------
227     // ADDITIONAL UTILITY METHODS
228     // --------------------------
229     
230     /**
231      * Sets default logging level for newly created agents (default is {@link Level#WARNING}).
232      * <p><p>
233      * If set to null, no level is set as default to the agent logger.
234      * <p><p>
235      * This probably violates the way how logging should be set up, but is more transparent for beginners.
236      * 
237      * @return this instance
238      */
239     public MultipleAgentRunner<AGENT, PARAMS, MODULE> setLogLevel(Level logLevel) {
240     	this.defaultLogLevel = logLevel;
241     	return this;
242     }
243     
244     /**
245      * Allows you to disable/enable default console logging for the agent. (Default is TRUE.)
246      * <p><p>
247      * This probably violates the way how logging should be set up, but is more transparent for beginners.
248      * 
249      * @param enabled
250      * @return
251      */
252     public MultipleAgentRunner<AGENT, PARAMS, MODULE> setConsoleLogging(boolean enabled) {
253     	this.consoleLogging = enabled;
254     	return this;
255     }
256     
257     // --------------
258     // IMPLEMENTATION
259     // --------------
260     
261     protected List<AGENT> startAgentsStandard(IAgentDescriptor<PARAMS,MODULE>... agentDescriptors) {
262     	int count = getAgentCount(agentDescriptors);
263     	if (count == 0) return new ArrayList<AGENT>(0);
264     	
265     	List<AGENT> result = new ArrayList<AGENT>(count);    	
266     	
267     	try {
268     	
269 	    	if (log != null && log.isLoggable(Level.FINE)) log.fine("Calling preInitHook()...");
270 	    	preInitHook();
271 	    	
272 	    	for (IAgentDescriptor<PARAMS,MODULE> descriptor : agentDescriptors) {
273 	    		startAgentsStandard(descriptor, result);
274 	    	}
275 	    	
276 	    	if (isPausing()) {
277 	    		if (log != null && log.isLoggable(Level.FINE)) log.fine("Calling preResumeHook()...");
278 	    		preResumeHook(result);
279 	    		for (AGENT agent : result) {
280 	    			agent.resume();
281 	    		} 
282 	    	}
283 	    	
284 	    	if (log != null && log.isLoggable(Level.FINE)) log.fine("Calling postStartedHookCalled()...");
285 	    	postStartedHook(result);
286 	    	
287 	    	return result;
288 	    	
289     	}  catch (PogamutException e) {
290     		killAgents(result);
291     		throw e;
292     	} catch (Exception e) {
293     		killAgents(result);
294     		throw new PogamutException("Agent's can't be started: " + e.getMessage(), e, this);
295     	}
296     };
297     
298 	protected synchronized List<AGENT> startAgentsMain(IAgentDescriptor<PARAMS,MODULE>... agentDescriptors) {
299     	int count = getAgentCount(agentDescriptors);
300     	if (count == 0) return new ArrayList<AGENT>(0);
301     	
302     	agents = new ArrayList<AGENT>(count);
303     	latch = new CountDownLatch(count);
304     	killed = false;
305     	
306     	boolean pausingBehavior = isPausing();
307     	
308     	//try {
309     	
310 	    	if (log != null && log.isLoggable(Level.FINE)) log.fine("Calling preInitHook()...");
311 	    	preInitHook();
312 	    	
313 	    	for (IAgentDescriptor<PARAMS,MODULE> descriptor : agentDescriptors) {
314 	    		if (killed) break;
315 	    		startAgentsMain(descriptor, pausingBehavior, agents);
316 	    	}
317 	    	
318 	    	if (pausingBehavior) {
319 	    		if (log != null && log.isLoggable(Level.FINE)) log.fine("Calling preResumeHook()...");
320 	    		preResumeHook(agents);
321 	    		for (AGENT agent : agents) {
322 	    			if (killed) break;
323 	    			agent.resume();	    			
324 	    		} 
325 	    	}
326 	    	
327 	    	if (!killed) {
328 		    	if (log != null && log.isLoggable(Level.FINE)) log.fine("Calling postStartedHookCalled()...");
329 		    	postStartedHook(agents);
330 		    	
331 		    	try {
332 					latch.await();
333 				} catch (InterruptedException e) {
334 					throw new PogamutInterruptedException("Interrupted while waiting for the agents to finish their execution.", e, this);
335 				}
336 	    	}
337 	    	
338 	    	if (killed) {
339 	    		throw new PogamutException("Could not execute all agents due to an exception, see logs of respective agents.", this);
340 	    	}
341 			
342 	    	return agents;
343 	    	
344 //    	}  catch (PogamutException e) {
345 //    		killAgents(agents);
346 //    		throw e;
347 //    	} catch (Exception e) {
348 //    		killAgents(agents);
349 //    		throw new PogamutException("Agent's can't be started: " + e.getMessage(), e, this);
350 //    	} finally {
351 //    		Pogamut.getPlatform().close();    		
352 //    	}
353     }
354     
355     /**
356      * Starts all agents described by 'agentDescriptor', puts new agent instances into 'result'.
357      * <p><p>
358      * Does not catch exceptions (they are propagated by JVM as usual).
359      * 
360      * @param agentDescriptor
361      * @param result
362      */
363     protected void startAgentsStandard(IAgentDescriptor<PARAMS,MODULE> agentDescriptor, List<AGENT> result) {
364     	if (agentDescriptor == null || agentDescriptor.getCount() == 0) return;
365     	
366     	IAgentFactory<AGENT, PARAMS> agentFactory = newAgentFactory(agentDescriptor.getAgentModule());
367     	
368     	PARAMS[] agentParams = agentDescriptor.getAgentParameters();
369     	
370     	for (int i = 0; i < agentDescriptor.getCount(); ++i) {
371     		PARAMS params = null;
372     		if (agentParams.length > i) {
373     			params = agentParams[i];
374     			params.assignDefaults(newDefaultAgentParameters());
375     		} else {
376     			params = (PARAMS) newDefaultAgentParameters();
377     		}
378     		
379     		AGENT agent = createAgentWithParams(agentFactory, params);
380     		
381     		if (log != null && log.isLoggable(Level.FINE)) log.fine("Calling preStartHook()...");
382     		preStartHook(agent);
383     		
384     		startAgent(agent);
385     		
386     		if (isPausing()) {
387     			agent.pause();
388     		}
389     		
390     		if (log != null && log.isLoggable(Level.FINE)) log.fine("Calling postStartHook()...");
391     		postStartHook(agent);
392     		
393     		result.add(agent);
394     	}
395     }    
396     
397     /**
398 	 * Overridden to provide the blocking mechanism. (For more info see {@link AgentRunner#startAgentWithParams(boolean, IAgentParameters...)}.
399 	 * <p><p>
400 	 */
401 	protected void startAgentsMain(IAgentDescriptor<PARAMS,MODULE> agentDescriptor, boolean pausingBehavior, List<AGENT> result) {
402     	if (agentDescriptor == null || agentDescriptor.getCount() == 0) return;
403     	
404     	if (killed) return;
405     	
406     	IAgentFactory<AGENT, PARAMS> agentFactory = newAgentFactory(agentDescriptor.getAgentModule());
407     	
408     	PARAMS[] agentParams = agentDescriptor.getAgentParameters();
409     	
410     	for (int i = 0; i < agentDescriptor.getCount(); ++i) {
411     		PARAMS params = null;
412     		if (agentParams.length > i) {
413     			params = agentParams[i];
414     			params.assignDefaults(newDefaultAgentParameters());
415     		} else {
416     			params = (PARAMS) newDefaultAgentParameters();
417     		}
418     		
419     		if (killed) return;
420     		
421     		AGENT agent = createAgentWithParams(agentFactory, params);
422     		result.add(agent);
423     		
424     		if (killed) return;
425     		
426     		if (log != null && log.isLoggable(Level.FINE)) log.fine("Calling preStartHook()...");
427     		preStartHook(agent);
428     		
429     		if (killed) return;
430     		
431     		startAgent(agent);
432     		
433     		if (pausingBehavior) {
434     			if (killed) return;
435     			agent.pause();
436     		}
437     		
438     		if (killed) return;
439     		
440     		if (log != null && log.isLoggable(Level.FINE)) log.fine("Calling postStartHook()...");
441     		postStartHook(agent);
442     	}
443     }
444     
445     /**
446      * Creates new {@link AgentId} from the 'name' and unique number that is automatically generated
447      * from the {@link MultipleAgentRunner#ID}.
448      * 
449      * @param name
450      */
451     protected IAgentId newAgentId(String name) {
452     	if (name == null) name = "Unnamed";
453     	synchronized(idMutex) {
454     		return new AgentId(name + (++ID));
455     	}
456     }
457     
458     /**
459      * Method that is used by {@link MultipleAgentRunner#startAgentWithParams(IAgentParameters[])} to instantiate new 
460      * agents. Uses {@link MultipleAgentRunner#factory} to do the job. It assumes the params were already configured 
461      * with defaults.
462      * 
463      * @param params
464      * @return
465      */
466     protected AGENT createAgentWithParams(IAgentFactory<AGENT, PARAMS> factory, PARAMS params) {
467     	if (log != null && log.isLoggable(Level.INFO)) log.info("Instantiating agent with id '" + params.getAgentId().getToken() + "'");
468     	AGENT agent = factory.newAgent(params);
469     	if (consoleLogging) {
470     		agent.getLogger().addDefaultConsoleHandler();
471     	}
472     	if (defaultLogLevel != null) {
473     		agent.getLogger().setLevel(defaultLogLevel);
474     	}    	
475     	return agent;
476     }
477     
478     /**
479      * Method that is used by {@link MultipleAgentRunner#startAgentWithParams(IAgentParameters[])} to start newly
480      * created agent.
481      * 
482      * @param agent
483      */
484     protected void startAgent(AGENT agent) {
485     	if (log != null && log.isLoggable(Level.INFO)) log.info("Starting agent with id '" + agent.getComponentId().getToken() + "'");
486     	if (main) {
487     		agent.getState().addListener(listener);
488     	}
489     	agent.start();
490     }
491     
492     /**
493      * This method is called whenever start/pause/resume of the single agent fails to clean up.
494      * <p><p>
495      * Recalls {@link MultipleAgentRunner#killAgent(IAgent)} for every non-null agent instance.
496      * 
497      * @param agents some array elements may be null!
498      */
499     protected void killAgents(List<AGENT> agents) {
500     	if (agents == null) return;
501     	synchronized(mutex) {
502 			if (main) {
503 				if (killed) return;
504 				while (latch.getCount() > 0) {
505 					latch.countDown();
506 				}
507 				killed = true;
508 			}			
509 			for (AGENT agent : agents) {
510 	    		if (agent != null) {
511 	    			killAgent(agent);
512 	    		}
513 	    	}
514 		}
515     	
516     }
517     
518     /**
519      * Kills a single agent instance, called during clean up when start/pause/resume of the agent fails.
520      * @param agent
521      */
522     protected void killAgent(AGENT agent) {
523     	if (agent == null) return;
524     	synchronized(mutex) {
525     		if (main) {
526     			agent.getState().removeListener(listener);
527     		}
528     		if (!(agent.getState().getFlag() instanceof IAgentStateDown)) {
529         		if (log != null && log.isLoggable(Level.WARNING)) log.warning("Killing agent with id '" + agent.getComponentId().getToken() + "'");
530         		try {
531         			agent.kill();
532         		} catch (Exception e) {    			
533         		}
534         	}
535     	}    	
536     }
537 	
538     /**
539      * Custom hook called before all the agents are going to be instantiated.
540      */
541     protected void preInitHook() throws PogamutException {    	
542     }
543 
544     /**
545      * Custom hook called after the agent is instantiated  and before
546      * the {@link IAgent#start()} is called.
547      * @param agent
548      */
549     protected void preStartHook(AGENT agent) throws PogamutException {
550     }
551     
552     /**
553      * Custom hook called after the agent is instantiated and
554      * started with {@link IAgent#start()}.
555      * @param agent
556      * @throws PogamutException
557      */
558     protected void postStartHook(AGENT agent) throws PogamutException {    	
559     }
560     
561     /**
562      * Custom hook called only iff {@link MultipleAgentRunner#isPausing()}. 
563      * This method is called after all the agents have been instantiated by the {@link MultipleAgentRunner#factory}
564      * and resumed with {@link IAgent#resume()}.
565      * @param agents
566      */
567     protected void preResumeHook(List<AGENT> agents) {
568     }
569     
570     /**
571      * Custom hook called after all the agents have been instantiated by the {@link MultipleAgentRunner#factory}
572      * and started with {@link IAgent#start()}.
573      * @param agents
574      */
575     protected void postStartedHook(List<AGENT> agents) {
576     }
577 
578 }