package org.culturegraph.cluster.source;

import com.google.common.base.Charsets;
import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.culturegraph.cluster.pipe.HBaseResultDecoder;
import org.culturegraph.cluster.pipe.HBaseScannerDecoder;
import org.culturegraph.cluster.util.Column;
import org.culturegraph.metastream.annotation.Description;
import org.culturegraph.metastream.annotation.In;
import org.culturegraph.metastream.annotation.Out;
import org.culturegraph.metastream.framework.DefaultSender;
import org.culturegraph.metastream.framework.ObjectReceiver;
import org.culturegraph.metastream.framework.StreamReceiver;
import org.culturegraph.util.CulturegraphUtilException;

@Description("reads from an HBase table. Use TABLE/scanner to scan and TABLE/row/ROW_ID to access a single row.")
@Out(StreamReceiver.class)
@In(String.class)
/* loaded from: input_file:lodmill-rd-0.1.0-SNAPSHOT-jar-with-dependencies.jar:org/culturegraph/cluster/source/HBaseOpener.class */
public final class HBaseOpener extends DefaultSender<StreamReceiver> implements ObjectReceiver<String> {
    private static final int CACHED_ROWS = 500;
    private static final Pattern SCANNER_PATTERN = Pattern.compile("scanner(\\?(startRow=([^&]*))?&?(stopRow=([^&]*))?)?");
    private static final String ROW = "row/";
    private int cachedRows;
    private final Configuration conf = HBaseConfiguration.create();

    public HBaseOpener(String str) {
        this.conf.set(HConstants.ZOOKEEPER_QUORUM, str);
        this.conf.set("hbase.regionserver.lease.period", "320000");
        this.cachedRows = 500;
    }

    public void setCachedRows(int i) {
        this.cachedRows = i;
    }

    @Override // org.culturegraph.metastream.framework.ObjectReceiver
    public void process(String str) {
        int indexOf = str.indexOf(47);
        String substring = str.substring(0, indexOf);
        String substring2 = str.substring(indexOf + 1);
        Matcher matcher = SCANNER_PATTERN.matcher(substring2);
        if (matcher.find()) {
            processScanner(substring, matcher.group(3), matcher.group(5));
        } else {
            if (!substring2.startsWith(ROW)) {
                throw new IllegalArgumentException("input '" + str + "' is not a valid query");
            }
            processRow(substring, substring2.substring(ROW.length()));
        }
    }

    private void processRow(String str, String str2) {
        try {
            HBaseResultDecoder hBaseResultDecoder = new HBaseResultDecoder();
            hBaseResultDecoder.setReceiver(getReceiver());
            HTable hTable = new HTable(this.conf, str);
            Get get = new Get(str2.getBytes(Charsets.UTF_8));
            get.addFamily(Column.Family.PROPERTY);
            Result result = hTable.get(get);
            if (!result.isEmpty()) {
                hBaseResultDecoder.process(result);
            }
            hTable.close();
        } catch (IOException e) {
            throw new CulturegraphUtilException("reading row '" + str2 + "' form table '" + str + "' failed", e);
        }
    }

    private void processScanner(String str, String str2, String str3) {
        try {
            HTable hTable = new HTable(this.conf, str);
            Scan scan = new Scan();
            scan.setCacheBlocks(false);
            scan.setCaching(this.cachedRows);
            if (str2 != null) {
                scan.setStartRow(str2.getBytes(Charsets.UTF_8));
            }
            if (str3 != null) {
                scan.setStopRow(str3.getBytes(Charsets.UTF_8));
            }
            scan.addFamily(Column.Family.PROPERTY);
            ResultScanner scanner = hTable.getScanner(scan);
            HBaseScannerDecoder hBaseScannerDecoder = new HBaseScannerDecoder();
            ((HBaseResultDecoder) hBaseScannerDecoder.setReceiver(new HBaseResultDecoder())).setReceiver(getReceiver());
            hBaseScannerDecoder.process(scanner);
            hTable.close();
        } catch (IOException e) {
            throw new CulturegraphUtilException("scanning table '" + str + "' failed", e);
        }
    }
}
