/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.ipc;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
import java.io.IOException;
import java.io.OutputStream;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import javax.security.sasl.SaslException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
import org.apache.hadoop.hbase.ipc.AsyncCall;
import org.apache.hadoop.hbase.ipc.AsyncRpcClient;
import org.apache.hadoop.hbase.ipc.AsyncServerResponseHandler;
import org.apache.hadoop.hbase.ipc.CallTimeoutException;
import org.apache.hadoop.hbase.ipc.ConnectionId;
import org.apache.hadoop.hbase.ipc.FatalConnectionException;
import org.apache.hadoop.hbase.ipc.IPCUtil;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.protobuf.generated.TracingProtos;
import org.apache.hadoop.hbase.security.AuthMethod;
import org.apache.hadoop.hbase.security.SaslClientHandler;
import org.apache.hadoop.hbase.security.SaslUtil;
import org.apache.hadoop.hbase.security.SecurityInfo;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.TokenSelector;
import org.apache.htrace.Span;
import org.apache.htrace.Trace;
import org.apache.hudi.com.google.protobuf.Descriptors;
import org.apache.hudi.com.google.protobuf.Message;
import org.apache.hudi.com.google.protobuf.RpcCallback;

@InterfaceAudience.Private
public class AsyncRpcChannel {
    private static final Log LOG = LogFactory.getLog((String)AsyncRpcChannel.class.getName());
    private static final int MAX_SASL_RETRIES = 5;
    protected static final Map<AuthenticationProtos.TokenIdentifier.Kind, TokenSelector<? extends TokenIdentifier>> tokenHandlers = new HashMap<AuthenticationProtos.TokenIdentifier.Kind, TokenSelector<? extends TokenIdentifier>>();
    final AsyncRpcClient client;
    private Channel channel;
    String name;
    final User ticket;
    final String serviceName;
    final InetSocketAddress address;
    private int ioFailureCounter = 0;
    private int connectFailureCounter = 0;
    boolean useSasl;
    AuthMethod authMethod;
    private int reloginMaxBackoff;
    private Token<? extends TokenIdentifier> token;
    private String serverPrincipal;
    private final Map<Integer, AsyncCall> pendingCalls = new HashMap<Integer, AsyncCall>();
    private boolean connected = false;
    private boolean closed = false;
    private Timeout cleanupTimer;
    private final TimerTask timeoutTask = new TimerTask(){

        public void run(Timeout timeout) throws Exception {
            AsyncRpcChannel.this.cleanupCalls();
        }
    };

    public AsyncRpcChannel(Bootstrap bootstrap, AsyncRpcClient client, User ticket, String serviceName, InetSocketAddress address) {
        this.client = client;
        this.ticket = ticket;
        this.serviceName = serviceName;
        this.address = address;
        this.channel = this.connect(bootstrap).channel();
        this.name = "IPC Client (" + this.channel.hashCode() + ") to " + address.toString() + (ticket == null ? " from unknown user" : " from " + ticket.getName());
    }

    private ChannelFuture connect(final Bootstrap bootstrap) {
        return bootstrap.remoteAddress((SocketAddress)this.address).connect().addListener((GenericFutureListener)new GenericFutureListener<ChannelFuture>(){

            public void operationComplete(ChannelFuture f) throws Exception {
                if (!f.isSuccess()) {
                    if (f.cause() instanceof SocketException) {
                        AsyncRpcChannel.this.retryOrClose(bootstrap, AsyncRpcChannel.this.connectFailureCounter++, f.cause());
                    } else {
                        AsyncRpcChannel.this.retryOrClose(bootstrap, AsyncRpcChannel.this.ioFailureCounter++, f.cause());
                    }
                    return;
                }
                AsyncRpcChannel.this.channel = f.channel();
                AsyncRpcChannel.this.setupAuthorization();
                ByteBuf b = AsyncRpcChannel.this.channel.alloc().directBuffer(6);
                AsyncRpcChannel.this.createPreamble(b, AsyncRpcChannel.this.authMethod);
                AsyncRpcChannel.this.channel.writeAndFlush((Object)b).addListener((GenericFutureListener)ChannelFutureListener.CLOSE_ON_FAILURE);
                if (AsyncRpcChannel.this.useSasl) {
                    UserGroupInformation ticket = AsyncRpcChannel.this.ticket.getUGI();
                    if (AsyncRpcChannel.this.authMethod == AuthMethod.KERBEROS && ticket != null && ticket.getRealUser() != null) {
                        ticket = ticket.getRealUser();
                    }
                    if (ticket == null) {
                        throw new FatalConnectionException("ticket/user is null");
                    }
                    final UserGroupInformation realTicket = ticket;
                    SaslClientHandler saslHandler = (SaslClientHandler)((Object)ticket.doAs((PrivilegedExceptionAction)new PrivilegedExceptionAction<SaslClientHandler>(){

                        @Override
                        public SaslClientHandler run() throws IOException {
                            return AsyncRpcChannel.this.getSaslHandler(realTicket, bootstrap);
                        }
                    }));
                    if (saslHandler != null) {
                        AsyncRpcChannel.this.channel.pipeline().addFirst(new ChannelHandler[]{saslHandler});
                    } else {
                        AsyncRpcChannel.this.authMethod = AuthMethod.SIMPLE;
                        AsyncRpcChannel.this.useSasl = false;
                    }
                } else {
                    AsyncRpcChannel.this.startHBaseConnection(f.channel());
                }
            }
        });
    }

