/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources.helpers;

import com.amazonaws.regions.Regions;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.BatchResultErrorEntry;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
import com.amazonaws.services.sqs.model.DeleteMessageBatchResult;
import com.amazonaws.services.sqs.model.GetQueueAttributesRequest;
import com.amazonaws.services.sqs.model.GetQueueAttributesResult;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.json.JSONObject;

public class CloudObjectsSelector {
    public static final List<String> ALLOWED_S3_EVENT_PREFIX = Collections.singletonList("ObjectCreated");
    public static final String S3_PREFIX = "s3://";
    public static volatile Logger log = LogManager.getLogger(CloudObjectsSelector.class);
    public static final String SQS_ATTR_APPROX_MESSAGES = "ApproximateNumberOfMessages";
    static final String SQS_MODEL_MESSAGE = "Message";
    static final String SQS_MODEL_EVENT_RECORDS = "Records";
    static final String SQS_MODEL_EVENT_NAME = "eventName";
    static final String S3_MODEL_EVENT_TIME = "eventTime";
    static final String S3_FILE_SIZE = "fileSize";
    static final String S3_FILE_PATH = "filePath";
    public final String queueUrl;
    public final int longPollWait;
    public final int maxMessagesPerRequest;
    public final int maxMessagePerBatch;
    public final int visibilityTimeout;
    public final TypedProperties props;
    public final String fsName;
    private final String regionName;

    public CloudObjectsSelector(TypedProperties props) {
        DataSourceUtils.checkRequiredProperties((TypedProperties)props, Arrays.asList("hoodie.deltastreamer.s3.source.queue.url", "hoodie.deltastreamer.s3.source.queue.region"));
        this.props = props;
        this.queueUrl = props.getString("hoodie.deltastreamer.s3.source.queue.url");
        this.regionName = props.getString("hoodie.deltastreamer.s3.source.queue.region");
        this.fsName = props.getString("hoodie.deltastreamer.s3.source.queue.fs", "s3").toLowerCase();
        this.longPollWait = props.getInteger("hoodie.deltastreamer.s3.source.queue.long.poll.wait", 20);
        this.maxMessagePerBatch = props.getInteger("hoodie.deltastreamer.s3.source.queue.max.messages.per.batch", 5);
        this.visibilityTimeout = props.getInteger("hoodie.deltastreamer.s3.source.queue.visibility.timeout", 30);
        this.maxMessagesPerRequest = 10;
    }

    protected Map<String, String> getSqsQueueAttributes(AmazonSQS sqsClient, String queueUrl) {
        GetQueueAttributesResult queueAttributesResult = sqsClient.getQueueAttributes(new GetQueueAttributesRequest(queueUrl).withAttributeNames(new String[]{SQS_ATTR_APPROX_MESSAGES}));
        return queueAttributesResult.getAttributes();
    }

    protected Map<String, Object> getFileAttributesFromRecord(JSONObject record) throws UnsupportedEncodingException {
        HashMap<String, Object> fileRecord = new HashMap<String, Object>();
        String eventTimeStr = record.getString(S3_MODEL_EVENT_TIME);
        long eventTime = Date.from(Instant.from(DateTimeFormatter.ISO_INSTANT.parse(eventTimeStr))).getTime();
        JSONObject s3Object = record.getJSONObject("s3").getJSONObject("object");
        String bucket = URLDecoder.decode(record.getJSONObject("s3").getJSONObject("bucket").getString("name"), "UTF-8");
        String key = URLDecoder.decode(s3Object.getString("key"), "UTF-8");
        String filePath = this.fsName + "://" + bucket + "/" + key;
        fileRecord.put(S3_MODEL_EVENT_TIME, eventTime);
        fileRecord.put(S3_FILE_SIZE, s3Object.getLong("size"));
        fileRecord.put(S3_FILE_PATH, filePath);
        return fileRecord;
    }

    public AmazonSQS createAmazonSqsClient() {
        return (AmazonSQS)((AmazonSQSClientBuilder)AmazonSQSClientBuilder.standard().withRegion(Regions.fromName((String)this.regionName))).build();
    }

