package net.jxta.impl.endpoint;

import java.util.LinkedList;
import java.util.List;
import net.jxta.endpoint.EndpointAddress;
import net.jxta.endpoint.EndpointListener;
import net.jxta.endpoint.Message;
import net.jxta.impl.endpoint.endpointMeter.InboundMeter;
import net.jxta.impl.util.Cache;
import net.jxta.impl.util.CacheEntry;
import net.jxta.impl.util.CacheEntryListener;
import net.jxta.impl.util.ResourceAccount;
import net.jxta.impl.util.ResourceDispatcher;
import net.jxta.impl.util.UnbiasedQueue;
import net.jxta.util.TimeConstants;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;

/* loaded from: input_file:activemq-ra-2.0.rar:jxta-2.0.jar:net/jxta/impl/endpoint/QuotaIncomingMessageListener.class */
public class QuotaIncomingMessageListener implements EndpointListener {
    private static final Logger LOG;
    private static ResourceDispatcher threadDispatcher;
    static int GmaxMsgSize;
    static int GmaxSenders;
    static int GminResPerSender;
    static int GmaxResPerSender;
    static int TotalExtra;
    static int MaxExtraPerSender;
    static int NeverReserved;
    private static ResourceDispatcher messageDispatcher;
    private static Cache allSources;
    private UnbiasedQueue messageQueue;
    private EndpointListener listener;
    private String name;
    private InboundMeter incomingMessageListenerMeter;
    private ResourceAccount myAccount;
    static Class class$net$jxta$impl$endpoint$QuotaIncomingMessageListener;

    /* loaded from: input_file:activemq-ra-2.0.rar:jxta-2.0.jar:net/jxta/impl/endpoint/QuotaIncomingMessageListener$ListenerThread.class */
    static class ListenerThread extends Thread {
        private static ThreadGroup listenerGroup = new ThreadGroup("Quota Message Listeners");
        private static List idleThreads = new LinkedList();
        private QuotaIncomingMessageListener current;
        private volatile boolean terminated;

        static ListenerThread newListenerThread(QuotaIncomingMessageListener quotaIncomingMessageListener) {
            synchronized (idleThreads) {
                if (idleThreads.isEmpty()) {
                    return new ListenerThread(quotaIncomingMessageListener);
                }
                ListenerThread listenerThread = (ListenerThread) idleThreads.remove(0);
                listenerThread.newJob(quotaIncomingMessageListener);
                return listenerThread;
            }
        }

        private ListenerThread(QuotaIncomingMessageListener quotaIncomingMessageListener) {
            super(listenerGroup, "QuotaListenerThread");
            this.terminated = false;
            this.current = quotaIncomingMessageListener;
            setDaemon(true);
            start();
        }

        void newJob(QuotaIncomingMessageListener quotaIncomingMessageListener) {
            synchronized (this) {
                this.current = quotaIncomingMessageListener;
                notify();
            }
        }

        void terminate() {
            this.terminated = true;
            interrupt();
        }

