package backtype.storm.messaging.netty;

import backtype.storm.Config;
import backtype.storm.messaging.IConnection;
import backtype.storm.messaging.TaskMessage;
import backtype.storm.utils.Utils;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:backtype/storm/messaging/netty/Server.class */
public class Server implements IConnection {
    private static final Logger LOG = LoggerFactory.getLogger(Server.class);
    Map storm_conf;
    int port;
    private LinkedBlockingQueue<ArrayList<TaskMessage>>[] message_queue;
    final ChannelFactory factory;
    final ServerBootstrap bootstrap;
    private int queueCount;
    HashMap<Integer, Integer> taskToQueueId;
    volatile ChannelGroup allChannels = new DefaultChannelGroup("storm-server");
    boolean closing = false;
    List<TaskMessage> closeMessage = Arrays.asList(new TaskMessage(-1, null));
    int roundRobinQueueId = 0;

    /* JADX INFO: Access modifiers changed from: package-private */
    public Server(Map map, int i) {
        this.taskToQueueId = null;
        this.storm_conf = map;
        this.port = i;
        this.queueCount = Utils.getInt(map.get(Config.WORKER_RECEIVER_THREAD_COUNT), 1).intValue();
        this.taskToQueueId = new HashMap<>();
        this.message_queue = new LinkedBlockingQueue[this.queueCount];
        for (int i2 = 0; i2 < this.queueCount; i2++) {
            this.message_queue[i2] = new LinkedBlockingQueue<>();
        }
        int intValue = Utils.getInt(map.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE)).intValue();
        int intValue2 = Utils.getInt(map.get(Config.STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS)).intValue();
        NettyRenameThreadFactory nettyRenameThreadFactory = new NettyRenameThreadFactory(name() + "-boss");
        NettyRenameThreadFactory nettyRenameThreadFactory2 = new NettyRenameThreadFactory(name() + "-worker");
        if (intValue2 > 0) {
            this.factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(nettyRenameThreadFactory), Executors.newCachedThreadPool(nettyRenameThreadFactory2), intValue2);
        } else {
            this.factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(nettyRenameThreadFactory), Executors.newCachedThreadPool(nettyRenameThreadFactory2));
        }
        LOG.info("Create Netty Server " + name() + ", buffer_size: " + intValue + ", maxWorkers: " + intValue2);
        this.bootstrap = new ServerBootstrap(this.factory);
        this.bootstrap.setOption("child.tcpNoDelay", true);
        this.bootstrap.setOption("child.receiveBufferSize", Integer.valueOf(intValue));
        this.bootstrap.setOption("child.keepAlive", true);
        this.bootstrap.setPipelineFactory(new StormServerPipelineFactory(this));
        this.allChannels.add(this.bootstrap.bind(new InetSocketAddress(i)));
    }

    private ArrayList<TaskMessage>[] groupMessages(List<TaskMessage> list) {
        ArrayList<TaskMessage>[] arrayListArr = new ArrayList[this.queueCount];
        for (int i = 0; i < list.size(); i++) {
            TaskMessage taskMessage = list.get(i);
            int task = taskMessage.task();
            if (task == -1) {
                this.closing = true;
                return null;
            }
            Integer messageQueueId = getMessageQueueId(task);
            if (null == arrayListArr[messageQueueId.intValue()]) {
                arrayListArr[messageQueueId.intValue()] = new ArrayList<>();
            }
            arrayListArr[messageQueueId.intValue()].add(taskMessage);
        }
        return arrayListArr;
    }

    private Integer getMessageQueueId(int i) {
        Integer num = this.taskToQueueId.get(Integer.valueOf(i));
        if (null == num) {
            synchronized (this.taskToQueueId) {
                if (null == this.taskToQueueId.get(Integer.valueOf(i))) {
                    int i2 = this.roundRobinQueueId;
                    this.roundRobinQueueId = i2 + 1;
                    num = Integer.valueOf(i2);
                    this.taskToQueueId.put(Integer.valueOf(i), num);
                    if (this.roundRobinQueueId == this.queueCount) {
                        this.roundRobinQueueId = 0;
                    }
                }
            }
        }
        return num;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void enqueue(List<TaskMessage> list) throws InterruptedException {
        ArrayList<TaskMessage>[] groupMessages;
        if (null == list || list.size() == 0 || this.closing || null == (groupMessages = groupMessages(list)) || this.closing) {
            return;
        }
        for (int i = 0; i < groupMessages.length; i++) {
            ArrayList<TaskMessage> arrayList = groupMessages[i];
            if (null != arrayList) {
                this.message_queue[i].put(arrayList);
            }
        }
    }

    @Override // backtype.storm.messaging.IConnection
    public Iterator<TaskMessage> recv(int i, int i2) {
        ArrayList<TaskMessage> arrayList;
        if (this.closing) {
            return this.closeMessage.iterator();
        }
        int i3 = i2 % this.queueCount;
        if ((i & 1) == 1) {
            arrayList = this.message_queue[i3].poll();
        } else {
            try {
                ArrayList<TaskMessage> take = this.message_queue[i3].take();
                LOG.debug("request to be processed: {}", take);
                arrayList = take;
            } catch (InterruptedException e) {
                LOG.info("exception within msg receiving", (Throwable) e);
                arrayList = null;
            }
        }
        if (null != arrayList) {
            return arrayList.iterator();
        }
        return null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void addChannel(Channel channel) {
        this.allChannels.add(channel);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void closeChannel(Channel channel) {
        channel.close().awaitUninterruptibly();
        this.allChannels.remove(channel);
    }

    @Override // backtype.storm.messaging.IConnection
    public synchronized void close() {
        if (this.allChannels != null) {
            this.allChannels.close().awaitUninterruptibly();
            this.factory.releaseExternalResources();
            this.allChannels = null;
        }
    }

    @Override // backtype.storm.messaging.IConnection
    public void send(int i, byte[] bArr) {
        throw new RuntimeException("Server connection should not send any messages");
    }

    @Override // backtype.storm.messaging.IConnection
    public void send(Iterator<TaskMessage> it) {
        throw new RuntimeException("Server connection should not send any messages");
    }

    public String name() {
        return "Netty-server-localhost-" + this.port;
    }
}
