/*
 * Decompiled with CFR 0.152.
 */
package com.predic8.membrane.core.exchangestore;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.predic8.membrane.annot.MCAttribute;
import com.predic8.membrane.annot.MCElement;
import com.predic8.membrane.core.exchange.AbstractExchange;
import com.predic8.membrane.core.exchange.Exchange;
import com.predic8.membrane.core.exchange.snapshots.AbstractExchangeSnapshot;
import com.predic8.membrane.core.exchange.snapshots.DynamicAbstractExchangeSnapshot;
import com.predic8.membrane.core.exchangestore.AbstractExchangeStore;
import com.predic8.membrane.core.http.Request;
import com.predic8.membrane.core.interceptor.Interceptor;
import com.predic8.membrane.core.rules.Rule;
import com.predic8.membrane.core.rules.RuleKey;
import com.predic8.membrane.core.rules.StatisticCollector;
import com.predic8.membrane.core.transport.http.HttpClient;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@MCElement(name="elasticSearchExchangeStore")
public class ElasticSearchExchangeStore
extends AbstractExchangeStore {
    HttpClient client;
    static Logger log = LoggerFactory.getLogger(ElasticSearchExchangeStore.class);
    int updateIntervalMs = 1000;
    Map<Long, AbstractExchangeSnapshot> shortTermMemoryForBatching = new HashMap<Long, AbstractExchangeSnapshot>();
    Cache<Long, AbstractExchangeSnapshot> cacheToWaitForElasticSearchIndex = CacheBuilder.newBuilder().expireAfterWrite(5L, TimeUnit.SECONDS).build();
    Thread updateJob;
    String index = "membrane";
    String type = "exchanges";
    ObjectMapper mapper;
    String location = "http://localhost:9200";
    private String documentPrefix;
    private long startTime;
    boolean init = false;

    @Override
    public void init() {
        super.init();
        if (this.client == null) {
            this.client = new HttpClient();
        }
        if (this.mapper == null) {
            this.mapper = new ObjectMapper();
        }
        if (this.documentPrefix == null) {
            this.documentPrefix = ElasticSearchExchangeStore.getLocalHostname();
        }
        this.documentPrefix = this.documentPrefix.toLowerCase();
        this.startTime = System.nanoTime();
        this.updateJob = new Thread(() -> {
            try {
                while (true) {
                    List<AbstractExchangeSnapshot> exchanges;
                    Map<Long, AbstractExchangeSnapshot> map = this.shortTermMemoryForBatching;
                    synchronized (map) {
                        exchanges = this.shortTermMemoryForBatching.values().stream().collect(Collectors.toList());
                        this.shortTermMemoryForBatching.values().stream().forEach(exc -> this.cacheToWaitForElasticSearchIndex.put((Object)exc.getId(), exc));
                        this.shortTermMemoryForBatching.clear();
                    }
                    if (exchanges.size() > 0) {
                        this.sendToElasticSearch(exchanges);
                        continue;
                    }
                    Thread.sleep(this.updateIntervalMs);
                }
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        this.updateJob.start();
        this.init = true;
    }

    private void sendToElasticSearch(List<AbstractExchangeSnapshot> exchanges) throws Exception {
        StringBuilder data = exchanges.stream().map(exchange -> this.wrapForBulkOperationElasticSearch(this.index, this.type, this.getLocalMachineNameWithSuffix() + "-" + exchange.getId(), this.collectExchangeDataFrom((AbstractExchangeSnapshot)exchange))).collect(StringBuilder::new, (sb, str) -> sb.append((String)str), (sb1, sb2) -> sb1.append((CharSequence)sb2));
        Exchange elasticSearchExc = new Request.Builder().post(this.location + "/_bulk").header("Content-Type", "application/x-ndjson").body(data.toString()).buildExchange();
        this.client.call(elasticSearchExc);
    }

    private static String getLocalHostname() {
        try {
            return InetAddress.getLocalHost().getHostName();
        }
        catch (UnknownHostException e) {
            try {
                return IOUtils.toString((InputStream)Runtime.getRuntime().exec("hostname").getInputStream());
            }
            catch (IOException e1) {
                e1.printStackTrace();
                return "localhost";
            }
        }
    }

    private String getLocalMachineNameWithSuffix() {
        return this.documentPrefix + "-" + this.startTime;
    }

    public String wrapForBulkOperationElasticSearch(String index, String type, String id, String value) {
        return "{ \"index\" : { \"_index\" : \"" + index + "\", \"_type\" : \"" + type + "\", \"_id\" : \"" + id + "\" } }\n" + value + "\n";
    }

    @Override
    public void snap(AbstractExchange exc, Interceptor.Flow flow) {
        AbstractExchangeSnapshot excCopy = null;
        try {
            if (flow == Interceptor.Flow.REQUEST) {
                excCopy = new DynamicAbstractExchangeSnapshot(exc, this::addForElasticSearch);
                this.addForElasticSearch(excCopy);
            } else {
                excCopy = this.getExchangeDtoById((int)exc.getId());
                DynamicAbstractExchangeSnapshot.addObservers(exc, excCopy, this::addForElasticSearch);
                excCopy = excCopy.updateFrom(exc);
                this.addForElasticSearch(excCopy);
            }
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addForElasticSearch(AbstractExchangeSnapshot exc) {
        Map<Long, AbstractExchangeSnapshot> map = this.shortTermMemoryForBatching;
        synchronized (map) {
            this.shortTermMemoryForBatching.put(exc.getId(), exc);
        }
    }

    private String collectExchangeDataFrom(AbstractExchangeSnapshot exc) {
        try {
            Map value = (Map)this.mapper.readValue(this.mapper.writeValueAsString((Object)exc), Map.class);
            value.put("issuer", this.documentPrefix);
            return this.mapper.writeValueAsString((Object)value);
        }
        catch (IOException e) {
            e.printStackTrace();
            return "";
        }
    }

    public AbstractExchangeSnapshot getExchangeDtoById(int id) {
        Long idBox = id;
        if (this.shortTermMemoryForBatching.get(idBox) != null) {
            return this.shortTermMemoryForBatching.get(idBox);
        }
        if (this.cacheToWaitForElasticSearchIndex.getIfPresent((Object)idBox) != null) {
            return (AbstractExchangeSnapshot)this.cacheToWaitForElasticSearchIndex.getIfPresent((Object)idBox);
        }
        return this.getFromElasticSearchById(id);
    }

    private AbstractExchangeSnapshot getFromElasticSearchById(int id) {
        try {
            Exchange exc = new Request.Builder().post(this.getElasticSearchExchangesPath() + "_search").body("{\n  \"query\": {\n    \"bool\": {\n      \"must\": [\n        {\n          \"wildcard\": {\n            \"issuer\": \"" + this.documentPrefix + "\"\n          }\n        },\n        {\n          \"match\": {\n            \"id\": \"" + id + "\"\n          }\n        }\n      ]\n    }\n  }\n}").header("Content-Type", "application/json").buildExchange();
            exc = this.client.call(exc);
            Map res = this.responseToMap(exc);
            Map excJson = this.getSourceElementFromElasticSearchResponse(res).get(0);
            return (AbstractExchangeSnapshot)this.mapper.readValue(this.mapper.writeValueAsString((Object)excJson), AbstractExchangeSnapshot.class);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private String getElasticSearchExchangesPath() {
        return this.location + "/" + this.index + "/" + this.type + "/";
    }

    public List<Map> getSourceElementFromElasticSearchResponse(Map response) {
        return this.getSourceElementFromHitsElement(this.getHitsElementFromElasticSearchResponse(response));
    }

    public List getHitsElementFromElasticSearchResponse(Map response) {
        return (List)((Map)response.get("hits")).get("hits");
    }

    public List<Map> getSourceElementFromHitsElement(List hits) {
        return hits.stream().map(hit -> ((Map)hit).get("_source")).collect(Collectors.toList());
    }

    @Override
    public AbstractExchange getExchangeById(int id) {
        return this.getFromElasticSearchById(id).toAbstractExchange();
    }

    @Override
    public void remove(AbstractExchange exchange) {
        try {
            this.removeFromElasticSearchById(exchange.getId());
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void removeFromElasticSearchById(long id) throws Exception {
        Exchange exc = new Request.Builder().delete(this.getElasticSearchExchangesPath() + this.getLocalMachineNameWithSuffix() + "-" + id).buildExchange();
        this.client.call(exc);
    }

    @Override
    public void removeAllExchanges(Rule rule) {
        String name = rule.toString();
        try {
            Exchange exc = new Request.Builder().post(this.getElasticSearchExchangesPath() + "_delete_by_query").body("{\n  \"query\": {\n    \"bool\": {\n      \"must\": [\n        {\n          \"wildcard\": {\n            \"issuer\": \"" + this.documentPrefix + "\"\n          }\n        },\n        {\n          \"match\": {\n            \"rule.name\": \"" + name + "\"\n          }\n        }\n      ]\n    }\n  }\n}").header("Content-Type", "application/json").buildExchange();
            this.client.call(exc);
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void removeAllExchanges(AbstractExchange[] exchanges) {
        StringBuilder sb = Stream.of(exchanges).map(exc -> exc.getId()).collect(() -> {
            StringBuilder acc = new StringBuilder();
            acc.append("[");
            return acc;
        }, (acc, id) -> acc.append(id).append(","), (acc1, acc2) -> acc1.append(",").append((CharSequence)acc2));
        sb.deleteCharAt(sb.length() - 1);
        sb.append("]");
        String exchangeIdsAsJsonArray = sb.toString();
        try {
            Exchange exc2 = new Request.Builder().post(this.getElasticSearchExchangesPath() + "_delete_by_query").body("{\n  \"query\": {\n    \"bool\": {\n      \"must\": [\n        {\n          \"wildcard\": {\n            \"issuer\": \"" + this.documentPrefix + "\"\n          }\n        },\n        {\n          \"terms\": {\n            \"id\": \"" + exchangeIdsAsJsonArray + "\"\n          }\n        }\n      ]\n    }\n  }\n}").header("Content-Type", "application/json").buildExchange();
            this.client.call(exc2);
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public AbstractExchange[] getExchanges(RuleKey ruleKey) {
        int port = ruleKey.getPort();
        try {
            Exchange exc = new Request.Builder().post(this.getElasticSearchExchangesPath() + "_search").body("{\n  \"query\": {\n    \"bool\": {\n      \"must\": [\n        {\n          \"wildcard\": {\n            \"issuer\": \"" + this.documentPrefix + "\"\n          }\n        },\n        {\n          \"match\": {\n            \"rule.port\": \"" + port + "\"\n          }\n        }\n      ]\n    }\n  }\n}").header("Content-Type", "application/json").buildExchange();
            exc = this.client.call(exc);
            List<Map> source = this.getSourceElementFromElasticSearchResponse(this.responseToMap(exc));
            AbstractExchangeSnapshot[] snapshots = (AbstractExchangeSnapshot[])this.mapper.readValue(this.mapper.writeValueAsString(source), AbstractExchangeSnapshot[].class);
            return Stream.of(snapshots).map(snapshot -> snapshot.toAbstractExchange()).collect(Collectors.toList()).toArray(new AbstractExchange[0]);
        }
        catch (Exception e) {
            e.printStackTrace();
            return new AbstractExchange[0];
        }
    }

    @Override
    public int getNumberOfExchanges(RuleKey ruleKey) {
        return this.getExchanges(ruleKey).length;
    }

    @Override
    public StatisticCollector getStatistics(RuleKey ruleKey) {
        StatisticCollector statistics = new StatisticCollector(false);
        List<AbstractExchange> exchangesList = Arrays.asList(this.getExchanges(ruleKey));
        if (exchangesList == null || exchangesList.isEmpty()) {
            return statistics;
        }
        for (int i = 0; i < exchangesList.size(); ++i) {
            statistics.collectFrom(exchangesList.get(i));
        }
        return statistics;
    }

    @Override
    public Object[] getAllExchanges() {
        return this.getAllExchangesAsList().toArray();
    }

    @Override
    public List<AbstractExchange> getAllExchangesAsList() {
        try {
            Exchange exc = new Request.Builder().post(this.getElasticSearchExchangesPath() + "_search").header("Content-Type", "application/json").body("{\n  \"query\": {\n    \"wildcard\": {\n      \"issuer\": \"" + this.documentPrefix + "\"\n    }\n  }\n}").buildExchange();
            exc = this.client.call(exc);
            if (!exc.getResponse().isOk()) {
                return new ArrayList<AbstractExchange>();
            }
            List<Map> sources = this.getSourceElementFromElasticSearchResponse(this.responseToMap(exc));
            return sources.stream().map(source -> {
                try {
                    return ((AbstractExchangeSnapshot)this.mapper.readValue(this.mapper.writeValueAsString(source), AbstractExchangeSnapshot.class)).toAbstractExchange();
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }).collect(Collectors.toList());
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private Map responseToMap(Exchange exc) throws IOException {
        return (Map)this.mapper.readValue(exc.getResponse().getBodyAsStringDecoded(), Map.class);
    }

    public HttpClient getClient() {
        return this.client;
    }

    @MCAttribute
    public void setClient(HttpClient client) {
        this.client = client;
    }

    public int getUpdateIntervalMs() {
        return this.updateIntervalMs;
    }

    @MCAttribute
    public void setUpdateIntervalMs(int updateIntervalMs) {
        this.updateIntervalMs = updateIntervalMs;
    }

    public String getLocation() {
        return this.location;
    }

    @MCAttribute
    public void setLocation(String location) {
        this.location = location;
    }

    public String getDocumentPrefix() {
        return this.documentPrefix;
    }

    @MCAttribute
    public void setDocumentPrefix(String documentPrefix) {
        this.documentPrefix = documentPrefix;
    }
}

