View Javadoc

1   package cz.cuni.amis.pogamut.multi.communication.worldview.impl;
2   
3   import java.util.Collections;
4   import java.util.HashMap;
5   import java.util.Map;
6   import java.util.Set;
7   import java.util.logging.Logger;
8   
9   import cz.cuni.amis.pogamut.base.agent.IAgentId;
10  import cz.cuni.amis.pogamut.base.communication.translator.event.IWorldChangeEvent;
11  import cz.cuni.amis.pogamut.base.utils.guice.AgentTeamScoped;
12  import cz.cuni.amis.pogamut.multi.communication.messages.SharedBatchBeginEvent;
13  import cz.cuni.amis.pogamut.multi.communication.messages.SharedBatchFinishedEvent;
14  import cz.cuni.amis.utils.exception.PogamutException;
15  import cz.cuni.amis.utils.maps.HashMapSet;
16  
17  
18  /**
19   * SharedWorldView with batch-driven implementation.
20   * The worldView manages all its localWorldViews and will notify them after a batch has been fully processed from all worldViews
21   * thus preventing inconsistency in shared data.
22   * LocalWorldViews must notify this worldView with correct events ({@link SharedBatchBeginEvent} and batchEndEvents which are left for the user to override
23   * -> the UT2004 version has its own implementation in descendant worldview )
24   * the sharedWorldView will then notify back with {@link SharedBatchFinishedEvent} when all events have been processed.
25   * @author srlok
26   *
27   */
28  public abstract class BatchAwareSharedWorldView extends EventDrivenSharedWorldView
29  {
30  	/**
31  	 * Construtor - all we need is logger. Shared world view gets all other information at runtime.
32  	 * @param logger
33  	 */
34  	public BatchAwareSharedWorldView(Logger logger) {
35  		super(logger);
36  	}
37  	
38  	/**
39  	 * This map counts the number of unfinished message batches for the respective TimeKey, once the count reaches zero,
40  	 * all worldViews waiting for the lock to release are notifies.
41  	 */
42  	private Map<Long, Integer> timeLocks = Collections.synchronizedMap( new HashMap<Long, Integer>() );
43  	
44  	/**
45  	 * Map containing time->agent for agents which are waiting for the time to be completely exported. 
46  	 */
47  	private HashMapSet<Long, IAgentId> waitingLocalWorldViews = new HashMapSet<Long, IAgentId>();
48  	
49  	private Object objectMutex = new Object();
50  		
51  	protected abstract boolean isBatchEndEvent( IWorldChangeEvent event );
52  	
53  	/**
54  	 * Notifies all waiting local world views, that batch belonging to 'time' has been exported by all local world views.
55  	 * I.e. {@link SharedBatchBeginEvent} occurs for 'time'.
56  	 *  
57  	 * @param waiting agent which local wvs should be notified
58  	 * @param time time for which the batch has finished
59  	 */
60  	protected void notifyLocalWorldViews(Set<IAgentId> waiting, long time) 
61  	{
62  		if (waiting != null) {
63  			for ( IAgentId id : waiting ) {
64  				localWorldViews.get(id).notify(new SharedBatchFinishedEvent(time) );
65  			}
66  		}
67  	}
68  	
69  	/**
70  	 * Adds a lock for the desired time.
71  	 * This method is called when a SharedBatchBeginEvent is recieved, it means that the localWorldViews should wait
72  	 * until the entire batch has been processed.
73  	 * @param time
74  	 */
75  	protected void processBeginEvent( SharedBatchBeginEvent event) {
76  		//log.info("Processing: " + event);
77  		
78  		synchronized(timeLocks) {
79  			// INCREASE TIME LOCKS FOR A GIVEN TIME
80  			Integer n = timeLocks.get(event.getSimTime());
81  			if ( n == null) {
82  				timeLocks.put(event.getSimTime(), 1);
83  			} else {
84  				timeLocks.put(event.getSimTime(), ++n);
85  			}
86  			// SUBSCRIBE LOCAL WORLD VIEW AS WAITING FOR THE SHARED-BATCH-END-EVENT
87  			waitingLocalWorldViews.add( event.getSimTime(), event.getAgentId() );			
88  		}
89  	}
90  	
91  	/**
92  	 * Processes batch-end event ... correctly synchronize access to timeLocks in lock-free manner.
93  	 * 
94  	 * @param time
95  	 */
96  	protected void processEndEvent( IWorldChangeEvent event) {
97  		//log.info("Processing:" + event + " ;" + event.getSimTime());
98  		Set<IAgentId> waiting = null;
99  		synchronized(timeLocks) {
100 			Integer locks = timeLocks.remove(event.getSimTime());
101 			if (locks == null) {
102 				throw new PogamutException("BatchEndEvent came for time that has no locks == no previous BatchBeginEvent came!", this);
103 			}
104 			if (locks <= 0) {
105 				throw new PogamutException("BatchEndEvent came for time that " + locks + " <= 0 locks! INVALID STATE!", this);
106 			}
107 			--locks;
108 			if (locks == 0)	{
109 				waiting = waitingLocalWorldViews.remove(event.getSimTime());				
110 			} else {
111 				timeLocks.put(event.getSimTime(), locks);
112 			}
113 		}
114 		notifyLocalWorldViews(waiting, event.getSimTime());
115 	}
116 
117 	//Object objMutex = new Object();
118 	
119 	@Override
120 	public void notify(IWorldChangeEvent event) {
121 		log.finest("BatchAwareSharedWorldView notify : " + event);
122 		
123 			if ( event instanceof SharedBatchBeginEvent ) {
124 				log.fine("SharedWorldView : SharedBatchBeginEvent for time : " + event.getSimTime());
125 				processBeginEvent( (SharedBatchBeginEvent) event );
126 			}
127 			else 
128 			if ( isBatchEndEvent(event) ) {
129 				log.fine("SharedWorldView : SharedBatchEndEvent for time : " + event.getSimTime());
130 				processEndEvent(event);
131 			}
132 			else
133 			{
134 				super.notify(event);
135 			}
136 	}
137 }