package org.codehaus.activemq.service.boundedvm;

import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
import EDU.oswego.cs.dl.util.concurrent.Executor;
import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import javax.jms.JMSException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.broker.BrokerClient;
import org.codehaus.activemq.filter.AndFilter;
import org.codehaus.activemq.filter.DestinationMap;
import org.codehaus.activemq.filter.Filter;
import org.codehaus.activemq.filter.FilterFactory;
import org.codehaus.activemq.filter.FilterFactoryImpl;
import org.codehaus.activemq.filter.NoLocalFilter;
import org.codehaus.activemq.message.ActiveMQDestination;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.ConsumerInfo;
import org.codehaus.activemq.message.MessageAck;
import org.codehaus.activemq.message.util.MemoryBoundedQueueManager;
import org.codehaus.activemq.service.DeadLetterPolicy;
import org.codehaus.activemq.service.MessageContainer;
import org.codehaus.activemq.service.MessageContainerManager;
import org.codehaus.activemq.service.RedeliveryPolicy;

/* loaded from: input_file:activemq-ra-1.2.rar:activemq-core-1.2.jar:org/codehaus/activemq/service/boundedvm/TransientQueueBoundedMessageManager.class */
public class TransientQueueBoundedMessageManager implements MessageContainerManager, Runnable {
    private static final int DEFAULT_GARBAGE_COLLECTION_CAPACITY_LIMIT = 10;
    private static final long DEFAULT_INACTIVE_TIMEOUT = 30000;
    private static final Log log;
    private MemoryBoundedQueueManager queueManager;
    private long inactiveTimeout;
    private int garbageCoolectionCapacityLimit;
    private RedeliveryPolicy redeliveryPolicy;
    private DeadLetterPolicy deadLetterPolicy;
    static Class class$org$codehaus$activemq$service$boundedvm$TransientQueueBoundedMessageManager;
    private ConcurrentHashMap containers = new ConcurrentHashMap();
    private DestinationMap destinationMap = new DestinationMap();
    private Map destinations = new ConcurrentHashMap();
    private ConcurrentHashMap subscriptions = new ConcurrentHashMap();
    private FilterFactory filterFactory = new FilterFactoryImpl();
    private SynchronizedBoolean started = new SynchronizedBoolean(false);
    private SynchronizedBoolean doingGarbageCollection = new SynchronizedBoolean(false);
    private PooledExecutor threadPool = new PooledExecutor();

    /* loaded from: input_file:activemq-ra-1.2.rar:activemq-core-1.2.jar:org/codehaus/activemq/service/boundedvm/TransientQueueBoundedMessageManager$TransientQueueThreadFactory.class */
    protected static class TransientQueueThreadFactory implements ThreadFactory {
        protected TransientQueueThreadFactory() {
        }

