package io.hops.hopsworks.vectordb;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import org.apache.http.client.config.RequestConfig;
import org.opensearch.OpenSearchException;
import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.client.indices.CreateIndexRequest;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.client.indices.PutMappingRequest;
import org.opensearch.cluster.metadata.MappingMetadata;
import org.opensearch.common.Strings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.query.QueryStringQueryBuilder;
import org.opensearch.index.reindex.DeleteByQueryRequest;
import org.opensearch.rest.RestStatus;
import org.opensearch.search.SearchHit;
import org.opensearch.search.builder.SearchSourceBuilder;

/* loaded from: input_file:io/hops/hopsworks/vectordb/OpensearchVectorDatabase.class */
public class OpensearchVectorDatabase implements VectorDatabase {
    private RestHighLevelClient client;
    private ObjectMapper objectMapper;
    private Integer requestTimeout;
    private Integer socketTimeout;
    private final Integer maxRetry;
    private static final Logger LOGGER = Logger.getLogger(OpensearchVectorDatabase.class.getName());
    private static final Map<String, String> dataTypeMap = ImmutableMap.builder().put("BOOLEAN", "byte").put("TINYINT", "byte").put("INT", "integer").put("SMALLINT", "short").put("BIGINT", "long").put("FLOAT", "float").put("DOUBLE", "double").put("TIMESTAMP", "date").put("DATE", "date").put("STRING", "text").put("ARRAY", "binary").put("STRUCT", "binary").put("BINARY", "binary").put("MAP", "binary").build();

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:io/hops/hopsworks/vectordb/OpensearchVectorDatabase$OperationResult.class */
    public static class OperationResult<T> {
        private final Boolean success;
        private final T result;

