1 /***
2 *
3 * Copyright 2003-2004 The Apache Software Foundation
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.codehaus.activecluster;
20 import java.util.HashMap;
21 import java.util.Map;
22 import javax.jms.Connection;
23 import javax.jms.JMSException;
24 import javax.jms.Message;
25 import javax.jms.MessageListener;
26 import javax.jms.ObjectMessage;
27 import junit.framework.TestCase;
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.codehaus.activecluster.Cluster;
31 import org.codehaus.activecluster.ClusterEvent;
32 import org.codehaus.activecluster.ClusterListener;
33 import org.codehaus.activecluster.impl.DefaultClusterFactory;
34 import org.codehaus.activemq.ActiveMQConnectionFactory;
35
36 /***
37 * Test ActiveCluster, ActiveMQ, with an eye to putting WADI on top of them.
38 *
39 * @author <a href="mailto:jules@coredevelopers.net">Jules Gosnell </a>
40 * @version $Revision: 1.1 $
41 */
42 public class ClusterFunctionTest extends TestCase {
43 protected Log _log = LogFactory.getLog(ClusterFunctionTest.class);
44
45 public ClusterFunctionTest(String name) {
46 super(name);
47 }
48 protected ActiveMQConnectionFactory _connectionFactory;
49 protected Connection _connection;
50 protected DefaultClusterFactory _clusterFactory;
51 protected Cluster _cluster0;
52 protected Cluster _cluster1;
53
54 protected void setUp() throws Exception {
55 testResponsePassed = false;
56 _connectionFactory = new ActiveMQConnectionFactory("multicast://224.1.2.3:5123");
57 _clusterFactory = new DefaultClusterFactory(_connectionFactory);
58 _cluster0 = _clusterFactory.createCluster("ORG.CODEHAUS.WADI.TEST.CLUSTER");
59 _cluster1 = _clusterFactory.createCluster("ORG.CODEHAUS.WADI.TEST.CLUSTER");
60 _cluster0.start();
61 _log.info("started node0: " + _cluster0.getLocalNode().getDestination());
62 _cluster1.start();
63 _log.info("started node1: " + _cluster1.getLocalNode().getDestination());
64 }
65
66 protected void tearDown() throws JMSException {
67
68 _cluster1 = null;
69
70 _cluster0 = null;
71 _clusterFactory = null;
72
73 _connection = null;
74
75 }
76
77 class MyClusterListener implements ClusterListener {
78 public void onNodeAdd(ClusterEvent ce) {
79 _log.info("node added: " + ce.getNode());
80 }
81
82 public void onNodeFailed(ClusterEvent ce) {
83 _log.info("node failed: " + ce.getNode());
84 }
85
86 public void onNodeRemoved(ClusterEvent ce) {
87 _log.info("node removed: " + ce.getNode());
88 }
89
90 public void onNodeUpdate(ClusterEvent ce) {
91 _log.info("node updated: " + ce.getNode());
92 }
93
94 public void onCoordinatorChanged(ClusterEvent ce) {
95 _log.info("coordinator changed: " + ce.getNode());
96 }
97 }
98
99 public void testCluster() throws Exception {
100 _cluster0.addClusterListener(new MyClusterListener());
101 Map map = new HashMap();
102 map.put("text", "testing123");
103 _cluster0.getLocalNode().setState(map);
104 _log.info("nodes: " + _cluster0.getNodes());
105 Thread.sleep(10000);
106 assertTrue(true);
107 }
108 /***
109 * An invokable piece of work.
110 */
111 static interface Invocation extends java.io.Serializable {
112 public void invoke(Cluster cluster, ObjectMessage om);
113 }
114 /***
115 * Listen for messages, if they contain Invocations, invoke() them.
116 */
117 class InvocationListener implements MessageListener {
118 protected Cluster _cluster;
119
120 public InvocationListener(Cluster cluster) {
121 _cluster = cluster;
122 }
123
124 public void onMessage(Message message) {
125 _log.info("message received: " + message);
126 ObjectMessage om = null;
127 Object tmp = null;
128 Invocation invocation = null;
129 try {
130 if (message instanceof ObjectMessage && (om = (ObjectMessage) message) != null
131 && (tmp = om.getObject()) != null && tmp instanceof Invocation
132 && (invocation = (Invocation) tmp) != null) {
133 _log.info("invoking message on: " + _cluster.getLocalNode());
134 invocation.invoke(_cluster, om);
135 _log.info("message successfully invoked on: " + _cluster.getLocalNode());
136 }
137 else {
138 _log.warn("bad message: " + message);
139 }
140 }
141 catch (JMSException e) {
142 _log.warn("unexpected problem", e);
143 }
144 }
145 }
146 /***
147 * A request for a piece of work which involves sending a response back to the original requester.
148 */
149 static class Request implements Invocation {
150 public void invoke(Cluster cluster, ObjectMessage om2) {
151 try {
152 System.out.println("request received");
153 ObjectMessage om = cluster.createObjectMessage();
154 om.setJMSReplyTo(cluster.getLocalNode().getDestination());
155 om.setObject(new Response());
156 System.out.println("sending response");
157 cluster.send(om2.getJMSReplyTo(), om);
158 System.out.println("request processed");
159 }
160 catch (JMSException e) {
161 System.err.println("problem sending response");
162 e.printStackTrace();
163 }
164 }
165 }
166 static boolean testResponsePassed = false;
167 /***
168 * A response containing a piece of work.
169 */
170 static class Response implements Invocation {
171 public void invoke(Cluster cluster, ObjectMessage om) {
172 try {
173 System.out.println("response arrived from: " + om.getJMSReplyTo());
174
175 ClusterFunctionTest.testResponsePassed = true;
176 System.out.println("response processed on: " + cluster.getLocalNode().getDestination());
177 }
178 catch (JMSException e) {
179 System.err.println("problem processing response");
180 }
181 }
182 }
183
184 public void testResponse() throws Exception {
185 MessageListener listener0 = new InvocationListener(_cluster0);
186 MessageListener listener1 = new InvocationListener(_cluster1);
187
188 _cluster0.createConsumer(_cluster0.getDestination(), null, true).setMessageListener(listener0);
189
190 _cluster0.createConsumer(_cluster0.getLocalNode().getDestination()).setMessageListener(listener0);
191
192 _cluster1.createConsumer(_cluster1.getDestination(), null, true).setMessageListener(listener1);
193
194 _cluster1.createConsumer(_cluster1.getLocalNode().getDestination()).setMessageListener(listener1);
195 ObjectMessage om = _cluster0.createObjectMessage();
196 om.setJMSReplyTo(_cluster0.getLocalNode().getDestination());
197 om.setObject(new Request());
198 testResponsePassed = false;
199 _cluster0.send(_cluster0.getLocalNode().getDestination(), om);
200 Thread.sleep(3000);
201 assertTrue(testResponsePassed);
202 _log.info("request/response between same node OK");
203 testResponsePassed = false;
204 _cluster0.send(_cluster1.getLocalNode().getDestination(), om);
205 Thread.sleep(3000);
206 assertTrue(testResponsePassed);
207 _log.info("request/response between two different nodes OK");
208 }
209 }