1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activecluster.impl;
19
20 import org.apache.commons.logging.Log;
21 import org.apache.commons.logging.LogFactory;
22 import org.codehaus.activecluster.Cluster;
23 import org.codehaus.activecluster.ClusterException;
24 import org.codehaus.activecluster.ClusterFactory;
25
26 import javax.jms.Connection;
27 import javax.jms.ConnectionFactory;
28 import javax.jms.Destination;
29 import javax.jms.JMSException;
30 import javax.jms.MessageProducer;
31 import javax.jms.Session;
32 import javax.jms.Topic;
33 import javax.jms.DeliveryMode;
34 import java.util.Timer;
35
36 /***
37 * A Factory of DefaultCluster instances
38 *
39 * @version $Revision: 1.12 $
40 */
41 public class DefaultClusterFactory implements ClusterFactory {
42
43 private final static Log log = LogFactory.getLog(DefaultClusterFactory.class);
44
45 private ConnectionFactory connectionFactory;
46 private boolean transacted;
47 private int acknowledgeMode;
48 private String dataTopicPrefix;
49 private long inactiveTime;
50 private boolean useQueueForInbox = false;
51 private int deliveryMode = DeliveryMode.NON_PERSISTENT;
52
53 public DefaultClusterFactory(ConnectionFactory connectionFactory, boolean transacted, int acknowledgeMode, String dataTopicPrefix, long inactiveTime) {
54 this.connectionFactory = connectionFactory;
55 this.transacted = transacted;
56 this.acknowledgeMode = acknowledgeMode;
57 this.dataTopicPrefix = dataTopicPrefix;
58 this.inactiveTime = inactiveTime;
59 }
60
61 public DefaultClusterFactory(ConnectionFactory connectionFactory) {
62 this(connectionFactory, false, Session.AUTO_ACKNOWLEDGE, "ACTIVECLUSTER.DATA.", 5000L);
63 }
64
65 public Cluster createCluster(Topic groupDestination) throws ClusterException, JMSException {
66 Connection connection = getConnectionFactory().createConnection();
67 Session session = createSession(connection);
68 return createCluster(connection, session, groupDestination);
69 }
70
71 public Cluster createCluster(String topic) throws ClusterException, JMSException {
72 Connection connection = getConnectionFactory().createConnection();
73 Session session = createSession(connection);
74 Topic groupDestination = session.createTopic(topic);
75 return createCluster(connection, session, groupDestination);
76 }
77
78
79
80 public String getDataTopicPrefix() {
81 return dataTopicPrefix;
82 }
83
84 public void setDataTopicPrefix(String dataTopicPrefix) {
85 this.dataTopicPrefix = dataTopicPrefix;
86 }
87
88 public int getAcknowledgeMode() {
89 return acknowledgeMode;
90 }
91
92 public void setAcknowledgeMode(int acknowledgeMode) {
93 this.acknowledgeMode = acknowledgeMode;
94 }
95
96 public long getInactiveTime() {
97 return inactiveTime;
98 }
99
100 public void setInactiveTime(long inactiveTime) {
101 this.inactiveTime = inactiveTime;
102 }
103
104 public boolean isTransacted() {
105 return transacted;
106 }
107
108 public void setTransacted(boolean transacted) {
109 this.transacted = transacted;
110 }
111
112 public boolean isUseQueueForInbox() {
113 return useQueueForInbox;
114 }
115
116 public void setUseQueueForInbox(boolean useQueueForInbox) {
117 this.useQueueForInbox = useQueueForInbox;
118 }
119
120 public ConnectionFactory getConnectionFactory() {
121 return connectionFactory;
122 }
123
124 public void setConnectionFactory(ConnectionFactory connectionFactory) {
125 this.connectionFactory = connectionFactory;
126 }
127
128 public int getDeliveryMode() {
129 return deliveryMode;
130 }
131
132 /***
133 * Sets the delivery mode of the group based producer
134 */
135 public void setDeliveryMode(int deliveryMode) {
136 this.deliveryMode = deliveryMode;
137 }
138
139
140
141 protected Cluster createCluster(Connection connection, Session session, Topic groupDestination) throws JMSException {
142 Topic dataTopic = session.createTopic(dataTopicPrefix + groupDestination.getTopicName());
143
144 log.info("Creating cluster group producer on topic: " + groupDestination);
145
146 MessageProducer producer = createProducer(session, null);
147 producer.setDeliveryMode(deliveryMode);
148
149 log.info("Creating cluster data producer on topic: " + dataTopic);
150
151 MessageProducer keepAliveProducer = session.createProducer(dataTopic);
152 keepAliveProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
153 StateService serviceStub = new StateServiceStub(session, keepAliveProducer);
154
155 Destination localInbox = null;
156 if (useQueueForInbox) {
157 localInbox = session.createTemporaryQueue();
158 }
159 else {
160 localInbox = session.createTemporaryTopic();
161 }
162 ReplicatedLocalNode localNode = new ReplicatedLocalNode(localInbox, serviceStub);
163 Timer timer = new Timer();
164 DefaultCluster answer = new DefaultCluster(localNode, dataTopic, groupDestination, connection, session, producer, timer, inactiveTime);
165 return answer;
166 }
167
168
169
170
171
172
173
174
175 protected MessageProducer createProducer(Session session, Topic groupDestination) throws JMSException {
176 return session.createProducer(groupDestination);
177 }
178
179 protected Session createSession(Connection connection) throws JMSException {
180 return connection.createSession(transacted, acknowledgeMode);
181 }
182 }