1   /***
2    *
3    * Copyright 2003-2004 The Apache Software Foundation
4    *
5    *  Licensed under the Apache License, Version 2.0 (the "License");
6    *  you may not use this file except in compliance with the License.
7    *  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   *  Unless required by applicable law or agreed to in writing, software
12   *  distributed under the License is distributed on an "AS IS" BASIS,
13   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
14   implied.
15   *  See the License for the specific language governing permissions and
16   *  limitations under the License.
17   */
18  
19  package org.codehaus.activecluster;
20  import java.util.HashMap;
21  import java.util.Map;
22  import javax.jms.Connection;
23  import javax.jms.JMSException;
24  import javax.jms.Message;
25  import javax.jms.MessageListener;
26  import javax.jms.ObjectMessage;
27  import junit.framework.TestCase;
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.codehaus.activecluster.Cluster;
31  import org.codehaus.activecluster.ClusterEvent;
32  import org.codehaus.activecluster.ClusterListener;
33  import org.codehaus.activecluster.impl.DefaultClusterFactory;
34  import org.codehaus.activemq.ActiveMQConnectionFactory;
35  
36  /***
37   * Test ActiveCluster, ActiveMQ, with an eye to putting WADI on top of them.
38   * 
39   * @author <a href="mailto:jules@coredevelopers.net">Jules Gosnell </a>
40   * @version $Revision: 1.1 $
41   */
42  public class ClusterFunctionTest extends TestCase {
43      protected Log _log = LogFactory.getLog(ClusterFunctionTest.class);
44  
45      public ClusterFunctionTest(String name) {
46          super(name);
47      }
48      protected ActiveMQConnectionFactory _connectionFactory;
49      protected Connection _connection;
50      protected DefaultClusterFactory _clusterFactory;
51      protected Cluster _cluster0;
52      protected Cluster _cluster1;
53  
54      protected void setUp() throws Exception {
55          testResponsePassed = false;
56          _connectionFactory = new ActiveMQConnectionFactory("multicast://224.1.2.3:5123");
57          _clusterFactory = new DefaultClusterFactory(_connectionFactory);
58          _cluster0 = _clusterFactory.createCluster("ORG.CODEHAUS.WADI.TEST.CLUSTER");
59          _cluster1 = _clusterFactory.createCluster("ORG.CODEHAUS.WADI.TEST.CLUSTER");
60          _cluster0.start();
61          _log.info("started node0: " + _cluster0.getLocalNode().getDestination());
62          _cluster1.start();
63          _log.info("started node1: " + _cluster1.getLocalNode().getDestination());
64      }
65  
66      protected void tearDown() throws JMSException {
67          //      _cluster1.stop();
68          _cluster1 = null;
69          //      _cluster0.stop();
70          _cluster0 = null;
71          _clusterFactory = null;
72          //      _connection.stop();
73          _connection = null;
74          //      _connectionFactory.stop();
75      }
76      //----------------------------------------
77      class MyClusterListener implements ClusterListener {
78          public void onNodeAdd(ClusterEvent ce) {
79              _log.info("node added: " + ce.getNode());
80          }
81  
82          public void onNodeFailed(ClusterEvent ce) {
83              _log.info("node failed: " + ce.getNode());
84          }
85  
86          public void onNodeRemoved(ClusterEvent ce) {
87              _log.info("node removed: " + ce.getNode());
88          }
89  
90          public void onNodeUpdate(ClusterEvent ce) {
91              _log.info("node updated: " + ce.getNode());
92          }
93  
94          public void onCoordinatorChanged(ClusterEvent ce) {
95              _log.info("coordinator changed: " + ce.getNode());
96          }
97      }
98  
99      public void testCluster() throws Exception {
100         _cluster0.addClusterListener(new MyClusterListener());
101         Map map = new HashMap();
102         map.put("text", "testing123");
103         _cluster0.getLocalNode().setState(map);
104         _log.info("nodes: " + _cluster0.getNodes());
105         Thread.sleep(10000);
106         assertTrue(true);
107     }
108     /***
109      * An invokable piece of work.
110      */
111     static interface Invocation extends java.io.Serializable {
112         public void invoke(Cluster cluster, ObjectMessage om);
113     }
114     /***
115      * Listen for messages, if they contain Invocations, invoke() them.
116      */
117     class InvocationListener implements MessageListener {
118         protected Cluster _cluster;
119 
120         public InvocationListener(Cluster cluster) {
121             _cluster = cluster;
122         }
123 
124         public void onMessage(Message message) {
125             _log.info("message received: " + message);
126             ObjectMessage om = null;
127             Object tmp = null;
128             Invocation invocation = null;
129             try {
130                 if (message instanceof ObjectMessage && (om = (ObjectMessage) message) != null
131                         && (tmp = om.getObject()) != null && tmp instanceof Invocation
132                         && (invocation = (Invocation) tmp) != null) {
133                     _log.info("invoking message on: " + _cluster.getLocalNode());
134                     invocation.invoke(_cluster, om);
135                     _log.info("message successfully invoked on: " + _cluster.getLocalNode());
136                 }
137                 else {
138                     _log.warn("bad message: " + message);
139                 }
140             }
141             catch (JMSException e) {
142                 _log.warn("unexpected problem", e);
143             }
144         }
145     }
146     /***
147      * A request for a piece of work which involves sending a response back to the original requester.
148      */
149     static class Request implements Invocation {
150         public void invoke(Cluster cluster, ObjectMessage om2) {
151             try {
152                 System.out.println("request received");
153                 ObjectMessage om = cluster.createObjectMessage();
154                 om.setJMSReplyTo(cluster.getLocalNode().getDestination());
155                 om.setObject(new Response());
156                 System.out.println("sending response");
157                 cluster.send(om2.getJMSReplyTo(), om);
158                 System.out.println("request processed");
159             }
160             catch (JMSException e) {
161                 System.err.println("problem sending response");
162                 e.printStackTrace();
163             }
164         }
165     }
166     static boolean testResponsePassed = false;
167     /***
168      * A response containing a piece of work.
169      */
170     static class Response implements Invocation {
171         public void invoke(Cluster cluster, ObjectMessage om) {
172             try {
173                 System.out.println("response arrived from: " + om.getJMSReplyTo());
174                 // set a flag to test later
175                 ClusterFunctionTest.testResponsePassed = true;
176                 System.out.println("response processed on: " + cluster.getLocalNode().getDestination());
177             }
178             catch (JMSException e) {
179                 System.err.println("problem processing response");
180             }
181         }
182     }
183 
184     public void testResponse() throws Exception {
185         MessageListener listener0 = new InvocationListener(_cluster0);
186         MessageListener listener1 = new InvocationListener(_cluster1);
187         // 1->(n-1) messages (excludes self)
188         _cluster0.createConsumer(_cluster0.getDestination(), null, true).setMessageListener(listener0);
189         // 1->1 messages
190         _cluster0.createConsumer(_cluster0.getLocalNode().getDestination()).setMessageListener(listener0);
191         // 1->(n-1) messages (excludes self)
192         _cluster1.createConsumer(_cluster1.getDestination(), null, true).setMessageListener(listener1);
193         // 1->1 messages
194         _cluster1.createConsumer(_cluster1.getLocalNode().getDestination()).setMessageListener(listener1);
195         ObjectMessage om = _cluster0.createObjectMessage();
196         om.setJMSReplyTo(_cluster0.getLocalNode().getDestination());
197         om.setObject(new Request());
198         testResponsePassed = false;
199         _cluster0.send(_cluster0.getLocalNode().getDestination(), om);
200         Thread.sleep(3000);
201         assertTrue(testResponsePassed);
202         _log.info("request/response between same node OK");
203         testResponsePassed = false;
204         _cluster0.send(_cluster1.getLocalNode().getDestination(), om);
205         Thread.sleep(3000);
206         assertTrue(testResponsePassed);
207         _log.info("request/response between two different nodes OK");
208     }
209 }