1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activecluster.impl;
19
20 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
21 import org.apache.commons.logging.Log;
22 import org.apache.commons.logging.LogFactory;
23 import org.codehaus.activecluster.Cluster;
24 import org.codehaus.activecluster.ClusterListener;
25 import org.codehaus.activecluster.LocalNode;
26 import org.codehaus.activecluster.Service;
27
28 import javax.jms.*;
29 import java.io.Serializable;
30 import java.util.Map;
31 import java.util.Timer;
32
33 /***
34 * A default implementation of ActiveCluster which uses standard JMS operations
35 *
36 * @version $Revision: 1.12 $
37 */
38 public class DefaultCluster implements Cluster {
39
40 private final static Log log = LogFactory.getLog(DefaultCluster.class);
41
42 private StateServiceImpl stateService;
43 private LocalNode localNode;
44 private Topic destination;
45 private Connection connection;
46 private Session session;
47 private MessageProducer producer;
48 private MessageConsumer consumer;
49 private Timer timer;
50 private SynchronizedBoolean started = new SynchronizedBoolean(false);
51 private Object clusterLock = new Object();
52
53 public DefaultCluster(final LocalNode localNode, Topic dataTopic, Topic destination, Connection connection, Session session,
54 MessageProducer producer, Timer timer, long inactiveTime) throws JMSException {
55 this.localNode = localNode;
56 this.destination = destination;
57 this.connection = connection;
58 this.session = session;
59 this.producer = producer;
60 this.timer = timer;
61
62 if (producer == null) {
63 throw new IllegalArgumentException("No producer specified!");
64 }
65
66
67 consumer = session.createConsumer(dataTopic, null, true);
68
69 log.info("Creating data consumer on topic: " + dataTopic);
70
71 this.stateService = new StateServiceImpl(this, clusterLock, new Runnable() {
72 public void run() {
73 if (localNode instanceof ReplicatedLocalNode) {
74 ((ReplicatedLocalNode) localNode).pingRemoteNodes();
75 }
76 }
77 }, timer, inactiveTime);
78 consumer.setMessageListener(new StateConsumer(stateService));
79 }
80
81 public synchronized void addClusterListener(ClusterListener listener) {
82 stateService.addClusterListener(listener);
83 }
84
85 public synchronized void removeClusterListener(ClusterListener listener) {
86 stateService.removeClusterListener(listener);
87 }
88
89 public Topic getDestination() {
90 return destination;
91 }
92
93 public LocalNode getLocalNode() {
94 return localNode;
95 }
96
97 public Map getNodes() {
98 return stateService.getNodes();
99 }
100
101 public synchronized void send(Destination destination, Message message) throws JMSException {
102 producer.send(destination, message);
103 }
104
105 public synchronized MessageConsumer createConsumer(Destination destination) throws JMSException {
106 return getSession().createConsumer(destination);
107 }
108
109 public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException {
110 return getSession().createConsumer(destination, selector);
111 }
112
113 public MessageConsumer createConsumer(Destination destination, String selector, boolean noLocal) throws JMSException {
114 return getSession().createConsumer(destination, selector, noLocal);
115 }
116
117 public synchronized Message createMessage() throws JMSException {
118 return getSession().createMessage();
119 }
120
121 public synchronized BytesMessage createBytesMessage() throws JMSException {
122 return getSession().createBytesMessage();
123 }
124
125 public synchronized MapMessage createMapMessage() throws JMSException {
126 return getSession().createMapMessage();
127 }
128
129 public synchronized ObjectMessage createObjectMessage() throws JMSException {
130 return getSession().createObjectMessage();
131 }
132
133 public synchronized ObjectMessage createObjectMessage(Serializable object) throws JMSException {
134 return getSession().createObjectMessage(object);
135 }
136
137 public synchronized StreamMessage createStreamMessage() throws JMSException {
138 return getSession().createStreamMessage();
139 }
140
141 public synchronized TextMessage createTextMessage() throws JMSException {
142 return getSession().createTextMessage();
143 }
144
145 public synchronized TextMessage createTextMessage(String text) throws JMSException {
146 return getSession().createTextMessage(text);
147 }
148
149 public synchronized void start() throws JMSException {
150 if (started.commit(false, true)) {
151 connection.start();
152 }
153 }
154
155 public void stop() throws JMSException {
156 try {
157 if (localNode instanceof Service) {
158 ((Service) localNode).stop();
159 }
160 timer.cancel();
161 session.close();
162 connection.stop();
163 connection.close();
164 }
165 finally {
166 connection = null;
167 session = null;
168 }
169 }
170
171 public boolean waitForClusterToComplete(int expectedCount, long timeout) throws InterruptedException {
172 timeout = timeout > 0 ? timeout : Long.MAX_VALUE;
173 long waitTime = timeout;
174 long start = System.currentTimeMillis();
175 synchronized (clusterLock) {
176 while (stateService.getNodes().size() < expectedCount && started.get() && waitTime > 0) {
177 clusterLock.wait(waitTime);
178 waitTime = timeout - (System.currentTimeMillis() - start);
179 }
180 }
181 return stateService.getNodes().size() >= expectedCount;
182 }
183
184 protected Session getSession() throws JMSException {
185 if (session == null) {
186 throw new JMSException("Cannot perform operation, this cluster connection is now closed");
187 }
188 return session;
189 }
190 }