        @Override // EDU.oswego.cs.dl.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            Thread thread = new Thread(runnable);
            thread.setPriority(6);
            thread.setDaemon(true);
            return thread;
        }
    }

    public TransientQueueBoundedMessageManager(MemoryBoundedQueueManager memoryBoundedQueueManager, RedeliveryPolicy redeliveryPolicy, DeadLetterPolicy deadLetterPolicy) {
        this.queueManager = memoryBoundedQueueManager;
        this.redeliveryPolicy = redeliveryPolicy;
        this.deadLetterPolicy = deadLetterPolicy;
        this.threadPool.setThreadFactory(new TransientQueueThreadFactory());
        this.inactiveTimeout = DEFAULT_INACTIVE_TIMEOUT;
        this.garbageCoolectionCapacityLimit = 10;
    }

    public int getGarbageCoolectionCapacityLimit() {
        return this.garbageCoolectionCapacityLimit;
    }

    public void setGarbageCoolectionCapacityLimit(int i) {
        this.garbageCoolectionCapacityLimit = i;
    }

    public long getInactiveTimeout() {
        return this.inactiveTimeout;
    }

    public void setInactiveTimeout(long j) {
        this.inactiveTimeout = j;
    }

    @Override // org.codehaus.activemq.service.Service
    public void start() throws JMSException {
        if (this.started.commit(false, true)) {
            Iterator it = this.containers.values().iterator();
            while (it.hasNext()) {
                ((TransientQueueBoundedMessageContainer) it.next()).start();
            }
            try {
                this.threadPool.execute(this);
            } catch (InterruptedException e) {
                JMSException jMSException = new JMSException("Garbage collection interupted on start()");
                jMSException.setLinkedException(e);
                throw jMSException;
            }
        }
    }

    @Override // org.codehaus.activemq.service.Service
    public void stop() throws JMSException {
        if (this.started.commit(true, false)) {
            Iterator it = this.containers.values().iterator();
            while (it.hasNext()) {
                ((TransientQueueBoundedMessageContainer) it.next()).stop();
            }
            this.threadPool.interruptAll();
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        while (this.started.get()) {
            doGarbageCollection();
            try {
                Thread.sleep(2000L);
            } catch (InterruptedException e) {
            }
        }
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public synchronized void addMessageConsumer(BrokerClient brokerClient, ConsumerInfo consumerInfo) throws JMSException {
        ActiveMQDestination destination = consumerInfo.getDestination();
        if (destination.isQueue()) {
            TransientQueueBoundedMessageContainer transientQueueBoundedMessageContainer = (TransientQueueBoundedMessageContainer) this.containers.get(destination);
            if (transientQueueBoundedMessageContainer == null) {
                this.queueManager.getMemoryBoundedQueue(brokerClient.toString());
                transientQueueBoundedMessageContainer = new TransientQueueBoundedMessageContainer(this.threadPool, this.queueManager, destination, this.redeliveryPolicy, this.deadLetterPolicy);
                addContainer(transientQueueBoundedMessageContainer);
                if (this.started.get()) {
                    transientQueueBoundedMessageContainer.start();
                }
            }
            TransientQueueSubscription addConsumer = transientQueueBoundedMessageContainer.addConsumer(createFilter(consumerInfo), consumerInfo, brokerClient);
            if (addConsumer != null) {
                this.subscriptions.put(consumerInfo.getConsumerId(), addConsumer);
            }
            String physicalName = destination.getPhysicalName();
            if (this.destinations.containsKey(physicalName)) {
                return;
            }
            this.destinations.put(physicalName, destination);
        }
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public synchronized void removeMessageConsumer(BrokerClient brokerClient, ConsumerInfo consumerInfo) throws JMSException {
        if (consumerInfo.getDestination().isQueue()) {
            for (TransientQueueBoundedMessageContainer transientQueueBoundedMessageContainer : this.containers.values()) {
                if (transientQueueBoundedMessageContainer != null) {
                    transientQueueBoundedMessageContainer.removeConsumer(consumerInfo);
                }
            }
            this.subscriptions.remove(consumerInfo.getConsumerId());
        }
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public void deleteSubscription(String str, String str2) throws JMSException {
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public void sendMessage(BrokerClient brokerClient, ActiveMQMessage activeMQMessage) throws JMSException {
        ActiveMQDestination jMSActiveMQDestination = activeMQMessage.getJMSActiveMQDestination();
        if (jMSActiveMQDestination.isQueue() && activeMQMessage.isTemporary()) {
            if (this.queueManager.getCurrentCapacity() <= this.garbageCoolectionCapacityLimit) {
                doGarbageCollection();
            }
            if (((TransientQueueBoundedMessageContainer) this.containers.get(jMSActiveMQDestination)) == null) {
                this.queueManager.getMemoryBoundedQueue(brokerClient.toString());
                TransientQueueBoundedMessageContainer transientQueueBoundedMessageContainer = new TransientQueueBoundedMessageContainer(this.threadPool, this.queueManager, jMSActiveMQDestination, this.redeliveryPolicy, this.deadLetterPolicy);
                addContainer(transientQueueBoundedMessageContainer);
                if (this.started.get()) {
                    transientQueueBoundedMessageContainer.start();
                }
            }
            Iterator it = this.destinationMap.get(activeMQMessage.getJMSActiveMQDestination()).iterator();
            while (it.hasNext()) {
                ((TransientQueueBoundedMessageContainer) it.next()).enqueue(activeMQMessage);
            }
        }
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public void acknowledgeMessage(BrokerClient brokerClient, MessageAck messageAck) throws JMSException {
        ActiveMQMessage acknowledgeMessage;
        TransientQueueSubscription transientQueueSubscription = (TransientQueueSubscription) this.subscriptions.get(messageAck.getConsumerId());
        if (transientQueueSubscription == null || (acknowledgeMessage = transientQueueSubscription.acknowledgeMessage(messageAck.getMessageID())) == null) {
            return;
        }
        if (!messageAck.isMessageRead() || messageAck.isExpired()) {
            acknowledgeMessage.setJMSRedelivered(true);
            if (acknowledgeMessage.incrementDeliveryCount() >= this.redeliveryPolicy.getMaximumRetryCount()) {
                if (log.isDebugEnabled()) {
                    log.debug(new StringBuffer().append("Message: ").append(acknowledgeMessage).append(" has exceeded its retry count").toString());
                }
                this.deadLetterPolicy.sendToDeadLetter(acknowledgeMessage);
            } else if (messageAck.isExpired()) {
                if (log.isDebugEnabled()) {
                    log.debug(new StringBuffer().append("Message: ").append(acknowledgeMessage).append(" has expired").toString());
                }
                this.deadLetterPolicy.sendToDeadLetter(acknowledgeMessage);
            } else {
                Iterator it = this.destinationMap.get(acknowledgeMessage.getJMSActiveMQDestination()).iterator();
                if (it.hasNext()) {
                    ((TransientQueueBoundedMessageContainer) it.next()).enqueueFirst(acknowledgeMessage);
                }
            }
        }
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public void acknowledgeTransactedMessage(BrokerClient brokerClient, String str, MessageAck messageAck) throws JMSException {
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public void redeliverMessage(BrokerClient brokerClient, MessageAck messageAck) throws JMSException {
        ActiveMQMessage acknowledgeMessage;
        TransientQueueSubscription transientQueueSubscription = (TransientQueueSubscription) this.subscriptions.get(messageAck.getConsumerId());
        if (transientQueueSubscription == null || (acknowledgeMessage = transientQueueSubscription.acknowledgeMessage(messageAck.getMessageID())) == null) {
            return;
        }
        acknowledgeMessage.setJMSRedelivered(true);
        Iterator it = this.destinationMap.get(acknowledgeMessage.getJMSActiveMQDestination()).iterator();
        if (it.hasNext()) {
            ((TransientQueueBoundedMessageContainer) it.next()).enqueueFirst(acknowledgeMessage);
        }
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public void poll() throws JMSException {
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public void commitTransaction(BrokerClient brokerClient, String str) throws JMSException {
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public void rollbackTransaction(BrokerClient brokerClient, String str) {
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public MessageContainer getContainer(String str) throws JMSException {
        Object obj = this.destinations.get(str);
        if (obj != null) {
            return (MessageContainer) this.containers.get(obj);
        }
        return null;
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public Map getDestinations() {
        return Collections.unmodifiableMap(this.destinations);
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public DeadLetterPolicy getDeadLetterPolicy() {
        return this.deadLetterPolicy;
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public void setDeadLetterPolicy(DeadLetterPolicy deadLetterPolicy) {
        this.deadLetterPolicy = deadLetterPolicy;
    }

    protected Filter createFilter(ConsumerInfo consumerInfo) throws JMSException {
        Filter createFilter = this.filterFactory.createFilter(consumerInfo.getDestination(), consumerInfo.getSelector());
        if (consumerInfo.isNoLocal()) {
            createFilter = new AndFilter(createFilter, new NoLocalFilter(consumerInfo.getClientId()));
        }
        return createFilter;
    }

    private void doGarbageCollection() {
        if (this.doingGarbageCollection.commit(true, false)) {
            if (this.queueManager.getCurrentCapacity() <= this.garbageCoolectionCapacityLimit) {
                Iterator it = this.containers.values().iterator();
                while (it.hasNext()) {
                    ((TransientQueueBoundedMessageContainer) it.next()).removeExpiredMessages();
                }
            }
            if (this.queueManager.getCurrentCapacity() <= this.garbageCoolectionCapacityLimit) {
                for (TransientQueueBoundedMessageContainer transientQueueBoundedMessageContainer : this.containers.values()) {
                    if (!transientQueueBoundedMessageContainer.isActive() && transientQueueBoundedMessageContainer.getIdleTimestamp() < System.currentTimeMillis() - this.inactiveTimeout) {
                        removeContainer(transientQueueBoundedMessageContainer);
                        log.warn(new StringBuffer().append("memory limit low - forced to remove inactive and idle queue: ").append(transientQueueBoundedMessageContainer.getDestinationName()).toString());
                    }
                }
            }
            if (this.queueManager.getCurrentCapacity() <= this.garbageCoolectionCapacityLimit) {
                for (TransientQueueBoundedMessageContainer transientQueueBoundedMessageContainer2 : this.containers.values()) {
                    if (!transientQueueBoundedMessageContainer2.isActive() && transientQueueBoundedMessageContainer2.getDestination().isTemporary()) {
                        removeContainer(transientQueueBoundedMessageContainer2);
                        log.warn(new StringBuffer().append("memory limit low - forced to remove inactive temporary queue: ").append(transientQueueBoundedMessageContainer2.getDestinationName()).toString());
                    }
                }
            }
            for (TransientQueueBoundedMessageContainer transientQueueBoundedMessageContainer3 : this.containers.values()) {
                if (!transientQueueBoundedMessageContainer3.isActive() && !transientQueueBoundedMessageContainer3.isEmpty()) {
                    removeContainer(transientQueueBoundedMessageContainer3);
                }
            }
            this.doingGarbageCollection.set(false);
        }
    }

    private synchronized void addContainer(TransientQueueBoundedMessageContainer transientQueueBoundedMessageContainer) {
        this.containers.put(transientQueueBoundedMessageContainer.getDestination(), transientQueueBoundedMessageContainer);
        this.destinationMap.put(transientQueueBoundedMessageContainer.getDestination(), transientQueueBoundedMessageContainer);
    }

    private synchronized void removeContainer(TransientQueueBoundedMessageContainer transientQueueBoundedMessageContainer) {
        try {
            transientQueueBoundedMessageContainer.close();
            log.info(new StringBuffer().append("closed inactive transient queue container: ").append(transientQueueBoundedMessageContainer.getDestinationName()).toString());
        } catch (JMSException e) {
            log.warn("failure closing container", e);
        }
        this.containers.remove(transientQueueBoundedMessageContainer.getDestination());
        this.destinationMap.remove(transientQueueBoundedMessageContainer.getDestination(), transientQueueBoundedMessageContainer);
    }

    protected Executor getThreadPool() {
        return this.threadPool;
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public void createMessageContainer(ActiveMQDestination activeMQDestination) throws JMSException {
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public void destroyMessageContainer(ActiveMQDestination activeMQDestination) throws JMSException {
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public Map getMessageContainerAdmins() throws JMSException {
        return Collections.EMPTY_MAP;
    }

    static Class class$(String str) {
        try {
            return Class.forName(str);
        } catch (ClassNotFoundException e) {
            throw new NoClassDefFoundError().initCause(e);
        }
    }

    static {
        Class cls;
        if (class$org$codehaus$activemq$service$boundedvm$TransientQueueBoundedMessageManager == null) {
            cls = class$("org.codehaus.activemq.service.boundedvm.TransientQueueBoundedMessageManager");
            class$org$codehaus$activemq$service$boundedvm$TransientQueueBoundedMessageManager = cls;
        } else {
            cls = class$org$codehaus$activemq$service$boundedvm$TransientQueueBoundedMessageManager;
        }
        log = LogFactory.getLog(cls);
    }
}
