1 package cz.cuni.amis.pogamut.multi.communication.worldview.impl;
2
3 import java.util.Collection;
4 import java.util.Collections;
5 import java.util.Comparator;
6 import java.util.concurrent.PriorityBlockingQueue;
7 import java.util.logging.Level;
8 import java.util.logging.Logger;
9
10 import cz.cuni.amis.pogamut.base.communication.translator.event.IWorldChangeEvent;
11 import cz.cuni.amis.pogamut.base.communication.translator.event.IWorldEventWrapper;
12 import cz.cuni.amis.pogamut.base.communication.translator.event.IWorldObjectUpdateResult;
13 import cz.cuni.amis.pogamut.base.communication.translator.event.IWorldObjectUpdatedEvent;
14 import cz.cuni.amis.pogamut.base.communication.worldview.event.IWorldEvent;
15 import cz.cuni.amis.pogamut.base.communication.worldview.impl.EventDrivenWorldView;
16 import cz.cuni.amis.pogamut.base.communication.worldview.object.IWorldObject;
17 import cz.cuni.amis.pogamut.base.communication.worldview.object.WorldObjectId;
18 import cz.cuni.amis.pogamut.base.communication.worldview.object.event.WorldObjectDestroyedEvent;
19 import cz.cuni.amis.pogamut.base.communication.worldview.object.event.WorldObjectFirstEncounteredEvent;
20 import cz.cuni.amis.pogamut.base.component.bus.exception.ComponentNotRunningException;
21 import cz.cuni.amis.pogamut.base.component.bus.exception.ComponentPausedException;
22 import cz.cuni.amis.pogamut.base.component.controller.ComponentController;
23 import cz.cuni.amis.pogamut.base.utils.guice.AgentTeamScoped;
24 import cz.cuni.amis.pogamut.multi.agent.ITeamId;
25 import cz.cuni.amis.pogamut.multi.agent.impl.TeamedAgentId;
26 import cz.cuni.amis.pogamut.multi.communication.translator.event.ISharedPropertyUpdateResult;
27 import cz.cuni.amis.pogamut.multi.communication.translator.event.ISharedPropertyUpdatedEvent;
28 import cz.cuni.amis.pogamut.multi.communication.translator.event.ISharedWorldObjectUpdatedEvent;
29 import cz.cuni.amis.pogamut.multi.communication.translator.event.IStaticWorldObjectUpdatedEvent;
30 import cz.cuni.amis.pogamut.multi.communication.worldview.ILocalWorldView;
31 import cz.cuni.amis.pogamut.multi.communication.worldview.object.ISharedProperty;
32 import cz.cuni.amis.pogamut.multi.communication.worldview.object.ISharedWorldObject;
33 import cz.cuni.amis.pogamut.multi.communication.worldview.object.IStaticWorldObject;
34 import cz.cuni.amis.pogamut.multi.communication.worldview.object.event.DummyObjectEvent.EventType;
35 import cz.cuni.amis.pogamut.multi.utils.timekey.TimeKey;
36 import cz.cuni.amis.utils.NullCheck;
37 import cz.cuni.amis.utils.exception.PogamutException;
38
39
40
41
42
43
44
45
46 @AgentTeamScoped
47 public abstract class EventDrivenSharedWorldView extends AbstractSharedWorldView {
48
49 public EventDrivenSharedWorldView(Logger logger) {
50 super(logger);
51 }
52
53 private ISharedProperty copyProperty(ISharedProperty original)
54 {
55 return original.clone();
56 }
57
58 public static final String WORLDVIEW_DEPENDENCY = "EventDrivenSharedWorldViewDependency";
59
60
61
62
63
64
65 protected boolean receiveEventProcessing = false;
66
67
68
69
70
71
72 protected PriorityBlockingQueue<IWorldChangeEvent> notifyEventsList =
73 new PriorityBlockingQueue<IWorldChangeEvent>(
74 64,
75 new Comparator<IWorldChangeEvent>() {
76 @Override
77 public int compare(IWorldChangeEvent arg0, IWorldChangeEvent arg1) {
78 return (int) Math.signum( arg0.getSimTime() - arg1.getSimTime() );
79 }
80 }
81 );
82
83 protected Collection<IWorldChangeEvent> syncEventList = Collections.synchronizedCollection( notifyEventsList );
84
85 @Override
86 public void notify(IWorldChangeEvent event) throws ComponentNotRunningException, ComponentPausedException {
87 log.finest("SharedWorldView notify : [" + event.getSimTime() + " ; " + event);
88 if (isPaused()) {
89 throw new ComponentPausedException(controller.getState().getFlag(), this);
90 }
91 if (!isRunning()) {
92 throw new ComponentNotRunningException(controller.getState().getFlag(), this);
93 }
94
95 synchronized(syncEventList) {
96
97 syncEventList.add(event);
98
99 if (receiveEventProcessing) {
100
101
102
103 log.finest("Added event; events :" + notifyEventsList.size());
104 return;
105 } else {
106
107 receiveEventProcessing = true;
108 }
109 }
110
111
112
113
114 while (true) {
115 IWorldChangeEvent ev = null;
116 synchronized(syncEventList) {
117 if (notifyEventsList.size() == 0) {
118
119 receiveEventProcessing = false;
120 return;
121 }
122 ev = notifyEventsList.poll();
123 }
124 if (ev != null) {
125 boolean exception = false;
126 try {
127 innerNotify(ev);
128 } catch (PogamutException e1) {
129 exception = true;
130 throw e1;
131 } catch (Exception e2) {
132 exception = true;
133 throw new PogamutException("Failed to process: " + ev, e2, this);
134 } finally {
135 if (exception) {
136
137 receiveEventProcessing = false;
138
139 }
140 }
141 }
142 }
143 }
144
145
146
147
148
149
150
151
152
153
154
155
156 protected void innerNotify(IWorldChangeEvent event) {
157 NullCheck.check(event, "event");
158 if (log.isLoggable(Level.FINEST)) log.finest("SharedWorldView processing " + event);
159
160
161 if (event instanceof ISharedWorldObjectUpdatedEvent) {
162 sharedObjectUpdatedEvent((ISharedWorldObjectUpdatedEvent)event);
163 }
164 else
165 if ( event instanceof ISharedPropertyUpdatedEvent) {
166 propertyUpdatedEvent((ISharedPropertyUpdatedEvent)event);
167 }
168 else
169 if ( event instanceof IStaticWorldObjectUpdatedEvent ) {
170 staticObjectUpdatedEvent((IStaticWorldObjectUpdatedEvent)event);
171 }
172 else
173 if (event instanceof IWorldEventWrapper) {
174 raiseEvent(((IWorldEventWrapper) event).getWorldEvent());
175 } else
176 if (event instanceof IWorldEvent) {
177 raiseEvent((IWorldEvent)event);
178 } else {
179 throw new PogamutException("Unsupported event type received (" + event.getClass() + "): " + event, this);
180 }
181 }
182
183
184
185
186 @Override
187 protected void raiseEvent(IWorldEvent event) {
188 try {
189 super.raiseEvent(event);
190 } catch (Exception e) {
191 this.controller.fatalError("Exception raising event " + event, e);
192 this.kill();
193 }
194 }
195
196 Object objectMutex = new Object();
197
198 public void addMsgClass(WorldObjectId id, Class msgClass)
199 {
200 synchronized(objectMutex)
201 {
202 this.idClassMap.put(id, msgClass);
203 }
204 }
205
206
207
208
209
210
211
212
213
214 protected void sharedObjectUpdatedEvent(ISharedWorldObjectUpdatedEvent updateEvent) {
215
216 boolean created = false;
217 boolean updated = false;
218 boolean destroyed = false;
219
220 if ( !syncIdClassMap.containsKey(updateEvent.getId() ))
221 {
222 NullCheck.check(updateEvent.getCompositeObjectClass(), "CompositeClass");
223 syncIdClassMap.put(updateEvent.getId(), updateEvent.getCompositeObjectClass());
224 }
225
226 for ( ISharedPropertyUpdatedEvent propertyEvent : updateEvent.getPropertyEvents() )
227 {
228 ISharedProperty property = currentSharedProperties.get(updateEvent.getTeamId(), updateEvent.getId(), propertyEvent.getPropertyId());
229 ISharedProperty copy = null;
230
231 if (property != null)
232 {
233 copy = copyProperty(property);
234 };
235
236 ISharedPropertyUpdateResult updateResult = propertyEvent.update(copy);
237
238 switch (updateResult.getResult())
239 {
240 case CREATED:
241 created = true;
242 propertyCreated(updateResult.getProperty(), updateEvent.getTeamId());
243 break;
244 case UPDATED:
245 if ( updateResult.getProperty() != copy)
246 {
247 throw new PogamutException("Update event " + updateEvent + " did not return the same instance of the object (result UPDATED).", this);
248 }
249
250 updated = true;
251 addOldSharedProperty( property, updateEvent.getTeamId(), propertyEvent.getSimTime());
252
253 propertyUpdated(copy, updateEvent.getTeamId());
254 break;
255 case DESTROYED:
256
257 addOldSharedProperty(property, updateEvent.getTeamId(), propertyEvent.getSimTime());
258
259 removeSharedProperty(property, updateEvent.getTeamId());
260 case SAME:
261 break;
262 default:
263 throw new PogamutException("Unhandled object update result " + updateResult.getResult() + " for the object " + updateEvent.getId() + "." + "Property : " + property, this);
264 }
265 }
266
267 if ( created )
268 {
269 objectCreated( getShared(updateEvent.getTeamId(), updateEvent.getId(), TimeKey.get(updateEvent.getSimTime()) ), updateEvent.getSimTime());
270 objectUpdated(updateEvent.getTeamId(), updateEvent.getId(), updateEvent.getSimTime());
271 }
272 else if ( updated )
273 {
274 objectUpdated(updateEvent.getTeamId(), updateEvent.getId(), updateEvent.getSimTime());
275 }
276 else if ( destroyed )
277 {
278
279 objectDestroyed( getShared(updateEvent.getTeamId(), updateEvent.getId(), TimeKey.get(updateEvent.getSimTime())), updateEvent.getSimTime());
280 }
281 }
282
283
284
285
286
287
288 protected void propertyUpdatedEvent( ISharedPropertyUpdatedEvent event)
289 {
290 ISharedProperty property = null;
291 ISharedProperty copy = null;
292
293 property = getSharedProperty(event.getPropertyId(), event.getTeamId(), TimeKey.get( event.getSimTime() ));
294
295 if ( property != null)
296 {
297 copy = property.clone();
298 }
299 ISharedPropertyUpdateResult result = event.update(copy);
300 switch ( result.getResult() )
301 {
302 case CREATED:
303 propertyCreated( result.getProperty(), event.getTeamId() );
304 break;
305 case UPDATED:
306 addOldSharedProperty(property, event.getTeamId(), event.getSimTime());
307 propertyUpdated(copy, event.getTeamId() );
308 break;
309 case DESTROYED:
310 addOldSharedProperty(property, event.getTeamId(), event.getSimTime() );
311 removeSharedProperty(property, event.getTeamId() );
312 break;
313 case SAME:
314 break;
315 default:
316 throw new PogamutException("Unexpected update result " + result.getResult() + " for property " + property.toString() + " .", this);
317 }
318 }
319
320
321
322
323
324
325
326
327
328 protected void staticObjectUpdatedEvent( IStaticWorldObjectUpdatedEvent event)
329 {
330 IStaticWorldObject current = super.getStatic( event.getId());
331 IWorldObjectUpdateResult<IStaticWorldObject> result = event.update(current);
332 switch ( result.getResult() )
333 {
334 case CREATED:
335 super.addStaticWorldObject( result.getObject() );
336 break;
337 case DESTROYED:
338 super.removeStaticWorldObject( result.getObject() );
339 break;
340 case SAME:
341 return;
342 default:
343 throw new PogamutException("Wrong static object update result " + result.getResult() + " for the object " + result.getObject().toString() + " . ", this);
344 }
345 }
346
347
348
349
350
351
352
353
354
355 protected void propertyCreated(ISharedProperty property, ITeamId team)
356 {
357 if (team == null)
358 {
359 addSharedProperty(property);
360 return;
361 }
362 addSharedProperty(property, team );
363
364 }
365
366
367
368
369
370
371
372
373
374
375 protected void propertyUpdated(ISharedProperty property, ITeamId team)
376 {
377 currentSharedProperties.put(team,property.getObjectId(),property.getPropertyId(),property);
378
379 }
380
381
382
383
384
385
386
387
388
389
390 protected void objectCreated(ISharedWorldObject obj, long time) {
391
392
393 }
394
395
396
397
398
399
400
401
402
403
404
405 protected void objectUpdated(ITeamId teamId, WorldObjectId objectId, long time)
406 {
407
408 for ( TeamedAgentId agentId : this.localWorldViews.keySet() )
409 {
410 if ( agentId.getTeamId().equals(teamId))
411 {
412 ILocalWorldView wv = localWorldViews.get(agentId);
413 if ( wv instanceof BatchAwareLocalWorldView)
414 {
415
416 ((BatchAwareLocalWorldView)wv).bufferObjectEvent(objectId, EventType.UPDATED, time);
417 }
418 }
419 }
420 }
421
422
423
424
425
426
427
428
429
430
431 protected void objectDestroyed(IWorldObject obj, long time) {
432
433 }
434
435
436 }