package org.apache.flink.table.store.table;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.KeyValueFileStore;
import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
import org.apache.flink.table.store.file.mergetree.compact.PartialUpdateMergeFunction;
import org.apache.flink.table.store.file.operation.KeyValueFileStoreScan;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.predicate.PredicateBuilder;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.file.writer.RecordWriter;
import org.apache.flink.table.store.table.sink.MemoryTableWrite;
import org.apache.flink.table.store.table.sink.SequenceGenerator;
import org.apache.flink.table.store.table.sink.SinkRecord;
import org.apache.flink.table.store.table.sink.SinkRecordConverter;
import org.apache.flink.table.store.table.sink.TableWrite;
import org.apache.flink.table.store.table.source.KeyValueTableRead;
import org.apache.flink.table.store.table.source.MergeTreeSplitGenerator;
import org.apache.flink.table.store.table.source.SplitGenerator;
import org.apache.flink.table.store.table.source.TableRead;
import org.apache.flink.table.store.table.source.TableScan;
import org.apache.flink.table.store.table.source.ValueContentRowDataRecordIterator;
import org.apache.flink.table.store.utils.RowDataUtils;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;

/* loaded from: input_file:org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.class */
public class ChangelogWithKeyFileStoreTable extends AbstractFileStoreTable {
    private static final String KEY_FIELD_PREFIX = "_KEY_";
    private static final long serialVersionUID = 1;
    private final KeyValueFileStore store;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ChangelogWithKeyFileStoreTable(Path path, SchemaManager schemaManager, TableSchema tableSchema) {
        super(path, tableSchema);
        MergeFunction partialUpdateMergeFunction;
        RowType logicalRowType = tableSchema.logicalRowType();
        Configuration fromMap = Configuration.fromMap(tableSchema.options());
        CoreOptions.MergeEngine mergeEngine = (CoreOptions.MergeEngine) fromMap.get(CoreOptions.MERGE_ENGINE);
        switch (mergeEngine) {
            case DEDUPLICATE:
                partialUpdateMergeFunction = new DeduplicateMergeFunction();
                break;
            case PARTIAL_UPDATE:
                List children = logicalRowType.getChildren();
                RowData.FieldGetter[] fieldGetterArr = new RowData.FieldGetter[children.size()];
                for (int i = 0; i < children.size(); i++) {
                    fieldGetterArr[i] = RowDataUtils.createNullCheckingFieldGetter((LogicalType) children.get(i), i);
                }
                partialUpdateMergeFunction = new PartialUpdateMergeFunction(fieldGetterArr);
                break;
            default:
                throw new UnsupportedOperationException("Unsupported merge engine: " + mergeEngine);
        }
        this.store = new KeyValueFileStore(schemaManager, tableSchema.id(), new CoreOptions(fromMap), tableSchema.logicalPartitionType(), addKeyNamePrefix(tableSchema.logicalBucketKeyType()), addKeyNamePrefix(tableSchema.logicalTrimmedPrimaryKeysType()), logicalRowType, partialUpdateMergeFunction);
    }

    private RowType addKeyNamePrefix(RowType rowType) {
        return new RowType((List) rowType.getFields().stream().map(rowField -> {
            return new RowType.RowField(KEY_FIELD_PREFIX + rowField.getName(), rowField.getType(), (String) rowField.getDescription().orElse(null));
        }).collect(Collectors.toList()));
    }

    @Override // org.apache.flink.table.store.table.FileStoreTable
    public TableScan newScan() {
        final KeyValueFileStoreScan newScan = this.store.newScan();
        return new TableScan(newScan, this.tableSchema, this.store.pathFactory()) { // from class: org.apache.flink.table.store.table.ChangelogWithKeyFileStoreTable.1
            @Override // org.apache.flink.table.store.table.source.TableScan
            protected SplitGenerator splitGenerator(FileStorePathFactory fileStorePathFactory) {
                return new MergeTreeSplitGenerator(ChangelogWithKeyFileStoreTable.this.store.newKeyComparator(), ChangelogWithKeyFileStoreTable.this.store.options().splitTargetSize(), ChangelogWithKeyFileStoreTable.this.store.options().splitOpenFileCost());
            }

            @Override // org.apache.flink.table.store.table.source.TableScan
            protected void withNonPartitionFilter(Predicate predicate) {
                List<Predicate> pickTransformFieldMapping = PredicateBuilder.pickTransformFieldMapping(PredicateBuilder.splitAnd(predicate), ChangelogWithKeyFileStoreTable.this.tableSchema.fieldNames(), ChangelogWithKeyFileStoreTable.this.tableSchema.trimmedPrimaryKeys());
                if (pickTransformFieldMapping.size() > 0) {
                    newScan.withKeyFilter(PredicateBuilder.and(pickTransformFieldMapping));
                }
            }
        };
    }

    @Override // org.apache.flink.table.store.table.FileStoreTable
    public TableRead newRead() {
        List<String> trimmedPrimaryKeys = this.tableSchema.trimmedPrimaryKeys();
        final Set set = (Set) this.tableSchema.fieldNames().stream().filter(str -> {
            return !trimmedPrimaryKeys.contains(str);
        }).collect(Collectors.toSet());
        return new KeyValueTableRead(this.store.newRead()) { // from class: org.apache.flink.table.store.table.ChangelogWithKeyFileStoreTable.2
            @Override // org.apache.flink.table.store.table.source.TableRead
            public TableRead withFilter(Predicate predicate) {
                ArrayList arrayList = new ArrayList();
                for (Predicate predicate2 : PredicateBuilder.splitAnd(predicate)) {
                    if (!PredicateBuilder.containsFields(predicate2, set)) {
                        arrayList.add(predicate2);
                    }
                }
                if (arrayList.size() > 0) {
                    this.read.withFilter(PredicateBuilder.and(arrayList));
                }
                return this;
            }

            @Override // org.apache.flink.table.store.table.source.TableRead
            public TableRead withProjection(int[][] iArr) {
                this.read.withValueProjection(iArr);
                return this;
            }

            @Override // org.apache.flink.table.store.table.source.KeyValueTableRead
            protected RecordReader.RecordIterator<RowData> rowDataRecordIteratorFromKv(RecordReader.RecordIterator<KeyValue> recordIterator) {
                return new ValueContentRowDataRecordIterator(recordIterator);
            }
        };
    }

    @Override // org.apache.flink.table.store.table.FileStoreTable
    public TableWrite newWrite() {
        SinkRecordConverter sinkRecordConverter = new SinkRecordConverter(this.store.options().bucket(), this.tableSchema);
        final SequenceGenerator sequenceGenerator = (SequenceGenerator) this.store.options().sequenceField().map(str -> {
            return new SequenceGenerator(str, schema().logicalRowType());
        }).orElse(null);
        return new MemoryTableWrite<KeyValue>(this.store.newWrite(), sinkRecordConverter, this.store.options()) { // from class: org.apache.flink.table.store.table.ChangelogWithKeyFileStoreTable.3
            private final KeyValue kv = new KeyValue();

            @Override // org.apache.flink.table.store.table.sink.AbstractTableWrite
            protected void writeSinkRecord(SinkRecord sinkRecord, RecordWriter<KeyValue> recordWriter) throws Exception {
                recordWriter.write(this.kv.replace(sinkRecord.primaryKey(), sequenceGenerator == null ? -1L : sequenceGenerator.generate(sinkRecord.row()), sinkRecord.row().getRowKind(), sinkRecord.row()));
            }
        };
    }

    @Override // org.apache.flink.table.store.table.AbstractFileStoreTable
    public KeyValueFileStore store() {
        return this.store;
    }
}
