View Javadoc

1   package cz.cuni.amis.pogamut.base.agent.module.comm;
2   
3   import java.util.ArrayList;
4   import java.util.HashMap;
5   import java.util.HashSet;
6   import java.util.List;
7   import java.util.Map;
8   import java.util.Set;
9   import java.util.concurrent.ConcurrentLinkedQueue;
10  import java.util.concurrent.LinkedBlockingQueue;
11  import java.util.concurrent.ThreadPoolExecutor;
12  import java.util.concurrent.TimeUnit;
13  import java.util.concurrent.locks.Lock;
14  import java.util.concurrent.locks.ReadWriteLock;
15  import java.util.concurrent.locks.ReentrantReadWriteLock;
16  import java.util.logging.Level;
17  import java.util.logging.Logger;
18  
19  import cz.cuni.amis.pogamut.base.agent.IAgentId;
20  import cz.cuni.amis.pogamut.base.agent.IObservingAgent;
21  import cz.cuni.amis.pogamut.base.agent.impl.AbstractAgent;
22  import cz.cuni.amis.pogamut.base.communication.translator.event.IWorldChangeEvent;
23  import cz.cuni.amis.pogamut.base.communication.worldview.IWorldView;
24  import cz.cuni.amis.pogamut.base.component.bus.exception.ComponentNotRunningException;
25  import cz.cuni.amis.pogamut.base.utils.PogamutPlatform;
26  import cz.cuni.amis.pogamut.base.utils.logging.LogCategory;
27  import cz.cuni.amis.utils.ExceptionToString;
28  import cz.cuni.amis.utils.Tuple2;
29  import cz.cuni.amis.utils.maps.HashMapSet;
30  
31  /**
32   * Simple support for inter-agent in-single-JVM communication.
33   * 
34   * This object can be used for setting up simple communication via {@link IWorldView}. 
35   * You can use {@link PogamutJVMComm#getInstance()} to obtain singleton-instance
36   * and register your agent to some channel using {@link PogamutJVMComm#registerAgent(IObservingAgent, int)}.
37   * DO NOT FORGET TO USE {@link PogamutJVMComm#unregisterAgent(IObservingAgent)} once your agent has finished its lifecycle,
38   * otherwise you would likely to leak memory.
39   * 
40   * Any agent may send events to registered agents at any time via {@link PogamutJVMComm#send(IWorldChangeEvent, int)}
41   * or {@link PogamutJVMComm#broadcast(IWorldChangeEvent)}. 
42   * 
43   * The object uses {@link IWorldView#notify(IWorldChangeEvent)} to propagate events to respective agents' worldviews.
44   *  
45   * @author Jimmy
46   */
47  public class PogamutJVMComm {
48  
49  	protected static Object instanceMutex = new Object();
50  	
51  	protected static PogamutJVMComm instance;
52  	
53  	protected static ConcurrentLinkedQueue<PogamutJVMComm> comms = new ConcurrentLinkedQueue<PogamutJVMComm>();
54  
55  	/**
56  	 * Getter for JVM singleton.
57  	 * @return
58  	 */
59  	public static PogamutJVMComm getInstance() {
60  		if (instance == null) {
61  			synchronized(instanceMutex) {
62  				if (instance == null) {
63  					instance = new PogamutJVMComm();
64  					((LogCategory)instance.getLog()).addConsoleHandler();
65  					instance.getLog().setLevel(Level.INFO);
66  				}
67  			}
68  		}
69  		return instance;
70  	}
71  	
72  	public static void platformClose() {
73  		while(comms.size() > 0) {
74  			comms.poll().destroy();
75  		}
76  	}
77  	
78  	/**
79  	 * If you wish to listen on all CHANNELS you can use this constant.
80  	 */
81  	public static final int ALL_CHANNELS = -1;
82  	
83  	/**
84  	 * {@link AbstractAgent#getComponentId()} maps to actual 'agent' registered + number of registration for a given agent.
85  	 */
86  	protected Map<IAgentId, Tuple2<IObservingAgent, Integer>> registeredAgents = new HashMap<IAgentId, Tuple2<IObservingAgent, Integer>>();
87  	
88  	/**
89  	 * Agents registered for "ALL_CHANNELS".
90  	 */
91  	protected Set<IObservingAgent> allChannels = new HashSet<IObservingAgent>();
92  	
93  	/**
94  	 * Agents registered for respective channels.
95  	 */
96  	protected HashMapSet<Integer, IObservingAgent> channels = new HashMapSet<Integer, IObservingAgent>();
97  	
98  	/**
99  	 * Mutex for reading/writing channels/allChannels and sending events.
100 	 */
101 	protected ReadWriteLock lock = new ReentrantReadWriteLock(true);
102 	
103 	protected Lock readLock = lock.readLock();
104 	
105 	protected Lock writeLock = lock.writeLock();
106 
107 	protected Logger log;		
108 	
109 	public PogamutJVMComm() {
110 		this(null);
111 	}
112 	
113 	/**
114 	 * @param log can be null, default will be provided
115 	 */
116 	public PogamutJVMComm(Logger log) {
117 		this.log = log;
118 		synchronized(comms) {
119 			if (this.log == null) {
120 				this.log = new LogCategory("AgentJVMComm" + comms.size());				
121 			}
122 			comms.add(this);
123 		}
124 	}
125 	
126 	public Logger getLog() {
127 		return log;
128 	}
129 	
130 	
131 	// ===========================
132 	// AGENT REGISTRATION TRACKING
133 	// ===========================
134 	
135 	/**
136 	 * @param agent
137 	 * @return how many times is 'agent' registered (after increase)
138 	 */
139 	protected int incRegisteredAgent(IObservingAgent agent) {
140 		synchronized(registeredAgents) {
141 			Tuple2<IObservingAgent, Integer> record = registeredAgents.get(agent.getComponentId());
142 			if (record == null) {
143 				record = new Tuple2<IObservingAgent, Integer>(agent, 0);
144 				registeredAgents.put(agent.getComponentId(), record);
145 			}
146 			// record != null
147 			if (record.getFirst() != agent) {
148 				throw new RuntimeException("agent.getComponentId() clash! Under " + agent.getComponentId() + " is registered agent " + record.getFirst() + " NOT AGENT " + agent + ".");
149 			}
150 			record.setSecond(record.getSecond()+1);
151 			return record.getSecond();
152 		}
153 	}
154 
155 	/**
156 	 * @param agent
157 	 * @return how many times is 'agent' still registered (after decrease)
158 	 */
159 	protected int decRegisteredAgent(IObservingAgent agent) {
160 		synchronized(registeredAgents) {
161 			Tuple2<IObservingAgent, Integer> record = registeredAgents.get(agent.getComponentId());
162 			if (record == null) {
163 				throw new RuntimeException("Attempt to decrease registration count for agent that is not registered, agent.getComponentId() == " + agent.getComponentId() + ".");
164 			}
165 			record.setSecond(record.getSecond()-1);
166 			if (record.getSecond() == 0) {
167 				registeredAgents.remove(agent.getComponentId());
168 			}
169 			return record.getSecond();
170 		}
171 	}
172 	
173 	// =======================
174 	// REGISTER AGENT
175 	// =======================
176 	
177 	/**
178 	 * Register an agent to receive events send through 'channel'. 
179 	 * 
180 	 * Use {@link PogamutJVMComm#ALL_CHANNELS} constant to listen to all channels (existing or future).
181 	 * 
182 	 * @param agent
183 	 * @param channel
184 	 */
185 	public void registerAgent(IObservingAgent agent, int channel) {
186 		if (channel < 0 && channel != ALL_CHANNELS) {
187 			throw new RuntimeException("channel == " + channel + " < 0, INVALID");
188 		}
189 		if (writeLock.tryLock()) {
190 			try {
191 				registerAgentUnsyncImpl(agent, channel);
192 			} finally {
193 				writeLock.unlock();
194 			}
195 		} else {
196 			execute(new RegisterAgent(agent, channel), true);
197 		}
198 	}
199 		
200 	protected void registerAgentSyncImpl(IObservingAgent agent, int channel) {
201 		writeLock.lock();
202 		try {
203 			registerAgentUnsyncImpl(agent, channel);
204 		} finally {
205 			writeLock.unlock();
206 		}
207 	}
208 	
209 	protected void registerAgentUnsyncImpl(IObservingAgent agent, int channel) {
210 		if (channel == ALL_CHANNELS) {
211 			// REGISTER FOR RECIEVING ANY MESSAGES
212 			if (allChannels.contains(agent)) {
213 				// ALREADY REGISTERED
214 				if (log != null && log.isLoggable(Level.WARNING)) log.warning("Agent " + agent.getComponentId() + " is already registered for ALL_CHANNELS (ignoring this request).");
215 				return;
216 			}
217 			allChannels.add(agent);
218 		} else {
219 			if (channels.get(channel).contains(agent)) {
220 				// ALREADY REGISTERED
221 				if (log != null && log.isLoggable(Level.WARNING)) log.warning("Agent " + agent.getComponentId() + " is already registered for channel " + channel + " (ignoring this request).");
222 				return;
223 			}
224 			channels.add(channel, agent);
225 		}
226 		int registerCount = incRegisteredAgent(agent);
227 		if (log != null && log.isLoggable(Level.INFO)) log.info("Registered " + agent.getComponentId() + " for " + (channel == ALL_CHANNELS ? "ALL_CHANNELS" : "channel " + channel) + ". Agent is registered " + registerCount + "x (in total).");
228 	}
229 
230 	// =======================
231 	// IS AGENT REGISTERED
232 	// =======================
233 
234 	/**
235 	 * Whether an 'agent' is listening on 'channel'.
236 	 * 
237 	 * Use {@link PogamutJVMComm#ALL_CHANNELS} constant to check whether an agent is listening to ALL CHANNELS (existing and future).
238 	 * 
239 	 * Potentially BLOCKING METHOD, waiting for {@link PogamutJVMComm#readLock} to be locked.
240 	 * 
241 	 * @param agent
242 	 * @param channel
243 	 * @return
244 	 */
245 	public boolean isAgentRegistered(IObservingAgent agent, int channel) {
246 		if (channel < 0 && channel != ALL_CHANNELS) {
247 			throw new RuntimeException("channel == " + channel + " < 0, INVALID");
248 		}
249 		readLock.lock();
250 		try {
251 			if (channel == ALL_CHANNELS) {
252 				return allChannels.contains(agent);
253 			} else {
254 				return getChannel(channel).contains(agent);
255 			}
256 		} finally {
257 			readLock.unlock();
258 		}
259 	}
260 	
261 	protected Set<IObservingAgent> getChannel(int channel) {
262 		if (channel == ALL_CHANNELS) {
263 			return allChannels;
264 		} else {
265 			if (channels.containsKey(channel)) {
266 				return channels.get(channel);
267 			}
268 			synchronized(channels) {
269 				return channels.get(channel);
270 			}
271 		}
272 	}
273 	
274 	// =============================
275 	// UNREGISTER AGENT FROM CHANNEL
276 	// =============================
277 	
278 	/**
279 	 * Removes agent from listening to some channels.
280 	 * 
281 	 * If you specify ALL_CHANNELS, agent is registered from ALL_CHANNELS (does not mean it is unregistered from respective channels, it
282 	 * is "unregister-from-broadcast-listening").
283 	 * 
284 	 * @param agent
285 	 * @param channel
286 	 */
287 	public void unregisterAgent(IObservingAgent agent, int channel) {		
288 		if (channel < 0 && channel != ALL_CHANNELS) {
289 			throw new RuntimeException("channel == " + channel + " < 0, INVALID");
290 		}
291 		if (writeLock.tryLock()) {
292 			try {
293 				unregisterAgentUnsyncImpl(agent, channel);
294 			} finally {
295 				writeLock.unlock();
296 			}
297 		} else {
298 			execute(new UnregisterAgentFromChannel(agent, channel), false);
299 		}
300 	}
301 	
302 	protected void unregisterAgentSyncImpl(IObservingAgent agent, int channel) {
303 		writeLock.lock();
304 		try {
305 			unregisterAgentUnsyncImpl(agent, channel);
306 		} finally {
307 			writeLock.unlock();
308 		}
309 	}
310 
311 	protected void unregisterAgentUnsyncImpl(IObservingAgent agent, int channel) {
312 		if (channel == ALL_CHANNELS) {
313 			if (allChannels.remove(agent)) {
314 				int registerCount = decRegisteredAgent(agent);
315 				if (log != null && log.isLoggable(Level.INFO)) log.info("UNregistered " + agent.getComponentId() + " from ALL_CHANNELS. " + (registerCount == 0 ? "Agent is now fully UNREGISTERED." : "Agent remains registered for other channels (" + registerCount + "x in total)."));
316 				if (registeredAgents.size() == 0) {
317 					shutdown(false);
318 				}
319 			}
320 		} else {
321 			if (getChannel(channel).remove(agent)) {
322 				int registerCount = decRegisteredAgent(agent);
323 				if (log != null && log.isLoggable(Level.INFO)) log.info("UNregistered " + agent.getComponentId() + " from channel " + channel + ". " + (registerCount == 0 ? "Agent is now fully UNREGISTERED." : "Agent remains registered for other channels (" + registerCount + "x in total)."));
324 				if (registeredAgents.size() == 0) {
325 					shutdown(false);
326 				}
327 			}
328 		}			
329 	}
330 
331 	// =======================
332 	// UNREGISTER AGENT
333 	// =======================
334 	
335 	/**
336 	 * Totally unregister the agent (all channels + ALL_CHANNELS).
337 	 * @param bot
338 	 */
339 	public void unregisterAgent(IObservingAgent agent) {
340 		if (writeLock.tryLock()) {
341 			try {
342 				unregisterAgentUnsyncImpl(agent);
343 			} finally {
344 				writeLock.unlock();
345 			}
346 		} else {
347 			execute(new UnregisterAgent(agent), false);
348 		}
349 		
350 	}
351 	
352 	protected void unregisterAgentSyncImpl(IObservingAgent agent) {
353 		writeLock.lock();
354 		try {
355 			unregisterAgentUnsyncImpl(agent);
356 		} finally {
357 			writeLock.unlock();
358 		}
359 	}
360 	
361 	protected void unregisterAgentUnsyncImpl(IObservingAgent agent) {
362 		synchronized(channels) {
363 			for (Integer channel : channels.keySet()) {
364 				unregisterAgentUnsyncImpl(agent, channel);
365 			}
366 		}
367 		unregisterAgentUnsyncImpl(agent, ALL_CHANNELS);
368 	}
369 
370 	// =======================
371 	// SEND EVENT
372 	// =======================
373 	
374 	/**
375 	 * Send 'event' to 'channel'.
376 	 * 
377 	 * Note that this version does not have "SENDER", thus it may notify even the SENDER with 'event'
378 	 * if it is subscribed to channel 'channel'.
379 	 * 
380 	 * If you wish to "sendToAllOthers" use {@link PogamutJVMComm#sendToOthers(IWorldChangeEvent, int, IObservingAgent)}.
381 	 * 
382 	 * @param event
383 	 * @param channel
384 	 */
385 	public void send(IWorldChangeEvent event, int channel) {
386 		execute(new Send(event, channel), false);
387 	}
388 	
389 	/**
390 	 * Send 'event' to 'channel' but does not notify 'sender'.
391 	 * 
392 	 * Note that 'sender' (that should be you) won't receive the 'event'.
393 	 * 
394 	 * @param event
395 	 * @param channel
396 	 */
397 	public void sendToOthers(IWorldChangeEvent event, int channel, IObservingAgent sender) {
398 		execute(new SendToOthers(event, channel, sender), false);
399 	}	
400 
401 	protected void sendSyncImpl(IWorldChangeEvent event, int channel) {
402 		readLock.lock();
403 		try {
404 			sendUnsyncImpl(event, channel);
405 		} finally {
406 			readLock.unlock();
407 		}
408 	}
409 	
410 	protected void sendToOthersSyncImpl(IWorldChangeEvent event, int channel, IObservingAgent sender) {
411 		readLock.lock();
412 		try {
413 			sendToOthersUnsyncImpl(event, channel, sender);
414 		} finally {
415 			readLock.unlock();
416 		}
417 	}
418 	
419 	protected void sendUnsyncImpl(IWorldChangeEvent event, int channel) {
420 		if (channel == ALL_CHANNELS) {
421 			broadcastUnsyncImpl(event);
422 		} else {
423 			for (IObservingAgent agent : getChannel(channel)) {
424 				sendToAgentUnsyncImpl(agent, event);
425 			}
426 			for (IObservingAgent agent : allChannels) {
427 				sendToAgentUnsyncImpl(agent, event);
428 			}			
429 		}
430 	}
431 	
432 	protected void sendToOthersUnsyncImpl(IWorldChangeEvent event, int channel, IObservingAgent sender) {
433 		if (channel == ALL_CHANNELS) {
434 			broadcastToOthersUnsyncImpl(event, sender);
435 		} else {
436 			for (IObservingAgent agent : getChannel(channel)) {
437 				if (agent != sender) {
438 					sendToAgentUnsyncImpl(agent, event);
439 				}
440 			}
441 			for (IObservingAgent agent : allChannels) {
442 				if (agent != sender) {
443 					sendToAgentUnsyncImpl(agent, event);
444 				}
445 			}			
446 		}
447 	}
448 	
449 	protected void sendToAgentUnsyncImpl(IObservingAgent agent, IWorldChangeEvent event) {
450 		try {
451 			if (log != null && log.isLoggable(Level.FINE))log.fine(event + " -> " + agent.getComponentId());
452 			agent.getWorldView().notify(event);
453 		} catch (ComponentNotRunningException e1) {
454 			// NOTHING TO SEE, move along...	
455 			if (log != null && log.isLoggable(Level.WARNING)) log.warning("Agent " + agent.getComponentId() + " is not running, did not receive: " + event);
456 		} catch (Exception e2) {
457 			if (log != null && log.isLoggable(Level.WARNING)) log.warning(ExceptionToString.process("Agent " + agent.getComponentId() + " failed to process " + event + ".", e2));
458 		}
459 	}
460 	
461 	// =======================
462 	// BROADCAST EVENT
463 	// =======================
464 
465 	/**
466 	 * Broadcast 'event' to all channels == all listening agents.
467 	 * 
468 	 * Note that this version does not have "SENDER", thus it may notify even the SENDER with 'event'
469 	 * if it is subscribed to any / all channels.
470 	 * 
471 	 * If you wish to "broadcastToAllOthers" use {@link PogamutJVMComm#broadcastToOthers(IWorldChangeEvent, IObservingAgent)}.
472 	 * 
473 	 * @param event
474 	 */
475 	public void broadcast(IWorldChangeEvent event) {
476 		execute(new Broadcast(event), false);
477 	}
478 	
479 	/**
480 	 * Broadcast 'event' to all channels == all listening agents.
481 	 * 
482 	 * Note that 'sender' (that should be you) won't receive the 'event'.
483 	 * 
484 	 * @param event
485 	 * @param sender 
486 	 */
487 	public void broadcastToOthers(IWorldChangeEvent event, IObservingAgent sender) {
488 		if (sender == null) {
489 			if (log != null && log.isLoggable(Level.WARNING)) log.warning("broadcast(event, null) called, sender unspecified");
490 			broadcast(event);
491 		} else {
492 			execute(new BroadcastToOthers(event, sender), false);
493 		}
494 	}
495 	
496 	protected void broadcastSyncImpl(IWorldChangeEvent event) {
497 		readLock.lock();
498 		try {
499 			broadcastUnsyncImpl(event);
500 		} finally {
501 			readLock.unlock();
502 		}
503 	}
504 	
505 	protected void broadcastToOthersSyncImpl(IWorldChangeEvent event, IObservingAgent sender) {
506 		readLock.lock();
507 		try {
508 			broadcastToOthersUnsyncImpl(event, sender);
509 		} finally {
510 			readLock.unlock();
511 		}
512 	}
513 	
514 	protected void broadcastUnsyncImpl(IWorldChangeEvent event) {
515 		List<Integer> existingChannels;
516 		synchronized(channels) {
517 			 existingChannels = new ArrayList<Integer>(channels.keySet());
518 		}
519 		for (Integer channel : existingChannels) {
520 			for (IObservingAgent agent : getChannel(channel)) {
521 				sendToAgentUnsyncImpl(agent, event);
522 			}
523 		}
524 		for (IObservingAgent agent : allChannels) {
525 			sendToAgentUnsyncImpl(agent, event);
526 		}		
527 	}
528 	
529 	protected void broadcastToOthersUnsyncImpl(IWorldChangeEvent event, IObservingAgent sender) {
530 		List<Integer> existingChannels;
531 		synchronized(channels) {
532 			 existingChannels = new ArrayList<Integer>(channels.keySet());
533 		}
534 		for (Integer channel : existingChannels) {
535 			for (IObservingAgent agent : getChannel(channel)) {
536 				if (agent != sender) {
537 					sendToAgentUnsyncImpl(agent, event);
538 				}
539 			}
540 		}
541 		for (IObservingAgent agent : allChannels) {
542 			if (agent != sender) {
543 				sendToAgentUnsyncImpl(agent, event);
544 			}
545 		}		
546 	}
547 	
548 	// =======================
549 	// CLEANUP
550 	// =======================
551 	
552 	/**
553 	 * UTILITY METHOD FOR DESTROYING THE COMMUNICATION.
554 	 * 
555 	 * Call this method if you want the instance to be safely GC()ed.
556 	 * 
557 	 * Note that {@link PogamutPlatform#close()} will call this for you on {@link PogamutJVMComm#getInstance()}.
558 	 */
559 	public void destroy() {
560 		try {
561 			try {
562 				shutdown(true);
563 			} finally {
564 				try {
565 					synchronized(channels) {
566 						channels.clear();
567 					}
568 				} finally {
569 					try {
570 						synchronized(allChannels) {
571 							allChannels.clear();
572 						}
573 					} finally {
574 						synchronized(registeredAgents) {
575 							registeredAgents.clear();
576 						}					
577 					}
578 				}
579 			}
580 		} catch (Exception e) {
581 			if (log != null && log.isLoggable(Level.WARNING)) log.warning(ExceptionToString.process("Failed to fully PogamutJVMComm.destroy().", e));
582 		}
583 	}
584 	
585 	// =======================
586 	// JOB PROCESSING
587 	// =======================
588 	
589 	protected Object executorMutex = new Object();
590 	
591 	protected ThreadPoolExecutor executor = null;
592 	
593 	protected Object numberOfRegisterAgentPendingMutex = new Object();
594 	
595 	protected int numberOfRegisterAgentPending = 0;
596 	
597 	protected void execute(Runnable job, boolean forceStart) {
598 		if (executor == null) {
599 			synchronized(executorMutex) {
600 				if (executor == null) {
601 					if (forceStart || registeredAgents.size() > 0) {
602 						if (log != null && log.isLoggable(Level.INFO)) log.info("Starting thread pool executor.");
603 						executor = new ThreadPoolExecutor(1, 20, 10000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
604 					}
605 				}
606 			}
607 		}
608 		if (executor == null) return;
609 		executor.execute(job);
610 	}
611 	
612 	protected void shutdown(boolean forced) {
613 		if (executor != null) {
614 			synchronized(executorMutex) {
615 				if (executor != null) {
616 					if (!forced) {
617 						synchronized(numberOfRegisterAgentPendingMutex) {
618 							if (numberOfRegisterAgentPending > 0) {
619 								if (log != null && log.isLoggable(Level.INFO)) log.info("Won't shutdown thread pool executor, there are unprocessed agent-registration jobs in queue and shutdown is NOT forced.");
620 								return;
621 							}
622 						}
623 					}
624 					if (log != null && log.isLoggable(Level.INFO)) log.info("Shutting down thread pool executor.");
625 					try {
626 						executor.shutdownNow();
627 					} catch (Exception e) {
628 						if (log != null && log.isLoggable(Level.WARNING)) log.warning(ExceptionToString.process("Error shutting down thread pool executor.", e));
629 					}
630 					executor = null;
631 				}
632 			}
633 		}
634 	}
635 	
636 	protected class RegisterAgent implements Runnable {
637 
638 		protected IObservingAgent agent;
639 		protected int channel;
640 		
641 		public RegisterAgent(IObservingAgent agent, int channel) {
642 			synchronized(numberOfRegisterAgentPendingMutex) {
643 				++numberOfRegisterAgentPending;
644 			}
645 			this.agent = agent;
646 			this.channel = channel;
647 		}
648 		
649 		@Override
650 		public void run() {
651 			try {
652 				registerAgentSyncImpl(agent, channel);
653 			} finally {
654 				synchronized(numberOfRegisterAgentPendingMutex) {
655 					--numberOfRegisterAgentPending;
656 				}
657 			}
658 		}
659 		
660 	}
661 	
662 	protected class UnregisterAgentFromChannel implements Runnable {
663 
664 		protected IObservingAgent agent;
665 		protected int channel;
666 		
667 		public UnregisterAgentFromChannel(IObservingAgent agent, int channel) {
668 			this.agent = agent;
669 			this.channel = channel;
670 		}
671 		
672 		@Override
673 		public void run() {
674 			unregisterAgentSyncImpl(agent, channel);
675 		}
676 		
677 	}
678 	
679 	protected class UnregisterAgent implements Runnable {
680 
681 		protected IObservingAgent agent;
682 		
683 		public UnregisterAgent(IObservingAgent agent) {
684 			this.agent = agent;
685 		}
686 		
687 		@Override
688 		public void run() {
689 			unregisterAgentSyncImpl(agent);
690 		}
691 		
692 	}
693 	
694 	protected class Send implements Runnable {
695 
696 		protected IWorldChangeEvent event;
697 		protected int channel;
698 
699 		public Send(IWorldChangeEvent event, int channel) {
700 			this.event = event;
701 			this.channel = channel;
702 		}
703 		
704 		@Override
705 		public void run() {
706 			sendSyncImpl(event, channel);
707 		}
708 		
709 	}
710 	
711 	protected class SendToOthers implements Runnable {
712 
713 		protected IWorldChangeEvent event;
714 		protected int channel;
715 		protected IObservingAgent sender;
716 
717 		public SendToOthers(IWorldChangeEvent event, int channel, IObservingAgent sender) {
718 			this.event = event;
719 			this.channel = channel;
720 			this.sender = sender;
721 		}
722 		
723 		@Override
724 		public void run() {
725 			sendToOthersSyncImpl(event, channel, sender);
726 		}
727 		
728 	}
729 	
730 	protected class Broadcast implements Runnable {
731 
732 		protected IWorldChangeEvent event;
733 		
734 		public Broadcast(IWorldChangeEvent event) {
735 			this.event = event;
736 		}
737 		
738 		@Override
739 		public void run() {
740 			broadcastSyncImpl(event);
741 		}
742 		
743 	}
744 	
745 	protected class BroadcastToOthers implements Runnable {
746 
747 		protected IWorldChangeEvent event;
748 		protected IObservingAgent sender;
749 
750 		public BroadcastToOthers(IWorldChangeEvent event, IObservingAgent sender) {
751 			this.event = event;
752 			this.sender = sender;
753 		}
754 		
755 		@Override
756 		public void run() {
757 			broadcastToOthersSyncImpl(event, sender);
758 		}
759 		
760 	}
761 
762 }