package io.hops.hopsworks.common.upload;

import io.hops.hopsworks.common.dao.kafka.KafkaConst;
import io.hops.hopsworks.common.hdfs.DistributedFileSystemOps;
import io.hops.hopsworks.common.hdfs.DistributedFsService;
import io.hops.hopsworks.exceptions.DatasetException;
import io.hops.hopsworks.restutils.RESTCodes;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Comparator;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.ejb.EJB;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.AccessControlException;

@TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
@Stateless
/* loaded from: input_file:io/hops/hopsworks/common/upload/UploadController.class */
public class UploadController {
    private static final Logger LOGGER = Logger.getLogger(UploadController.class.getName());

    @EJB
    private StagingManager stagingManager;

    @EJB
    private DistributedFsService dfs;

    @EJB
    private ResumableInfoStorage storage;

    public void checkPermission(FlowInfo flowInfo, String str, String str2) throws IOException, DatasetException {
        if (str2 != null) {
            DistributedFileSystemOps distributedFileSystemOps = null;
            try {
                distributedFileSystemOps = this.dfs.getDfsOps(str2);
                checkPermission(distributedFileSystemOps, flowInfo, str);
                this.dfs.closeDfsClient(distributedFileSystemOps);
            } catch (Throwable th) {
                this.dfs.closeDfsClient(distributedFileSystemOps);
                throw th;
            }
        }
    }

    public boolean upload(InputStream inputStream, FlowInfo flowInfo, String str, String str2) throws DatasetException, AccessControlException {
        LOGGER.log(Level.FINE, "Uploading:- chunk: {0}, id: {1}", new Object[]{Integer.valueOf(flowInfo.getChunkNumber()), flowInfo.getIdentifier()});
        try {
            return hdfsStagingUpload(inputStream, flowInfo, str, str2);
        } catch (IOException e) {
            LOGGER.log(Level.WARNING, "Failed to upload file: {0}", flowInfo.getFilename());
            throw new DatasetException(RESTCodes.DatasetErrorCode.UPLOAD_ERROR, Level.FINE, "Failed to upload." + e.getMessage(), e.getMessage());
        } catch (AccessControlException e2) {
            throw new AccessControlException("Permission denied: You can not upload to this folder. ");
        }
    }

    public boolean isUploaded(FlowInfo flowInfo, String str) {
        flowInfo.setFilePath(getTmpStagingDir(flowInfo.getFilename(), str).toString());
        return this.storage.isUploaded(Integer.valueOf(flowInfo.hashCode()), Integer.valueOf(flowInfo.getChunkNumber()));
    }

    private void checkPermission(DistributedFileSystemOps distributedFileSystemOps, FlowInfo flowInfo, String str) throws IOException, DatasetException {
        Path path = new Path(str, flowInfo.getFilename());
        if (flowInfo.getChunkNumber() == 1) {
            try {
                if (distributedFileSystemOps.exists(path)) {
                    throw new DatasetException(RESTCodes.DatasetErrorCode.DESTINATION_EXISTS, Level.FINE, "Destination already exists. path: " + path);
                }
                distributedFileSystemOps.touchz(path);
                distributedFileSystemOps.rm(path, false);
            } catch (AccessControlException e) {
                throw new AccessControlException("Permission denied: You can not upload to this folder. ");
            }
        }
    }

    private void saveChunk(DistributedFileSystemOps distributedFileSystemOps, InputStream inputStream, FlowInfo flowInfo, int i) throws IOException {
        OutputStream outputStream = null;
        try {
            outputStream = distributedFileSystemOps.create(new Path(flowInfo.getFilePath(), String.valueOf(i)));
            IOUtils.copy(inputStream, outputStream);
            IOUtils.closeQuietly(inputStream);
            IOUtils.closeQuietly(outputStream);
        } catch (Throwable th) {
            IOUtils.closeQuietly(inputStream);
            IOUtils.closeQuietly(outputStream);
            throw th;
        }
    }