        public OperationResult(Boolean bool, T t) {
            this.success = bool;
            this.result = t;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @FunctionalInterface
    /* loaded from: input_file:io/hops/hopsworks/vectordb/OpensearchVectorDatabase$OperationSupplier.class */
    public interface OperationSupplier {
        OperationResult perform() throws IOException, OpenSearchStatusException, VectorDatabaseException;
    }

    public static String getDataType(String str) {
        if (Strings.isNullOrEmpty(str)) {
            return null;
        }
        return dataTypeMap.get(str.split("<")[0].toUpperCase());
    }

    public OpensearchVectorDatabase() {
        this.client = null;
        this.objectMapper = new ObjectMapper();
        this.requestTimeout = 60000;
        this.socketTimeout = 61000;
        this.maxRetry = 3;
    }

    public OpensearchVectorDatabase(RestHighLevelClient restHighLevelClient) {
        this.client = null;
        this.objectMapper = new ObjectMapper();
        this.requestTimeout = 60000;
        this.socketTimeout = 61000;
        this.maxRetry = 3;
        this.client = restHighLevelClient;
    }

    public OpensearchVectorDatabase(RestHighLevelClient restHighLevelClient, Integer num) {
        this.client = null;
        this.objectMapper = new ObjectMapper();
        this.requestTimeout = 60000;
        this.socketTimeout = 61000;
        this.maxRetry = 3;
        this.client = restHighLevelClient;
        this.requestTimeout = num;
        this.socketTimeout = Integer.valueOf(num.intValue() + 1000);
    }

    public void init(RestHighLevelClient restHighLevelClient) {
        this.client = restHighLevelClient;
    }

    protected RestHighLevelClient getClient() throws VectorDatabaseException {
        return this.client;
    }

    @Override // io.hops.hopsworks.vectordb.VectorDatabase
    public void createIndex(Index index, String str, Boolean bool) throws VectorDatabaseException {
        if (bool.booleanValue() && getIndex(index.getName()).isPresent()) {
            return;
        }
        retry(() -> {
            CreateIndexRequest createIndexRequest = new CreateIndexRequest(index.getName());
            createIndexRequest.setTimeout(new TimeValue(this.requestTimeout.intValue()));
            createIndexRequest.setMasterTimeout(new TimeValue(this.requestTimeout.intValue()));
            createIndexRequest.source(str, XContentType.JSON);
            return getClient().indices().create(createIndexRequest, getRequestOptions()).isAcknowledged() ? new OperationResult(true, null) : new OperationResult(false, null);
        }, "create index", Sets.newHashSet(new RestStatus[]{RestStatus.OK, RestStatus.CREATED}));
    }

    @Override // io.hops.hopsworks.vectordb.VectorDatabase
    public Optional<Index> getIndex(String str) throws VectorDatabaseException {
        return retry(() -> {
            return (OperationResult) getClient().indices().get(new GetIndexRequest(new String[]{str}), RequestOptions.DEFAULT).getMappings().keySet().stream().map(Index::new).findFirst().map(index -> {
                return new OperationResult(true, index);
            }).orElseGet(() -> {
                return new OperationResult(false, null);
            });
        }, "get index", Sets.newHashSet(new RestStatus[]{RestStatus.OK, RestStatus.NOT_FOUND}));
    }

    @Override // io.hops.hopsworks.vectordb.VectorDatabase
    public Set<Index> getAllIndices() throws VectorDatabaseException {
        return (Set) retry(() -> {
            return new OperationResult(true, getClient().indices().get(new GetIndexRequest(new String[]{"*"}), RequestOptions.DEFAULT).getMappings().keySet().stream().map(Index::new).collect(Collectors.toSet()));
        }, "get all indices", Sets.newHashSet(new RestStatus[]{RestStatus.OK})).orElseGet(Sets::newHashSet);
    }

    @Override // io.hops.hopsworks.vectordb.VectorDatabase
    public void deleteIndex(Index index) throws VectorDatabaseException {
        retry(() -> {
            return getClient().indices().delete(new DeleteIndexRequest(index.getName()).timeout(new TimeValue((long) this.requestTimeout.intValue())).masterNodeTimeout(new TimeValue((long) this.requestTimeout.intValue())), RequestOptions.DEFAULT.toBuilder().setRequestConfig(RequestConfig.custom().setSocketTimeout(this.socketTimeout.intValue()).build()).build()).isAcknowledged() ? new OperationResult(true, null) : new OperationResult(false, null);
        }, "delete index", Sets.newHashSet(new RestStatus[]{RestStatus.OK, RestStatus.NOT_FOUND}));
    }

    @Override // io.hops.hopsworks.vectordb.VectorDatabase
    public void addFields(Index index, String str) throws VectorDatabaseException {
        PutMappingRequest putMappingRequest = new PutMappingRequest(new String[]{index.getName()});
        putMappingRequest.source(str, XContentType.JSON);
        try {
            if (getClient().indices().putMapping(putMappingRequest, RequestOptions.DEFAULT).isAcknowledged()) {
            } else {
                throw new VectorDatabaseException("Failed to add fields to opensearch index: " + index.getName());
            }
        } catch (IOException e) {
            throw new VectorDatabaseException("Failed to add fields to opensearch index: " + index.getName() + "Err: " + e);
        }
    }

    @Override // io.hops.hopsworks.vectordb.VectorDatabase
    public List<Field> getSchema(Index index) throws VectorDatabaseException {
        try {
            Object orDefault = ((MappingMetadata) getClient().indices().get(new GetIndexRequest(new String[]{index.getName()}), RequestOptions.DEFAULT).getMappings().get(index.getName())).getSourceAsMap().getOrDefault("properties", null);
            return orDefault != null ? (List) ((Map) orDefault).entrySet().stream().map(entry -> {
                return new Field((String) entry.getKey(), entry.getValue());
            }).collect(Collectors.toList()) : Lists.newArrayList();
        } catch (IOException e) {
            throw new VectorDatabaseException("Failed to get schema from opensearch index: " + index.getName() + "Err: " + e);
        }
    }

    @Override // io.hops.hopsworks.vectordb.VectorDatabase
    public void write(Index index, String str, String str2) throws VectorDatabaseException {
        try {
            IndexResponse index2 = getClient().index(makeIndexRequest(index.getName(), str, str2), RequestOptions.DEFAULT);
            if (index2.status().equals(RestStatus.CREATED) || index2.status().equals(RestStatus.OK)) {
            } else {
                throw new VectorDatabaseException("Cannot index data. Status: " + index2.status());
            }
        } catch (IOException | OpenSearchException e) {
            throw new VectorDatabaseException("Cannot index data. Err: " + e);
        }
    }

    private IndexRequest makeIndexRequest(String str, String str2, String str3) {
        IndexRequest source = new IndexRequest(str).source(str2, XContentType.JSON);
        if (str3 != null) {
            source.id(str3);
        }
        return source;
    }

    @Override // io.hops.hopsworks.vectordb.VectorDatabase
    public void write(Index index, String str) throws VectorDatabaseException {
        write(index, str, null);
    }

    @Override // io.hops.hopsworks.vectordb.VectorDatabase
    public void batchWrite(Index index, List<String> list) throws VectorDatabaseException {
        BulkRequest bulkRequest = new BulkRequest();
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            bulkRequest.add(makeIndexRequest(index.getName(), it.next(), null));
        }
        bulkRequest(bulkRequest);
    }

