1 package cz.cuni.amis.pogamut.base.component.bus;
2
3 import java.util.Collection;
4 import java.util.Collections;
5 import java.util.HashSet;
6 import java.util.Map;
7 import java.util.Queue;
8 import java.util.Set;
9 import java.util.concurrent.ConcurrentHashMap;
10 import java.util.concurrent.ConcurrentLinkedQueue;
11 import java.util.concurrent.locks.Lock;
12 import java.util.concurrent.locks.ReadWriteLock;
13 import java.util.concurrent.locks.ReentrantReadWriteLock;
14
15 import com.google.inject.Inject;
16
17 import cz.cuni.amis.pogamut.base.agent.IAgentId;
18 import cz.cuni.amis.pogamut.base.component.IComponent;
19 import cz.cuni.amis.pogamut.base.component.IComponentAware;
20 import cz.cuni.amis.pogamut.base.component.bus.event.IFatalErrorEvent;
21 import cz.cuni.amis.pogamut.base.component.bus.event.IResetEvent;
22 import cz.cuni.amis.pogamut.base.component.bus.event.impl.ComponentBusErrorEvent;
23 import cz.cuni.amis.pogamut.base.component.bus.event.impl.FatalErrorEvent;
24 import cz.cuni.amis.pogamut.base.component.bus.event.impl.FatalErrorPropagatingEvent;
25 import cz.cuni.amis.pogamut.base.component.bus.event.impl.ResetEvent;
26 import cz.cuni.amis.pogamut.base.component.bus.exception.ComponentBusErrorException;
27 import cz.cuni.amis.pogamut.base.component.bus.exception.ComponentBusNotRunningException;
28 import cz.cuni.amis.pogamut.base.component.bus.exception.ComponentIdClashException;
29 import cz.cuni.amis.pogamut.base.component.bus.exception.FatalErrorPropagatingEventException;
30 import cz.cuni.amis.pogamut.base.component.bus.exception.MoreComponentsForClassException;
31 import cz.cuni.amis.pogamut.base.component.bus.exception.ResetFailedException;
32 import cz.cuni.amis.pogamut.base.utils.guice.AgentScoped;
33 import cz.cuni.amis.pogamut.base.utils.logging.IAgentLogger;
34 import cz.cuni.amis.pogamut.base.utils.logging.LogCategory;
35 import cz.cuni.amis.utils.ClassUtils;
36 import cz.cuni.amis.utils.Const;
37 import cz.cuni.amis.utils.ExceptionToString;
38 import cz.cuni.amis.utils.NullCheck;
39 import cz.cuni.amis.utils.maps.LazyMap;
40 import cz.cuni.amis.utils.sets.ConcurrentHashSet;
41 import cz.cuni.amis.utils.sets.ConcurrentLinkedHashSet;
42 import cz.cuni.amis.utils.token.IToken;
43 import cz.cuni.amis.utils.token.Tokens;
44 import java.util.logging.Level;
45 import java.util.logging.Logger;
46
47
48
49
50
51
52
53
54 @AgentScoped
55 @SuppressWarnings("unchecked")
56 public class ComponentBus implements IComponentBus, IComponentAware {
57
58 public static final IToken COMPONENT_ID = Tokens.get("ComponentBus");
59
60 private Map<IToken, IComponent> componentsByToken = new ConcurrentHashMap<IToken, IComponent>();
61
62 private Map<Class, Set<IComponent>> componentsByClass = new LazyMap<Class, Set<IComponent>>(new ConcurrentHashMap<Class, Set<IComponent>>()) {
63
64 @Override
65 protected Set<IComponent> create(Class key) {
66 return new ConcurrentHashSet<IComponent>();
67 }
68
69 };
70
71 private Map<Class, Set<IComponentEventListener>> eventListeners = new LazyMap<Class, Set<IComponentEventListener>>(new ConcurrentHashMap<Class, Set<IComponentEventListener>>()) {
72
73 @Override
74 protected Set<IComponentEventListener> create(Class key) {
75 return new ConcurrentLinkedHashSet<IComponentEventListener>();
76 }
77
78 };
79
80 private Map<Class, Map<Class, Set<IComponentEventListener>>> componentEventListeners = new LazyMap<Class, Map<Class, Set<IComponentEventListener>>>(new ConcurrentHashMap<Class, Map<Class, Set<IComponentEventListener>>>()) {
81
82 @Override
83 protected Map<Class, Set<IComponentEventListener>> create(Class key) {
84 return new LazyMap<Class, Set<IComponentEventListener>>(new ConcurrentHashMap<Class, Set<IComponentEventListener>>()) {
85
86 @Override
87 protected Set<IComponentEventListener> create(Class key) {
88 return new ConcurrentLinkedHashSet<IComponentEventListener>();
89 }
90
91 };
92 }
93
94 };
95
96 private Map<IToken, Map<Class, Set<IComponentEventListener>>> componentNameEventListeners = new LazyMap<IToken, Map<Class, Set<IComponentEventListener>>>() {
97
98 @Override
99 protected Map<Class, Set<IComponentEventListener>> create(IToken key) {
100 return new LazyMap<Class, Set<IComponentEventListener>>(new ConcurrentHashMap<Class, Set<IComponentEventListener>>()) {
101
102 @Override
103 protected Set<IComponentEventListener> create(Class key) {
104 return new ConcurrentLinkedHashSet<IComponentEventListener>();
105 }
106
107 };
108 }
109
110 };
111
112
113
114
115 private boolean running = true;
116
117
118
119
120
121
122 private Queue<IComponentEvent> queue = new ConcurrentLinkedQueue();
123
124
125
126
127
128
129 private boolean queueProcessing = false;
130
131
132
133
134
135
136 private boolean fatalErrorProcessing = false;
137
138
139
140
141 private Object queueProcessingMutex = new Object();
142
143 private LogCategory log;
144
145 private IAgentId agentId;
146
147 @Inject
148 public ComponentBus(IAgentLogger logger) {
149 NullCheck.check(logger, "logger");
150 this.agentId = logger.getAgentId();
151 this.log = logger.getCategory(this);
152 NullCheck.check(this.log, "log category returned by the logger");
153 }
154
155 @Override
156 public String toString() {
157 return "ComponentBus[" + agentId.getToken() + ", running=" + running + ", queue length=" + (this.queue == null ? "null" : this.queue.size()) + "]";
158 }
159
160 @Override
161 public IComponentBus getEventBus() {
162 return this;
163 }
164
165 @Override
166 public IToken getComponentId() {
167 return COMPONENT_ID;
168 }
169
170 public Logger getLog() {
171 return log;
172 }
173
174 @Override
175 public boolean isRunning() {
176 return running;
177 }
178
179 @Override
180 public synchronized void reset() throws ResetFailedException {
181 if (log.isLoggable(Level.WARNING)) log.warning("reset() called.");
182 try {
183 if (running) {
184 if (log.isLoggable(Level.WARNING)) log.warning(ComponentBus.COMPONENT_ID.getToken() + " is still running, broadcasting fatal error to stop all components.");
185 event(new FatalErrorEvent<IComponent>(this, "Resetting."));
186 }
187 if (log.isLoggable(Level.WARNING)) log.warning("Broadcasting reset event.");
188 resetBus();
189 innerRaiseEvent(new ResetEvent(this));
190 } catch (Exception e) {
191 if (e instanceof ComponentBusErrorException) {
192 innerRaiseEvent(new FatalErrorEvent(this, "Reset failed.", e.getCause()));
193 throw new ResetFailedException(e.getCause(), log, this);
194 } else {
195 innerRaiseEvent(new FatalErrorEvent(this, "Reset failed.", e));
196 throw new ResetFailedException(e, log, this);
197 }
198 }
199 if (log.isLoggable(Level.WARNING)) log.warning("Reseted, bus is running again.");
200 }
201
202 private void resetBus() {
203 running = true;
204 queue.clear();
205 queueProcessing = false;
206 fatalErrorProcessing = false;
207 }
208
209
210
211
212
213
214
215 @Override
216 public <T> T getComponent(Class<T> cls) throws MoreComponentsForClassException {
217 Set<T> components = (Set<T>) componentsByClass.get(cls);
218 if (components.size() > 0) throw new MoreComponentsForClassException(cls, components, this);
219 return components.iterator().next();
220 }
221
222 @Override
223 public <T> Set<T> getComponents(Class<T> cls) {
224 return (Set<T>) Collections.unmodifiableSet(componentsByClass.get(cls));
225 }
226
227 @Override
228 public void register(IComponent component) throws ComponentIdClashException {
229 synchronized(componentsByToken) {
230 NullCheck.check(component.getComponentId(), "component's id is null ("+ component + ")");
231 if (componentsByToken.get(component.getComponentId()) != null) {
232 if (componentsByToken.get(component.getComponentId()) == component) {
233 return;
234 } else {
235 ComponentIdClashException e = new ComponentIdClashException(component.getComponentId(), log, this);
236 try {
237 event(new FatalErrorEvent(this, e));
238 } catch (Exception e1) {
239 }
240 throw e;
241 }
242 }
243 registerComponent(component);
244 }
245 }
246
247 @Override
248 public void remove(IComponent component) {
249 synchronized(componentsByToken) {
250 componentsByToken.remove(component.getComponentId());
251 Collection<Class> componentClasses = ClassUtils.getSubclasses(component.getClass());
252 for (Class cls : componentClasses) {
253 componentsByClass.get(cls).remove(component);
254 }
255 if (log.isLoggable(Level.INFO)) log.info(component + " of the id " + component.getComponentId().getToken() + " removed from the bus.");
256 }
257 }
258
259
260
261
262
263
264
265
266
267
268
269 private void registerComponent(IComponent component) {
270 componentsByToken.put(component.getComponentId(), component);
271 Collection<Class> componentClasses = ClassUtils.getSubclasses(component.getClass());
272 for (Class cls : componentClasses) {
273 componentsByClass.get(cls).add(component);
274 }
275 if (log.isLoggable(Level.INFO)) log.info(component + " registered under id " + component.getComponentId().getToken());
276 }
277
278 @Override
279 public IComponent getComponent(IToken name) {
280 return componentsByToken.get(name);
281 }
282
283
284
285
286
287
288
289 @Override
290 public void addEventListener(Class<?> event, IComponentEventListener<?> listener) {
291 NullCheck.check(event, "event");
292 NullCheck.check(listener, "listener");
293 eventListeners.get(event).add(listener);
294 }
295
296 @Override
297 public void addEventListener(Class<?> event, Class<?> component, IComponentEventListener<?> listener) {
298 NullCheck.check(event, "event");
299 NullCheck.check(component, "comopnent");
300 NullCheck.check(listener, "listener");
301 componentEventListeners.get(component).get(event).add(listener);
302 }
303
304 @Override
305 public void addEventListener(Class<?> event, IToken componentName, IComponentEventListener<?> listener) {
306 NullCheck.check(event, "event");
307 NullCheck.check(componentName, "componentName");
308 NullCheck.check(listener, "listener");
309 componentNameEventListeners.get(componentName).get(event).add(listener);
310 }
311
312 @Override
313 public void addEventListener(Class<?> event, IComponent component, IComponentEventListener<?> listener) {
314 NullCheck.check(component, "component");
315 addEventListener(event, component.getComponentId(), listener);
316 }
317
318 @Override
319 public boolean isListening(Class<?> event, IComponentEventListener<?> listener) {
320 NullCheck.check(event, "event");
321 NullCheck.check(listener, "listener");
322 if (!eventListeners.containsKey(event)) return false;
323 return eventListeners.get(event).contains(listener);
324 }
325
326 @Override
327 public boolean isListening(Class<?> event, Class<?> component, IComponentEventListener<?> listener) {
328 NullCheck.check(event, "event");
329 NullCheck.check(component, "component");
330 NullCheck.check(listener, "listener");
331 if (!componentEventListeners.containsKey(component)) return false;
332 Map<Class, Set<IComponentEventListener>> listeners = componentEventListeners.get(component);
333 if (!listeners.containsKey(event)) return false;
334 return listeners.get(event).contains(listener);
335 }
336
337 @Override
338 public boolean isListening(Class<?> event, IToken componentId, IComponentEventListener<?> listener) {
339 NullCheck.check(event, "event");
340 NullCheck.check(componentId, "componentId");
341 NullCheck.check(listener, "listener");
342 if (!componentNameEventListeners.containsKey(componentId)) return false;
343 Map<Class, Set<IComponentEventListener>> listeners = componentNameEventListeners.get(componentId);
344 if (!listeners.containsKey(event)) return false;
345 return listeners.get(event).contains(listener);
346 }
347
348 @Override
349 public boolean isListening(Class<?> event, IComponent component, IComponentEventListener<?> listener) {
350 NullCheck.check(component, "component");
351 return isListening(event, component.getComponentId(), listener);
352 }
353
354 @Override
355 public void removeEventListener(Class<?> event, IComponentEventListener<?> listener) {
356 NullCheck.check(event, "event");
357 NullCheck.check(listener, "listener");
358 if (!eventListeners.containsKey(event)) return;
359 eventListeners.get(event).remove(listener);
360 }
361
362 @Override
363 public void removeEventListener(Class<?> event, Class<?> component, IComponentEventListener<?> listener) {
364 NullCheck.check(event, "event");
365 NullCheck.check(component, "component");
366 NullCheck.check(listener, "listener");
367 if (!componentEventListeners.containsKey(component)) return;
368 Map<Class, Set<IComponentEventListener>> listeners = componentEventListeners.get(component);
369 if (!listeners.containsKey(event)) return;
370 listeners.get(event).remove(listener);
371 }
372
373 @Override
374 public void removeEventListener(Class<?> event, IToken componentId, IComponentEventListener<?> listener) {
375 NullCheck.check(event, "event");
376 NullCheck.check(componentId, "componentId");
377 NullCheck.check(listener, "listener");
378 if (!componentNameEventListeners.containsKey(componentId)) return;
379 Map<Class, Set<IComponentEventListener>> listeners = componentNameEventListeners.get(componentId);
380 if (!listeners.containsKey(event)) return;
381 listeners.get(event).remove(listener);
382 }
383
384 @Override
385 public void removeEventListener(Class<?> event, IComponent component, IComponentEventListener<?> listener) {
386 NullCheck.check(component, "component");
387 removeEventListener(event, component.getComponentId(), listener);
388 }
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403 private void notifyListenersA(IComponentEvent event) {
404 Collection<Class> eventClasses = ClassUtils.getSubclasses(event.getClass());
405 for (Class eventClass : eventClasses) {
406 if (!eventListeners.containsKey(eventClass)) continue;
407 for (IComponentEventListener listener : eventListeners.get(eventClass)) {
408 if (!isRunning()) return;
409 listener.notify(event);
410 }
411 }
412 }
413
414
415
416
417
418
419
420
421 private void notifyListenersB(IComponentEvent event) {
422 Collection<Class> componentClasses = ClassUtils.getSubclasses(event.getSource().getClass());
423 Collection<Class> eventClasses = ClassUtils.getSubclasses(event.getClass());
424 for (Class componentClass : componentClasses) {
425 if (!componentEventListeners.containsKey(componentClass)) continue;
426 Map<Class, Set<IComponentEventListener>> listeners = componentEventListeners.get(componentClass);
427 for (Class eventClass : eventClasses) {
428 if (!listeners.containsKey(eventClass)) continue;
429 for (IComponentEventListener listener : listeners.get(eventClass)) {
430 if (!isRunning()) return;
431 listener.notify(event);
432 }
433 }
434 }
435 }
436
437
438
439
440
441
442
443
444 private void notifyListenersC(IComponentEvent event) {
445 if (!componentNameEventListeners.containsKey(event.getSource().getComponentId())) return;
446 Map<Class, Set<IComponentEventListener>> listeners = componentNameEventListeners.get(event.getSource().getComponentId());
447 Collection<Class> eventClasses = ClassUtils.getSubclasses(event.getClass());
448 for (Class eventClass : eventClasses) {
449 if (!listeners.containsKey(eventClass)) continue;
450 for (IComponentEventListener listener : listeners.get(eventClass)) {
451 if (!isRunning()) return;
452 listener.notify(event);
453 }
454 }
455 }
456
457
458
459
460
461
462
463
464
465
466 private void notifyListenersA_Safe(IComponentEvent event) {
467 Collection<Class> eventClasses = ClassUtils.getSubclasses(event.getClass());
468 for (Class eventClass : eventClasses) {
469 if (!eventListeners.containsKey(eventClass)) continue;
470 for (IComponentEventListener listener : eventListeners.get(eventClass)) {
471 try {
472 listener.notify(event);
473 } catch (Exception e) {
474 if (log.isLoggable(Level.WARNING)) log.warning(ExceptionToString.process("Exception happened during notification of event " + event + " on listener " + listener + ".", e));
475 }
476 }
477 }
478 }
479
480
481
482
483
484
485
486
487
488
489 private void notifyListenersB_Safe(IComponentEvent event) {
490 Collection<Class> componentClasses = ClassUtils.getSubclasses(event.getSource().getClass());
491 Collection<Class> eventClasses = ClassUtils.getSubclasses(event.getClass());
492 for (Class componentClass : componentClasses) {
493 if (!componentEventListeners.containsKey(componentClass)) continue;
494 Map<Class, Set<IComponentEventListener>> listeners = componentEventListeners.get(componentClass);
495 for (Class eventClass : eventClasses) {
496 if (!listeners.containsKey(eventClass)) continue;
497 for (IComponentEventListener listener : listeners.get(eventClass)) {
498 try {
499 listener.notify(event);
500 } catch (Exception e) {
501 if (log.isLoggable(Level.WARNING)) log.warning(ExceptionToString.process("Exception happened during notification of event " + event + " on listener " + listener + ".", e));
502 }
503 }
504 }
505 }
506 }
507
508
509
510
511
512
513
514
515
516
517 private void notifyListenersC_Safe(IComponentEvent event) {
518 if (!componentNameEventListeners.containsKey(event.getSource().getComponentId())) return;
519 Map<Class, Set<IComponentEventListener>> listeners = componentNameEventListeners.get(event.getSource().getComponentId());
520 Collection<Class> eventClasses = ClassUtils.getSubclasses(event.getClass());
521 for (Class eventClass : eventClasses) {
522 if (!listeners.containsKey(eventClass)) continue;
523 for (IComponentEventListener listener : listeners.get(eventClass)) {
524 try {
525 listener.notify(event);
526 } catch (Exception e) {
527 if (log.isLoggable(Level.WARNING)) log.warning(ExceptionToString.process("Exception happened during notification of event " + event + " on listener " + listener + ".", e));
528 }
529 }
530 }
531 }
532
533
534
535
536
537
538
539
540 private void innerRaiseEvent(IComponentEvent event) {
541 if (event instanceof IFatalErrorEvent) {
542 if (log.isLoggable(Level.SEVERE)) log.severe("Fatal error happenned - component bus is stopping." + Const.NEW_LINE + ((IFatalErrorEvent)event).getSummary());
543 queue.clear();
544 running = false;
545 notifyListenersA_Safe(event);
546 notifyListenersB_Safe(event);
547 notifyListenersC_Safe(event);
548 return;
549 } else {
550 if (log.isLoggable(Level.FINE)) log.fine("Notifying " + event);
551 }
552 if (!isRunning()) return;
553 notifyListenersA(event);
554 if (!isRunning()) return;
555 notifyListenersB(event);
556 if (!isRunning()) return;
557 notifyListenersC(event);
558 }
559
560
561
562
563
564
565
566
567
568
569
570 private void innerRaiseEvent_Safe(IComponentEvent event) {
571 if (event instanceof IFatalErrorEvent) {
572 if (log.isLoggable(Level.SEVERE)) log.severe("Fatal error happenned - component bus is stopping." + Const.NEW_LINE + ((IFatalErrorEvent)event).getSummary());
573 queue.clear();
574 running = false;
575 } else {
576 if (log.isLoggable(Level.FINE)) log.fine("Notifying (safe) " + event);
577 }
578 notifyListenersA_Safe(event);
579 notifyListenersB_Safe(event);
580 notifyListenersC_Safe(event);
581 }
582
583 @Override
584 public synchronized boolean event(IComponentEvent event) throws ComponentBusNotRunningException, ComponentBusErrorException, FatalErrorPropagatingEventException {
585
586
587 NullCheck.check(event, "event");
588 if (event instanceof IResetEvent) throw new IllegalArgumentException("you can't broadcast reset event this way, use reset() instead");
589
590 if (fatalErrorProcessing) {
591 if (log.isLoggable(Level.WARNING)) log.warning("FatalErrorEvent is being processed, cannot propagate " + event + ".");
592 return false;
593 }
594
595
596
597 if (event instanceof IFatalErrorEvent) {
598
599 fatalErrorProcessing = true;
600 innerRaiseEvent_Safe(event);
601 return false;
602 }
603
604
605 if (!isRunning()) {
606 throw new ComponentBusNotRunningException(event, log, this);
607 }
608
609
610 if (queueProcessing) {
611
612
613
614 queue.add(event);
615 return false;
616 }
617
618
619 if (queue.size() > 0) {
620 ComponentBusErrorException e = new ComponentBusErrorException("Previous events has not been fully processed! ComponenBus fatal error.", event, this);
621 innerRaiseEvent_Safe(
622 new ComponentBusErrorEvent(this, e)
623 );
624 throw e;
625 }
626
627
628 queue.add(event);
629
630 processQueue();
631
632
633 return true;
634 }
635
636 @Override
637 public synchronized void eventTransactional(IComponentEvent event) throws ComponentBusNotRunningException, ComponentBusErrorException, FatalErrorPropagatingEventException {
638
639
640 NullCheck.check(event, "event");
641 if (event instanceof IResetEvent) throw new IllegalArgumentException("you can't broadcast reset event this way, use reset() instead");
642
643 if (fatalErrorProcessing) {
644 if (log.isLoggable(Level.WARNING)) log.warning("FatalErrorEvent is being processed, cannot propagate " + event + ".");
645 return;
646 }
647
648
649
650 if (event instanceof IFatalErrorEvent) {
651
652 fatalErrorProcessing = true;
653 innerRaiseEvent_Safe(event);
654 return;
655 }
656
657 if (!queueProcessing) {
658
659 event(event);
660 return;
661 }
662
663
664 innerRaiseEvent(event);
665 }
666
667
668
669
670 private void processQueue() throws FatalErrorPropagatingEventException, ComponentBusErrorException {
671
672 boolean dropQueueProcessing = !queueProcessing;
673 queueProcessing = true;
674 IComponentEvent event = null;
675 while(queue.size() != 0) {
676
677 try {
678 event = queue.poll();
679 } catch (Exception e) {
680 ComponentBusErrorException e1 = new ComponentBusErrorException("Can't poll next event.", e, this);
681 innerRaiseEvent_Safe(
682 new ComponentBusErrorEvent(this, e)
683 );
684 throw e1;
685 }
686 try {
687 innerRaiseEvent(event);
688 } catch (FatalErrorPropagatingEventException e1) {
689 throw e1;
690 } catch (ComponentBusErrorException e2) {
691 throw e2;
692 } catch (Exception e3) {
693 innerRaiseEvent_Safe(
694 new FatalErrorPropagatingEvent<ComponentBus>(this, "Exception happened during the event propagation.", e3, event)
695 );
696 queueProcessing = false;
697 throw new FatalErrorPropagatingEventException(event, e3, this);
698 }
699 }
700 if (!isRunning()) {
701
702 if (log.isLoggable(Level.SEVERE)) log.severe("Stopped.");
703 if (event != null && (!(event instanceof IFatalErrorEvent))) {
704
705 throw new FatalErrorPropagatingEventException(event, this);
706 }
707 }
708
709 if (dropQueueProcessing) {
710
711 queueProcessing = false;
712 }
713 }
714
715 }