    private boolean markAsUploadedAndCheckFinished(DistributedFileSystemOps distributedFileSystemOps, FlowInfo flowInfo, int i, long j) throws IOException {
        FileStatus[] listStatus;
        if (!this.storage.addChunkAndCheckIfFinished(flowInfo, i, j)) {
            return false;
        }
        Path path = new Path(flowInfo.getFilePath());
        if (!distributedFileSystemOps.exists(path) || !distributedFileSystemOps.getFileStatus(path).isDirectory() || (listStatus = distributedFileSystemOps.listStatus(path)) == null || listStatus.length <= 0) {
            return true;
        }
        if (listStatus.length > 1) {
            Arrays.sort(listStatus, Comparator.comparingInt(fileStatus -> {
                return Integer.parseInt(fileStatus.getPath().getName());
            }));
        }
        OutputStream outputStream = null;
        InputStream inputStream = null;
        try {
            outputStream = distributedFileSystemOps.create(fromTemp(path));
            for (FileStatus fileStatus2 : listStatus) {
                try {
                    inputStream = distributedFileSystemOps.open(fileStatus2.getPath());
                    IOUtils.copy(inputStream, outputStream);
                    IOUtils.closeQuietly(inputStream);
                } catch (Throwable th) {
                    IOUtils.closeQuietly(inputStream);
                    throw th;
                }
            }
            distributedFileSystemOps.rm(path, true);
            IOUtils.closeQuietly(outputStream);
            return true;
        } catch (Throwable th2) {
            IOUtils.closeQuietly(outputStream);
            throw th2;
        }
    }

    private void copyToHdfs(DistributedFileSystemOps distributedFileSystemOps, FlowInfo flowInfo, String str) throws IOException {
        try {
            Path path = new Path(str, flowInfo.getFilename());
            Path fromTemp = fromTemp(flowInfo.getFilePath());
            if (!fromTemp.equals(path)) {
                distributedFileSystemOps.moveWithinHdfs(fromTemp, path, true);
                LOGGER.log(Level.INFO, "Move within Hdfs. {0}", new Object[]{fromTemp, path});
            }
            distributedFileSystemOps.setPermission(path, distributedFileSystemOps.getParentPermission(path));
            LOGGER.log(Level.INFO, "Copied to HDFS. {0}", path);
        } catch (AccessControlException e) {
            throw new AccessControlException("Permission denied: You can not upload to this folder. ");
        }
    }

    private boolean hdfsStagingUpload(InputStream inputStream, FlowInfo flowInfo, String str, String str2) throws DatasetException, IOException {
        DistributedFileSystemOps distributedFileSystemOps = null;
        try {
            distributedFileSystemOps = this.dfs.getDfsOps(str2);
            checkPermission(distributedFileSystemOps, flowInfo, str);
            flowInfo.setFilePath(getTmpStagingDir(distributedFileSystemOps, flowInfo.getFilename(), str));
            this.storage.put(flowInfo);
            saveChunk(distributedFileSystemOps, inputStream, flowInfo, flowInfo.getChunkNumber());
            boolean markAsUploadedAndCheckFinished = markAsUploadedAndCheckFinished(distributedFileSystemOps, flowInfo, flowInfo.getChunkNumber(), flowInfo.getCurrentChunkSize());
            if (markAsUploadedAndCheckFinished) {
                copyToHdfs(distributedFileSystemOps, flowInfo, str);
            }
            if (distributedFileSystemOps != null) {
                this.dfs.closeDfsClient(distributedFileSystemOps);
            }
            return markAsUploadedAndCheckFinished;
        } catch (Throwable th) {
            if (distributedFileSystemOps != null) {
                this.dfs.closeDfsClient(distributedFileSystemOps);
            }
            throw th;
        }
    }

    private String getTmpStagingDir(DistributedFileSystemOps distributedFileSystemOps, String str, String str2) throws DatasetException {
        Path tmpStagingDir = getTmpStagingDir(str, str2);
        try {
            if (!distributedFileSystemOps.exists(tmpStagingDir) && !distributedFileSystemOps.mkdirs(tmpStagingDir, distributedFileSystemOps.getParentPermission(tmpStagingDir))) {
                LOGGER.log(Level.WARNING, "Failed to create upload staging dir. Path: {0}", tmpStagingDir.toString());
            }
            return tmpStagingDir.toString();
        } catch (IOException e) {
            LOGGER.log(Level.WARNING, "Failed to create upload staging dir. Path: {0}", tmpStagingDir.toString());
            throw new DatasetException(RESTCodes.DatasetErrorCode.UPLOAD_ERROR, Level.SEVERE, "Failed to create upload staging dir", e.getMessage(), e);
        }
    }

    private Path getTmpStagingDir(String str, String str2) {
        return toTemp(this.stagingManager.getStagingPath() + str2, str);
    }

    private Path toTemp(String str, String str2) {
        return new Path(str, str2 + ".temp");
    }

    private Path fromTemp(Path path) {
        return fromTemp(path.toString().replace(".temp", KafkaConst.KAFKA_ENDPOINT_IDENTIFICATION_ALGORITHM));
    }

    private Path fromTemp(String str) {
        return new Path(str.replace(".temp", KafkaConst.KAFKA_ENDPOINT_IDENTIFICATION_ALGORITHM));
    }
}
