View Javadoc

1   package cz.cuni.amis.pogamut.multi.communication.worldview.impl;
2   
3   import java.util.Collections;
4   import java.util.HashMap;
5   import java.util.HashSet;
6   import java.util.LinkedList;
7   import java.util.List;
8   import java.util.Map;
9   import java.util.Queue;
10  import java.util.Set;
11  import java.util.concurrent.CountDownLatch;
12  import java.util.concurrent.LinkedBlockingQueue;
13  import java.util.logging.Level;
14  
15  import cz.cuni.amis.pogamut.base.communication.translator.event.IWorldChangeEvent;
16  import cz.cuni.amis.pogamut.base.communication.worldview.object.WorldObjectId;
17  import cz.cuni.amis.pogamut.base.communication.worldview.object.event.WorldObjectDestroyedEvent;
18  import cz.cuni.amis.pogamut.base.communication.worldview.object.event.WorldObjectEvent;
19  import cz.cuni.amis.pogamut.base.communication.worldview.object.event.WorldObjectFirstEncounteredEvent;
20  import cz.cuni.amis.pogamut.base.communication.worldview.object.event.WorldObjectUpdatedEvent;
21  import cz.cuni.amis.pogamut.base.component.bus.exception.ComponentNotRunningException;
22  import cz.cuni.amis.pogamut.base.component.controller.ComponentDependencies;
23  import cz.cuni.amis.pogamut.base.component.lifecyclebus.ILifecycleBus;
24  import cz.cuni.amis.pogamut.base.utils.logging.IAgentLogger;
25  import cz.cuni.amis.pogamut.base3d.worldview.object.IViewable;
26  import cz.cuni.amis.pogamut.base3d.worldview.object.event.WorldObjectAppearedEvent;
27  import cz.cuni.amis.pogamut.multi.agent.ITeamedAgentId;
28  import cz.cuni.amis.pogamut.multi.communication.messages.SharedBatchBeginEvent;
29  import cz.cuni.amis.pogamut.multi.communication.messages.SharedBatchFinishedEvent;
30  import cz.cuni.amis.pogamut.multi.communication.translator.event.ICompositeWorldObjectUpdatedEvent;
31  import cz.cuni.amis.pogamut.multi.communication.translator.event.ILocalWorldObjectUpdatedEvent;
32  import cz.cuni.amis.pogamut.multi.communication.translator.event.ISharedWorldObjectUpdatedEvent;
33  import cz.cuni.amis.pogamut.multi.communication.translator.event.IStaticWorldObjectUpdatedEvent;
34  import cz.cuni.amis.pogamut.multi.communication.worldview.ISharedWorldView;
35  import cz.cuni.amis.pogamut.multi.communication.worldview.object.ICompositeWorldObject;
36  import cz.cuni.amis.pogamut.multi.communication.worldview.object.ILocalViewable;
37  import cz.cuni.amis.pogamut.multi.communication.worldview.object.ILocalWorldObject;
38  import cz.cuni.amis.pogamut.multi.communication.worldview.object.event.DummyObjectEvent;
39  import cz.cuni.amis.pogamut.multi.communication.worldview.object.event.DummyObjectEvent.EventType;
40  import cz.cuni.amis.pogamut.multi.utils.timekey.TimeKey;
41  import cz.cuni.amis.utils.NullCheck;
42  import cz.cuni.amis.utils.exception.PogamutInterruptedException;
43  
44  /**
45   * Implements the batch logic into the worldView.
46   * @author srlok
47   *
48   */
49  public abstract class BatchAwareLocalWorldView extends VisionLocalWorldView {
50  
51  	public BatchAwareLocalWorldView(ComponentDependencies dependencies,
52  			ILifecycleBus bus, IAgentLogger logger,
53  			ISharedWorldView parentWorldView, ITeamedAgentId agentId) {
54  		super(dependencies, bus, logger, parentWorldView, agentId);
55  		objectMutex = new Object();
56  	}
57  
58  	/**
59  	 * Queue of all incoming batches ready to be processed.
60  	 */
61  	private Queue<List<IWorldChangeEvent>> batches = new LinkedBlockingQueue<List<IWorldChangeEvent>>();
62  	
63  	/**
64  	 * The current(incomplete) batch of events waiting to be processed.
65  	 */
66  	private List<IWorldChangeEvent> currentBatch = new LinkedList<IWorldChangeEvent>();
67  	
68  	/**
69  	 * Used to identify events marking beginning of batches. Override to provide correct behavior.
70  	 * @param event
71  	 * @return
72  	 */
73  	protected abstract boolean isBatchBeginEvent( IWorldChangeEvent event );
74  	
75  	/**
76  	 * Used to detect batch end events, needs to be overriden to detect the events properly.
77  	 * @param event
78  	 * @return
79  	 */
80  	protected abstract boolean isBatchEndEvent( IWorldChangeEvent event );
81  	
82  	/**
83  	 * This means, that lock was requested by a thread and it is waiting for finishing the batch.
84  	 */
85  	private boolean lockRequested = false;
86  	
87  	private boolean lockFinished = false;
88  	
89  	
90  	/**
91  	 * This means, that we are waiting for shared worldView to process all events from current batch (between current begin event and current end event)
92  	 */
93  	private boolean waitingForSharedBatch = false; 
94  	
95  	/**
96  	 * The lock for a time is set when the sharedWorldView has been sent a BatchBeginMessage, but it has not yet confirmed that
97  	 * all events for the specified time have been processed.
98  	 */
99  	private Set<Long> sharedWVLocks = Collections.synchronizedSet( new HashSet<Long>(4));
100 	
101 	private boolean timeKeyIncreased = false;
102 	
103 	private Object objectMutex = new Object();	
104 	private CountDownLatch latch = new CountDownLatch(1);
105 	
106 	/**
107 	 * These keys are currently locked (shadowCopies are held)
108 	 */
109 	private List<Long> lockedTimes = new LinkedList<Long>();
110 	
111 	/**
112 	 * Notifies sharedWorldView that a beginEvent has been recieved with with the specified time and the sharedWorldView should notify
113 	 * this worldView back, when all events for the time have been processed.
114 	 * @param time
115 	 */
116 	protected synchronized void notifySharedBegin( long time )
117 	{
118 		log.finer("Notifying sharedWorldView with SharedBegin event of time : " + time);
119 		sharedWorldView.notify( new SharedBatchBeginEvent(time, this.agentId) );
120 	}
121 	
122 	protected Map<WorldObjectId, Set<EventType>> bufferedEvents = new HashMap<WorldObjectId, Set<EventType>>();
123 	protected List<DummyObjectEvent> eventBuffer = new LinkedList<DummyObjectEvent>();
124 	
125 	// OVERRIDING OF
126 	//created, updated, disappeared, appeared, destroyed
127 	// implements buffering and later flushing of events
128 	
129 	@Override
130 	protected void objectCreated( ILocalWorldObject obj, long time )
131 	{
132 		bufferObjectEvent( obj.getId(), EventType.FIRST_ENCOUNTERED, time );
133 		bufferObjectEvent( obj.getId(), EventType.UPDATED, time);
134 		super.objectCreated(obj, time);
135 	}
136 	
137 	@Override
138 	protected void objectUpdated( ILocalWorldObject obj, long time )
139 	{
140 		bufferObjectEvent( obj.getId(), EventType.UPDATED, time );
141 		super.objectUpdated(obj, time);
142 	}
143 	
144 	@Override
145 	protected void objectDestroyed( ILocalWorldObject obj, long time)
146 	{
147 		//Raise now, raising later would try to get the deleted object
148 		raiseEvent( new WorldObjectDestroyedEvent( get(obj.getId(), TimeKey.get(time)), time));
149 		super.objectDestroyed(obj, time); //and delete
150 	}
151 	
152 	@Override
153 	protected void objectAppeared( ILocalViewable obj, long time )
154 	{
155 		super.objectAppeared(obj, time);
156 		bufferObjectEvent( obj.getId(), EventType.APPEARED, time);		
157 	}
158 	
159 	@Override
160 	protected void objectDisappeared( ILocalViewable obj, long time )
161 	{
162 		super.objectDisappeared(obj, time);
163 		bufferObjectEvent( obj.getId(), EventType.DISAPPEARED, time );		
164 	}
165 	
166 	/**
167 	 * Sets the visible property on the object to false by creating a disappeared event
168 	 * also raises correct events
169 	 * @param id
170 	 * @param time
171 	 */
172 	protected abstract void disappearObject( WorldObjectId id, long time);
173 	
174 	/**
175 	 * This is used for raising object events safely
176 	 * by buffering the object events, we make sure that when the events are raised and listeners notified,
177 	 * the update event has been fully processed and the object contains correct and consistent data.
178 	 * If you need to update objects manually and then want to raise events for whatever reason, always use this method.
179 	 * @param id
180 	 * @param eventType
181 	 * @param time
182 	 */
183 	protected void bufferObjectEvent(WorldObjectId id, EventType eventType, long time)
184 	{
185 		if ( log.isLoggable(Level.FINEST ))
186 		{
187 			log.finest("Buffering event for : " + id.toString() + " ; Type :" + eventType.toString() + "; T : " + time);
188 		}
189 		
190 		//lets check if we are not adding the event for the second time (happens when there is an update from shared part as well);
191 		//needs to synchronize because of shared worldViews
192 		synchronized(eventBuffer)
193 		{
194 			Set<EventType> buffered = bufferedEvents.get(id);
195 			if ( buffered == null)
196 			{
197 				eventBuffer.add( new DummyObjectEvent(id, eventType, time) );
198 				buffered = new HashSet<EventType>();
199 				buffered.add(eventType);
200 				bufferedEvents.put( id, buffered);
201 			}
202 			else if ( !buffered.contains(eventType) )
203 			{
204 				buffered.add(eventType);
205 				eventBuffer.add( new DummyObjectEvent(id, eventType, time) );
206 			}
207 		}
208 	
209 	}
210 	
211 	/**
212 	 * Raises all events from this batch
213 	 */
214 	protected void flushEvents()
215 	{
216 		List<DummyObjectEvent> toBuffer = new LinkedList<DummyObjectEvent>();
217 		if (log.isLoggable(Level.FINE) )
218 		{
219 			if ( eventBuffer.isEmpty() )
220 			{
221 				log.fine("No events to flush.");
222 			}
223 			else
224 			{
225 				log.fine("Flushing events for time : " + eventBuffer.iterator().next().getTime() + "; Buffer size : " + eventBuffer.size() );
226 			}
227 		}
228 		synchronized (eventBuffer)
229 		{
230 			List<DummyObjectEvent> toProcess = eventBuffer;
231 			eventBuffer = new LinkedList<DummyObjectEvent>(); 
232 			for (DummyObjectEvent dummy : toProcess)
233 			{
234 				WorldObjectEvent e = null;
235 				long eventTime = dummy.getTime();
236 				try	
237 				{					
238 					switch ( dummy.getType() )
239 					{
240 					case APPEARED :
241 						e = new WorldObjectAppearedEvent<IViewable>((IViewable)this.get(dummy.getObjectId(), TimeKey.get(eventTime)), eventTime );
242 						break;
243 					case DESTROYED :
244 						e = new WorldObjectDestroyedEvent<ICompositeWorldObject>(this.get(dummy.getObjectId(), TimeKey.get(eventTime)), eventTime );
245 						break;
246 					case DISAPPEARED :
247 						//disappear the object
248 						disappearObject( dummy.getObjectId(), eventTime );
249 							
250 						//e = new WorldObjectDisappearedEvent<IViewable>((IViewable)this.get(dummy.getObjectId(), TimeKey.get(eventTime)), eventTime);
251 						
252 						break;
253 					case FIRST_ENCOUNTERED :
254 						e = new WorldObjectFirstEncounteredEvent<ICompositeWorldObject>(this.get(dummy.getObjectId(), TimeKey.get(eventTime)), eventTime);
255 						break;
256 					case UPDATED :
257 						if ( getLocal(dummy.getObjectId() ) != null)
258 						{
259 							e = new WorldObjectUpdatedEvent<ICompositeWorldObject>(this.get(dummy.getObjectId(), TimeKey.get(eventTime)), eventTime);
260 						}			
261 						break;
262 					}
263 					if ( e != null )
264 					{
265 						raiseEvent(e);
266 					}
267 				}
268 				catch (Exception exc) //sometimes updating msgClass can get little slow, lets just postpone the event for now
269 				{					
270 					log.warning("["+dummy.getTime()+"]Exception in raising event |" + dummy.getObjectId() + "| postponing " + exc);
271 					dummy.incTime();
272 					toBuffer.add(dummy);					
273 				}		
274 			}
275 			eventBuffer.addAll(toBuffer);//clear event buffer
276 			bufferedEvents = new HashMap<WorldObjectId, Set<EventType>>();
277 		}// END SYNCHRONIZED
278 	}
279 	
280 	IWorldChangeEvent bufferedEndMessage = null;
281 	
282 	/**
283 	 * This method is called when the SharedBatchFinishedEvent is recieved from the sharedWorldView, notifying us that
284 	 * all sharedEvents for the specified time have been processed and it is safe to run logic on the time.
285 	 * @param time
286 	 */
287 	protected void sharedBatchFinished( long time )
288 	{
289 		//GUICE ERROR
290 		if ( lockedTimes == null )
291 		{
292 			lockedTimes = new LinkedList<Long>();
293 		}
294 		
295 		synchronized( lockedTimes )
296 		{
297 			//flush events
298 			this.flushEvents();
299 			NullCheck.check(bufferedEndMessage, "Buffered End message");
300 			super.notify( bufferedEndMessage );
301 			bufferedEndMessage = null;
302 			if ( log.isLoggable( Level.FINER ) )
303 			{
304 				log.finer("SharedBatchFinishedEvent recieved from the SharedWorldView for time " + time );
305 			}
306 			if ( !lockFinished )
307 			{
308 				log.fine("Setting current timeKey : " + time );
309 				setCurrentTime( TimeKey.get(time) );
310 				timeKeyIncreased = true;
311 				List<Long> newLocks = new LinkedList<Long>();
312 				for ( Long t : lockedTimes )
313 				{
314 					if ( t < time)
315 					{
316 						unlockTime(t);
317 					}
318 					else
319 					{
320 						newLocks.add(t);
321 					}
322 				}
323 				lockedTimes = newLocks;
324 				
325 				//GUICE ERROR
326 				if ( latch == null )
327 				{
328 					latch = new CountDownLatch(1);
329 				}
330 				latch.countDown();
331 				
332 			}		
333 		}
334 	}
335 	
336 	public boolean isLocked()
337 	{
338 		return lockFinished;
339 	}
340 	
341 	/**
342 	 * Must be called before starting logic.
343 	 */
344 	public void lock()
345 	{
346 		log.fine("Locking BatchAwareLocalWorldView");
347 		
348 		synchronized ( objectMutex )
349 		{
350 			if (!isRunning()) throw new ComponentNotRunningException("Can't lock() world view is not running!", log, this);
351 			if ( !lockFinished  )
352 			{
353 				lockRequested = true;
354 			}
355 			else
356 			{
357 				return;
358 			}			
359 		}
360 		
361 		try {
362 			latch.await();
363 		} catch (InterruptedException e) {
364 			throw new PogamutInterruptedException("Interrupted while waiting to acquire lock()!", e, this);
365 		}
366 		
367 		lockFinished = true;
368 		
369 		log.fine("BatchAwareLocalWorldView locked.");
370 	}
371 
372 	/**
373 	 * Called after the logic has finished.
374 	 */
375 	public void unlock()
376 	{
377 		synchronized( objectMutex )
378 		{
379 			if (!isRunning()) throw new ComponentNotRunningException("Can't unlock() world view is not running!", log, this);
380 			LinkedList<Long> newLocks = new LinkedList<Long>();
381 			for ( Long t : lockedTimes )
382 			{
383 				if ( t < currentTimeKey.getTime() )
384 				{
385 					this.unlockTime( t );
386 				}
387 				else
388 				{
389 					newLocks.add(t);
390 				}
391 			}
392 			lockedTimes = newLocks;
393 			lockFinished = false;
394 			lockRequested = false;
395 			latch = new CountDownLatch(1);
396 			log.fine("BatchAwareLocalWorldView unlocked");
397 		}
398 	}
399 	
400 	private boolean timeKeySet = false;
401 	
402 	boolean endMessageCame = false;
403 	boolean sharedFinished = false;
404 	
405 	@Override
406 	public synchronized void notify(IWorldChangeEvent event)
407 	{
408 		log.finest( "BatchAwareLocalWorldView notify : " + event);
409 		
410 		if (!timeKeySet)
411 		{
412 			this.currentTimeKey = TimeKey.get( event.getSimTime() );
413 			timeKeySet = true;
414 		}
415 		
416 		//if the event updates a shared part of a WorldObject, notify sharedWorldView
417     	if (!( event instanceof ILocalWorldObjectUpdatedEvent))
418         {
419     		if ( event instanceof ICompositeWorldObjectUpdatedEvent)
420     		{
421     			IWorldChangeEvent partEvent = ((ICompositeWorldObjectUpdatedEvent)event).getSharedEvent();
422     			if (partEvent != null) //shared part
423     			{
424     				if ( log.isLoggable( Level.FINEST ))
425     				{
426     					log.finest("Notyfying sharedWV " + partEvent.toString() + ")");
427     				}
428     				sharedWorldView.notify(partEvent);	
429     			}
430     			partEvent = ((ICompositeWorldObjectUpdatedEvent)event).getStaticEvent();
431 				if ( partEvent != null) //static part
432 				{
433 					if ( log.isLoggable( Level.FINEST )) log.finest("Notyfying sharedWV " + partEvent.toString() + ")");
434 					sharedWorldView.notify(partEvent);
435 				}
436     		}
437         	//shared or static event will not modify LocalObjects, no need to process it beyond notifying sharedWorldView
438     		else if ( (event instanceof ISharedWorldObjectUpdatedEvent) || (event instanceof IStaticWorldObjectUpdatedEvent) )
439         	{
440     			if ( log.isLoggable( Level.FINEST )) log.finest("Notyfying sharedWV " + event.toString() + ")");    			
441     			sharedWorldView.notify(event);
442         		return;
443         	}
444         }
445     	
446     	//FIXME some guice weird business that objectMutex isnt initialized
447     	//GUICE ERROR
448     	if (objectMutex == null )
449     	{
450     		objectMutex = new Object();
451     	}
452     	
453     	synchronized(objectMutex)
454     	{
455 	    	if ( isBatchBeginEvent(event) )
456 	    	{
457 	    		if ( currentTimeKey == null )
458 	    		{
459 	    			log.info("Setting new currentTimeKey to : " + event.getSimTime());
460 	    			currentTimeKey = TimeKey.get( event.getSimTime() );
461 	    		}
462 		    	lockTime( event.getSimTime());
463 		    	this.lockedTimes.add( event.getSimTime() );
464 		    	notifySharedBegin( event.getSimTime() );
465 		    	super.notify(event);
466 		    	
467 	    	}
468 	    	else if ( isBatchEndEvent(event) )
469 	    	{
470 	    		if ( log.isLoggable(Level.FINER ))
471 	    		{log.finer("Notifying sharedWorldView with EndEvent of time " + event.getSimTime() + " : " + event); };
472 	    		sharedWorldView.notify(event);
473 	    		bufferedEndMessage = event;
474 	    		endMessageCame = true;
475 	    		if ( endMessageCame && sharedFinished)
476 	    		{
477 	    			sharedBatchFinished(event.getSimTime());
478 	    			endMessageCame = false;
479 	    			sharedFinished = false;
480 	    		}
481 	    		//super.notify(event);
482 	    	}    	
483 	    	else if ( event instanceof SharedBatchFinishedEvent )
484 	    	{	    		
485 	    		sharedFinished = true;
486 	    		if ( endMessageCame && sharedFinished)
487 	    		{
488 	    			sharedBatchFinished(event.getSimTime());
489 	    			endMessageCame = false;
490 	    			sharedFinished = false;
491 	    		}
492 	    	}
493 	    	else
494 	    	{
495 	    		super.notify( event );
496 	    	}
497     	}
498 	}
499 	
500 	@Override
501 	protected void stop() {
502 		super.stop();
503 		synchronized(objectMutex) {
504 			while (latch != null && latch.getCount() > 0) latch.countDown();
505 			while (lockedTimes != null && lockedTimes.size() > 0) {
506 				long time = lockedTimes.get(0);
507 				unlockTime(lockedTimes.get(0));
508 				if (lockedTimes.get(0) == time) lockedTimes.remove(0);
509 			}
510 		}
511 	}
512 	
513 	@Override
514 	protected void kill() {
515 		super.kill();
516 		synchronized(objectMutex) {
517 			while (latch != null && latch.getCount() > 0) latch.countDown();
518 			while (lockedTimes != null && lockedTimes.size() > 0) {
519 				try {
520 					long time = lockedTimes.get(0);
521 					unlockTime(lockedTimes.get(0));
522 					if (lockedTimes.get(0) == time) lockedTimes.remove(0);
523 				} catch (Exception e) {					
524 				}
525 			}
526 		}
527 	}
528 	
529 }