View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activecluster.impl;
19  
20  import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
21  import org.apache.commons.logging.Log;
22  import org.apache.commons.logging.LogFactory;
23  import org.codehaus.activecluster.Cluster;
24  import org.codehaus.activecluster.ClusterListener;
25  import org.codehaus.activecluster.LocalNode;
26  import org.codehaus.activecluster.Service;
27  
28  import javax.jms.*;
29  import java.io.Serializable;
30  import java.util.Map;
31  import java.util.Timer;
32  
33  /***
34   * A default implementation of ActiveCluster which uses standard JMS operations
35   *
36   * @version $Revision: 1.12 $
37   */
38  public class DefaultCluster implements Cluster {
39  
40      private final static Log log = LogFactory.getLog(DefaultCluster.class);
41  
42      private StateServiceImpl stateService;
43      private LocalNode localNode;
44      private Topic destination;
45      private Connection connection;
46      private Session session;
47      private MessageProducer producer;
48      private MessageConsumer consumer;
49      private Timer timer;
50      private SynchronizedBoolean started = new SynchronizedBoolean(false);
51      private Object clusterLock = new Object();
52  
53      public DefaultCluster(final LocalNode localNode, Topic dataTopic, Topic destination, Connection connection, Session session,
54                            MessageProducer producer, Timer timer, long inactiveTime) throws JMSException {
55          this.localNode = localNode;
56          this.destination = destination;
57          this.connection = connection;
58          this.session = session;
59          this.producer = producer;
60          this.timer = timer;
61  
62          if (producer == null) {
63              throw new IllegalArgumentException("No producer specified!");
64          }
65  
66          // now lets subscribe the service to the updates from the data topic
67          consumer = session.createConsumer(dataTopic, null, true);
68  
69          log.info("Creating data consumer on topic: " + dataTopic);
70  
71          this.stateService = new StateServiceImpl(this, clusterLock, new Runnable() {
72              public void run() {
73                  if (localNode instanceof ReplicatedLocalNode) {
74                      ((ReplicatedLocalNode) localNode).pingRemoteNodes();
75                  }
76              }
77          }, timer, inactiveTime);
78          consumer.setMessageListener(new StateConsumer(stateService));
79      }
80  
81      public synchronized void addClusterListener(ClusterListener listener) {
82          stateService.addClusterListener(listener);
83      }
84  
85      public synchronized void removeClusterListener(ClusterListener listener) {
86          stateService.removeClusterListener(listener);
87      }
88  
89      public Topic getDestination() {
90          return destination;
91      }
92  
93      public LocalNode getLocalNode() {
94          return localNode;
95      }
96  
97      public Map getNodes() {
98          return stateService.getNodes();
99      }
100 
101     public synchronized void send(Destination destination, Message message) throws JMSException {
102         producer.send(destination, message);
103     }
104 
105     public synchronized MessageConsumer createConsumer(Destination destination) throws JMSException {
106         return getSession().createConsumer(destination);
107     }
108 
109     public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException {
110         return getSession().createConsumer(destination, selector);
111     }
112 
113     public MessageConsumer createConsumer(Destination destination, String selector, boolean noLocal) throws JMSException {
114         return getSession().createConsumer(destination, selector, noLocal);
115     }
116 
117     public synchronized Message createMessage() throws JMSException {
118         return getSession().createMessage();
119     }
120 
121     public synchronized BytesMessage createBytesMessage() throws JMSException {
122         return getSession().createBytesMessage();
123     }
124 
125     public synchronized MapMessage createMapMessage() throws JMSException {
126         return getSession().createMapMessage();
127     }
128 
129     public synchronized ObjectMessage createObjectMessage() throws JMSException {
130         return getSession().createObjectMessage();
131     }
132 
133     public synchronized ObjectMessage createObjectMessage(Serializable object) throws JMSException {
134         return getSession().createObjectMessage(object);
135     }
136 
137     public synchronized StreamMessage createStreamMessage() throws JMSException {
138         return getSession().createStreamMessage();
139     }
140 
141     public synchronized TextMessage createTextMessage() throws JMSException {
142         return getSession().createTextMessage();
143     }
144 
145     public synchronized TextMessage createTextMessage(String text) throws JMSException {
146         return getSession().createTextMessage(text);
147     }
148 
149     public synchronized void start() throws JMSException {
150         if (started.commit(false, true)) {
151             connection.start();
152         }
153     }
154 
155     public void stop() throws JMSException {
156         try {
157             if (localNode instanceof Service) {
158                 ((Service) localNode).stop();
159             }
160             timer.cancel();
161             session.close();
162             connection.stop();
163             connection.close();
164         }
165         finally {
166             connection = null;
167             session = null;
168         }
169     }
170 
171     public boolean waitForClusterToComplete(int expectedCount, long timeout) throws InterruptedException {
172         timeout = timeout > 0 ? timeout : Long.MAX_VALUE;
173         long waitTime = timeout;
174         long start = System.currentTimeMillis();
175         synchronized (clusterLock) {
176             while (stateService.getNodes().size() < expectedCount && started.get() && waitTime > 0) {
177                 clusterLock.wait(waitTime);
178                 waitTime = timeout - (System.currentTimeMillis() - start);
179             }
180         }
181         return stateService.getNodes().size() >= expectedCount;
182     }
183 
184     protected Session getSession() throws JMSException {
185         if (session == null) {
186             throw new JMSException("Cannot perform operation, this cluster connection is now closed");
187         }
188         return session;
189     }
190 }