1 package cz.cuni.amis.pogamut.base.component.bus.event;
2
3 import java.util.concurrent.CountDownLatch;
4 import java.util.concurrent.TimeUnit;
5
6 import cz.cuni.amis.pogamut.base.component.IComponent;
7 import cz.cuni.amis.pogamut.base.component.bus.IComponentBus;
8 import cz.cuni.amis.pogamut.base.component.bus.IComponentEventListener;
9 import cz.cuni.amis.utils.NullCheck;
10 import cz.cuni.amis.utils.exception.PogamutInterruptedException;
11 import cz.cuni.amis.utils.token.IToken;
12
13
14
15
16
17
18
19 public class BusAwareCountDownLatch extends CountDownLatch {
20
21 public static class BusStoppedInterruptedException extends PogamutInterruptedException {
22 public BusStoppedInterruptedException(Object origin) {
23 super("Interrupted because bus was stopped (fatal error, or watched component stopped) while waiting on the latch.", origin);
24 }
25 }
26
27 private static IToken[] getTokens(IComponent... components) {
28 if (components == null) return null;
29 IToken[] tokens = new IToken[components.length];
30 int i = 0;
31 for (IComponent component : components) {
32 tokens[i++] = component.getComponentId();
33 }
34 return tokens;
35 }
36
37 private final IComponentBus bus;
38
39 private final IToken[] componentIds;
40
41
42
43
44 private boolean stoppedEvent = false;
45
46 private Object stopMutex = new Object();
47
48
49
50
51 private boolean removed = false;
52
53 private IComponentEventListener componentListener = new IComponentEventListener() {
54 @Override
55 public void notify(Object event) {
56 stopped();
57 }
58 };
59
60
61
62
63
64
65
66 public BusAwareCountDownLatch(int count, IComponentBus bus) {
67 this(count, bus, (IToken[])null);
68 }
69
70 public BusAwareCountDownLatch(int count, IComponentBus bus, IComponent... components) {
71 this(count, bus, getTokens(components));
72 }
73
74 public BusAwareCountDownLatch(int count, IComponentBus bus, IToken... componentIds) {
75 super(count);
76 this.bus = bus;
77 NullCheck.check(bus, "bus");
78 this.componentIds = componentIds;
79 NullCheck.check(componentIds, "componentIds");
80 if (!bus.isRunning()) {
81 removed = true;
82 stopped();
83 } else {
84 if (count > 0) {
85 removed = false;
86 bus.addEventListener(IFatalErrorEvent.class, componentListener);
87 synchronized(this.componentIds) {
88 for (IToken componentId : componentIds) {
89 bus.addEventListener(IStoppingEvent.class, componentId, componentListener);
90 bus.addEventListener(IStoppedEvent.class, componentId, componentListener);
91 }
92 }
93 }
94 if (!bus.isRunning()) stopped();
95 }
96 }
97
98 private void stopped() {
99 if (stoppedEvent) return;
100 synchronized(stopMutex) {
101 if (stoppedEvent) return;
102 stoppedEvent = true;
103 }
104 removeListeners();
105 totalCountDown();
106 }
107
108 private void totalCountDown() {
109 while (getCount() > 0) countDown();
110 }
111
112 private void removeListeners() {
113 if (removed) return;
114 synchronized(componentListener) {
115 if (removed) return;
116 removed = true;
117 }
118
119 bus.removeEventListener(IFatalErrorEvent.class, componentListener);
120 if (componentIds != null) {
121 synchronized(componentIds) {
122 for (IToken token : componentIds) {
123 bus.removeEventListener(IStoppingEvent.class, token, componentListener);
124 bus.removeEventListener(IStoppedEvent.class, token, componentListener);
125 }
126 }
127 }
128 }
129
130
131
132 @Override
133 public void countDown() {
134 super.countDown();
135 if (getCount() <= 0) removeListeners();
136 }
137
138
139
140
141
142 @Override
143 public void await() throws BusStoppedInterruptedException, PogamutInterruptedException {
144 try {
145 super.await();
146 } catch (InterruptedException e) {
147 throw new PogamutInterruptedException(e, this);
148 }
149 checkBusStop();
150 }
151
152
153
154
155
156
157
158
159 @Override
160 public boolean await(long timeout, TimeUnit unit) throws BusStoppedInterruptedException, PogamutInterruptedException {
161 boolean val;
162 try {
163 val = super.await(timeout, unit);
164 } catch (InterruptedException e) {
165 throw new PogamutInterruptedException(e, this);
166 }
167 checkBusStop();
168 return val;
169 }
170
171 protected void checkBusStop() throws BusStoppedInterruptedException {
172 if (stoppedEvent) {
173 throw new BusStoppedInterruptedException(this);
174 }
175 }
176
177 public String toString() {
178 return "BusAwareCountDownLatch";
179 }
180
181 }