/*
 * Decompiled with CFR 0.152.
 */
package org.apache.jcs.auxiliary.lateral.socket.tcp.discovery;

import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer;
import EDU.oswego.cs.dl.util.concurrent.Channel;
import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.net.DatagramPacket;
import java.net.InetAddress;
import java.net.MulticastSocket;
import java.util.ArrayList;
import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.jcs.auxiliary.AuxiliaryCache;
import org.apache.jcs.auxiliary.lateral.LateralCacheInfo;
import org.apache.jcs.auxiliary.lateral.LateralCacheNoWait;
import org.apache.jcs.auxiliary.lateral.socket.tcp.LateralTCPCacheManager;
import org.apache.jcs.auxiliary.lateral.socket.tcp.TCPLateralCacheAttributes;
import org.apache.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
import org.apache.jcs.auxiliary.lateral.socket.tcp.discovery.UDPDiscoveryMessage;
import org.apache.jcs.auxiliary.lateral.socket.tcp.discovery.UDPDiscoveryService;
import org.apache.jcs.engine.behavior.ICompositeCacheManager;
import org.apache.jcs.engine.behavior.IElementSerializer;
import org.apache.jcs.engine.behavior.IShutdownObserver;
import org.apache.jcs.engine.logging.behavior.ICacheEventLogger;

