View Javadoc

1   package cz.cuni.amis.pogamut.base.component.bus.event;
2   
3   import java.util.concurrent.CountDownLatch;
4   import java.util.concurrent.TimeUnit;
5   
6   import cz.cuni.amis.pogamut.base.component.IComponent;
7   import cz.cuni.amis.pogamut.base.component.bus.IComponentBus;
8   import cz.cuni.amis.pogamut.base.component.bus.IComponentEventListener;
9   import cz.cuni.amis.utils.NullCheck;
10  import cz.cuni.amis.utils.exception.PogamutInterruptedException;
11  import cz.cuni.amis.utils.token.IToken;
12  
13  /**
14   * Extends CoundDownLatch with ability to stop waiting when some component connected
15   * to the bus fails which may indicate termination of all other components on the bus.
16   * Thus further waiting doesn't make sense.
17   * @author ik
18   */
19  public class BusAwareCountDownLatch extends CountDownLatch {
20  
21      public static class BusStoppedInterruptedException extends PogamutInterruptedException {
22          public BusStoppedInterruptedException(Object origin) {
23              super("Interrupted because bus was stopped (fatal error, or watched component stopped) while waiting on the latch.", origin);
24          }
25      }
26      
27      private static IToken[] getTokens(IComponent... components) {
28      	if (components == null) return null;
29      	IToken[] tokens = new IToken[components.length];
30      	int i = 0;
31      	for (IComponent component : components) {
32      		tokens[i++] = component.getComponentId();
33      	}
34      	return tokens;
35      }
36      
37      private final IComponentBus bus;
38      
39      private final IToken[] componentIds;
40      
41      /**
42       * Indication that bus (or one of dependent component) stopped before the latch was raised externaly.
43       */
44      private boolean stoppedEvent = false;
45      
46      private Object stopMutex = new Object();
47      
48      /**
49       * Wether the listeners has been removed from component bus."
50       */
51      private boolean removed = false;
52      
53      private IComponentEventListener componentListener = new IComponentEventListener() {
54          @Override
55          public void notify(Object event) {
56          	stopped();            
57          }
58      };
59      
60      //
61      //
62      // CONSTRUCTORS
63      //
64      //
65      
66      public BusAwareCountDownLatch(int count, IComponentBus bus) {
67          this(count, bus, (IToken[])null);
68      }
69      
70      public BusAwareCountDownLatch(int count, IComponentBus bus, IComponent... components) {
71          this(count, bus, getTokens(components));
72      }
73      
74      public BusAwareCountDownLatch(int count, IComponentBus bus, IToken... componentIds) {
75          super(count);
76          this.bus = bus;
77          NullCheck.check(bus, "bus");        
78          this.componentIds  = componentIds;   
79          NullCheck.check(componentIds, "componentIds");
80          if (!bus.isRunning()) {        	
81          	removed = true;
82          	stopped();
83          } else {        
84  	        if (count > 0) {
85  	        	removed = false;
86  	        	bus.addEventListener(IFatalErrorEvent.class, componentListener);
87  		        synchronized(this.componentIds) {
88  			        for (IToken componentId : componentIds) {
89  			        	bus.addEventListener(IStoppingEvent.class, componentId, componentListener); 
90  			        	bus.addEventListener(IStoppedEvent.class, componentId, componentListener);
91  			        }
92  		        }
93  	        }        
94  	        if (!bus.isRunning()) stopped();
95          }
96      }
97      
98      private void stopped() {
99      	if (stoppedEvent) return;
100     	synchronized(stopMutex) {
101     		if (stoppedEvent) return;
102     		stoppedEvent = true;
103     	}
104         removeListeners();
105         totalCountDown();
106     }
107    
108     private void totalCountDown() {
109     	while (getCount() > 0) countDown();
110     }
111     
112     private void removeListeners() {
113     	if (removed) return;
114     	synchronized(componentListener) {
115     		if (removed) return;
116     		removed = true;
117     	}
118     	
119     	bus.removeEventListener(IFatalErrorEvent.class, componentListener);
120 		if (componentIds != null) {
121            	synchronized(componentIds) {
122 	           	for (IToken token : componentIds) {
123 	           		bus.removeEventListener(IStoppingEvent.class, token, componentListener);
124 	           		bus.removeEventListener(IStoppedEvent.class, token, componentListener);
125 	           	}
126            	}
127 		}
128     }
129     
130     
131     
132     @Override
133     public void countDown() {
134     	super.countDown();
135     	if (getCount() <= 0) removeListeners();
136     }
137 
138     /**
139      * @throws cz.cuni.amis.pogamut.base.component.bus.event.BusAwareCountDownLatch.BusStoppedInterruptedException when the waiting was stopped because some component of the bus stopped
140      * @throws PogamutInterruptedException
141      */
142     @Override
143     public void await() throws BusStoppedInterruptedException, PogamutInterruptedException {
144         try {
145 			super.await();
146 		} catch (InterruptedException e) {
147 			throw new PogamutInterruptedException(e, this);
148 		}
149         checkBusStop();
150     }
151 
152     /**
153      * @param timeout
154      * @param unit
155      * @return
156      * @throws cz.cuni.amis.pogamut.base.component.bus.event.BusAwareCountDownLatch.BusStoppedInterruptedException when the waiting was stopped because some component of the bus stopped
157      * @throws PogamutInterruptedException
158      */
159     @Override
160     public boolean await(long timeout, TimeUnit unit) throws BusStoppedInterruptedException, PogamutInterruptedException {
161         boolean val;
162 		try {
163 			val = super.await(timeout, unit);
164 		} catch (InterruptedException e) {
165 			throw new PogamutInterruptedException(e, this);
166 		}
167         checkBusStop();
168         return val;
169     }
170 
171     protected void checkBusStop() throws BusStoppedInterruptedException {
172     	if (stoppedEvent) {
173             throw new BusStoppedInterruptedException(this);
174         }        
175     }
176     
177     public String toString() {
178     	return "BusAwareCountDownLatch";
179     }
180     
181 }