View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activecluster.impl;
19  
20  import org.apache.commons.logging.Log;
21  import org.apache.commons.logging.LogFactory;
22  import org.codehaus.activecluster.Cluster;
23  import org.codehaus.activecluster.ClusterEvent;
24  import org.codehaus.activecluster.ClusterListener;
25  import org.codehaus.activecluster.Node;
26  import org.codehaus.activecluster.election.ElectionStrategy;
27  import org.codehaus.activecluster.election.impl.BullyElectionStrategy;
28  
29  import javax.jms.Destination;
30  import javax.jms.JMSException;
31  import java.util.ArrayList;
32  import java.util.Collections;
33  import java.util.HashMap;
34  import java.util.Iterator;
35  import java.util.List;
36  import java.util.Map;
37  import java.util.Map.Entry;
38  import java.util.Timer;
39  import java.util.TimerTask;
40  
41  
42  /***
43   * Represents a node list
44   *
45   * @version $Revision: 1.10 $
46   */
47  public class StateServiceImpl implements StateService {
48  
49      private final static Log log = LogFactory.getLog(StateServiceImpl.class);
50      private Cluster cluster;
51      private Object clusterLock;
52      private Map nodes = new HashMap();
53      private long inactiveTime;
54      private List listeners = Collections.synchronizedList(new ArrayList());
55      private Destination localDestination;
56      private Runnable localNodePing;
57      private Timer timer;
58      private NodeImpl coordinator;
59      private ElectionStrategy electionStrategy;
60  
61      public StateServiceImpl(Cluster cluster, Object clusterLock, Runnable localNodePing, Timer timer, long inactiveTime) {
62          this.cluster = cluster;
63          this.clusterLock = clusterLock;
64          this.localDestination = cluster.getLocalNode().getDestination();
65          this.localNodePing = localNodePing;
66          this.timer = timer;
67          this.inactiveTime = inactiveTime;
68          long delay = inactiveTime / 3;
69          timer.scheduleAtFixedRate(createTimerTask(), delay, delay);
70          (this.coordinator = (NodeImpl)cluster.getLocalNode()).setCoordinator(true);
71          this.electionStrategy = new BullyElectionStrategy();
72      }
73      
74      /***
75       * @return the current election strategy
76       */
77      public ElectionStrategy getElectionStrategy(){
78          return electionStrategy;
79      }
80      
81      /***
82       * set the election strategy
83       * @param electionStrategy
84       */
85      public void setElectionStrategy(ElectionStrategy electionStrategy){
86          this.electionStrategy = electionStrategy;
87      }
88  
89      public long getInactiveTime() {
90          return inactiveTime;
91      }
92  
93      public void setInactiveTime(long inactiveTime) {
94          this.inactiveTime = inactiveTime;
95      }
96  
97      public synchronized Map getNodes() {
98          HashMap answer = new HashMap(nodes.size());
99          for (Iterator iter = nodes.entrySet().iterator(); iter.hasNext();) {
100             Map.Entry entry = (Map.Entry) iter.next();
101             Destination key = (Destination) entry.getKey();
102             NodeEntry nodeEntry = (NodeEntry) entry.getValue();
103             answer.put(key, nodeEntry.node);
104         }
105         return answer;
106     }
107 
108     public synchronized void keepAlive(Node node) {
109         Destination key = node.getDestination();
110         if (!localDestination.equals(key)) {
111             NodeEntry entry = (NodeEntry) nodes.get(key);
112             if (entry == null) {
113                 entry = new NodeEntry();
114                 entry.node = node;
115                 nodes.put(key, entry);
116                 nodeAdded(node);
117                 synchronized (clusterLock) {
118                     clusterLock.notifyAll();
119                 }
120             }
121             else {
122                 // has the data changed
123                 if (stateHasChanged(entry.node, node)) {
124                     entry.node = node;
125                     nodeUpdated(node);
126                 }
127             }
128 
129             // lets update the timer at which the node will be considered
130             // to be dead
131             entry.lastKeepAlive = getTimeMillis();
132         }
133     }
134 
135     public synchronized void shutdown(Node node) {
136         Destination key = node.getDestination();
137         nodes.remove(key);
138 
139         ClusterEvent event = new ClusterEvent(cluster, node, ClusterEvent.ADD_NODE);
140         // lets take a copy to make contention easier
141         Object[] array = listeners.toArray();
142         for (int i = 0, size = array.length; i < size; i++) {
143             ClusterListener listener = (ClusterListener) array[i];
144             listener.onNodeRemoved(event);
145         }
146     }
147 
148     public synchronized void checkForTimeouts() {
149         localNodePing.run();
150         long time = getTimeMillis();
151         for (Iterator iter = nodes.entrySet().iterator(); iter.hasNext();) {
152             Map.Entry entry = (Entry) iter.next();
153             NodeEntry nodeEntry = (NodeEntry) entry.getValue();
154             if (nodeEntry.lastKeepAlive + inactiveTime < time) {
155                 iter.remove();
156                 nodeFailed(nodeEntry.node);
157             }
158         }
159     }
160 
161     public TimerTask createTimerTask() {
162         return new TimerTask() {
163             public void run() {
164                 checkForTimeouts();
165             }
166         };
167     }
168 
169     public void addClusterListener(ClusterListener listener) {
170         listeners.add(listener);
171     }
172 
173     public void removeClusterListener(ClusterListener listener) {
174         listeners.remove(listener);
175     }
176 
177     protected void nodeAdded(Node node) {
178         ClusterEvent event = new ClusterEvent(cluster, node, ClusterEvent.ADD_NODE);
179         // lets take a copy to make contention easier
180         Object[] array = listeners.toArray();
181         for (int i = 0, size = array.length; i < size; i++) {
182             ClusterListener listener = (ClusterListener) array[i];
183             listener.onNodeAdd(event);
184         }
185         doElection();
186     }
187 
188     protected void nodeUpdated(Node node) {
189         ClusterEvent event = new ClusterEvent(cluster, node, ClusterEvent.UPDATE_NODE);
190         // lets take a copy to make contention easier
191         Object[] array = listeners.toArray();
192         for (int i = 0, size = array.length; i < size; i++) {
193             ClusterListener listener = (ClusterListener) array[i];
194             listener.onNodeUpdate(event);
195         }
196     }
197 
198     protected void nodeFailed(Node node) {
199         ClusterEvent event = new ClusterEvent(cluster, node, ClusterEvent.REMOVE_NODE);
200         // lets take a copy to make contention easier
201         Object[] array = listeners.toArray();
202         for (int i = 0, size = array.length; i < size; i++) {
203             ClusterListener listener = (ClusterListener) array[i];
204             listener.onNodeFailed(event);
205         }
206         doElection();
207     }
208     
209     protected void coordinatorChanged(Node node) {
210         ClusterEvent event = new ClusterEvent(cluster, node, ClusterEvent.ELECTED_COORDINATOR);
211         // lets take a copy to make contention easier
212         Object[] array = listeners.toArray();
213         for (int i = 0, size = array.length; i < size; i++) {
214             ClusterListener listener = (ClusterListener) array[i];
215             listener.onCoordinatorChanged(event);
216         }
217     }
218     
219     protected void doElection()  {
220         if (electionStrategy != null) {
221             try {
222                 NodeImpl newElected = (NodeImpl) electionStrategy.doElection(cluster);
223                 if (newElected != null && !newElected.equals(coordinator)) {
224                     coordinator.setCoordinator(false);
225                     coordinator = newElected;
226                     coordinator.setCoordinator(true);
227                     coordinatorChanged(coordinator);
228                 }
229             }catch(JMSException jmsEx){
230                 log.error("do election failed",jmsEx);
231             }
232         }
233     }
234 
235     /***
236      * For performance we may wish to use a less granualar timing mechanism
237      * only updating the time every x millis since we're only using
238      * the time as a judge of when a node has not pinged for at least a few
239      * hundred millis etc.
240      *
241      * @return
242      */
243     protected long getTimeMillis() {
244         return System.currentTimeMillis();
245     }
246 
247     protected static class NodeEntry {
248         public Node node;
249         public long lastKeepAlive;
250     }
251 
252 
253     /***
254      * @return true if the node has changed state from the old in memory copy to the
255      *         newly arrived copy
256      */
257     protected boolean stateHasChanged(Node oldNode, Node newNode) {
258         Map oldState = oldNode.getState();
259         Map newState = newNode.getState();
260         if (oldState == newState) {
261             return false;
262         }
263         return oldState == null || newState == null || !oldState.equals(newState);
264     }
265 }