    protected List<Message> getMessagesToProcess(AmazonSQS sqsClient, String queueUrl, int longPollWait, int visibilityTimeout, int maxMessagePerBatch, int maxMessagesPerRequest) {
        ArrayList<Message> messagesToProcess = new ArrayList<Message>();
        ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest().withQueueUrl(queueUrl).withWaitTimeSeconds(Integer.valueOf(longPollWait)).withVisibilityTimeout(Integer.valueOf(visibilityTimeout));
        receiveMessageRequest.setMaxNumberOfMessages(Integer.valueOf(maxMessagesPerRequest));
        Map<String, String> queueAttributesResult = this.getSqsQueueAttributes(sqsClient, queueUrl);
        long approxMessagesAvailable = Long.parseLong(queueAttributesResult.get(SQS_ATTR_APPROX_MESSAGES));
        log.info((Object)("Approximately " + approxMessagesAvailable + " messages available in queue."));
        long numMessagesToProcess = Math.min(approxMessagesAvailable, (long)maxMessagePerBatch);
        for (int i = 0; i < (int)Math.ceil((double)numMessagesToProcess / (double)maxMessagesPerRequest); ++i) {
            List messages = sqsClient.receiveMessage(receiveMessageRequest).getMessages();
            log.debug((Object)("Number of messages: " + messages.size()));
            messagesToProcess.addAll(messages);
            if (messages.isEmpty()) break;
        }
        return messagesToProcess;
    }

    protected List<List<Message>> createListPartitions(List<Message> singleList, int eachBatchSize) {
        ArrayList<List<Message>> listPartitions = new ArrayList<List<Message>>();
        if (singleList.size() == 0 || eachBatchSize < 1) {
            return listPartitions;
        }
        for (int start = 0; start < singleList.size(); start += eachBatchSize) {
            int end = Math.min(start + eachBatchSize, singleList.size());
            if (start > end) {
                throw new IndexOutOfBoundsException("Index " + start + " is out of the list range <0," + (singleList.size() - 1) + ">");
            }
            listPartitions.add(new ArrayList<Message>(singleList.subList(start, end)));
        }
        return listPartitions;
    }

    protected void deleteBatchOfMessages(AmazonSQS sqs, String queueUrl, List<Message> messagesToBeDeleted) {
        DeleteMessageBatchRequest deleteBatchReq = new DeleteMessageBatchRequest().withQueueUrl(queueUrl);
        List deleteEntries = deleteBatchReq.getEntries();
        for (Message message : messagesToBeDeleted) {
            deleteEntries.add(new DeleteMessageBatchRequestEntry().withId(message.getMessageId()).withReceiptHandle(message.getReceiptHandle()));
        }
        DeleteMessageBatchResult deleteResult = sqs.deleteMessageBatch(deleteBatchReq);
        List deleteFailures = deleteResult.getFailed().stream().map(BatchResultErrorEntry::getId).collect(Collectors.toList());
        if (!deleteFailures.isEmpty()) {
            log.warn((Object)("Failed to delete " + deleteFailures.size() + " messages out of " + deleteEntries.size() + " from queue."));
        } else {
            log.info((Object)("Successfully deleted " + deleteEntries.size() + " messages from queue."));
        }
    }

    public void deleteProcessedMessages(AmazonSQS sqs, String queueUrl, List<Message> processedMessages) {
        if (!processedMessages.isEmpty()) {
            List<List<Message>> deleteBatches = this.createListPartitions(processedMessages, 10);
            for (List<Message> deleteBatch : deleteBatches) {
                this.deleteBatchOfMessages(sqs, queueUrl, deleteBatch);
            }
        }
    }

    public static class Config {
        private static final String HOODIE_DELTASTREAMER_S3_SOURCE = "hoodie.deltastreamer.s3.source";
        public static final String S3_SOURCE_QUEUE_URL = "hoodie.deltastreamer.s3.source.queue.url";
        public static final String S3_SOURCE_QUEUE_REGION = "hoodie.deltastreamer.s3.source.queue.region";
        public static final String S3_SOURCE_QUEUE_FS = "hoodie.deltastreamer.s3.source.queue.fs";
        public static final String S3_QUEUE_LONG_POLL_WAIT = "hoodie.deltastreamer.s3.source.queue.long.poll.wait";
        public static final String S3_SOURCE_QUEUE_MAX_MESSAGES_PER_BATCH = "hoodie.deltastreamer.s3.source.queue.max.messages.per.batch";
        public static final String S3_SOURCE_QUEUE_VISIBILITY_TIMEOUT = "hoodie.deltastreamer.s3.source.queue.visibility.timeout";
        public static final String SOURCE_INPUT_SELECTOR = "hoodie.deltastreamer.source.input.selector";
    }
}

