1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activecluster;
19
20 import javax.jms.Destination;
21 import javax.jms.Message;
22 import java.util.List;
23 import java.util.Map;
24
25 /***
26 * @version $Revision: 1.4 $
27 */
28 public class ClusterTest extends ClusterTestSupport {
29
30 protected int count = 5;
31
32 public void testCluster() throws Exception {
33 cluster = createCluster();
34
35 subscribeToCluster();
36
37 cluster.start();
38
39 Destination destination = cluster.getDestination();
40 Message message = cluster.createTextMessage("abcdef");
41 cluster.send(destination, message);
42
43 clusterListener.waitForMessageToArrive();
44
45 List list = clusterListener.flushMessages();
46 assertEquals("Should have received a message: " + list, 1, list.size());
47
48 System.out.println("Received message: " + list.get(0));
49 }
50
51
52 public void testMembershipCluster() throws Exception {
53 Cluster[] clusters = new Cluster[count];
54 for (int i = 0; i < count; i++) {
55 Cluster cluster = createCluster("node:" + i);
56 clusters[i] = cluster;
57 cluster.addClusterListener(new TestingClusterListener(cluster));
58 cluster.start();
59 System.out.println("started " + clusters[i].getLocalNode().getName());
60
61 }
62
63 System.out.println("waiting to complete ...");
64 for (int i = count - 1; i >= 0; i--) {
65 Cluster cluster = clusters[i];
66 String localName = cluster.getLocalNode().getName();
67 boolean completed = cluster.waitForClusterToComplete(count - 1, 5000);
68 assertTrue("Node: " + i + " with contents: " + dumpConnectedNodes(cluster.getNodes()), completed);
69
70 System.out.println(localName + " completed = " + completed + " nodes = "
71 + dumpConnectedNodes(cluster.getNodes()));
72 }
73
74 assertClusterMembership(clusters);
75
76
77 Thread.sleep(10000L);
78
79 assertClusterMembership(clusters);
80
81 Cluster testCluster = clusters[0];
82 LocalNode testNode = testCluster.getLocalNode();
83 String key = "key";
84 String value = "value";
85
86 Map map = testNode.getState();
87 map.put(key, value);
88 testNode.setState(map);
89
90 Thread.sleep(500);
91 for (int i = 1; i < count; i++) {
92 Node node = (Node) clusters[i].getNodes().get(testNode.getDestination());
93 assertTrue("The current test node should be in the cluster: " + i, node != null);
94 assertTrue(node.getState().get(key).equals(value));
95 }
96
97 for (int i = 0; i < count; i++) {
98 System.out.println(clusters[i].getLocalNode().getName() + " Is coordinator = " + clusters[i].getLocalNode().isCoordinator());
99 clusters[i].stop();
100 Thread.sleep(250);
101
102 }
103 }
104
105 protected void assertClusterMembership(Cluster[] clusters) {
106 for (int i = 0; i < count; i++) {
107 System.out.println("Cluster: " + i + " = " + clusters[i].getNodes());
108
109 assertEquals("Size of clusters for cluster: " + i, count - 1, clusters[i].getNodes().size());
110 System.out.println(clusters[i].getLocalNode().getName() + " Is coordinator = " + clusters[i].getLocalNode().isCoordinator());
111 }
112 }
113
114 }