View Javadoc

1   /***
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activecluster.impl;
19  
20  import org.apache.commons.logging.Log;
21  import org.apache.commons.logging.LogFactory;
22  import org.codehaus.activecluster.Cluster;
23  import org.codehaus.activecluster.ClusterException;
24  import org.codehaus.activecluster.ClusterFactory;
25  
26  import javax.jms.Connection;
27  import javax.jms.ConnectionFactory;
28  import javax.jms.Destination;
29  import javax.jms.JMSException;
30  import javax.jms.MessageProducer;
31  import javax.jms.Session;
32  import javax.jms.Topic;
33  import javax.jms.DeliveryMode;
34  import java.util.Timer;
35  
36  /***
37   * A Factory of DefaultCluster instances
38   *
39   * @version $Revision: 1.12 $
40   */
41  public class DefaultClusterFactory implements ClusterFactory {
42  
43      private final static Log log = LogFactory.getLog(DefaultClusterFactory.class);
44  
45      private ConnectionFactory connectionFactory;
46      private boolean transacted;
47      private int acknowledgeMode;
48      private String dataTopicPrefix;
49      private long inactiveTime;
50      private boolean useQueueForInbox = false;
51      private int deliveryMode = DeliveryMode.NON_PERSISTENT;
52  
53      public DefaultClusterFactory(ConnectionFactory connectionFactory, boolean transacted, int acknowledgeMode, String dataTopicPrefix, long inactiveTime) {
54          this.connectionFactory = connectionFactory;
55          this.transacted = transacted;
56          this.acknowledgeMode = acknowledgeMode;
57          this.dataTopicPrefix = dataTopicPrefix;
58          this.inactiveTime = inactiveTime;
59      }
60  
61      public DefaultClusterFactory(ConnectionFactory connectionFactory) {
62          this(connectionFactory, false, Session.AUTO_ACKNOWLEDGE, "ACTIVECLUSTER.DATA.", 5000L);
63      }
64  
65      public Cluster createCluster(Topic groupDestination) throws ClusterException, JMSException {
66          Connection connection = getConnectionFactory().createConnection();
67          Session session = createSession(connection);
68          return createCluster(connection, session, groupDestination);
69      }
70  
71      public Cluster createCluster(String topic) throws ClusterException, JMSException {
72          Connection connection = getConnectionFactory().createConnection();
73          Session session = createSession(connection);
74          Topic groupDestination = session.createTopic(topic);
75          return createCluster(connection, session, groupDestination);
76      }
77  
78      // Properties
79      //-------------------------------------------------------------------------
80      public String getDataTopicPrefix() {
81          return dataTopicPrefix;
82      }
83  
84      public void setDataTopicPrefix(String dataTopicPrefix) {
85          this.dataTopicPrefix = dataTopicPrefix;
86      }
87  
88      public int getAcknowledgeMode() {
89          return acknowledgeMode;
90      }
91  
92      public void setAcknowledgeMode(int acknowledgeMode) {
93          this.acknowledgeMode = acknowledgeMode;
94      }
95  
96      public long getInactiveTime() {
97          return inactiveTime;
98      }
99  
100     public void setInactiveTime(long inactiveTime) {
101         this.inactiveTime = inactiveTime;
102     }
103 
104     public boolean isTransacted() {
105         return transacted;
106     }
107 
108     public void setTransacted(boolean transacted) {
109         this.transacted = transacted;
110     }
111 
112     public boolean isUseQueueForInbox() {
113         return useQueueForInbox;
114     }
115 
116     public void setUseQueueForInbox(boolean useQueueForInbox) {
117         this.useQueueForInbox = useQueueForInbox;
118     }
119 
120     public ConnectionFactory getConnectionFactory() {
121         return connectionFactory;
122     }
123 
124     public void setConnectionFactory(ConnectionFactory connectionFactory) {
125         this.connectionFactory = connectionFactory;
126     }
127 
128     public int getDeliveryMode() {
129         return deliveryMode;
130     }
131 
132     /***
133      * Sets the delivery mode of the group based producer
134      */
135     public void setDeliveryMode(int deliveryMode) {
136         this.deliveryMode = deliveryMode;
137     }
138 
139     // Implementation methods
140     //-------------------------------------------------------------------------
141     protected Cluster createCluster(Connection connection, Session session, Topic groupDestination) throws JMSException {
142         Topic dataTopic = session.createTopic(dataTopicPrefix + groupDestination.getTopicName());
143 
144         log.info("Creating cluster group producer on topic: " + groupDestination);
145 
146         MessageProducer producer = createProducer(session, null);
147         producer.setDeliveryMode(deliveryMode);
148 
149         log.info("Creating cluster data producer on topic: " + dataTopic);
150 
151         MessageProducer keepAliveProducer = session.createProducer(dataTopic);
152         keepAliveProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
153         StateService serviceStub = new StateServiceStub(session, keepAliveProducer);
154 
155         Destination localInbox = null;
156         if (useQueueForInbox) {
157             localInbox = session.createTemporaryQueue();
158         }
159         else {
160             localInbox = session.createTemporaryTopic();
161         }
162         ReplicatedLocalNode localNode = new ReplicatedLocalNode(localInbox, serviceStub);
163         Timer timer = new Timer();
164         DefaultCluster answer = new DefaultCluster(localNode, dataTopic, groupDestination, connection, session, producer, timer, inactiveTime);
165         return answer;
166     }
167 
168     /*
169      protected Cluster createInternalCluster(Session session, Topic dataDestination) {
170          MessageProducer producer = createProducer(session);
171          return new DefaultCluster(new NonReplicatedLocalNode(), dataDestination, connection, session, producer);
172      }
173      */
174     
175     protected MessageProducer createProducer(Session session, Topic groupDestination) throws JMSException {
176         return session.createProducer(groupDestination);
177     }
178 
179     protected Session createSession(Connection connection) throws JMSException {
180         return connection.createSession(transacted, acknowledgeMode);
181     }
182 }