/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.upload;

import com.google.common.annotations.VisibleForTesting;
import io.hops.hopsworks.common.hdfs.DistributedFileSystemOps;
import io.hops.hopsworks.common.hdfs.DistributedFsService;
import io.hops.hopsworks.common.upload.FlowInfo;
import io.hops.hopsworks.common.upload.ResumableInfoStorage;
import io.hops.hopsworks.common.upload.StagingManager;
import io.hops.hopsworks.exceptions.DatasetException;
import io.hops.hopsworks.restutils.RESTCodes;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.ejb.EJB;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.AccessControlException;

@Stateless
@TransactionAttribute(value=TransactionAttributeType.NOT_SUPPORTED)
public class UploadController {
    private static final Logger LOGGER = Logger.getLogger(UploadController.class.getName());
    @EJB
    private StagingManager stagingManager;
    @EJB
    private DistributedFsService dfs;
    @EJB
    private ResumableInfoStorage storage;

    public UploadController() {
    }

    @VisibleForTesting
    public UploadController(StagingManager stagingManager, DistributedFsService dfs, ResumableInfoStorage storage) {
        this.stagingManager = stagingManager;
        this.dfs = dfs;
        this.storage = storage;
    }

    public boolean upload(InputStream uploadedInputStream, FlowInfo flowInfo, String hdfsPath, String username) throws DatasetException, AccessControlException {
        LOGGER.log(Level.FINE, "Uploading:- chunk: {0}, id: {1}", new Object[]{flowInfo.getChunkNumber(), flowInfo.getIdentifier()});
        try {
            return this.hdfsStagingUpload(uploadedInputStream, flowInfo, hdfsPath, username);
        }
        catch (AccessControlException ex) {
            throw new AccessControlException("Permission denied: You can not upload to this folder. ");
        }
        catch (IOException e) {
            LOGGER.log(Level.WARNING, "Failed to upload file: {0} {1}", new Object[]{flowInfo.getFilename(), e.getMessage()});
            throw new DatasetException(RESTCodes.DatasetErrorCode.UPLOAD_ERROR, Level.FINE, "Failed to upload. " + e.getMessage(), e.getMessage());
        }
    }

    public boolean uploaded(FlowInfo flowInfo, String hdfsPath) {
        flowInfo.setFilePath(this.getTmpStagingDir(flowInfo.getFilename(), hdfsPath).toString());
        return this.storage.uploaded(flowInfo.hashCode(), flowInfo.getChunkNumber());
    }