public class UDPDiscoveryReceiver
implements Runnable,
IShutdownObserver {
    private static final Log log = LogFactory.getLog((Class)UDPDiscoveryReceiver.class);
    private final byte[] m_buffer = new byte[65536];
    private MulticastSocket m_socket;
    private static final int maxPoolSize = 10;
    private PooledExecutor pooledExecutor = null;
    private int cnt = 0;
    protected UDPDiscoveryService service = null;
    private String multicastAddressString = "";
    private int multicastPort = 0;
    private ICompositeCacheManager cacheMgr;
    private boolean shutdown = false;
    protected ICacheEventLogger cacheEventLogger;
    protected IElementSerializer elementSerializer;

    public UDPDiscoveryReceiver(UDPDiscoveryService service, String multicastAddressString, int multicastPort, ICompositeCacheManager cacheMgr, ICacheEventLogger cacheEventLogger, IElementSerializer elementSerializer) throws IOException {
        this.service = service;
        this.multicastAddressString = multicastAddressString;
        this.multicastPort = multicastPort;
        this.cacheMgr = cacheMgr;
        this.cacheEventLogger = cacheEventLogger;
        this.elementSerializer = elementSerializer;
        this.pooledExecutor = new PooledExecutor((Channel)new BoundedBuffer(100), 10);
        this.pooledExecutor.discardOldestWhenBlocked();
        this.pooledExecutor.setThreadFactory((ThreadFactory)new MyThreadFactory());
        if (log.isInfoEnabled()) {
            log.info((Object)("constructing listener, [" + this.multicastAddressString + ":" + this.multicastPort + "]"));
        }
        this.createSocket(this.multicastAddressString, this.multicastPort);
    }

    private void createSocket(String multicastAddressString, int multicastPort) throws IOException {
        try {
            this.m_socket = new MulticastSocket(multicastPort);
            this.m_socket.joinGroup(InetAddress.getByName(multicastAddressString));
        }
        catch (IOException e) {
            log.error((Object)("Could not bind to multicast address [" + multicastAddressString + ":" + multicastPort + "]"), (Throwable)e);
            throw e;
        }
    }

    public Object waitForMessage() throws IOException {
        DatagramPacket packet = new DatagramPacket(this.m_buffer, this.m_buffer.length);
        Object obj = null;
        try {
            this.m_socket.receive(packet);
            ByteArrayInputStream byteStream = new ByteArrayInputStream(this.m_buffer, 0, packet.getLength());
            ObjectInputStream objectStream = new ObjectInputStream(byteStream);
            obj = objectStream.readObject();
        }
        catch (Exception e) {
            log.error((Object)"Error receving multicast packet", (Throwable)e);
        }
        return obj;
    }

    public void run() {
        block6: while (true) {
            try {
                while (!this.shutdown) {
                    Object obj = this.waitForMessage();
                    ++this.cnt;
                    if (log.isDebugEnabled()) {
                        log.debug((Object)(this.getCnt() + " messages received."));
                    }
                    UDPDiscoveryMessage message = null;
                    try {
                        message = (UDPDiscoveryMessage)obj;
                        if (message != null) {
                            MessageHandler handler = new MessageHandler(message);
                            this.pooledExecutor.execute((Runnable)handler);
                            if (!log.isDebugEnabled()) continue block6;
                            log.debug((Object)"Passed handler to executor.");
                            continue block6;
                        }
                        log.warn((Object)"message is null");
                        continue block6;
                    }
                    catch (ClassCastException cce) {
                        log.warn((Object)("Received unknown message type " + cce.getMessage()));
                    }
                }
                break;
            }
            catch (Exception e) {
                log.error((Object)"Unexpected exception in UDP receiver.", (Throwable)e);
                try {
                    Thread.sleep(100L);
                    break;
                }
                catch (Exception e2) {
                    log.error((Object)"Problem sleeping", (Throwable)e2);
                    break;
                }
            }
        }
    }

    public void setCnt(int cnt) {
        this.cnt = cnt;
    }

    public int getCnt() {
        return this.cnt;
    }

    public void shutdown() {
        try {
            this.shutdown = true;
            this.m_socket.close();
            this.pooledExecutor.shutdownNow();
        }
        catch (Exception e) {
            log.error((Object)"Problem closing socket");
        }
    }

    class MyThreadFactory
    implements ThreadFactory {
        MyThreadFactory() {
        }

        public Thread newThread(Runnable runner) {
            Thread t = new Thread(runner);
            String oldName = t.getName();
            t.setName("JCS-UDPDiscoveryReceiver-" + oldName);
            t.setDaemon(true);
            t.setPriority(1);
            return t;
        }
    }

    public class MessageHandler
    implements Runnable {
        private UDPDiscoveryMessage message = null;

        public MessageHandler(UDPDiscoveryMessage message) {
            this.message = message;
        }

        public void run() {
            if (this.message.getRequesterId() == LateralCacheInfo.listenerId) {
                if (log.isDebugEnabled()) {
                    log.debug((Object)"from self");
                }
            } else {
                if (log.isDebugEnabled()) {
                    log.debug((Object)"from another");
                    log.debug((Object)("Message = " + this.message));
                }
                if (this.message.getMessageType() == 1) {
                    if (log.isDebugEnabled()) {
                        log.debug((Object)"Message is a Request Broadcase, will have the service handle it.");
                    }
                    UDPDiscoveryReceiver.this.service.serviceRequestBroadcast();
                    return;
                }
                try {
                    ITCPLateralCacheAttributes lca = null;
                    lca = UDPDiscoveryReceiver.this.service.getTcpLateralCacheAttributes() != null ? (ITCPLateralCacheAttributes)UDPDiscoveryReceiver.this.service.getTcpLateralCacheAttributes().copy() : new TCPLateralCacheAttributes();
                    lca.setTransmissionType(3);
                    lca.setTcpServer(this.message.getHost() + ":" + this.message.getPort());
                    LateralTCPCacheManager lcm = LateralTCPCacheManager.getInstance(lca, UDPDiscoveryReceiver.this.cacheMgr, UDPDiscoveryReceiver.this.cacheEventLogger, UDPDiscoveryReceiver.this.elementSerializer);
                    ArrayList regions = this.message.getCacheNames();
                    if (regions != null) {
                        Iterator it = regions.iterator();
                        while (it.hasNext()) {
                            String cacheName = (String)it.next();
                            try {
                                AuxiliaryCache ic = lcm.getCache(cacheName);
                                if (log.isDebugEnabled()) {
                                    log.debug((Object)("Got cache, ic = " + ic));
                                }
                                if (ic == null) continue;
                                UDPDiscoveryReceiver.this.service.addNoWait((LateralCacheNoWait)ic);
                                if (!log.isDebugEnabled()) continue;
                                log.debug((Object)("Called addNoWait for cacheName " + cacheName));
                            }
                            catch (Exception e) {
                                log.error((Object)"Problem creating no wait", (Throwable)e);
                            }
                        }
                    } else {
                        log.warn((Object)("No cache names found in message " + this.message));
                    }
                }
                catch (Exception e) {
                    log.error((Object)"Problem getting lateral maanger", (Throwable)e);
                }
            }
        }
    }
}

