View Javadoc

1   package cz.cuni.amis.pogamut.base.agent.utils.runner.impl;
2   
3   import java.util.ArrayList;
4   import java.util.Arrays;
5   import java.util.List;
6   import java.util.concurrent.CountDownLatch;
7   import java.util.logging.Level;
8   import java.util.logging.Logger;
9   
10  import cz.cuni.amis.pogamut.base.agent.IAgent;
11  import cz.cuni.amis.pogamut.base.agent.IAgentId;
12  import cz.cuni.amis.pogamut.base.agent.impl.AgentId;
13  import cz.cuni.amis.pogamut.base.agent.params.IAgentParameters;
14  import cz.cuni.amis.pogamut.base.agent.state.level0.IAgentState;
15  import cz.cuni.amis.pogamut.base.agent.state.level1.IAgentStateDown;
16  import cz.cuni.amis.pogamut.base.agent.state.level2.IAgentStateFailed;
17  import cz.cuni.amis.pogamut.base.agent.utils.runner.IAgentRunner;
18  import cz.cuni.amis.pogamut.base.factory.IAgentFactory;
19  import cz.cuni.amis.pogamut.base.utils.Pogamut;
20  import cz.cuni.amis.pogamut.base.utils.PogamutPlatform;
21  import cz.cuni.amis.pogamut.base.utils.logging.IAgentLogger;
22  import cz.cuni.amis.utils.NullCheck;
23  import cz.cuni.amis.utils.exception.PogamutException;
24  import cz.cuni.amis.utils.exception.PogamutInterruptedException;
25  import cz.cuni.amis.utils.flag.FlagListener;
26  
27  /**
28   * <b>BASIC USAGE</b>
29   * <p><p>
30   * Class used for starting the agent with certain default parameters. This class can't be used alone
31   * as it is abstract. That's because the GaviaLib does not provide any concrete agent implementation
32   * you may instance. Instead, the GaviaLib must be used together with some environment-Pogamut bridge
33   * that defines a concrete {@link IAgent} with concrete {@link IAgentParameters}.
34   * <p><p>
35   * The class provides start-synchronization behavior of respective agents via {@link AgentRunner#setPausing(boolean)}.
36   * For more information see {@link IAgentRunner#setPausing(boolean)}. 
37   * <p><p>
38   * Note that the class also provides you with hook-methods that can be utilized to additionally configure
39   * agent instances as they are created and started. These are {@link AgentRunner#preInitHook()}, {@link AgentRunner#preStartHook(IAgent)},
40   * {@link AgentRunner#preResumeHook(IAgent[])}, {@link AgentRunner#postStartHook(IAgent)} and {@link AgentRunner#postStartedHook(IAgent[])}.
41   * <p><p>
42   * This class is (almost complete) implementation that can instantiate and start one or multiple agents (of the same
43   * class). The only thing that is left to be implemented is {@link AgentRunner#newDefaultAgentParameters()} that
44   * are used to {@link IAgentParameters#assignDefaults(IAgentParameters)} into user provided parameters (if any of
45   * are provided).
46   * <p><p>
47   * Additional features:
48   * <ul>
49   * <li>{@link AgentRunner#setLogLevel(Level)} - allows you to set default logging level for the newly created agent (default {@link Level#WARNING})</li>
50   * <li>{@link AgentRunner#setConsoleLogging(boolean)} - allows you to attach default console logger (via {@link IAgentLogger#addDefaultConsoleHandler()})</li>
51   * </ul>
52   * <p><p>
53   * This runner is based on the {@link IAgentFactory} interface that is utilized to create new instances. It is advised
54   * that concrete agent runners hides this fact from the user instantiating the factory themselves so the user
55   * must not dive deep into GaviaLib architecture.
56   * <p><p>
57   * <b>USING {@link AgentRunner} FROM THE MAIN METHOD</b>
58   * <p><p>
59   * There is one problem using Pogamut agents that comes from the decision to use JMX for the agents management.
60   * If you start the agent in the main method and terminates it at some point in time - the JVM will not exit. That's
61   * because of the rmi registry daemon thread that will hang up in the air preventing JVM from termination. This can
62   * be handled by calling {@link PogamutPlatform#close()} in the end, that shuts down the rmi registry.
63   * <p><p>
64   * If you happen to need to use {@link AgentRunner} from the main method, do not forget to call {@link AgentRunner#setMain(boolean)} with param 'true'.
65   * 
66   * @author Jimmy
67   */
68  public abstract class AgentRunner<AGENT extends IAgent, PARAMS extends IAgentParameters> implements IAgentRunner<AGENT, PARAMS> {
69  
70  	/**
71  	 * Used to uniquely number all agents across all {@link AgentRunner} instances.
72  	 */
73  	private static long ID = 0;
74  	
75  	/**
76  	 * Mutex we're synchronizing on when accessing {@link AgentRunner#ID}.
77  	 */
78  	private static Object idMutex = new Object();
79  	
80  	/**
81  	 * Mutex that synchronize killing of agents due to a failure.
82  	 */
83  	protected Object mutex = new Object();
84  	
85  	/**
86  	 * Used to instantiate new agents.
87  	 */
88      protected IAgentFactory<AGENT, PARAMS> factory;
89      
90      /**
91       * Use to log stuff.
92       * <p><p>
93       * WARNING: may be null! Always check it before logging!
94       */
95  	protected Logger log;
96  	
97  	/**
98  	 * Whether the pausing feature is enables, see {@link AgentRunner#setPausing(boolean)}.
99  	 */
100 	private boolean pausing = false;
101 	
102 	/**
103 	 * Default log level that is set to the agent after its instantiation.
104 	 */
105 	protected Level defaultLogLevel = Level.WARNING;
106 	
107 	/**
108 	 * Whether the console logging is enabled as default. (Default is TRUE.)
109 	 */
110 	protected boolean consoleLogging = true;
111 
112 	////
113 	// FIELDS USED MAINLY FOR 'setMain(true)' FUNCTIONALITY
114 	////
115 	
116 	/**
117      * Latch where we're awaiting till all agents finishes.
118      */
119     protected CountDownLatch latch;
120     
121     /**
122      * List of started agents.
123      */
124     protected List<AGENT> agents = null;
125     
126     /**
127      * Whether we had to kill all agents (i.e., some exception/failure has happened).
128      */
129     protected boolean killed = false;
130     
131     /**
132      * Whether we should provide 'main' feature.
133      */
134     protected boolean main = false;
135     
136     /**
137      * Whether we're currently killing all agents...
138      */
139     protected boolean killingAgents = false;
140     
141     /**
142      * Guards {@link AgentRunner#killingAgents} access.
143      */
144     protected Object killingAgentsMutex = new Object();
145     
146 	/**
147 	 * Listener that lowers the count on the {@link MainAgentRunner#latch} (if reaches zero, start method resumes and closes the Pogamut platform),
148 	 * and watches for the agent's failure ({@link MainAgentRunner#killAgents(IAgent[])} in this case).
149 	 */
150 	protected FlagListener<IAgentState> listener = new FlagListener<IAgentState>() {
151 		@Override
152 		public void flagChanged(IAgentState changedValue) {
153 			if (changedValue instanceof IAgentStateFailed) {
154 				killAgents(agents);
155 			} else
156 			if (changedValue instanceof IAgentStateDown) {
157 				latch.countDown();
158 			}
159 		}    	        	
160     };	
161     
162     /**
163 	 * The runner needs the 'factory' so it know how to construct (instantiate) new agents.
164 	 * 
165 	 * @param factory preconfigured factory that is going to be used to produce new instances of agents
166 	 */
167     public AgentRunner(IAgentFactory<AGENT, PARAMS> factory) {    	
168         this.factory = factory;
169         NullCheck.check(this.factory, "factory");        
170     }  
171     
172     public Logger getLog() {
173 		return log;
174 	}
175 
176 	public AgentRunner<AGENT, PARAMS> setLog(Logger log) {
177 		this.log = log;
178 		return this;
179 	}  
180     
181     // ----------------------
182     // INTERFACE IAgentRunner
183     // ----------------------
184   
185 	@Override
186     public synchronized AGENT startAgent() throws PogamutException {
187 		List<AGENT> agent;
188 		if (main) {
189 			agent = startAgentWithParamsMain(false, (PARAMS) newDefaultAgentParameters());
190 		} else {
191 			agent = startAgentWithParams(false, (PARAMS) newDefaultAgentParameters());
192 		}
193         return agent.get(0);
194     }
195     
196     @Override
197     public synchronized List<AGENT> startAgents(int count) throws PogamutException {
198     	PARAMS[] params = (PARAMS[]) new IAgentParameters[count];
199     	for (int i = 0; i < params.length; ++i) params[i] = (PARAMS) newDefaultAgentParameters();
200     	if (main) {
201 			return startAgentWithParamsMain(false, params);
202 		} else {
203 			return startAgentWithParams(false, params);
204 		}
205     }
206     
207     @Override
208 	public synchronized List<AGENT> startAgents(PARAMS... agentParameters) throws PogamutException {
209     	if (main) {
210     		return startAgentWithParamsMain(true, agentParameters);
211     	} else {
212     		return startAgentWithParams(true, agentParameters);
213     	}
214 	}
215     
216     @Override
217     public boolean isPausing() {
218     	return pausing;
219     }
220     
221     @Override
222     public synchronized AgentRunner<AGENT, PARAMS> setPausing(boolean state) {
223     	this.pausing = state;
224     	return this;
225     }
226     
227     @Override
228     public boolean isMain() {
229     	return main;
230     }
231     
232     @Override
233     public synchronized AgentRunner<AGENT, PARAMS> setMain(boolean state) {
234     	this.main = state;
235     	return this;
236     }
237     
238     // --------------------------
239     // ADDITIONAL UTILITY METHODS
240     // --------------------------
241     
242     /**
243      * Sets default logging level for newly created agents (default is {@link Level#WARNING}).
244      * <p><p>
245      * If set to null, no level is set as default to the agent logger.
246      * <p><p>
247      * This probably violates the way how logging should be set up, but is more transparent for beginners.
248      * 
249      * @return this instance
250      */
251     public AgentRunner<AGENT, PARAMS> setLogLevel(Level logLevel) {
252     	this.defaultLogLevel = logLevel;
253     	return this;
254     }
255     
256     /**
257      * Allows you to disable/enable default console logging for the agent. (Default is TRUE.)
258      * <p><p>
259      * This probably violates the way how logging should be set up, but is more transparent for beginners.
260      * 
261      * @param enabled
262      * @return
263      */
264     public AgentRunner<AGENT, PARAMS> setConsoleLogging(boolean enabled) {
265     	this.consoleLogging = enabled;
266     	return this;
267     }
268     
269     // --------------
270     // IMPLEMENTATION
271     // --------------
272     
273     /**
274      * This method should be internally used to create and start the batch of agent instances.
275      * <p><p>
276      * 
277      * @param fillDefaults whether the method should fill default values into the 'params'
278      * @param params
279      * @return
280      */
281     protected List<AGENT> startAgentWithParams(boolean fillDefaults, PARAMS... params) {
282     	if (params == null || params.length == 0) return new ArrayList<AGENT>(0);
283     	List<AGENT> result = new ArrayList<AGENT>(params.length);
284     	
285     	boolean pausingBehavior = isPausing();
286     	
287     	try {
288     		if (log != null && log.isLoggable(Level.FINE)) log.fine("Calling preInitHook()...");
289 	    	preInitHook();
290 	    	
291 	    	for (int i = 0; i < params.length; ++i) {
292 	    		if (fillDefaults) {
293 	    			params[i].assignDefaults(newDefaultAgentParameters());
294 	    		}
295 	    		AGENT agent = createAgentWithParams(params[i]);
296 	    		
297 	    		if (log != null && log.isLoggable(Level.FINE)) log.fine("Calling preStartHook()...");
298 	    		preStartHook(agent);
299 	    		
300 	    		startAgent(agent);
301 	    		
302 	    		if (pausingBehavior) {
303 	    			agent.pause();
304 	    		}
305 	    		
306 	    		if (log != null && log.isLoggable(Level.FINE)) log.fine("Calling postStartHook()...");
307 	    		postStartHook(agent);
308 	    		
309 	    		result.add(agent);
310 	    	}
311 	    	
312 	    	if (pausingBehavior) {
313 	    		if (log != null && log.isLoggable(Level.FINE)) log.fine("Calling preResumeHook()...");
314 	    		preResumeHook(result);
315 	    		for (AGENT agent : result) {
316 	    			agent.resume();
317 	    		} 
318 	    	}
319 	    		
320 	    	if (log != null && log.isLoggable(Level.FINE)) log.fine("Calling postStartedHookCalled()...");
321 	    	postStartedHook(result);
322     	} catch (PogamutException e) {
323     		killAgents(result);
324     		throw e;
325     	} catch (Exception e) {
326     		killAgents(result);
327     		throw new PogamutException("Agent's can't be started: " + e.getMessage(), e, this);
328     	}
329     	
330     	return result;
331     }
332     
333     /**
334      * E.g. {@link AgentRunner#startAgentWithParams(boolean, IAgentParameters[])} but 
335      * provides the blocking mechanism.
336 	 */
337 	protected List<AGENT> startAgentWithParamsMain(boolean fillDefaults, PARAMS... params) {
338 		if (params == null || params.length == 0) return new ArrayList<AGENT>(0);
339 		latch = new CountDownLatch(params.length);
340 		
341 		agents = new ArrayList<AGENT>(params.length);
342     	killed = false;
343 		
344 		boolean pausingBehavior = isPausing();
345     	
346     	try {
347     		if (log != null && log.isLoggable(Level.FINE)) log.fine("Calling preInitHook()...");
348 	    	preInitHook();
349 	    	
350 	    	for (int i = 0; i < params.length; ++i) {	    		
351 	    		if (killed) break;
352 	    		
353 	    		if (fillDefaults) {
354 	    			params[i].assignDefaults(newDefaultAgentParameters());
355 	    		}
356 	    		AGENT agent = createAgentWithParams(params[i]);
357 	    		
358 	    		if (killed) break;
359 	    		
360 	    		if (log != null && log.isLoggable(Level.FINE)) log.fine("Calling preStartHook()...");
361 	    		preStartHook(agent);
362 	    		
363 	    		if (killed) break;
364 	    		
365 	    		startAgent(agent);
366 	    		
367 	    		if (killed) {
368 	    			killAgent(agent);
369 	    			break;
370 	    		}
371 	    		
372 	    		if (pausingBehavior) {
373 	    			agent.pause();
374 	    		}
375 	    		
376 	    		if (killed) {
377 	    			killAgent(agent);
378 	    			break;
379 	    		}
380 	    		
381 	    		if (log != null && log.isLoggable(Level.FINE)) log.fine("Calling postStartHook()...");
382 	    		postStartHook(agent);
383 	    		
384 	    		if (killed) {
385 	    			killAgent(agent);
386 	    			break;
387 	    		}
388 	    		
389 	    		synchronized(mutex) {
390 	    			if (killed) {
391 	    				killAgent(agent);
392 	    				break;
393 	    			}
394 	    			agents.add(agent);
395 	    		}
396 	    	}
397 	    	
398 	    	if (!killed) {
399 		    	if (pausingBehavior) {
400 		    		if (log != null && log.isLoggable(Level.FINE)) log.fine("Calling preResumeHook()...");
401 		    		preResumeHook(agents);
402 		    		for (AGENT agent : agents) {
403 		    			agent.resume();
404 		    		} 
405 		    	}
406 		    	if (!killed) {	
407 		    		if (log != null && log.isLoggable(Level.FINE)) log.fine("Calling postStartedHookCalled()...");
408 		    		postStartedHook(agents);
409 		    	}
410 		    	
411 		    	if (!killed) {
412 		    		try {
413 		    			latch.await();
414 		    		} catch (InterruptedException e) {
415 		    			throw new PogamutInterruptedException("Interrupted while waiting for the agents to finish their execution.", e, this);
416 		    		}
417 		    	}
418 	    	}
419 	    	
420 	    	if (killed) {
421 				throw new PogamutException("Could not execute all agents due to an exception, check logs of respective agents.", this);
422 			}
423 	    	
424 	    	return agents;
425 	    	
426     	} catch (PogamutException e) {
427     		killAgents(agents);
428     		throw e;
429     	} catch (Exception e) {
430     		killAgents(agents);
431     		throw new PogamutException("Agents can't be started: " + e.getMessage(), e, this);
432     	} finally {   
433     		Pogamut.getPlatform().close();
434 		}		
435 	};
436     
437     /**
438      * Method that is called to provide another default parameters for newly created agents.
439      * <p><p>
440      * Note that it might be the case, that some parameters can't be shared between agent instances,
441      * thus you might always need to provide a new parameters. This decision is up to you as an implementor
442      * (that means you must understand how these parameters are used by the particular agent, fail-safe behaviour
443      * is to always provide a new one).
444      * <p><p>
445      * Notice that the method does not require you to provide parameters of the type 'PARAMS' allowing you
446      * to provide any params as defaults.
447      * 
448      * @return new default parameters
449      */
450     protected abstract IAgentParameters newDefaultAgentParameters();
451     
452     /**
453      * Creates new {@link AgentId} from the 'name' and unique number that is automatically generated
454      * from the {@link AgentRunner#ID}.
455      * 
456      * @param name
457      */
458     protected IAgentId newAgentId(String name) {
459     	if (name == null) name = "Unnamed";
460     	synchronized(idMutex) {
461     		return new AgentId(name + (++ID));
462     	}
463     }
464     
465     /**
466      * Fills defaults parameters into the 'params' by using {@link AgentRunner#newDefaultAgentParameters()}.
467      * @param params
468      */
469     protected void fillInDefaults(PARAMS params) {
470     	params.assignDefaults(newDefaultAgentParameters());
471     }
472     
473     /**
474      * Fills defaults parameters into every 'params' of the array by using {@link AgentRunner#newDefaultAgentParameters()},
475      * i.e., we're creating a new default parameters for every 'params' from the array.
476      * @param params
477      */
478     protected void fillInDefaults(PARAMS[] paramsArray) {
479     	for (PARAMS params : paramsArray) {
480     		params.assignDefaults(newDefaultAgentParameters());
481     	}
482     }
483     
484     /**
485      * Method that is used by {@link AgentRunner#startAgentWithParams(IAgentParameters[])} to instantiate new 
486      * agents. Uses {@link AgentRunner#factory} to do the job. It assumes the params were already configured 
487      * with defaults.
488      * 
489      * @param params
490      * @return
491      */
492     protected AGENT createAgentWithParams(PARAMS params) {
493     	if (log != null && log.isLoggable(Level.INFO)) log.info("Instantiating agent with id '" + params.getAgentId().getToken() + "'");
494     	AGENT agent = factory.newAgent(params);
495     	if (consoleLogging) {
496     		agent.getLogger().addDefaultConsoleHandler();
497     	}
498     	if (defaultLogLevel != null) {
499     		agent.getLogger().setLevel(defaultLogLevel);
500     	}    	
501     	return agent;
502     }
503     
504     /**
505      * Method that is used by {@link AgentRunner#startAgentWithParams(IAgentParameters[])} to start newly
506      * created agent.
507      * 
508      * @param agent
509      */
510     protected void startAgent(AGENT agent) {
511     	if (main) {
512     		agent.getState().addListener(listener);
513     	}
514     	if (log != null && log.isLoggable(Level.INFO)) log.info("Starting agent with id '" + agent.getComponentId().getToken() + "'");
515     	agent.start();
516     }
517     
518     /**
519      * This method is called whenever start/pause/resume of the single agent fails to clean up.
520      * <p><p>
521      * Recalls {@link AgentRunner#killAgent(IAgent)} for every non-null agent instance.
522      * 
523      * @param agents some array elements may be null!
524      */
525     protected void killAgents(List<AGENT> agents) {    	
526     	synchronized(killingAgentsMutex) {
527     		if (killingAgents) return;
528     		killingAgents = true;
529     	}
530     	// WE'RE ALONE HERE!
531     	try {
532 	    	synchronized(mutex) {
533 	    		if (main) {
534 	    			if (killed) return;
535 	    			if (agents == null) return;
536 	    			while (latch.getCount() > 0) {
537 	    				latch.countDown();
538 	    			}
539 	    			killed = true;
540 	    		}
541 	    		if (agents == null) return;
542 	        	for (AGENT agent : agents) {
543 	        		if (agent != null) {
544 	        			killAgent(agent);
545 	        		}
546 	        	}
547 	    	}
548     	} finally {
549     		killingAgents = false;
550     	}
551     }
552     
553     /**
554      * Kills a single agent instance, called during clean up when start/pause/resume of the agent fails.
555      * @param agent
556      */
557     protected void killAgent(AGENT agent) {
558     	if (agent == null) return;
559     	synchronized(mutex) {
560 			if (main) {
561 				agent.getState().removeListener(listener);
562 			}
563 			if (!(agent.getState().getFlag() instanceof IAgentStateDown)) {
564 				if (log != null && log.isLoggable(Level.WARNING)) log.warning("Killing agent with id '" + agent.getComponentId().getToken() + "'");
565 				try {
566 					agent.kill();
567 				} catch (Exception e) {    			
568 				}
569 			}
570     	}
571     }
572 	
573     /**
574      * Custom hook called before all the agents are going to be instantiated by the {@link AgentRunner#factory}.
575      * <p><p>
576      * May be utilized by the GaviaLib user to inject additional code into the runner.
577      */
578     protected void preInitHook() throws PogamutException {    	
579     }
580 
581     /**
582      * Custom hook called after the agent is instantiated by the {@link AgentRunner#factory} and before
583      * the {@link IAgent#start()} is called.
584      * <p><p>
585      * May be utilized by the GaviaLib user to inject additional code into the runner.
586      * 
587      * @param agent
588      */
589     protected void preStartHook(AGENT agent) throws PogamutException {
590     }
591     
592     /**
593      * Custom hook called after the agent is instantiated by the {@link AgentRunner#factory} and
594      * started with {@link IAgent#start()}.
595      * <p><p>
596      * May be utilized by the GaviaLib user to inject additional code into the runner.
597      * 
598      * @param agent
599      * @throws PogamutException
600      */
601     protected void postStartHook(AGENT agent) throws PogamutException {    	
602     }
603     
604     /**
605      * Custom hook called only iff {@link AgentRunner#isPausing()}. This method is called after all the agents have been instantiated by the {@link AgentRunner#factory}
606      * and before they are resumed by {@link IAgent#resume()}.
607      * <p><p>
608      * May be utilized by the GaviaLib user to inject additional code into the runner.
609      * 
610      * @param agents
611      */
612     protected void preResumeHook(List<AGENT> agents) {
613     }
614     
615     /**
616      * Custom hook called after all the agents have been instantiated by the {@link AgentRunner#factory}
617      * and started with {@link IAgent#start()}.
618      * <p><p>
619      * May be utilized by the GaviaLib user to inject additional code into the runner.
620      * 
621      * @param agents
622      */
623     protected void postStartedHook(List<AGENT> agents) {
624     }
625     
626 }