    @Override // io.hops.hopsworks.vectordb.VectorDatabase
    public void batchWrite(Index index, Map<String, String> map) throws VectorDatabaseException {
        BulkRequest bulkRequest = new BulkRequest();
        for (Map.Entry<String, String> entry : map.entrySet()) {
            bulkRequest.add(makeIndexRequest(index.getName(), entry.getValue(), entry.getKey()));
        }
        bulkRequest(bulkRequest);
    }

    @Override // io.hops.hopsworks.vectordb.VectorDatabase
    public List<Map<String, Object>> preview(Index index, Set<Field> set, int i) throws VectorDatabaseException {
        ArrayList newArrayList = Lists.newArrayList();
        return set.size() == 0 ? newArrayList : (List) retry(() -> {
            BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
            Iterator it = set.iterator();
            while (it.hasNext()) {
                boolQuery.must(QueryBuilders.existsQuery(((Field) it.next()).getName()));
            }
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
            searchSourceBuilder.query(boolQuery);
            searchSourceBuilder.size(i);
            SearchRequest searchRequest = new SearchRequest(new String[]{index.getName()});
            searchRequest.source(searchSourceBuilder);
            for (SearchHit searchHit : getClient().search(searchRequest, RequestOptions.DEFAULT).getHits().getHits()) {
                newArrayList.add(searchHit.getSourceAsMap());
            }
            return new OperationResult(true, newArrayList);
        }, "preview", Sets.newHashSet(new RestStatus[]{RestStatus.OK})).orElseGet(() -> {
            return newArrayList;
        });
    }

    private void bulkRequest(BulkRequest bulkRequest) throws VectorDatabaseException {
        try {
            BulkResponse bulk = getClient().bulk(bulkRequest, RequestOptions.DEFAULT);
            if (bulk.hasFailures()) {
                throw new VectorDatabaseException(String.format("Index data failed partially. Response status %d", Integer.valueOf(bulk.status().getStatus())));
            }
        } catch (IOException e) {
            throw new VectorDatabaseException("Cannot index data. Err: " + e);
        }
    }

    @Override // io.hops.hopsworks.vectordb.VectorDatabase
    public void writeMap(Index index, Map<String, Object> map) throws VectorDatabaseException {
        writeMap(index, map, null);
    }

    @Override // io.hops.hopsworks.vectordb.VectorDatabase
    public void writeMap(Index index, Map<String, Object> map, String str) throws VectorDatabaseException {
        try {
            write(index, this.objectMapper.writeValueAsString(map), str);
        } catch (IOException e) {
            throw new VectorDatabaseException("Failed to index data because data cannot be written to String.");
        }
    }

    @Override // io.hops.hopsworks.vectordb.VectorDatabase
    public void batchWriteMap(Index index, List<Map<String, Object>> list) throws VectorDatabaseException {
        ArrayList newArrayList = Lists.newArrayList();
        try {
            Iterator<Map<String, Object>> it = list.iterator();
            while (it.hasNext()) {
                newArrayList.add(this.objectMapper.writeValueAsString(it.next()));
            }
            batchWrite(index, newArrayList);
        } catch (IOException e) {
            throw new VectorDatabaseException("Failed to index data because data cannot be written to String.");
        }
    }