    private void checkPermission(DistributedFileSystemOps udfso, FlowInfo flowInfo, String path, Path stagingDir) throws IOException, DatasetException {
        String fileName = flowInfo.getFilename();
        Path dest = new Path(path, fileName);
        if (flowInfo.getChunkNumber() == 1) {
            try {
                if (udfso.exists(dest)) {
                    throw new DatasetException(RESTCodes.DatasetErrorCode.DESTINATION_EXISTS, Level.FINE, "Destination already exists. path: " + dest);
                }
                this.createStagingDirIfNotExist(udfso, stagingDir);
                udfso.touchz(dest);
            }
            catch (AccessControlException ex) {
                throw new AccessControlException("Permission denied: You can not upload to this folder. ");
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void saveChunk(DistributedFileSystemOps dfsOps, InputStream uploadedInputStream, FlowInfo info) throws IOException {
        Path location = new Path(info.getFilePath(), String.valueOf(info.getChunkNumber()));
        FSDataOutputStream out = null;
        try {
            out = dfsOps.create(location);
            IOUtils.copy((InputStream)uploadedInputStream, (OutputStream)out);
        }
        finally {
            IOUtils.closeQuietly((InputStream)uploadedInputStream);
            IOUtils.closeQuietly((OutputStream)out);
        }
    }

    private boolean markAsUploadedAndCheckFinished(DistributedFileSystemOps dfsOps, FlowInfo info) throws IOException, DatasetException {
        if (this.storage.addChunkAndCheckIfFinished(info, info.getChunkNumber(), info.getCurrentChunkSize())) {
            this.mergeChunks(info, dfsOps);
            return true;
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void mergeChunks(FlowInfo info, DistributedFileSystemOps dfsOps) throws IOException, DatasetException {
        block7: {
            Path location = new Path(info.getFilePath());
            if (!dfsOps.exists(location) || !dfsOps.getFileStatus(location).isDirectory()) break block7;
            Path collected = this.fromTemp(location);
            FSDataOutputStream out = null;
            FSDataInputStream in = null;
            try {
                out = dfsOps.create(collected);
                for (int i = 1; i <= info.getTotalChunks(); ++i) {
                    try {
                        Path chunk = new Path(info.getFilePath(), String.valueOf(i));
                        if (!dfsOps.exists(chunk) || !dfsOps.getFileStatus(chunk).isFile()) {
                            throw new DatasetException(RESTCodes.DatasetErrorCode.UPLOAD_ERROR, Level.SEVERE, "Could not find chunk: " + i);
                        }
                        in = dfsOps.open(chunk);
                        IOUtils.copy((InputStream)in, (OutputStream)out);
                    }
                    catch (Throwable throwable) {
                        IOUtils.closeQuietly(in);
                        throw throwable;
                    }
                    IOUtils.closeQuietly((InputStream)in);
                }
                dfsOps.rm(location, true);
            }
            finally {
                IOUtils.closeQuietly((OutputStream)out);
            }
        }
    }

    private void copyToHdfs(DistributedFileSystemOps dfsOps, FlowInfo info, String hdfsPath) throws IOException {
        try {
            Path location = new Path(hdfsPath, info.getFilename());
            Path stagingFilePath = this.fromTemp(info.getFilePath());
            if (!stagingFilePath.equals((Object)location)) {
                dfsOps.moveWithinHdfs(stagingFilePath, location, true);
                LOGGER.log(Level.INFO, "Move within Hdfs. {0}", new Object[]{stagingFilePath, location});
            }
            dfsOps.setPermission(location, dfsOps.getParentPermission(location));
            LOGGER.log(Level.INFO, "Copied to HDFS. {0}", location);
        }
        catch (AccessControlException ex) {
            throw new AccessControlException("Permission denied: You can not upload to this folder. ");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean hdfsStagingUpload(InputStream uploadedInputStream, FlowInfo flowInfo, String hdfsPath, String username) throws DatasetException, IOException {
        boolean finished;
        DistributedFileSystemOps dfsOps = null;
        try {
            dfsOps = this.dfs.getDfsOps(username);
            Path stagingDir = this.getTmpStagingDir(hdfsPath, flowInfo.getFilename());
            this.checkPermission(dfsOps, flowInfo, hdfsPath, stagingDir);
            this.checkStagingDir(dfsOps, flowInfo, hdfsPath, stagingDir);
            flowInfo.setFilePath(stagingDir.toString());
            this.storage.put(flowInfo);
            finished = this.saveChunkAndCheckFinished(uploadedInputStream, dfsOps, flowInfo, hdfsPath);
            if (finished) {
                this.copyToHdfs(dfsOps, flowInfo, hdfsPath);
            }
        }
        finally {
            if (dfsOps != null) {
                this.dfs.closeDfsClient(dfsOps);
            }
        }
        return finished;
    }

    private boolean saveChunkAndCheckFinished(InputStream uploadedInputStream, DistributedFileSystemOps dfsOps, FlowInfo flowInfo, String hdfsPath) throws IOException, DatasetException {
        try {
            this.saveChunk(dfsOps, uploadedInputStream, flowInfo);
            return this.markAsUploadedAndCheckFinished(dfsOps, flowInfo);
        }
        catch (Exception e) {
            if (flowInfo.getChunkNumber() == 1) {
                LOGGER.log(Level.INFO, "Failed on chunk 1. Cleaning up temporary placeholder.");
                dfsOps.rm(new Path(hdfsPath, flowInfo.getFilename()), false);
            }
            throw e;
        }
    }

    private void checkStagingDir(DistributedFileSystemOps dfsOps, FlowInfo flowInfo, String hdfsPath, Path stagingDir) throws DatasetException {
        Path dest = new Path(hdfsPath, flowInfo.getFilename());
        try {
            if (!dfsOps.exists(dest)) {
                this.createStagingDirIfNotExist(dfsOps, stagingDir);
            } else if (!dfsOps.exists(stagingDir)) {
                throw new DatasetException(RESTCodes.DatasetErrorCode.DESTINATION_EXISTS, Level.FINE, "Destination already exists. path: " + dest);
            }
            if (dfsOps.exists(stagingDir) && !dfsOps.getFileStatus(stagingDir).isDirectory()) {
                throw new DatasetException(RESTCodes.DatasetErrorCode.DESTINATION_EXISTS, Level.FINE, "Failed to create upload staging dir. path: " + stagingDir);
            }
        }
        catch (IOException e) {
            LOGGER.log(Level.WARNING, "Failed to create upload staging dir. Path: {0}", stagingDir.toString());
            throw new DatasetException(RESTCodes.DatasetErrorCode.UPLOAD_ERROR, Level.SEVERE, "Failed to create upload staging dir", e.getMessage(), (Throwable)e);
        }
    }

    private void createStagingDirIfNotExist(DistributedFileSystemOps dfsOps, Path stagingDir) throws IOException {
        boolean created;
        if (!dfsOps.exists(stagingDir) && !(created = dfsOps.mkdirs(stagingDir, dfsOps.getParentPermission(stagingDir)))) {
            LOGGER.log(Level.INFO, "Failed to create upload staging dir. Path: {0}", stagingDir.toString());
        }
    }

    private Path getTmpStagingDir(String hdfsPath, String filename) {
        String baseDir = this.stagingManager.getStagingPath() + hdfsPath;
        return this.toTemp(baseDir, filename);
    }

    private Path toTemp(String baseDir, String filename) {
        return new Path(baseDir, filename + ".temp");
    }

    private Path fromTemp(Path path) {
        return this.fromTemp(path.toString());
    }

    private Path fromTemp(String path) {
        int start = path.lastIndexOf(".temp");
        return new Path(path.substring(0, start));
    }
}

