/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.hybrid;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.PriorityQueue;
import java.util.TreeMap;
import org.apache.flink.runtime.io.network.partition.hybrid.BufferIndexAndChannel;
import org.apache.flink.shaded.guava30.com.google.common.collect.Iterators;
import org.apache.flink.shaded.guava30.com.google.common.collect.PeekingIterator;
import org.apache.flink.util.Preconditions;

public class HsSpillingStrategyUtils {
    public static TreeMap<Integer, List<BufferIndexAndChannel>> getBuffersByConsumptionPriorityInOrder(List<Integer> nextBufferIndexToConsume, TreeMap<Integer, Deque<BufferIndexAndChannel>> subpartitionToAllBuffers, int expectedSize) {
        if (expectedSize <= 0) {
            return new TreeMap<Integer, List<BufferIndexAndChannel>>();
        }
        PriorityQueue<BufferConsumptionPriorityIterator> heap = new PriorityQueue<BufferConsumptionPriorityIterator>();
        subpartitionToAllBuffers.forEach((subpartitionId, buffers) -> {
            if (!buffers.isEmpty()) {
                heap.add(new BufferConsumptionPriorityIterator((Deque<BufferIndexAndChannel>)buffers, (Integer)nextBufferIndexToConsume.get((int)subpartitionId)));
            }
        });
        TreeMap<Integer, List<BufferIndexAndChannel>> subpartitionToHighPriorityBuffers = new TreeMap<Integer, List<BufferIndexAndChannel>>();
        for (int i = 0; i < expectedSize && !heap.isEmpty(); ++i) {
            BufferConsumptionPriorityIterator bufferConsumptionPriorityIterator = (BufferConsumptionPriorityIterator)heap.poll();
            BufferIndexAndChannel bufferIndexAndChannel = bufferConsumptionPriorityIterator.next();
            subpartitionToHighPriorityBuffers.computeIfAbsent(bufferIndexAndChannel.getChannel(), ArrayList::new).add(bufferIndexAndChannel);
            if (!bufferConsumptionPriorityIterator.hasNext()) continue;
            heap.add(bufferConsumptionPriorityIterator);
        }
        subpartitionToHighPriorityBuffers.values().forEach(Collections::reverse);
        return subpartitionToHighPriorityBuffers;
    }

    private static class BufferConsumptionPriorityIterator
    implements Comparable<BufferConsumptionPriorityIterator>,
    Iterator<BufferIndexAndChannel> {
        private final int consumptionProgress;
        private final PeekingIterator<BufferIndexAndChannel> bufferIterator;

        public BufferConsumptionPriorityIterator(Deque<BufferIndexAndChannel> bufferQueue, int consumptionProgress) {
            this.consumptionProgress = consumptionProgress;
            this.bufferIterator = Iterators.peekingIterator(bufferQueue.descendingIterator());
        }

        @Override
        public BufferIndexAndChannel next() {
            return this.bufferIterator.next();
        }

        @Override
        public boolean hasNext() {
            return this.bufferIterator.hasNext();
        }

        @Override
        public int compareTo(BufferConsumptionPriorityIterator that) {
            return Integer.compare(Preconditions.checkNotNull(that.bufferIterator.peek()).getBufferIndex() - that.consumptionProgress, Preconditions.checkNotNull(this.bufferIterator.peek()).getBufferIndex() - this.consumptionProgress);
        }
    }
}