        boolean getJob() {
            if (this.terminated) {
                return false;
            }
            synchronized (idleThreads) {
                idleThreads.add(0, this);
            }
            while (true) {
                synchronized (this) {
                    if (this.current != null) {
                        return true;
                    }
                    try {
                        wait(TimeConstants.FOUR_SECONDS);
                    } catch (InterruptedException e) {
                    }
                    if (this.current != null) {
                        return true;
                    }
                    synchronized (idleThreads) {
                        if (idleThreads.remove(this)) {
                            return false;
                        }
                    }
                }
            }
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    if (this.current != null) {
                        this.current = this.current.doOne();
                    } else if (!getJob()) {
                        return;
                    }
                } catch (Throwable th) {
                    QuotaIncomingMessageListener.LOG.fatal(new StringBuffer().append("Uncaught Throwable in thread :").append(Thread.currentThread().getName()).toString(), th);
                    return;
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:activemq-ra-2.0.rar:jxta-2.0.jar:net/jxta/impl/endpoint/QuotaIncomingMessageListener$MessageFromSource.class */
    public static class MessageFromSource {
        Message msg;
        EndpointAddress srcAddress;
        EndpointAddress destAddress;
        ResourceAccount src;
        long timeReceived;
        long size;

        MessageFromSource(Message message, EndpointAddress endpointAddress, EndpointAddress endpointAddress2, ResourceAccount resourceAccount, long j, long j2) {
            this.msg = message;
            this.src = resourceAccount;
            this.srcAddress = endpointAddress;
            this.destAddress = endpointAddress2;
            this.timeReceived = j;
            this.size = j2;
        }
    }

    /* loaded from: input_file:activemq-ra-2.0.rar:jxta-2.0.jar:net/jxta/impl/endpoint/QuotaIncomingMessageListener$MyCacheListener.class */
    static class MyCacheListener implements CacheEntryListener {
        MyCacheListener() {
        }

        @Override // net.jxta.impl.util.CacheEntryListener
        public void purged(CacheEntry cacheEntry) {
            ((ResourceAccount) cacheEntry.getValue()).close();
        }
    }

    public QuotaIncomingMessageListener(EndpointListener endpointListener) {
        this(endpointListener.getClass().getName(), endpointListener);
    }

    public QuotaIncomingMessageListener(String str, EndpointListener endpointListener) {
        this(str, endpointListener, null);
    }

    public QuotaIncomingMessageListener(String str, EndpointListener endpointListener, InboundMeter inboundMeter) {
        this.messageQueue = new UnbiasedQueue(Integer.MAX_VALUE, false, new LinkedList());
        this.listener = null;
        this.name = null;
        this.incomingMessageListenerMeter = null;
        this.myAccount = null;
        this.listener = endpointListener;
        this.name = str;
        this.incomingMessageListenerMeter = inboundMeter;
        synchronized (threadDispatcher) {
            this.myAccount = threadDispatcher.newAccount(1L, -1L, this);
            threadDispatcher.notify();
        }
        Thread.yield();
    }

    public String toString() {
        return this.name;
    }

    public EndpointListener getListener() {
        return this.listener;
    }

    public void close() {
        LinkedList linkedList = new LinkedList();
        synchronized (threadDispatcher) {
            this.messageQueue.close();
            if (this.myAccount.isIdle()) {
                this.myAccount.close();
            }
            while (true) {
                MessageFromSource messageFromSource = (MessageFromSource) this.messageQueue.pop();
                if (messageFromSource == null) {
                    break;
                } else {
                    linkedList.add(messageFromSource);
                }
            }
            threadDispatcher.notify();
        }
        Thread.yield();
        synchronized (messageDispatcher) {
            while (!linkedList.isEmpty()) {
                MessageFromSource messageFromSource2 = (MessageFromSource) linkedList.removeFirst();
                messageFromSource2.src.inNeed(false);
                messageFromSource2.src.releaseQuantity(messageFromSource2.size);
                if (messageFromSource2.src.isIdle()) {
                    allSources.stickyCacheEntry((CacheEntry) messageFromSource2.src.getUserObject(), false);
                }
            }
            messageDispatcher.notify();
        }
        this.listener = null;
    }

    public QuotaIncomingMessageListener doOne() {
        MessageFromSource messageFromSource;
        ResourceAccount releaseItem;
        synchronized (threadDispatcher) {
            messageFromSource = (MessageFromSource) this.messageQueue.pop();
            this.myAccount.inNeed(this.messageQueue.getCurrentInQueue() != 0);
            threadDispatcher.notify();
        }
        if (messageFromSource != null) {
            synchronized (messageDispatcher) {
                messageFromSource.src.inNeed(false);
                messageFromSource.src.releaseQuantity(messageFromSource.size);
                if (messageFromSource.src.isIdle()) {
                    allSources.stickyCacheEntry((CacheEntry) messageFromSource.src.getUserObject(), false);
                }
                messageDispatcher.notify();
            }
            try {
                this.listener.processIncomingMessage(messageFromSource.msg, messageFromSource.srcAddress, messageFromSource.destAddress);
            } catch (Throwable th) {
                if (LOG.isEnabledFor(Level.ERROR)) {
                    LOG.error(new StringBuffer().append("Uncaught Throwable in listener : ").append(this.name).append("(").append(this.listener.getClass().getName()).append(")").toString(), th);
                }
            }
        }
        synchronized (threadDispatcher) {
            this.myAccount.inNeed(this.messageQueue.getCurrentInQueue() > 0);
            releaseItem = this.myAccount.releaseItem();
            if (this.messageQueue.isClosed() && this.myAccount.isIdle()) {
                this.myAccount.close();
            }
            threadDispatcher.notify();
        }
        if (releaseItem == null) {
            return null;
        }
        return (QuotaIncomingMessageListener) releaseItem.getUserObject();
    }

    @Override // net.jxta.endpoint.EndpointListener
    public void processIncomingMessage(Message message, EndpointAddress endpointAddress, EndpointAddress endpointAddress2) {
        ResourceAccount resourceAccount;
        boolean push;
        int currentInQueue;
        if (this.messageQueue.isClosed()) {
            return;
        }
        String endpointAddress3 = endpointAddress.toString();
        long byteLength = message.getByteLength();
        synchronized (messageDispatcher) {
            CacheEntry cacheEntry = allSources.getCacheEntry(endpointAddress3);
            if (cacheEntry == null) {
                resourceAccount = messageDispatcher.newAccount(40960L, -1L, endpointAddress3);
                if (resourceAccount.getNbReserved() < 1) {
                    resourceAccount.close();
                    allSources.purge(10);
                    resourceAccount = messageDispatcher.newAccount(40960L, -1L, new StringBuffer().append("retrying:").append(endpointAddress3).toString());
                }
                allSources.put(endpointAddress3, resourceAccount);
                cacheEntry = allSources.getCacheEntry(endpointAddress3);
                resourceAccount.setUserObject(cacheEntry);
            } else {
                resourceAccount = (ResourceAccount) cacheEntry.getValue();
            }
            if (!resourceAccount.obtainQuantity(byteLength)) {
                if (LOG.isEnabledFor(Level.INFO)) {
                    LOG.info("Peer exceeds queuing limits; msg discarded.");
                }
                messageDispatcher.notify();
                return;
            }
            allSources.stickyCacheEntry(cacheEntry, true);
            messageDispatcher.notify();
            boolean z = false;
            synchronized (threadDispatcher) {
                while (true) {
                    push = this.messageQueue.push(new MessageFromSource(message, endpointAddress, endpointAddress2, resourceAccount, 0L, byteLength));
                    if (push || !this.messageQueue.isClosed()) {
                        if (push) {
                            break;
                        }
                    } else if (LOG.isEnabledFor(Level.INFO)) {
                        LOG.info("queue closed, message discarded");
                    }
                }
                if (LOG.isEnabledFor(Level.WARN) && (currentInQueue = this.messageQueue.getCurrentInQueue()) > 100) {
                    LOG.warn(new StringBuffer().append("Very long queue (").append(currentInQueue).append(") for listener: ").append(this).toString());
                }
                if (push) {
                    z = this.myAccount.obtainItem();
                }
                threadDispatcher.notify();
            }
            if (push) {
                Thread.yield();
                if (z) {
                    ListenerThread.newListenerThread(this);
                    return;
                } else {
                    if (LOG.isEnabledFor(Level.INFO)) {
                        LOG.info(new StringBuffer().append("Listener '").append(this).append("' exceeds thread's limits; msg waits.").toString());
                        return;
                    }
                    return;
                }
            }
            synchronized (messageDispatcher) {
                resourceAccount.inNeed(false);
                resourceAccount.releaseQuantity(byteLength);
                if (resourceAccount.isIdle()) {
                    allSources.stickyCacheEntry(cacheEntry, false);
                }
                messageDispatcher.notify();
            }
            Thread.yield();
        }
    }

    static Class class$(String str) {
        try {
            return Class.forName(str);
        } catch (ClassNotFoundException e) {
            throw new NoClassDefFoundError(e.getMessage());
        }
    }

    static {
        Class cls;
        if (class$net$jxta$impl$endpoint$QuotaIncomingMessageListener == null) {
            cls = class$("net.jxta.impl.endpoint.QuotaIncomingMessageListener");
            class$net$jxta$impl$endpoint$QuotaIncomingMessageListener = cls;
        } else {
            cls = class$net$jxta$impl$endpoint$QuotaIncomingMessageListener;
        }
        LOG = Logger.getLogger(cls.getName());
        threadDispatcher = new ResourceDispatcher(100L, 1L, 3L, 150L, 6L, 5L, true, "threadDispatcher");
        GmaxMsgSize = 20480;
        GmaxSenders = 150;
        GminResPerSender = 2 * GmaxMsgSize;
        GmaxResPerSender = 2 * GminResPerSender;
        TotalExtra = 2 * GmaxResPerSender * GmaxSenders;
        MaxExtraPerSender = 10 * GmaxResPerSender;
        NeverReserved = TotalExtra / 8;
        messageDispatcher = new ResourceDispatcher(GmaxSenders, GminResPerSender, GmaxResPerSender, TotalExtra, MaxExtraPerSender, NeverReserved, false, "messageDispatcher");
        allSources = new Cache(100L, new MyCacheListener());
    }
}
