package org.culturegraph.cluster.job.ingest;

import com.google.common.base.Charsets;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.culturegraph.cluster.sink.ComplexPutWriter;
import org.culturegraph.cluster.util.AbstractJobLauncher;
import org.culturegraph.cluster.util.Column;
import org.culturegraph.cluster.util.ConfigConst;
import org.culturegraph.metamorph.core.Metamorph;
import org.culturegraph.metamorph.core.MetamorphErrorHandler;
import org.culturegraph.metastream.framework.DefaultSender;
import org.culturegraph.metastream.framework.StreamReceiver;
import org.culturegraph.metastream.reader.MultiFormatReader;
import org.culturegraph.metastream.reader.Reader;

/* loaded from: input_file:lodmill-rd-0.1.0-SNAPSHOT-jar-with-dependencies.jar:org/culturegraph/cluster/job/ingest/BibIngest.class */
public final class BibIngest extends AbstractJobLauncher {

    /* loaded from: input_file:lodmill-rd-0.1.0-SNAPSHOT-jar-with-dependencies.jar:org/culturegraph/cluster/job/ingest/BibIngest$IngestMapper.class */
    static final class IngestMapper<K, V> extends Mapper<LongWritable, Text, K, V> implements MetamorphErrorHandler {
        private static final String ERROR_IN_ROW = "Error in row %1$s: %2$s %3$s";
        private static final Log LOG = LogFactory.getLog(BibIngest.class);
        private static final String INGEST = "Ingest";
        private static final String EXCEPTION = "cg:mappingError";
        private static final long WRITE_BUFFER = 16777216;
        private Reader reader;
        private final ComplexPutWriter collector = new ComplexPutWriter();
        private HTable htable;
        private byte[] format;
        private boolean storeRawData;
        private Mapper<LongWritable, Text, K, V>.Context currentContext;

        IngestMapper() {
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.hadoop.mapreduce.Mapper
        public void setup(Mapper<LongWritable, Text, K, V>.Context context) throws IOException, InterruptedException {
            super.setup(context);
            Configuration configuration = context.getConfiguration();
            this.storeRawData = configuration.getBoolean(ConfigConst.STORE_RAW_DATA, false);
            this.format = configuration.get(ConfigConst.FORMAT).getBytes();
            this.reader = new MultiFormatReader(configuration.get(ConfigConst.FORMAT));
            Metamorph metamorph = new Metamorph(configuration.get(ConfigConst.MORPH_DEF));
            metamorph.setErrorHandler(this);
            this.reader.setReceiver(metamorph);
            String str = configuration.get(ConfigConst.INGEST_PREFIX, "");
            if (str.isEmpty()) {
                metamorph.setReceiver((Metamorph) this.collector);
            } else {
                ((PrefixAdder) metamorph.setReceiver((Metamorph) new PrefixAdder(str))).setReceiver(this.collector);
            }
            this.htable = new HTable(configuration, configuration.get(ConfigConst.OUTPUT_TABLE));
            this.htable.setAutoFlush(false);
            this.htable.setWriteBufferSize(16777216L);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.hadoop.mapreduce.Mapper
        public void cleanup(Mapper<LongWritable, Text, K, V>.Context context) throws IOException, InterruptedException {
            super.cleanup(context);
            this.htable.flushCommits();
            this.htable.close();
        }

        @Override // org.apache.hadoop.mapreduce.Mapper
        public void map(LongWritable longWritable, Text text, Mapper<LongWritable, Text, K, V>.Context context) throws IOException {
            this.currentContext = context;
            String text2 = text.toString();
            if (text2.isEmpty()) {
                context.getCounter(INGEST, "empty input lines").increment(1L);
                return;
            }
            try {
                this.collector.reset();
                this.reader.read(text2);
                Put currentPut = this.collector.getCurrentPut();
                if (currentPut == null) {
                    return;
                }
                if (this.storeRawData) {
                    currentPut.add(Column.Family.RAW, this.format, text2.getBytes());
                }
                if (currentPut.isEmpty()) {
                    context.getCounter(INGEST, "records without content").increment(1L);
                } else {
                    this.htable.put(currentPut);
                    context.getCounter(INGEST, "records ingested").increment(1L);
                }
            } catch (RuntimeException e) {
                error(e);
            }
        }

        @Override // org.culturegraph.metamorph.core.MetamorphErrorHandler
        public void error(Exception exc) {
            String str;
            if (this.collector.getCurrentPut() == null) {
                str = "";
            } else {
                this.collector.literal(EXCEPTION, exc.getClass().getSimpleName() + ":" + exc.getMessage());
                str = new String(this.collector.getCurrentPut().getRow(), Charsets.UTF_8);
            }
            LOG.warn(String.format(ERROR_IN_ROW, str, exc.getClass().getSimpleName(), exc.getMessage()));
            this.currentContext.getCounter(INGEST, exc.getClass().getSimpleName()).increment(1L);
            this.currentContext.setStatus("Last exception: " + exc.getClass().getSimpleName() + " " + exc.getMessage());
        }
    }

    /* loaded from: input_file:lodmill-rd-0.1.0-SNAPSHOT-jar-with-dependencies.jar:org/culturegraph/cluster/job/ingest/BibIngest$PrefixAdder.class */
    private static final class PrefixAdder extends DefaultSender<StreamReceiver> implements StreamReceiver {
        private final String prefix;

        public PrefixAdder(String str) {
            this.prefix = str;
        }

        @Override // org.culturegraph.metastream.framework.StreamReceiver
        public void startRecord(String str) {
            getReceiver().startRecord(this.prefix + str);
        }

        @Override // org.culturegraph.metastream.framework.StreamReceiver
        public void endRecord() {
            getReceiver().endRecord();
        }

        @Override // org.culturegraph.metastream.framework.StreamReceiver
        public void startEntity(String str) {
            getReceiver().startEntity(str);
        }

        @Override // org.culturegraph.metastream.framework.StreamReceiver
        public void endEntity() {
            getReceiver().endEntity();
        }

        @Override // org.culturegraph.metastream.framework.StreamReceiver
        public void literal(String str, String str2) {
            getReceiver().literal(str, str2);
        }
    }

    public static void main(String[] strArr) {
        launch(new BibIngest(), strArr);
    }

    @Override // org.culturegraph.cluster.util.AbstractJobLauncher
    protected Configuration prepareConf(Configuration configuration) {
        addRequiredArguments(ConfigConst.OUTPUT_TABLE, ConfigConst.FORMAT, ConfigConst.INGEST_PREFIX, ConfigConst.INPUT_PATH, ConfigConst.MORPH_DEF);
        addOptionalArguments(ConfigConst.STORE_RAW_DATA);
        Configuration create = HBaseConfiguration.create(configuration);
        create.setIfUnset(ConfigConst.STORE_RAW_DATA, "false");
        create.setIfUnset("mapred.map.tasks.speculative.execution", "false");
        setJobName("Ingest " + configuration.get(ConfigConst.INPUT_PATH) + " -> " + configuration.get(ConfigConst.OUTPUT_TABLE));
        return create;
    }

    @Override // org.culturegraph.cluster.util.AbstractJobLauncher
    protected void configureJob(Job job, Configuration configuration) throws IOException {
        job.setJarByClass(BibIngest.class);
        FileInputFormat.addInputPath(job, new Path(configuration.get(ConfigConst.INPUT_PATH)));
        job.setMapperClass(IngestMapper.class);
        job.setNumReduceTasks(0);
        job.setOutputFormatClass(NullOutputFormat.class);
    }
}
