View Javadoc

1   package cz.cuni.amis.pogamut.multi.communication.worldview.impl;
2   
3   import java.util.Collection;
4   import java.util.Collections;
5   import java.util.Comparator;
6   import java.util.concurrent.PriorityBlockingQueue;
7   import java.util.logging.Level;
8   import java.util.logging.Logger;
9   
10  import cz.cuni.amis.pogamut.base.communication.translator.event.IWorldChangeEvent;
11  import cz.cuni.amis.pogamut.base.communication.translator.event.IWorldEventWrapper;
12  import cz.cuni.amis.pogamut.base.communication.translator.event.IWorldObjectUpdateResult;
13  import cz.cuni.amis.pogamut.base.communication.translator.event.IWorldObjectUpdatedEvent;
14  import cz.cuni.amis.pogamut.base.communication.worldview.event.IWorldEvent;
15  import cz.cuni.amis.pogamut.base.communication.worldview.impl.EventDrivenWorldView;
16  import cz.cuni.amis.pogamut.base.communication.worldview.object.IWorldObject;
17  import cz.cuni.amis.pogamut.base.communication.worldview.object.WorldObjectId;
18  import cz.cuni.amis.pogamut.base.communication.worldview.object.event.WorldObjectDestroyedEvent;
19  import cz.cuni.amis.pogamut.base.communication.worldview.object.event.WorldObjectFirstEncounteredEvent;
20  import cz.cuni.amis.pogamut.base.component.bus.exception.ComponentNotRunningException;
21  import cz.cuni.amis.pogamut.base.component.bus.exception.ComponentPausedException;
22  import cz.cuni.amis.pogamut.base.component.controller.ComponentController;
23  import cz.cuni.amis.pogamut.base.utils.guice.AgentTeamScoped;
24  import cz.cuni.amis.pogamut.multi.agent.ITeamId;
25  import cz.cuni.amis.pogamut.multi.agent.impl.TeamedAgentId;
26  import cz.cuni.amis.pogamut.multi.communication.translator.event.ISharedPropertyUpdateResult;
27  import cz.cuni.amis.pogamut.multi.communication.translator.event.ISharedPropertyUpdatedEvent;
28  import cz.cuni.amis.pogamut.multi.communication.translator.event.ISharedWorldObjectUpdatedEvent;
29  import cz.cuni.amis.pogamut.multi.communication.translator.event.IStaticWorldObjectUpdatedEvent;
30  import cz.cuni.amis.pogamut.multi.communication.worldview.ILocalWorldView;
31  import cz.cuni.amis.pogamut.multi.communication.worldview.object.ISharedProperty;
32  import cz.cuni.amis.pogamut.multi.communication.worldview.object.ISharedWorldObject;
33  import cz.cuni.amis.pogamut.multi.communication.worldview.object.IStaticWorldObject;
34  import cz.cuni.amis.pogamut.multi.communication.worldview.object.event.DummyObjectEvent.EventType;
35  import cz.cuni.amis.pogamut.multi.utils.timekey.TimeKey;
36  import cz.cuni.amis.utils.NullCheck;
37  import cz.cuni.amis.utils.exception.PogamutException;
38  
39  /**
40   * SharedWorldView implementing basic event management (notyfying listeners --- not yet fully functional)
41   * and updating shared objects using the events. While some synchronization is done here, this worlview does not consider
42   * the events to come in batches and thus may create incosistent data structures. (this is addressed by {@link BatchAwareSharedWorldView})
43   * @author srlok
44   *
45   */
46  @AgentTeamScoped
47  public abstract class EventDrivenSharedWorldView extends AbstractSharedWorldView {
48  
49  	public EventDrivenSharedWorldView(Logger logger) {
50  		super(logger);
51  	}
52  	
53  	private ISharedProperty copyProperty(ISharedProperty original)
54  	{
55  		return original.clone();
56  	}
57  
58  	public static final String WORLDVIEW_DEPENDENCY = "EventDrivenSharedWorldViewDependency";
59  
60      /**
61       * Flag that is telling us whether there is an event being processed or not.
62       * <p><p>
63       * It is managed only by notify() method - DO NOT MODIFY OUTSIDE IT!
64       */
65      protected boolean receiveEventProcessing = false;
66      
67      /**
68       * List of events we have to process.
69       * <p><p>
70       * It is managed only by notify() method - DO NOT MODIFY OUTSIDE IT!
71       */
72      protected PriorityBlockingQueue<IWorldChangeEvent> notifyEventsList =
73      	new PriorityBlockingQueue<IWorldChangeEvent>(
74      		64, 
75      		new Comparator<IWorldChangeEvent>() {
76  				@Override
77  				public int compare(IWorldChangeEvent arg0, IWorldChangeEvent arg1) {
78  					return (int) Math.signum( arg0.getSimTime() - arg1.getSimTime() );
79  				}
80  			}
81      	);
82      
83      protected Collection<IWorldChangeEvent> syncEventList = Collections.synchronizedCollection( notifyEventsList );
84      
85      @Override
86      public void notify(IWorldChangeEvent event) throws ComponentNotRunningException, ComponentPausedException {
87      	log.finest("SharedWorldView notify : [" + event.getSimTime() + " ; " + event);
88      	if (isPaused()) {
89      		throw new ComponentPausedException(controller.getState().getFlag(), this);
90      	}
91      	if (!isRunning()) {
92      		throw new ComponentNotRunningException(controller.getState().getFlag(), this);
93      	}
94  
95      	synchronized(syncEventList) {
96      		// ADD EVENT AS A SUBJECT FOR FUTURE PROCESSING
97      		syncEventList.add(event);
98  	        // is this method recursively called?
99  	        if (receiveEventProcessing) {
100 	            // yes it is -> that means the previous event has not been
101 	            // processed! ... store this event and allows the previous one
102 	            // to be fully processed (e.g. postpone raising this event)
103 	        	log.finest("Added event; events :" + notifyEventsList.size());
104 	            return;
105 	        } else {
106 	            // no it is not ... so raise the flag that we're inside the method
107 	            receiveEventProcessing = true;
108 	        }
109     	}
110     	
111     	// SINGLE THREAD ONLY AT THIS POINT!
112     	
113     	// check the events list size, do we have more events to process?
114         while (true) {
115         	IWorldChangeEvent ev = null;
116         	synchronized(syncEventList) {
117         		if (notifyEventsList.size() == 0) {
118         			// NO MORE EVENTS TO BE PROCESSED
119         			receiveEventProcessing = false;
120         			return;
121         		}
122         		ev = notifyEventsList.poll();
123         	}        	  
124             if (ev != null) {
125             	boolean exception = false;
126             	try {
127             		innerNotify(ev);
128             	} catch (PogamutException e1) {
129             		exception = true;
130             		throw e1;
131             	} catch (Exception e2) {
132             		exception = true;
133             		throw new PogamutException("Failed to process: " + ev, e2, this);
134             	} finally {
135             		if (exception) {
136             			// we're going to jump out of the method!
137             			receiveEventProcessing = false;
138             			// NOTE THAT PROCESSING OF EVENTS AFTER THIS POINT IS A BIT NON-DETERMINISTIC!
139             		}
140             	}
141             }
142         }
143     }
144         
145     /**
146      * Used to process IWorldChangeEvent - it has to be either IWorldChangeEvent or IWorldObjectUpdateEvent. Forbids recursion.
147      * <p>
148      * DO NOT CALL SEPARATELY - should be called only from notifyEvent().
149      * <p><p>
150      * You may override it to provide event-specific processing behavior.
151      * <p><p>
152      * MUST NOT BE CALLED CONCURRENTLY - SINGLE THREAD AT THIS POINT ONLY! MUST BE ENFORCED FROM THE OUTSIDE!
153      *
154      * @param event
155      */
156     protected void innerNotify(IWorldChangeEvent event) {    	
157     	NullCheck.check(event, "event");
158     	if (log.isLoggable(Level.FINEST)) log.finest("SharedWorldView processing " + event);
159     	
160     	//update shared part
161         if (event instanceof ISharedWorldObjectUpdatedEvent) {
162         	sharedObjectUpdatedEvent((ISharedWorldObjectUpdatedEvent)event);
163         }
164         else 
165         if ( event instanceof ISharedPropertyUpdatedEvent) {
166         	propertyUpdatedEvent((ISharedPropertyUpdatedEvent)event);
167         }
168         else 
169         if ( event instanceof IStaticWorldObjectUpdatedEvent ) {
170         	staticObjectUpdatedEvent((IStaticWorldObjectUpdatedEvent)event);
171         }
172         else
173         if (event instanceof IWorldEventWrapper) {
174             raiseEvent(((IWorldEventWrapper) event).getWorldEvent());
175         } else
176         if (event instanceof IWorldEvent) {
177           	raiseEvent((IWorldEvent)event);
178         } else {
179             throw new PogamutException("Unsupported event type received (" + event.getClass() + "): " + event, this);
180         }
181     }
182     
183     /**
184      * Catches exceptions. If exception is caught, it calls {@link ComponentController}.fatalError() and this.kill(). 
185      */
186     @Override
187     protected void raiseEvent(IWorldEvent event) {
188     	try {
189     		super.raiseEvent(event);
190     	} catch (Exception e) {
191     		this.controller.fatalError("Exception raising event " + event, e);
192     		this.kill();
193     	}
194     }
195     
196     Object objectMutex = new Object();
197     
198     public void addMsgClass(WorldObjectId id, Class msgClass)
199     {
200     	synchronized(objectMutex)
201     	{
202     		this.idClassMap.put(id, msgClass);
203     	}
204     }
205     
206     /**
207      * Called from {@link EventDrivenWorldView#innerNotify(IWorldChangeEvent)} if the event is {@link IWorldObjectUpdatedEvent}
208      * to process it.
209      * <p><p>
210      * MUST NOT BE CALLED CONCURRENTLY - SINGLE THREAD AT THIS POINT ONLY! MUST BE ENFORCED FROM THE OUTSIDE!
211      * 
212      * @param updateEvent
213      */
214     protected void sharedObjectUpdatedEvent(ISharedWorldObjectUpdatedEvent updateEvent) {
215         //update results
216         boolean created = false;
217         boolean updated = false;
218         boolean destroyed = false;
219         
220        if ( !syncIdClassMap.containsKey(updateEvent.getId() ))
221        {
222     	   NullCheck.check(updateEvent.getCompositeObjectClass(), "CompositeClass");
223     	   syncIdClassMap.put(updateEvent.getId(), updateEvent.getCompositeObjectClass());
224        }
225         
226         for ( ISharedPropertyUpdatedEvent propertyEvent : updateEvent.getPropertyEvents() )
227         {
228         	ISharedProperty property = currentSharedProperties.get(updateEvent.getTeamId(), updateEvent.getId(), propertyEvent.getPropertyId());
229         	ISharedProperty copy = null;
230         	
231         	if (property != null)
232         	{
233         		copy = copyProperty(property);
234         	};
235         	
236         	ISharedPropertyUpdateResult updateResult = propertyEvent.update(copy);
237         	
238         	switch (updateResult.getResult())
239         	{
240         	case CREATED:
241         		created = true;
242         		propertyCreated(updateResult.getProperty(), updateEvent.getTeamId());
243         		break;
244         	case UPDATED:
245         		if ( updateResult.getProperty() != copy)
246         		{
247                		throw new PogamutException("Update event " + updateEvent + " did not return the same instance of the object (result UPDATED).", this);
248         		}
249         		//add old property to maps
250         		updated = true;
251         		addOldSharedProperty( property, updateEvent.getTeamId(), propertyEvent.getSimTime());
252         		//update the value
253         		propertyUpdated(copy, updateEvent.getTeamId());
254         		break;
255         	case DESTROYED:
256         		//add value to old object maps
257         		addOldSharedProperty(property, updateEvent.getTeamId(), propertyEvent.getSimTime());
258         		//remove from current maps
259         		removeSharedProperty(property, updateEvent.getTeamId());        		
260         	case SAME:
261         		break;
262         	default:
263         		throw new PogamutException("Unhandled object update result " + updateResult.getResult() + " for the object " + updateEvent.getId() + "." + "Property : " + property, this);
264         	}
265         }
266         //now all properties are updated, let's raise the correct object events
267        if ( created )
268        {
269     	   objectCreated( getShared(updateEvent.getTeamId(), updateEvent.getId(), TimeKey.get(updateEvent.getSimTime()) ), updateEvent.getSimTime());
270     	   objectUpdated(updateEvent.getTeamId(), updateEvent.getId(), updateEvent.getSimTime());
271        }
272        else if ( updated )
273        {
274     	   objectUpdated(updateEvent.getTeamId(), updateEvent.getId(), updateEvent.getSimTime());
275        }
276        else if ( destroyed )
277        {
278     	   //remove cached object
279     	   objectDestroyed( getShared(updateEvent.getTeamId(), updateEvent.getId(), TimeKey.get(updateEvent.getSimTime())), updateEvent.getSimTime());
280        }
281     }
282     
283     /**
284      * MUST NOT BE CALLED CONCURRENTLY - SINGLE THREAD AT THIS POINT ONLY! MUST BE ENFORCED FROM THE OUTSIDE!
285      * 
286      * @param event
287      */
288     protected void propertyUpdatedEvent( ISharedPropertyUpdatedEvent event)
289     {
290     	ISharedProperty property = null;
291     	ISharedProperty copy = null;
292     	
293     	property = getSharedProperty(event.getPropertyId(), event.getTeamId(), TimeKey.get( event.getSimTime() )); //??
294     	
295     	if ( property != null)
296     	{
297     		copy = property.clone();
298     	}
299     	ISharedPropertyUpdateResult result = event.update(copy);
300     	switch ( result.getResult() )
301     	{
302     	case CREATED:
303     		propertyCreated( result.getProperty(), event.getTeamId() );
304     		break;
305     	case UPDATED:
306     		addOldSharedProperty(property, event.getTeamId(), event.getSimTime());
307     		propertyUpdated(copy, event.getTeamId() );
308     		break;
309     	case DESTROYED:
310     		addOldSharedProperty(property, event.getTeamId(), event.getSimTime() );
311     		removeSharedProperty(property, event.getTeamId() );
312     		break;
313     	case SAME:
314     		break;
315     	default:
316     		throw new PogamutException("Unexpected update result " + result.getResult() + " for property " + property.toString() + " .", this);    	
317     	}
318     }
319     
320     /**
321      * Manages updating static objects,
322      * only possible event types are CREATED and DESTROYED, any other event type raises an exception.
323      * <p><p>
324      * MUST NOT BE CALLED CONCURRENTLY - SINGLE THREAD AT THIS POINT ONLY! MUST BE ENFORCED FROM THE OUTSIDE!
325      * 
326      * @param event
327      */
328     protected void staticObjectUpdatedEvent( IStaticWorldObjectUpdatedEvent event)
329     {
330     	IStaticWorldObject current = super.getStatic( event.getId());
331     	IWorldObjectUpdateResult<IStaticWorldObject> result = event.update(current);
332     	switch ( result.getResult() )
333     	{
334     	case CREATED:
335     		super.addStaticWorldObject( result.getObject() );
336     		break;
337     	case DESTROYED:
338     		super.removeStaticWorldObject( result.getObject() );
339     		break;
340     	case SAME:
341     		return;
342     	default:
343     		throw new PogamutException("Wrong static object update result " + result.getResult() + " for the object " + result.getObject().toString() + " . ", this);
344     	}
345     }
346     
347     /**
348      * If team is null, the property will be created for all teams.
349      * <p><p>
350      * MUST NOT BE CALLED CONCURRENTLY - SINGLE THREAD AT THIS POINT ONLY! MUST BE ENFORCED FROM THE OUTSIDE!
351      * 
352      * @param property
353      * @param team
354      */
355     protected void propertyCreated(ISharedProperty property, ITeamId team)
356     {
357     	if (team == null)
358     	{
359     		addSharedProperty(property);
360     		return;
361     	}
362     	addSharedProperty(property, team );
363     	//TODO event raise
364     }
365     
366     /**
367      * Updates the property.
368      * This method is not responsible for adding old versions of the object.
369      * <p><p>
370      * MUST NOT BE CALLED CONCURRENTLY - SINGLE THREAD AT THIS POINT ONLY! MUST BE ENFORCED FROM THE OUTSIDE!
371      * 
372      * @param property
373      * @param team
374      */
375     protected void propertyUpdated(ISharedProperty property, ITeamId team)
376     {
377     	currentSharedProperties.put(team,property.getObjectId(),property.getPropertyId(),property);
378     	//TODO raise events
379     }
380 
381     /**
382      * Must be called whenever an object was created, raises correct events.
383      * <p><p>
384      * Might be overridden to provide different behavior.
385      * <p><p>
386      * MUST NOT BE CALLED CONCURRENTLY - SINGLE THREAD AT THIS POINT ONLY! MUST BE ENFORCED FROM THE OUTSIDE!
387      * 
388      * @param obj
389      */
390     protected void objectCreated(ISharedWorldObject obj, long time) {
391     	//no event raise here as of now...
392     	//so far we only notify local worldview with update events
393     }
394     
395     /**
396      * Must be called whenever an object was updated - raises correct event.
397      * <p><p>
398      * Might be overridden to provide a mechanism that will forbid
399      * update of certain objects (like items that can't move).
400      * <p><p>
401      * MUST NOT BE CALLED CONCURRENTLY - SINGLE THREAD AT THIS POINT ONLY! MUST BE ENFORCED FROM THE OUTSIDE!
402      *
403      * @param obj
404      */
405     protected void objectUpdated(ITeamId teamId, WorldObjectId objectId, long time)
406     {
407        //notify local worldView listeners
408     	for ( TeamedAgentId agentId : this.localWorldViews.keySet() )
409     	{
410     		if ( agentId.getTeamId().equals(teamId))
411     		{
412     			ILocalWorldView wv = localWorldViews.get(agentId);
413     			if ( wv instanceof BatchAwareLocalWorldView)
414     			{
415     				//buffer the objectEvent
416     				((BatchAwareLocalWorldView)wv).bufferObjectEvent(objectId, EventType.UPDATED, time);
417     			}
418     		}
419     	}
420     }
421     
422     /**
423      * Must be called whenever an object was destroyed - raises correct events.
424      * <p><p>
425      * Might be overriden to provide different behavior.
426      * <p><p>
427      * MUST NOT BE CALLED CONCURRENTLY - SINGLE THREAD AT THIS POINT ONLY! MUST BE ENFORCED FROM THE OUTSIDE!
428      * 
429      * @param obj
430      */
431     protected void objectDestroyed(IWorldObject obj, long time) {
432         //raiseEvent(new WorldObjectDestroyedEvent<IWorldObject>(obj, time));        
433     }
434 
435 	
436 }