    private void startHBaseConnection(Channel ch) {
        ch.pipeline().addLast("frameDecoder", (ChannelHandler)new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
        ch.pipeline().addLast(new ChannelHandler[]{new AsyncServerResponseHandler(this)});
        try {
            this.writeChannelHeader(ch).addListener((GenericFutureListener)new GenericFutureListener<ChannelFuture>(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                public void operationComplete(ChannelFuture future) throws Exception {
                    ArrayList callsToWrite;
                    if (!future.isSuccess()) {
                        AsyncRpcChannel.this.close(future.cause());
                        return;
                    }
                    Map map = AsyncRpcChannel.this.pendingCalls;
                    synchronized (map) {
                        AsyncRpcChannel.this.connected = true;
                        callsToWrite = new ArrayList(AsyncRpcChannel.this.pendingCalls.values());
                    }
                    for (AsyncCall call : callsToWrite) {
                        AsyncRpcChannel.this.writeRequest(call);
                    }
                }
            });
        }
        catch (IOException e) {
            this.close(e);
        }
    }

    private SaslClientHandler getSaslHandler(final UserGroupInformation realTicket, final Bootstrap bootstrap) throws IOException {
        return new SaslClientHandler(realTicket, this.authMethod, this.token, this.serverPrincipal, this.client.fallbackAllowed, this.client.conf.get("hbase.rpc.protection", SaslUtil.QualityOfProtection.AUTHENTICATION.name().toLowerCase()), new SaslClientHandler.SaslExceptionHandler(){

            @Override
            public void handle(int retryCount, Random random, Throwable cause) {
                try {
                    AsyncRpcChannel.this.handleSaslConnectionFailure(retryCount, cause, realTicket);
                    AsyncRpcChannel.this.client.newTimeout(new TimerTask(){

                        public void run(Timeout timeout) throws Exception {
                            AsyncRpcChannel.this.connect(bootstrap);
                        }
                    }, random.nextInt(AsyncRpcChannel.this.reloginMaxBackoff) + 1, TimeUnit.MILLISECONDS);
                }
                catch (IOException | InterruptedException e) {
                    AsyncRpcChannel.this.close(e);
                }
            }
        }, new SaslClientHandler.SaslSuccessfulConnectHandler(){

            @Override
            public void onSuccess(Channel channel) {
                AsyncRpcChannel.this.startHBaseConnection(channel);
            }
        });
    }

    private void retryOrClose(final Bootstrap bootstrap, int connectCounter, Throwable e) {
        if (connectCounter < this.client.maxRetries) {
            this.client.newTimeout(new TimerTask(){

                public void run(Timeout timeout) throws Exception {
                    AsyncRpcChannel.this.connect(bootstrap);
                }
            }, this.client.failureSleep, TimeUnit.MILLISECONDS);
        } else {
            this.client.failedServers.addToFailedServers(this.address);
            this.close(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Promise<Message> callMethod(Descriptors.MethodDescriptor method, PayloadCarryingRpcController controller, Message request, Message responsePrototype, MetricsConnection.CallStats callStats) {
        final AsyncCall call = new AsyncCall(this.channel.eventLoop(), this.client.callIdCnt.getAndIncrement(), method, request, controller, responsePrototype, callStats);
        controller.notifyOnCancel(new RpcCallback<Object>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run(Object parameter) {
                Map map = AsyncRpcChannel.this.pendingCalls;
                synchronized (map) {
                    AsyncRpcChannel.this.pendingCalls.remove(call.id);
                }
            }
        });
        if (controller.isCanceled()) {
            call.cancel(true);
            return call;
        }
        Map<Integer, AsyncCall> map = this.pendingCalls;
        synchronized (map) {
            if (this.closed) {
                Promise promise = this.channel.eventLoop().newPromise();
                promise.setFailure((Throwable)new ConnectException());
                return promise;
            }
            this.pendingCalls.put(call.id, call);
            if (this.cleanupTimer == null && call.getRpcTimeout() > 0L) {
                this.cleanupTimer = this.client.newTimeout(this.timeoutTask, call.getRpcTimeout(), TimeUnit.MILLISECONDS);
            }
            if (!this.connected) {
                return call;
            }
        }
        this.writeRequest(call);
        return call;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    AsyncCall removePendingCall(int id) {
        Map<Integer, AsyncCall> map = this.pendingCalls;
        synchronized (map) {
            return this.pendingCalls.remove(id);
        }
    }

    private ChannelFuture writeChannelHeader(Channel channel) throws IOException {
        RPCProtos.ConnectionHeader.Builder headerBuilder = RPCProtos.ConnectionHeader.newBuilder().setServiceName(this.serviceName);
        RPCProtos.UserInformation userInfoPB = this.buildUserInfo(this.ticket.getUGI(), this.authMethod);
        if (userInfoPB != null) {
            headerBuilder.setUserInfo(userInfoPB);
        }
        if (this.client.codec != null) {
            headerBuilder.setCellBlockCodecClass(this.client.codec.getClass().getCanonicalName());
        }
        if (this.client.compressor != null) {
            headerBuilder.setCellBlockCompressorClass(this.client.compressor.getClass().getCanonicalName());
        }
        headerBuilder.setVersionInfo(ProtobufUtil.getVersionInfo());
        RPCProtos.ConnectionHeader header = headerBuilder.build();
        int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header);
        ByteBuf b = channel.alloc().directBuffer(totalSize);
        b.writeInt(header.getSerializedSize());
        b.writeBytes(header.toByteArray());
        return channel.writeAndFlush((Object)b);
    }

    private void writeRequest(AsyncCall call) {
        try {
            ByteBuffer cellBlock;
            RPCProtos.RequestHeader.Builder requestHeaderBuilder = RPCProtos.RequestHeader.newBuilder();
            requestHeaderBuilder.setCallId(call.id).setMethodName(call.method.getName()).setRequestParam(call.param != null);
            if (Trace.isTracing()) {
                Span s = Trace.currentSpan();
                requestHeaderBuilder.setTraceInfo(TracingProtos.RPCTInfo.newBuilder().setParentId(s.getSpanId()).setTraceId(s.getTraceId()));
            }
            if ((cellBlock = this.client.buildCellBlock(call.controller.cellScanner())) != null) {
                RPCProtos.CellBlockMeta.Builder cellBlockBuilder = RPCProtos.CellBlockMeta.newBuilder();
                cellBlockBuilder.setLength(cellBlock.limit());
                requestHeaderBuilder.setCellBlockMeta(cellBlockBuilder.build());
            }
            if (call.controller.getPriority() != 0) {
                requestHeaderBuilder.setPriority(call.controller.getPriority());
            }
            RPCProtos.RequestHeader rh = requestHeaderBuilder.build();
            int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(rh, call.param);
            if (cellBlock != null) {
                totalSize += cellBlock.remaining();
            }
            ByteBuf b = this.channel.alloc().directBuffer(4 + totalSize);
            try (ByteBufOutputStream out = new ByteBufOutputStream(b);){
                call.callStats.setRequestSizeBytes(IPCUtil.write((OutputStream)out, rh, call.param, cellBlock));
            }
            this.channel.writeAndFlush((Object)b).addListener((GenericFutureListener)new CallWriteListener(this, call.id));
        }
        catch (IOException e) {
            this.close(e);
        }
    }

    private void setupAuthorization() throws IOException {
        SecurityInfo securityInfo = SecurityInfo.getInfo(this.serviceName);
        this.useSasl = this.client.userProvider.isHBaseSecurityEnabled();
        this.token = null;
        if (this.useSasl && securityInfo != null) {
            String serverKey;
            AuthenticationProtos.TokenIdentifier.Kind tokenKind = securityInfo.getTokenKind();
            if (tokenKind != null) {
                TokenSelector<? extends TokenIdentifier> tokenSelector = tokenHandlers.get(tokenKind);
                if (tokenSelector != null) {
                    this.token = tokenSelector.selectToken(new Text(this.client.clusterId), this.ticket.getUGI().getTokens());
                } else if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)("No token selector found for type " + tokenKind));
                }
            }
            if ((serverKey = securityInfo.getServerPrincipal()) == null) {
                throw new IOException("Can't obtain server Kerberos config key from SecurityInfo");
            }
            this.serverPrincipal = SecurityUtil.getServerPrincipal((String)this.client.conf.get(serverKey), (String)this.address.getAddress().getCanonicalHostName().toLowerCase());
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("RPC Server Kerberos principal name for service=" + this.serviceName + " is " + this.serverPrincipal));
            }
        }
        this.authMethod = !this.useSasl ? AuthMethod.SIMPLE : (this.token != null ? AuthMethod.DIGEST : AuthMethod.KERBEROS);
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Use " + (Object)((Object)this.authMethod) + " authentication for service " + this.serviceName + ", sasl=" + this.useSasl));
        }
        this.reloginMaxBackoff = this.client.conf.getInt("hbase.security.relogin.maxbackoff", 5000);
    }

    private RPCProtos.UserInformation buildUserInfo(UserGroupInformation ugi, AuthMethod authMethod) {
        if (ugi == null || authMethod == AuthMethod.DIGEST) {
            return null;
        }
        RPCProtos.UserInformation.Builder userInfoPB = RPCProtos.UserInformation.newBuilder();
        if (authMethod == AuthMethod.KERBEROS) {
            userInfoPB.setEffectiveUser(ugi.getUserName());
        } else if (authMethod == AuthMethod.SIMPLE) {
            userInfoPB.setEffectiveUser(ugi.getUserName());
            if (ugi.getRealUser() != null) {
                userInfoPB.setRealUser(ugi.getRealUser().getUserName());
            }
        }
        return userInfoPB.build();
    }

    private void createPreamble(ByteBuf byteBuf, AuthMethod authMethod) {
        byteBuf.writeBytes(HConstants.RPC_HEADER);
        byteBuf.writeByte(0);
        byteBuf.writeByte((int)authMethod.code);
    }

    public void close(final Throwable e) {
        this.client.removeConnection(this);
        this.channel.eventLoop().execute(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                ArrayList toCleanup;
                Map map = AsyncRpcChannel.this.pendingCalls;
                synchronized (map) {
                    if (AsyncRpcChannel.this.closed) {
                        return;
                    }
                    AsyncRpcChannel.this.closed = true;
                    toCleanup = new ArrayList(AsyncRpcChannel.this.pendingCalls.values());
                    AsyncRpcChannel.this.pendingCalls.clear();
                }
                IOException closeException = null;
                if (e != null) {
                    closeException = e instanceof IOException ? (IOException)e : new IOException(e);
                }
                if (LOG.isDebugEnabled() && closeException != null) {
                    LOG.debug((Object)(AsyncRpcChannel.this.name + ": closing ipc connection to " + AsyncRpcChannel.this.address), (Throwable)closeException);
                }
                if (AsyncRpcChannel.this.cleanupTimer != null) {
                    AsyncRpcChannel.this.cleanupTimer.cancel();
                    AsyncRpcChannel.this.cleanupTimer = null;
                }
                for (AsyncCall call : toCleanup) {
                    call.setFailed(closeException != null ? closeException : new ConnectionClosingException("Call id=" + call.id + " on server " + AsyncRpcChannel.this.address + " aborted: connection is closing"));
                }
                AsyncRpcChannel.this.channel.disconnect().addListener((GenericFutureListener)ChannelFutureListener.CLOSE);
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)(AsyncRpcChannel.this.name + ": closed"));
                }
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void cleanupCalls() {
        ArrayList<AsyncCall> toCleanup = new ArrayList<AsyncCall>();
        long currentTime = EnvironmentEdgeManager.currentTime();
        long nextCleanupTaskDelay = -1L;
        Map<Integer, AsyncCall> map = this.pendingCalls;
        synchronized (map) {
            Iterator<AsyncCall> iter = this.pendingCalls.values().iterator();
            while (iter.hasNext()) {
                AsyncCall call = iter.next();
                long timeout = call.getRpcTimeout();
                if (timeout <= 0L) continue;
                if (currentTime - call.getStartTime() >= timeout) {
                    iter.remove();
                    toCleanup.add(call);
                    continue;
                }
                if (nextCleanupTaskDelay >= 0L && timeout >= nextCleanupTaskDelay) continue;
                nextCleanupTaskDelay = timeout;
            }
            this.cleanupTimer = nextCleanupTaskDelay > 0L ? this.client.newTimeout(this.timeoutTask, nextCleanupTaskDelay, TimeUnit.MILLISECONDS) : null;
        }
        for (AsyncCall call : toCleanup) {
            call.setFailed(new CallTimeoutException("Call id=" + call.id + ", waitTime=" + (currentTime - call.getStartTime()) + ", rpcTimeout=" + call.getRpcTimeout()));
        }
    }

    public boolean isAlive() {
        return this.channel.isOpen();
    }

    private synchronized boolean shouldAuthenticateOverKrb() throws IOException {
        UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
        UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
        UserGroupInformation realUser = currentUser.getRealUser();
        return this.authMethod == AuthMethod.KERBEROS && loginUser != null && loginUser.hasKerberosCredentials() && (loginUser.equals((Object)currentUser) || loginUser.equals((Object)realUser));
    }

    private void handleSaslConnectionFailure(final int currRetries, final Throwable ex, UserGroupInformation user) throws IOException, InterruptedException {
        user.doAs((PrivilegedExceptionAction)new PrivilegedExceptionAction<Void>(){

            @Override
            public Void run() throws IOException, InterruptedException {
                if (AsyncRpcChannel.this.shouldAuthenticateOverKrb()) {
                    if (currRetries < 5) {
                        LOG.debug((Object)("Exception encountered while connecting to the server : " + ex));
                        if (UserGroupInformation.isLoginKeytabBased()) {
                            UserGroupInformation.getLoginUser().reloginFromKeytab();
                        } else {
                            UserGroupInformation.getLoginUser().reloginFromTicketCache();
                        }
                        return null;
                    }
                    String msg = "Couldn't setup connection for " + UserGroupInformation.getLoginUser().getUserName() + " to " + AsyncRpcChannel.this.serverPrincipal;
                    LOG.warn((Object)msg);
                    throw (IOException)new IOException(msg).initCause(ex);
                }
                LOG.warn((Object)("Exception encountered while connecting to the server : " + ex));
                if (ex instanceof RemoteException) {
                    throw (RemoteException)ex;
                }
                if (ex instanceof SaslException) {
                    String msg = "SASL authentication failed. The most likely cause is missing or invalid credentials. Consider 'kinit'.";
                    LOG.fatal((Object)msg, ex);
                    throw new RuntimeException(msg, ex);
                }
                throw new IOException(ex);
            }
        });
    }

    public int getConnectionHashCode() {
        return ConnectionId.hashCode(this.ticket, this.serviceName, this.address);
    }

    public int hashCode() {
        return this.getConnectionHashCode();
    }

    public boolean equals(Object obj) {
        if (obj instanceof AsyncRpcChannel) {
            AsyncRpcChannel channel = (AsyncRpcChannel)obj;
            return channel.hashCode() == obj.hashCode();
        }
        return false;
    }

    public String toString() {
        return this.address.toString() + "/" + this.serviceName + "/" + this.ticket;
    }

    static {
        tokenHandlers.put(AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN, new AuthenticationTokenSelector());
    }

    private static final class CallWriteListener
    implements ChannelFutureListener {
        private final AsyncRpcChannel rpcChannel;
        private final int id;

        public CallWriteListener(AsyncRpcChannel asyncRpcChannel, int id) {
            this.rpcChannel = asyncRpcChannel;
            this.id = id;
        }

        public void operationComplete(ChannelFuture future) throws Exception {
            AsyncCall call;
            if (!future.isSuccess() && (call = this.rpcChannel.removePendingCall(this.id)) != null) {
                if (future.cause() instanceof IOException) {
                    call.setFailed((IOException)future.cause());
                } else {
                    call.setFailed(new IOException(future.cause()));
                }
            }
        }
    }
}

