1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activecluster.impl;
19
20 import org.apache.commons.logging.Log;
21 import org.apache.commons.logging.LogFactory;
22 import org.codehaus.activecluster.Cluster;
23 import org.codehaus.activecluster.ClusterEvent;
24 import org.codehaus.activecluster.ClusterListener;
25 import org.codehaus.activecluster.Node;
26 import org.codehaus.activecluster.election.ElectionStrategy;
27 import org.codehaus.activecluster.election.impl.BullyElectionStrategy;
28
29 import javax.jms.Destination;
30 import javax.jms.JMSException;
31 import java.util.ArrayList;
32 import java.util.Collections;
33 import java.util.HashMap;
34 import java.util.Iterator;
35 import java.util.List;
36 import java.util.Map;
37 import java.util.Map.Entry;
38 import java.util.Timer;
39 import java.util.TimerTask;
40
41
42 /***
43 * Represents a node list
44 *
45 * @version $Revision: 1.10 $
46 */
47 public class StateServiceImpl implements StateService {
48
49 private final static Log log = LogFactory.getLog(StateServiceImpl.class);
50 private Cluster cluster;
51 private Object clusterLock;
52 private Map nodes = new HashMap();
53 private long inactiveTime;
54 private List listeners = Collections.synchronizedList(new ArrayList());
55 private Destination localDestination;
56 private Runnable localNodePing;
57 private Timer timer;
58 private NodeImpl coordinator;
59 private ElectionStrategy electionStrategy;
60
61 public StateServiceImpl(Cluster cluster, Object clusterLock, Runnable localNodePing, Timer timer, long inactiveTime) {
62 this.cluster = cluster;
63 this.clusterLock = clusterLock;
64 this.localDestination = cluster.getLocalNode().getDestination();
65 this.localNodePing = localNodePing;
66 this.timer = timer;
67 this.inactiveTime = inactiveTime;
68 long delay = inactiveTime / 3;
69 timer.scheduleAtFixedRate(createTimerTask(), delay, delay);
70 (this.coordinator = (NodeImpl)cluster.getLocalNode()).setCoordinator(true);
71 this.electionStrategy = new BullyElectionStrategy();
72 }
73
74 /***
75 * @return the current election strategy
76 */
77 public ElectionStrategy getElectionStrategy(){
78 return electionStrategy;
79 }
80
81 /***
82 * set the election strategy
83 * @param electionStrategy
84 */
85 public void setElectionStrategy(ElectionStrategy electionStrategy){
86 this.electionStrategy = electionStrategy;
87 }
88
89 public long getInactiveTime() {
90 return inactiveTime;
91 }
92
93 public void setInactiveTime(long inactiveTime) {
94 this.inactiveTime = inactiveTime;
95 }
96
97 public synchronized Map getNodes() {
98 HashMap answer = new HashMap(nodes.size());
99 for (Iterator iter = nodes.entrySet().iterator(); iter.hasNext();) {
100 Map.Entry entry = (Map.Entry) iter.next();
101 Destination key = (Destination) entry.getKey();
102 NodeEntry nodeEntry = (NodeEntry) entry.getValue();
103 answer.put(key, nodeEntry.node);
104 }
105 return answer;
106 }
107
108 public synchronized void keepAlive(Node node) {
109 Destination key = node.getDestination();
110 if (!localDestination.equals(key)) {
111 NodeEntry entry = (NodeEntry) nodes.get(key);
112 if (entry == null) {
113 entry = new NodeEntry();
114 entry.node = node;
115 nodes.put(key, entry);
116 nodeAdded(node);
117 synchronized (clusterLock) {
118 clusterLock.notifyAll();
119 }
120 }
121 else {
122
123 if (stateHasChanged(entry.node, node)) {
124 entry.node = node;
125 nodeUpdated(node);
126 }
127 }
128
129
130
131 entry.lastKeepAlive = getTimeMillis();
132 }
133 }
134
135 public synchronized void shutdown(Node node) {
136 Destination key = node.getDestination();
137 nodes.remove(key);
138
139 ClusterEvent event = new ClusterEvent(cluster, node, ClusterEvent.ADD_NODE);
140
141 Object[] array = listeners.toArray();
142 for (int i = 0, size = array.length; i < size; i++) {
143 ClusterListener listener = (ClusterListener) array[i];
144 listener.onNodeRemoved(event);
145 }
146 }
147
148 public synchronized void checkForTimeouts() {
149 localNodePing.run();
150 long time = getTimeMillis();
151 for (Iterator iter = nodes.entrySet().iterator(); iter.hasNext();) {
152 Map.Entry entry = (Entry) iter.next();
153 NodeEntry nodeEntry = (NodeEntry) entry.getValue();
154 if (nodeEntry.lastKeepAlive + inactiveTime < time) {
155 iter.remove();
156 nodeFailed(nodeEntry.node);
157 }
158 }
159 }
160
161 public TimerTask createTimerTask() {
162 return new TimerTask() {
163 public void run() {
164 checkForTimeouts();
165 }
166 };
167 }
168
169 public void addClusterListener(ClusterListener listener) {
170 listeners.add(listener);
171 }
172
173 public void removeClusterListener(ClusterListener listener) {
174 listeners.remove(listener);
175 }
176
177 protected void nodeAdded(Node node) {
178 ClusterEvent event = new ClusterEvent(cluster, node, ClusterEvent.ADD_NODE);
179
180 Object[] array = listeners.toArray();
181 for (int i = 0, size = array.length; i < size; i++) {
182 ClusterListener listener = (ClusterListener) array[i];
183 listener.onNodeAdd(event);
184 }
185 doElection();
186 }
187
188 protected void nodeUpdated(Node node) {
189 ClusterEvent event = new ClusterEvent(cluster, node, ClusterEvent.UPDATE_NODE);
190
191 Object[] array = listeners.toArray();
192 for (int i = 0, size = array.length; i < size; i++) {
193 ClusterListener listener = (ClusterListener) array[i];
194 listener.onNodeUpdate(event);
195 }
196 }
197
198 protected void nodeFailed(Node node) {
199 ClusterEvent event = new ClusterEvent(cluster, node, ClusterEvent.REMOVE_NODE);
200
201 Object[] array = listeners.toArray();
202 for (int i = 0, size = array.length; i < size; i++) {
203 ClusterListener listener = (ClusterListener) array[i];
204 listener.onNodeFailed(event);
205 }
206 doElection();
207 }
208
209 protected void coordinatorChanged(Node node) {
210 ClusterEvent event = new ClusterEvent(cluster, node, ClusterEvent.ELECTED_COORDINATOR);
211
212 Object[] array = listeners.toArray();
213 for (int i = 0, size = array.length; i < size; i++) {
214 ClusterListener listener = (ClusterListener) array[i];
215 listener.onCoordinatorChanged(event);
216 }
217 }
218
219 protected void doElection() {
220 if (electionStrategy != null) {
221 try {
222 NodeImpl newElected = (NodeImpl) electionStrategy.doElection(cluster);
223 if (newElected != null && !newElected.equals(coordinator)) {
224 coordinator.setCoordinator(false);
225 coordinator = newElected;
226 coordinator.setCoordinator(true);
227 coordinatorChanged(coordinator);
228 }
229 }catch(JMSException jmsEx){
230 log.error("do election failed",jmsEx);
231 }
232 }
233 }
234
235 /***
236 * For performance we may wish to use a less granualar timing mechanism
237 * only updating the time every x millis since we're only using
238 * the time as a judge of when a node has not pinged for at least a few
239 * hundred millis etc.
240 *
241 * @return
242 */
243 protected long getTimeMillis() {
244 return System.currentTimeMillis();
245 }
246
247 protected static class NodeEntry {
248 public Node node;
249 public long lastKeepAlive;
250 }
251
252
253 /***
254 * @return true if the node has changed state from the old in memory copy to the
255 * newly arrived copy
256 */
257 protected boolean stateHasChanged(Node oldNode, Node newNode) {
258 Map oldState = oldNode.getState();
259 Map newState = newNode.getState();
260 if (oldState == newState) {
261 return false;
262 }
263 return oldState == null || newState == null || !oldState.equals(newState);
264 }
265 }