/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.blockmanagement;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.EnumSet;
import java.util.List;
import java.util.Random;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsTracer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.BlockReaderFactory;
import org.apache.hadoop.hdfs.ClientContext;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.RemotePeerFactory;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.security.token.block.SecurityTestUtil;
import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Test;

public class TestBlockTokenWithDFS {
    private static final int BLOCK_SIZE = 1024;
    private static final int FILE_SIZE = 2048;
    private static final String FILE_TO_READ = "/fileToRead.dat";
    private static final String FILE_TO_WRITE = "/fileToWrite.dat";
    private static final String FILE_TO_APPEND = "/fileToAppend.dat";
    private final byte[] rawData = new byte[2048];

    public TestBlockTokenWithDFS() {
        ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
        Random r = new Random();
        r.nextBytes(this.rawData);
    }

    private void createFile(FileSystem fs, Path filename) throws IOException {
        FSDataOutputStream out = fs.create(filename);
        out.write(this.rawData);
        out.close();
    }

    private boolean checkFile1(FSDataInputStream in) {
        byte[] toRead = new byte[2048];
        int totalRead = 0;
        int nRead = 0;
        try {
            while ((nRead = in.read(toRead, totalRead, toRead.length - totalRead)) > 0) {
                totalRead += nRead;
            }
        }
        catch (IOException e) {
            return false;
        }
        Assert.assertEquals((String)"Cannot read file.", (long)toRead.length, (long)totalRead);
        return this.checkFile(toRead);
    }

    private boolean checkFile2(FSDataInputStream in) {
        byte[] toRead = new byte[2048];
        try {
            Assert.assertEquals((String)"Cannot read file", (long)toRead.length, (long)in.read(0L, toRead, 0, toRead.length));
        }
        catch (IOException e) {
            return false;
        }
        return this.checkFile(toRead);
    }

    private boolean checkFile(byte[] fileToCheck) {
        if (fileToCheck.length != this.rawData.length) {
            return false;
        }
        for (int i = 0; i < fileToCheck.length; ++i) {
            if (fileToCheck[i] == this.rawData[i]) continue;
            return false;
        }
        return true;
    }