    @Override // io.hops.hopsworks.vectordb.VectorDatabase
    public void batchWriteMap(Index index, Map<String, Map<String, Object>> map) throws VectorDatabaseException {
        HashMap newHashMap = Maps.newHashMap();
        try {
            for (Map.Entry<String, Map<String, Object>> entry : map.entrySet()) {
                newHashMap.put(entry.getKey(), this.objectMapper.writeValueAsString(entry.getValue()));
            }
            batchWrite(index, newHashMap);
        } catch (IOException e) {
            throw new VectorDatabaseException("Failed to index data because data cannot be written to String.");
        }
    }

    @Override // io.hops.hopsworks.vectordb.VectorDatabase
    public void deleteByQuery(Index index, String str) throws VectorDatabaseException {
        retry(() -> {
            DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(new String[]{index.getName()});
            deleteByQueryRequest.setQuery(new QueryStringQueryBuilder(str));
            deleteByQueryRequest.setTimeout(new TimeValue(this.requestTimeout.intValue()));
            return getClient().deleteByQuery(deleteByQueryRequest, getRequestOptions()).getBulkFailures().isEmpty() ? new OperationResult(true, null) : new OperationResult(false, null);
        }, "delete by query", Sets.newHashSet(new RestStatus[]{RestStatus.OK}));
    }

    private long getDelayMillis(long j) {
        return Math.min(j, 5000L);
    }

    protected <T> Optional<T> retry(OperationSupplier operationSupplier, String str, Set<RestStatus> set) throws VectorDatabaseException {
        OperationResult perform;
        long j = 1000;
        boolean z = false;
        for (int i = 0; i < this.maxRetry.intValue(); i++) {
            try {
                try {
                    try {
                        perform = operationSupplier.perform();
                    } catch (OpenSearchStatusException e) {
                        if (set.contains(e.status())) {
                            Optional<T> empty = Optional.empty();
                            if (z) {
                                doneRetry();
                            }
                            return empty;
                        }
                        if (!e.getDetailedMessage().contains("process_cluster_event_timeout_exception")) {
                            throw new VectorDatabaseException(String.format("Failed to %s index: %s", str, e.getDetailedMessage()));
                        }
                        LOGGER.log(Level.INFO, String.format("Failed to %s: %s", str, e.getDetailedMessage()));
                        if (i >= this.maxRetry.intValue() - 1 || !shouldRetry().booleanValue()) {
                            break;
                        }
                        if (!z) {
                            startRetry();
                            z = true;
                        }
                        Thread.sleep(getDelayMillis(j));
                        j *= 2;
                    } catch (IOException e2) {
                        throw new VectorDatabaseException(String.format("Failed to %s index: %s", str, e2.getMessage()));
                    }
                    if (perform.success.booleanValue()) {
                        Optional<T> ofNullable = Optional.ofNullable(perform.result);
                        if (z) {
                            doneRetry();
                        }
                        return ofNullable;
                    }
                    if (i >= this.maxRetry.intValue() - 1 || !shouldRetry().booleanValue()) {
                        break;
                    }
                    if (!z) {
                        startRetry();
                        z = true;
                    }
                    Thread.sleep(getDelayMillis(j));
                    j *= 2;
                } catch (InterruptedException e3) {
                    LOGGER.log(Level.INFO, String.format("Retry %s interrupted.", str));
                    if (z) {
                        doneRetry();
                    }
                }
            } catch (Throwable th) {
                if (z) {
                    doneRetry();
                }
                throw th;
            }
        }
        if (z) {
            doneRetry();
        }
        throw new VectorDatabaseException(String.format("Operation %s failed after retries.", str));
    }

    protected Boolean shouldRetry() {
        return true;
    }

    protected void startRetry() {
    }

    protected void doneRetry() {
    }

    private RequestOptions getRequestOptions() {
        return RequestOptions.DEFAULT.toBuilder().setRequestConfig(RequestConfig.custom().setSocketTimeout(this.socketTimeout.intValue()).build()).build();
    }

    @Override // io.hops.hopsworks.vectordb.VectorDatabase
    public void close() {
        if (this.client != null) {
            try {
                this.client.close();
                this.client = null;
            } catch (IOException e) {
                throw new OpenSearchException("Error while shuting down client", new Object[0]);
            }
        }
    }
}
