1 package cz.cuni.amis.pogamut.multi.communication.worldview.impl;
2
3 import java.util.Collections;
4 import java.util.HashMap;
5 import java.util.HashSet;
6 import java.util.LinkedList;
7 import java.util.List;
8 import java.util.Map;
9 import java.util.Queue;
10 import java.util.Set;
11 import java.util.concurrent.CountDownLatch;
12 import java.util.concurrent.LinkedBlockingQueue;
13 import java.util.logging.Level;
14
15 import cz.cuni.amis.pogamut.base.communication.translator.event.IWorldChangeEvent;
16 import cz.cuni.amis.pogamut.base.communication.worldview.object.WorldObjectId;
17 import cz.cuni.amis.pogamut.base.communication.worldview.object.event.WorldObjectDestroyedEvent;
18 import cz.cuni.amis.pogamut.base.communication.worldview.object.event.WorldObjectEvent;
19 import cz.cuni.amis.pogamut.base.communication.worldview.object.event.WorldObjectFirstEncounteredEvent;
20 import cz.cuni.amis.pogamut.base.communication.worldview.object.event.WorldObjectUpdatedEvent;
21 import cz.cuni.amis.pogamut.base.component.bus.exception.ComponentNotRunningException;
22 import cz.cuni.amis.pogamut.base.component.controller.ComponentDependencies;
23 import cz.cuni.amis.pogamut.base.component.lifecyclebus.ILifecycleBus;
24 import cz.cuni.amis.pogamut.base.utils.logging.IAgentLogger;
25 import cz.cuni.amis.pogamut.base3d.worldview.object.IViewable;
26 import cz.cuni.amis.pogamut.base3d.worldview.object.event.WorldObjectAppearedEvent;
27 import cz.cuni.amis.pogamut.multi.agent.ITeamedAgentId;
28 import cz.cuni.amis.pogamut.multi.communication.messages.SharedBatchBeginEvent;
29 import cz.cuni.amis.pogamut.multi.communication.messages.SharedBatchFinishedEvent;
30 import cz.cuni.amis.pogamut.multi.communication.translator.event.ICompositeWorldObjectUpdatedEvent;
31 import cz.cuni.amis.pogamut.multi.communication.translator.event.ILocalWorldObjectUpdatedEvent;
32 import cz.cuni.amis.pogamut.multi.communication.translator.event.ISharedWorldObjectUpdatedEvent;
33 import cz.cuni.amis.pogamut.multi.communication.translator.event.IStaticWorldObjectUpdatedEvent;
34 import cz.cuni.amis.pogamut.multi.communication.worldview.ISharedWorldView;
35 import cz.cuni.amis.pogamut.multi.communication.worldview.object.ICompositeWorldObject;
36 import cz.cuni.amis.pogamut.multi.communication.worldview.object.ILocalViewable;
37 import cz.cuni.amis.pogamut.multi.communication.worldview.object.ILocalWorldObject;
38 import cz.cuni.amis.pogamut.multi.communication.worldview.object.event.DummyObjectEvent;
39 import cz.cuni.amis.pogamut.multi.communication.worldview.object.event.DummyObjectEvent.EventType;
40 import cz.cuni.amis.pogamut.multi.utils.timekey.TimeKey;
41 import cz.cuni.amis.utils.NullCheck;
42 import cz.cuni.amis.utils.exception.PogamutInterruptedException;
43
44
45
46
47
48
49 public abstract class BatchAwareLocalWorldView extends VisionLocalWorldView {
50
51 public BatchAwareLocalWorldView(ComponentDependencies dependencies,
52 ILifecycleBus bus, IAgentLogger logger,
53 ISharedWorldView parentWorldView, ITeamedAgentId agentId) {
54 super(dependencies, bus, logger, parentWorldView, agentId);
55 objectMutex = new Object();
56 }
57
58
59
60
61 private Queue<List<IWorldChangeEvent>> batches = new LinkedBlockingQueue<List<IWorldChangeEvent>>();
62
63
64
65
66 private List<IWorldChangeEvent> currentBatch = new LinkedList<IWorldChangeEvent>();
67
68
69
70
71
72
73 protected abstract boolean isBatchBeginEvent( IWorldChangeEvent event );
74
75
76
77
78
79
80 protected abstract boolean isBatchEndEvent( IWorldChangeEvent event );
81
82
83
84
85 private boolean lockRequested = false;
86
87 private boolean lockFinished = false;
88
89
90
91
92
93 private boolean waitingForSharedBatch = false;
94
95
96
97
98
99 private Set<Long> sharedWVLocks = Collections.synchronizedSet( new HashSet<Long>(4));
100
101 private boolean timeKeyIncreased = false;
102
103 private Object objectMutex = new Object();
104 private CountDownLatch latch = new CountDownLatch(1);
105
106
107
108
109 private List<Long> lockedTimes = new LinkedList<Long>();
110
111
112
113
114
115
116 protected synchronized void notifySharedBegin( long time )
117 {
118 log.finer("Notifying sharedWorldView with SharedBegin event of time : " + time);
119 sharedWorldView.notify( new SharedBatchBeginEvent(time, this.agentId) );
120 }
121
122 protected Map<WorldObjectId, Set<EventType>> bufferedEvents = new HashMap<WorldObjectId, Set<EventType>>();
123 protected List<DummyObjectEvent> eventBuffer = new LinkedList<DummyObjectEvent>();
124
125
126
127
128
129 @Override
130 protected void objectCreated( ILocalWorldObject obj, long time )
131 {
132 bufferObjectEvent( obj.getId(), EventType.FIRST_ENCOUNTERED, time );
133 bufferObjectEvent( obj.getId(), EventType.UPDATED, time);
134 super.objectCreated(obj, time);
135 }
136
137 @Override
138 protected void objectUpdated( ILocalWorldObject obj, long time )
139 {
140 bufferObjectEvent( obj.getId(), EventType.UPDATED, time );
141 super.objectUpdated(obj, time);
142 }
143
144 @Override
145 protected void objectDestroyed( ILocalWorldObject obj, long time)
146 {
147
148 raiseEvent( new WorldObjectDestroyedEvent( get(obj.getId(), TimeKey.get(time)), time));
149 super.objectDestroyed(obj, time);
150 }
151
152 @Override
153 protected void objectAppeared( ILocalViewable obj, long time )
154 {
155 super.objectAppeared(obj, time);
156 bufferObjectEvent( obj.getId(), EventType.APPEARED, time);
157 }
158
159 @Override
160 protected void objectDisappeared( ILocalViewable obj, long time )
161 {
162 super.objectDisappeared(obj, time);
163 bufferObjectEvent( obj.getId(), EventType.DISAPPEARED, time );
164 }
165
166
167
168
169
170
171
172 protected abstract void disappearObject( WorldObjectId id, long time);
173
174
175
176
177
178
179
180
181
182
183 protected void bufferObjectEvent(WorldObjectId id, EventType eventType, long time)
184 {
185 if ( log.isLoggable(Level.FINEST ))
186 {
187 log.finest("Buffering event for : " + id.toString() + " ; Type :" + eventType.toString() + "; T : " + time);
188 }
189
190
191
192 synchronized(eventBuffer)
193 {
194 Set<EventType> buffered = bufferedEvents.get(id);
195 if ( buffered == null)
196 {
197 eventBuffer.add( new DummyObjectEvent(id, eventType, time) );
198 buffered = new HashSet<EventType>();
199 buffered.add(eventType);
200 bufferedEvents.put( id, buffered);
201 }
202 else if ( !buffered.contains(eventType) )
203 {
204 buffered.add(eventType);
205 eventBuffer.add( new DummyObjectEvent(id, eventType, time) );
206 }
207 }
208
209 }
210
211
212
213
214 protected void flushEvents()
215 {
216 List<DummyObjectEvent> toBuffer = new LinkedList<DummyObjectEvent>();
217 if (log.isLoggable(Level.FINE) )
218 {
219 if ( eventBuffer.isEmpty() )
220 {
221 log.fine("No events to flush.");
222 }
223 else
224 {
225 log.fine("Flushing events for time : " + eventBuffer.iterator().next().getTime() + "; Buffer size : " + eventBuffer.size() );
226 }
227 }
228 synchronized (eventBuffer)
229 {
230 List<DummyObjectEvent> toProcess = eventBuffer;
231 eventBuffer = new LinkedList<DummyObjectEvent>();
232 for (DummyObjectEvent dummy : toProcess)
233 {
234 WorldObjectEvent e = null;
235 long eventTime = dummy.getTime();
236 try
237 {
238 switch ( dummy.getType() )
239 {
240 case APPEARED :
241 e = new WorldObjectAppearedEvent<IViewable>((IViewable)this.get(dummy.getObjectId(), TimeKey.get(eventTime)), eventTime );
242 break;
243 case DESTROYED :
244 e = new WorldObjectDestroyedEvent<ICompositeWorldObject>(this.get(dummy.getObjectId(), TimeKey.get(eventTime)), eventTime );
245 break;
246 case DISAPPEARED :
247
248 disappearObject( dummy.getObjectId(), eventTime );
249
250
251
252 break;
253 case FIRST_ENCOUNTERED :
254 e = new WorldObjectFirstEncounteredEvent<ICompositeWorldObject>(this.get(dummy.getObjectId(), TimeKey.get(eventTime)), eventTime);
255 break;
256 case UPDATED :
257 if ( getLocal(dummy.getObjectId() ) != null)
258 {
259 e = new WorldObjectUpdatedEvent<ICompositeWorldObject>(this.get(dummy.getObjectId(), TimeKey.get(eventTime)), eventTime);
260 }
261 break;
262 }
263 if ( e != null )
264 {
265 raiseEvent(e);
266 }
267 }
268 catch (Exception exc)
269 {
270 log.warning("["+dummy.getTime()+"]Exception in raising event |" + dummy.getObjectId() + "| postponing " + exc);
271 dummy.incTime();
272 toBuffer.add(dummy);
273 }
274 }
275 eventBuffer.addAll(toBuffer);
276 bufferedEvents = new HashMap<WorldObjectId, Set<EventType>>();
277 }
278 }
279
280 IWorldChangeEvent bufferedEndMessage = null;
281
282
283
284
285
286
287 protected void sharedBatchFinished( long time )
288 {
289
290 if ( lockedTimes == null )
291 {
292 lockedTimes = new LinkedList<Long>();
293 }
294
295 synchronized( lockedTimes )
296 {
297
298 this.flushEvents();
299 NullCheck.check(bufferedEndMessage, "Buffered End message");
300 super.notify( bufferedEndMessage );
301 bufferedEndMessage = null;
302 if ( log.isLoggable( Level.FINER ) )
303 {
304 log.finer("SharedBatchFinishedEvent recieved from the SharedWorldView for time " + time );
305 }
306 if ( !lockFinished )
307 {
308 log.fine("Setting current timeKey : " + time );
309 setCurrentTime( TimeKey.get(time) );
310 timeKeyIncreased = true;
311 List<Long> newLocks = new LinkedList<Long>();
312 for ( Long t : lockedTimes )
313 {
314 if ( t < time)
315 {
316 unlockTime(t);
317 }
318 else
319 {
320 newLocks.add(t);
321 }
322 }
323 lockedTimes = newLocks;
324
325
326 if ( latch == null )
327 {
328 latch = new CountDownLatch(1);
329 }
330 latch.countDown();
331
332 }
333 }
334 }
335
336 public boolean isLocked()
337 {
338 return lockFinished;
339 }
340
341
342
343
344 public void lock()
345 {
346 log.fine("Locking BatchAwareLocalWorldView");
347
348 synchronized ( objectMutex )
349 {
350 if (!isRunning()) throw new ComponentNotRunningException("Can't lock() world view is not running!", log, this);
351 if ( !lockFinished )
352 {
353 lockRequested = true;
354 }
355 else
356 {
357 return;
358 }
359 }
360
361 try {
362 latch.await();
363 } catch (InterruptedException e) {
364 throw new PogamutInterruptedException("Interrupted while waiting to acquire lock()!", e, this);
365 }
366
367 lockFinished = true;
368
369 log.fine("BatchAwareLocalWorldView locked.");
370 }
371
372
373
374
375 public void unlock()
376 {
377 synchronized( objectMutex )
378 {
379 if (!isRunning()) throw new ComponentNotRunningException("Can't unlock() world view is not running!", log, this);
380 LinkedList<Long> newLocks = new LinkedList<Long>();
381 for ( Long t : lockedTimes )
382 {
383 if ( t < currentTimeKey.getTime() )
384 {
385 this.unlockTime( t );
386 }
387 else
388 {
389 newLocks.add(t);
390 }
391 }
392 lockedTimes = newLocks;
393 lockFinished = false;
394 lockRequested = false;
395 latch = new CountDownLatch(1);
396 log.fine("BatchAwareLocalWorldView unlocked");
397 }
398 }
399
400 private boolean timeKeySet = false;
401
402 boolean endMessageCame = false;
403 boolean sharedFinished = false;
404
405 @Override
406 public synchronized void notify(IWorldChangeEvent event)
407 {
408 log.finest( "BatchAwareLocalWorldView notify : " + event);
409
410 if (!timeKeySet)
411 {
412 this.currentTimeKey = TimeKey.get( event.getSimTime() );
413 timeKeySet = true;
414 }
415
416
417 if (!( event instanceof ILocalWorldObjectUpdatedEvent))
418 {
419 if ( event instanceof ICompositeWorldObjectUpdatedEvent)
420 {
421 IWorldChangeEvent partEvent = ((ICompositeWorldObjectUpdatedEvent)event).getSharedEvent();
422 if (partEvent != null)
423 {
424 if ( log.isLoggable( Level.FINEST ))
425 {
426 log.finest("Notyfying sharedWV " + partEvent.toString() + ")");
427 }
428 sharedWorldView.notify(partEvent);
429 }
430 partEvent = ((ICompositeWorldObjectUpdatedEvent)event).getStaticEvent();
431 if ( partEvent != null)
432 {
433 if ( log.isLoggable( Level.FINEST )) log.finest("Notyfying sharedWV " + partEvent.toString() + ")");
434 sharedWorldView.notify(partEvent);
435 }
436 }
437
438 else if ( (event instanceof ISharedWorldObjectUpdatedEvent) || (event instanceof IStaticWorldObjectUpdatedEvent) )
439 {
440 if ( log.isLoggable( Level.FINEST )) log.finest("Notyfying sharedWV " + event.toString() + ")");
441 sharedWorldView.notify(event);
442 return;
443 }
444 }
445
446
447
448 if (objectMutex == null )
449 {
450 objectMutex = new Object();
451 }
452
453 synchronized(objectMutex)
454 {
455 if ( isBatchBeginEvent(event) )
456 {
457 if ( currentTimeKey == null )
458 {
459 log.info("Setting new currentTimeKey to : " + event.getSimTime());
460 currentTimeKey = TimeKey.get( event.getSimTime() );
461 }
462 lockTime( event.getSimTime());
463 this.lockedTimes.add( event.getSimTime() );
464 notifySharedBegin( event.getSimTime() );
465 super.notify(event);
466
467 }
468 else if ( isBatchEndEvent(event) )
469 {
470 if ( log.isLoggable(Level.FINER ))
471 {log.finer("Notifying sharedWorldView with EndEvent of time " + event.getSimTime() + " : " + event); };
472 sharedWorldView.notify(event);
473 bufferedEndMessage = event;
474 endMessageCame = true;
475 if ( endMessageCame && sharedFinished)
476 {
477 sharedBatchFinished(event.getSimTime());
478 endMessageCame = false;
479 sharedFinished = false;
480 }
481
482 }
483 else if ( event instanceof SharedBatchFinishedEvent )
484 {
485 sharedFinished = true;
486 if ( endMessageCame && sharedFinished)
487 {
488 sharedBatchFinished(event.getSimTime());
489 endMessageCame = false;
490 sharedFinished = false;
491 }
492 }
493 else
494 {
495 super.notify( event );
496 }
497 }
498 }
499
500 @Override
501 protected void stop() {
502 super.stop();
503 synchronized(objectMutex) {
504 while (latch != null && latch.getCount() > 0) latch.countDown();
505 while (lockedTimes != null && lockedTimes.size() > 0) {
506 long time = lockedTimes.get(0);
507 unlockTime(lockedTimes.get(0));
508 if (lockedTimes.get(0) == time) lockedTimes.remove(0);
509 }
510 }
511 }
512
513 @Override
514 protected void kill() {
515 super.kill();
516 synchronized(objectMutex) {
517 while (latch != null && latch.getCount() > 0) latch.countDown();
518 while (lockedTimes != null && lockedTimes.size() > 0) {
519 try {
520 long time = lockedTimes.get(0);
521 unlockTime(lockedTimes.get(0));
522 if (lockedTimes.get(0) == time) lockedTimes.remove(0);
523 } catch (Exception e) {
524 }
525 }
526 }
527 }
528
529 }