    private static FSDataOutputStream writeFile(FileSystem fileSys, Path name, short repl, long blockSize) throws IOException {
        FSDataOutputStream stm = fileSys.create(name, true, fileSys.getConf().getInt("io.file.buffer.size", 4096), repl, blockSize);
        return stm;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static void tryRead(final Configuration conf, LocatedBlock lblock, boolean shouldSucceed) {
        InetSocketAddress targetAddr = null;
        IOException ioe = null;
        BlockReader blockReader = null;
        ExtendedBlock block = lblock.getBlock();
        try {
            DatanodeInfo[] nodes = lblock.getLocations();
            targetAddr = NetUtils.createSocketAddr((String)nodes[0].getXferAddr());
            blockReader = new BlockReaderFactory(new DfsClientConf(conf)).setFileName(BlockReaderFactory.getFileName((InetSocketAddress)targetAddr, (String)"test-blockpoolid", (long)block.getBlockId())).setExtendedBlock(block).setBlockToken(lblock.getBlockToken()).setInetSocketAddress(targetAddr).setStartOffset(0L).setLength(-1L).setVerifyChecksum(true).setClientName("TestBlockTokenWithDFS").setDatanodeInfo(nodes[0]).setCachingStrategy(CachingStrategy.newDefaultStrategy()).setClientCacheContext(ClientContext.getFromConf((Configuration)conf)).setConfiguration(conf).setTracer(FsTracer.get((Configuration)conf)).setRemotePeerFactory(new RemotePeerFactory(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                public Peer newConnectedPeer(InetSocketAddress addr, Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId) throws IOException {
                    Peer peer = null;
                    Socket sock = NetUtils.getSocketFactoryFromProperty((Configuration)conf, (String)conf.get("dfs.client.xceiver.socket.factory.class", "org.apache.hadoop.net.StandardSocketFactory")).createSocket();
                    try {
                        sock.connect(addr, HdfsServerConstants.READ_TIMEOUT);
                        sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
                        peer = TcpPeerServer.peerFromSocket((Socket)sock);
                    }
                    finally {
                        if (peer == null) {
                            IOUtils.closeSocket((Socket)sock);
                        }
                    }
                    return peer;
                }
            }).build();
        }
        catch (IOException ex) {
            ioe = ex;
        }
        finally {
            if (blockReader != null) {
                try {
                    blockReader.close();
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }
        if (shouldSucceed) {
            Assert.assertNotNull((String)"OP_READ_BLOCK: access token is invalid, when it is expected to be valid", blockReader);
        } else {
            Assert.assertNotNull((String)"OP_READ_BLOCK: access token is valid, when it is expected to be invalid", (Object)ioe);
            Assert.assertTrue((String)"OP_READ_BLOCK failed due to reasons other than access token: ", (boolean)(ioe instanceof InvalidBlockTokenException));
        }
    }

    private static Configuration getConf(int numDataNodes) {
        Configuration conf = new Configuration();
        conf.setBoolean("dfs.block.access.token.enable", true);
        conf.setLong("dfs.blocksize", 1024L);
        conf.setInt("io.bytes.per.checksum", 1024);
        conf.setInt("dfs.heartbeat.interval", 1);
        conf.setInt("dfs.replication", numDataNodes);
        conf.setInt("ipc.client.connect.max.retries", 0);
        conf.setInt("dfs.client.retry.window.base", 10);
        conf.setInt("dfs.client.failover.max.attempts", 1);
        conf.set("dfs.client.retry.policy.spec", "1,1");
        return conf;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAppend() throws Exception {
        MiniDFSCluster cluster = null;
        int numDataNodes = 2;
        Configuration conf = TestBlockTokenWithDFS.getConf(numDataNodes);
        try {
            cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
            cluster.waitActive();
            Assert.assertEquals((long)numDataNodes, (long)cluster.getDataNodes().size());
            NameNode nn = cluster.getNameNode();
            BlockManager bm = nn.getNamesystem().getBlockManager();
            BlockTokenSecretManager sm = bm.getBlockTokenSecretManager();
            SecurityTestUtil.setBlockTokenLifetime(sm, 1000L);
            Path fileToAppend = new Path(FILE_TO_APPEND);
            DistributedFileSystem fs = cluster.getFileSystem();
            FSDataOutputStream stm = TestBlockTokenWithDFS.writeFile((FileSystem)fs, fileToAppend, (short)numDataNodes, 1024L);
            stm.write(this.rawData, 0, 1);
            stm.close();
            stm = fs.append(fileToAppend);
            int mid = this.rawData.length - 1;
            stm.write(this.rawData, 1, mid - 1);
            stm.hflush();
            Token<BlockTokenIdentifier> token = DFSTestUtil.getBlockToken(stm);
            while (!SecurityTestUtil.isBlockTokenExpired(token)) {
                try {
                    Thread.sleep(10L);
                }
                catch (InterruptedException interruptedException) {}
            }
            cluster.stopDataNode(0);
            stm.write(this.rawData, mid, this.rawData.length - mid);
            stm.close();
            FSDataInputStream in5 = fs.open(fileToAppend);
            Assert.assertTrue((boolean)this.checkFile1(in5));
        }
        finally {
            if (cluster != null) {
                cluster.shutdown();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testWrite() throws Exception {
        MiniDFSCluster cluster = null;
        int numDataNodes = 2;
        Configuration conf = TestBlockTokenWithDFS.getConf(numDataNodes);
        try {
            cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
            cluster.waitActive();
            Assert.assertEquals((long)numDataNodes, (long)cluster.getDataNodes().size());
            NameNode nn = cluster.getNameNode();
            BlockManager bm = nn.getNamesystem().getBlockManager();
            BlockTokenSecretManager sm = bm.getBlockTokenSecretManager();
            SecurityTestUtil.setBlockTokenLifetime(sm, 1000L);
            Path fileToWrite = new Path(FILE_TO_WRITE);
            DistributedFileSystem fs = cluster.getFileSystem();
            FSDataOutputStream stm = TestBlockTokenWithDFS.writeFile((FileSystem)fs, fileToWrite, (short)numDataNodes, 1024L);
            int mid = this.rawData.length - 1;
            stm.write(this.rawData, 0, mid);
            stm.hflush();
            Token<BlockTokenIdentifier> token = DFSTestUtil.getBlockToken(stm);
            while (!SecurityTestUtil.isBlockTokenExpired(token)) {
                try {
                    Thread.sleep(10L);
                }
                catch (InterruptedException interruptedException) {}
            }
            cluster.stopDataNode(0);
            stm.write(this.rawData, mid, this.rawData.length - mid);
            stm.close();
            FSDataInputStream in4 = fs.open(fileToWrite);
            Assert.assertTrue((boolean)this.checkFile1(in4));
        }
        finally {
            if (cluster != null) {
                cluster.shutdown();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRead() throws Exception {
        MiniDFSCluster cluster = null;
        int numDataNodes = 2;
        Configuration conf = TestBlockTokenWithDFS.getConf(numDataNodes);
        try {
            cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
            cluster.waitActive();
            Assert.assertEquals((long)numDataNodes, (long)cluster.getDataNodes().size());
            NameNode nn = cluster.getNameNode();
            NamenodeProtocols nnProto = nn.getRpcServer();
            BlockManager bm = nn.getNamesystem().getBlockManager();
            BlockTokenSecretManager sm = bm.getBlockTokenSecretManager();
            SecurityTestUtil.setBlockTokenLifetime(sm, 1000L);
            Path fileToRead = new Path(FILE_TO_READ);
            DistributedFileSystem fs = cluster.getFileSystem();
            this.createFile((FileSystem)fs, fileToRead);
            FSDataInputStream in1 = fs.open(fileToRead);
            Assert.assertTrue((boolean)this.checkFile1(in1));
            FSDataInputStream in2 = fs.open(fileToRead);
            Assert.assertTrue((boolean)this.checkFile1(in2));
            FSDataInputStream in3 = fs.open(fileToRead);
            Assert.assertTrue((boolean)this.checkFile2(in3));
            try (DFSClient client = null;){
                client = new DFSClient(new InetSocketAddress("localhost", cluster.getNameNodePort()), conf);
            }
            List locatedBlocks = nnProto.getBlockLocations(FILE_TO_READ, 0L, 2048L).getLocatedBlocks();
            LocatedBlock lblock = (LocatedBlock)locatedBlocks.get(0);
            Token myToken = lblock.getBlockToken();
            Assert.assertFalse((boolean)SecurityTestUtil.isBlockTokenExpired((Token<BlockTokenIdentifier>)myToken));
            TestBlockTokenWithDFS.tryRead(conf, lblock, true);
            while (!SecurityTestUtil.isBlockTokenExpired((Token<BlockTokenIdentifier>)myToken)) {
                try {
                    Thread.sleep(10L);
                }
                catch (InterruptedException interruptedException) {}
            }
            Assert.assertTrue((boolean)SecurityTestUtil.isBlockTokenExpired((Token<BlockTokenIdentifier>)myToken));
            TestBlockTokenWithDFS.tryRead(conf, lblock, false);
            lblock.setBlockToken(sm.generateToken(lblock.getBlock(), EnumSet.of(BlockTokenIdentifier.AccessMode.READ)));
            TestBlockTokenWithDFS.tryRead(conf, lblock, true);
            ExtendedBlock wrongBlock = new ExtendedBlock(lblock.getBlock().getBlockPoolId(), lblock.getBlock().getBlockId() + 1L);
            lblock.setBlockToken(sm.generateToken(wrongBlock, EnumSet.of(BlockTokenIdentifier.AccessMode.READ)));
            TestBlockTokenWithDFS.tryRead(conf, lblock, false);
            lblock.setBlockToken(sm.generateToken(lblock.getBlock(), EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE, BlockTokenIdentifier.AccessMode.COPY, BlockTokenIdentifier.AccessMode.REPLACE)));
            TestBlockTokenWithDFS.tryRead(conf, lblock, false);
            SecurityTestUtil.setBlockTokenLifetime(sm, 600000L);
            List<LocatedBlock> lblocks = DFSTestUtil.getAllBlocks(in1);
            for (LocatedBlock locatedBlock : lblocks) {
                Assert.assertTrue((boolean)SecurityTestUtil.isBlockTokenExpired((Token<BlockTokenIdentifier>)locatedBlock.getBlockToken()));
            }
            in1.seek(0L);
            Assert.assertTrue((boolean)this.checkFile1(in1));
            List<LocatedBlock> lblocks2 = DFSTestUtil.getAllBlocks(in2);
            for (LocatedBlock blk : lblocks2) {
                Assert.assertTrue((boolean)SecurityTestUtil.isBlockTokenExpired((Token<BlockTokenIdentifier>)blk.getBlockToken()));
            }
            Assert.assertTrue((boolean)in2.seekToNewSource(0L));
            Assert.assertTrue((boolean)this.checkFile1(in2));
            List<LocatedBlock> list = DFSTestUtil.getAllBlocks(in3);
            for (LocatedBlock blk : list) {
                Assert.assertTrue((boolean)SecurityTestUtil.isBlockTokenExpired((Token<BlockTokenIdentifier>)blk.getBlockToken()));
            }
            Assert.assertTrue((boolean)this.checkFile2(in3));
            Assert.assertTrue((boolean)cluster.restartDataNodes(true));
            cluster.waitActive();
            Assert.assertEquals((long)numDataNodes, (long)cluster.getDataNodes().size());
            cluster.shutdownNameNode(0);
            lblocks = DFSTestUtil.getAllBlocks(in1);
            for (LocatedBlock blk : lblocks) {
                Assert.assertFalse((boolean)SecurityTestUtil.isBlockTokenExpired((Token<BlockTokenIdentifier>)blk.getBlockToken()));
            }
            in1.seek(0L);
            Assert.assertTrue((boolean)this.checkFile1(in1));
            lblocks2 = DFSTestUtil.getAllBlocks(in2);
            for (LocatedBlock blk : lblocks2) {
                Assert.assertFalse((boolean)SecurityTestUtil.isBlockTokenExpired((Token<BlockTokenIdentifier>)blk.getBlockToken()));
            }
            in2.seekToNewSource(0L);
            Assert.assertTrue((boolean)this.checkFile1(in2));
            List<LocatedBlock> list2 = DFSTestUtil.getAllBlocks(in3);
            for (LocatedBlock blk : list2) {
                Assert.assertFalse((boolean)SecurityTestUtil.isBlockTokenExpired((Token<BlockTokenIdentifier>)blk.getBlockToken()));
            }
            Assert.assertTrue((boolean)this.checkFile2(in3));
            cluster.restartNameNode(0);
            cluster.shutdownNameNode(0);
            in1.seek(0L);
            Assert.assertTrue((boolean)this.checkFile1(in1));
            in2.seekToNewSource(0L);
            Assert.assertTrue((boolean)this.checkFile1(in2));
            Assert.assertTrue((boolean)this.checkFile2(in3));
            cluster.restartNameNode(0);
            Assert.assertTrue((boolean)cluster.restartDataNodes(true));
            cluster.waitActive();
            Assert.assertEquals((long)numDataNodes, (long)cluster.getDataNodes().size());
            cluster.shutdownNameNode(0);
            in1.seek(0L);
            Assert.assertFalse((boolean)this.checkFile1(in1));
            Assert.assertFalse((boolean)this.checkFile2(in3));
            cluster.restartNameNode(0);
            in1.seek(0L);
            Assert.assertTrue((boolean)this.checkFile1(in1));
            in2.seekToNewSource(0L);
            Assert.assertTrue((boolean)this.checkFile1(in2));
            Assert.assertTrue((boolean)this.checkFile2(in3));
            Assert.assertTrue((boolean)cluster.restartDataNodes(false));
            cluster.waitActive();
            Assert.assertEquals((long)numDataNodes, (long)cluster.getDataNodes().size());
            in1.seek(0L);
            Assert.assertTrue((boolean)this.checkFile1(in1));
            in2.seekToNewSource(0L);
            Assert.assertTrue((boolean)this.checkFile1(in2));
            Assert.assertTrue((boolean)this.checkFile2(in3));
        }
        finally {
            if (cluster != null) {
                cluster.shutdown();
            }
        }
    }

    @Test
    public void testEnd2End() throws Exception {
        Configuration conf = new Configuration();
        conf.setBoolean("dfs.block.access.token.enable", true);
        new TestBalancer().integrationTest(conf);
    }
}

