/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptive.allocator;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotInfo;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.scheduler.adaptive.allocator.FreeSlotFunction;
import org.apache.flink.runtime.scheduler.adaptive.allocator.IsSlotAvailableAndFreeFunction;
import org.apache.flink.runtime.scheduler.adaptive.allocator.JobInformation;
import org.apache.flink.runtime.scheduler.adaptive.allocator.ReserveSlotFunction;
import org.apache.flink.runtime.scheduler.adaptive.allocator.ReservedSlots;
import org.apache.flink.runtime.scheduler.adaptive.allocator.SharedSlot;
import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAllocator;
import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelismWithSlotSharing;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.util.ResourceCounter;
import org.apache.flink.util.Preconditions;

public class SlotSharingSlotAllocator
implements SlotAllocator {
    private final ReserveSlotFunction reserveSlotFunction;
    private final FreeSlotFunction freeSlotFunction;
    private final IsSlotAvailableAndFreeFunction isSlotAvailableAndFreeFunction;

    private SlotSharingSlotAllocator(ReserveSlotFunction reserveSlot, FreeSlotFunction freeSlotFunction, IsSlotAvailableAndFreeFunction isSlotAvailableAndFreeFunction) {
        this.reserveSlotFunction = reserveSlot;
        this.freeSlotFunction = freeSlotFunction;
        this.isSlotAvailableAndFreeFunction = isSlotAvailableAndFreeFunction;
    }

    public static SlotSharingSlotAllocator createSlotSharingSlotAllocator(ReserveSlotFunction reserveSlot, FreeSlotFunction freeSlotFunction, IsSlotAvailableAndFreeFunction isSlotAvailableAndFreeFunction) {
        return new SlotSharingSlotAllocator(reserveSlot, freeSlotFunction, isSlotAvailableAndFreeFunction);
    }

    @Override
    public ResourceCounter calculateRequiredSlots(Iterable<JobInformation.VertexInformation> vertices) {
        int numTotalRequiredSlots = 0;
        for (Integer requiredSlots : SlotSharingSlotAllocator.getMaxParallelismForSlotSharingGroups(vertices).values()) {
            numTotalRequiredSlots += requiredSlots.intValue();
        }
        return ResourceCounter.withResource(ResourceProfile.UNKNOWN, numTotalRequiredSlots);
    }

    private static Map<SlotSharingGroupId, Integer> getMaxParallelismForSlotSharingGroups(Iterable<JobInformation.VertexInformation> vertices) {
        HashMap<SlotSharingGroupId, Integer> maxParallelismForSlotSharingGroups = new HashMap<SlotSharingGroupId, Integer>();
        for (JobInformation.VertexInformation vertex : vertices) {
            maxParallelismForSlotSharingGroups.compute(vertex.getSlotSharingGroup().getSlotSharingGroupId(), (slotSharingGroupId, currentMaxParallelism) -> currentMaxParallelism == null ? vertex.getParallelism() : Math.max(currentMaxParallelism, vertex.getParallelism()));
        }
        return maxParallelismForSlotSharingGroups;
    }

    public Optional<VertexParallelismWithSlotSharing> determineParallelism(JobInformation jobInformation, Collection<? extends SlotInfo> freeSlots) {
        int slotsPerSlotSharingGroup = freeSlots.size() / jobInformation.getSlotSharingGroups().size();
        if (slotsPerSlotSharingGroup == 0) {
            return Optional.empty();
        }
        Iterator<? extends SlotInfo> slotIterator = freeSlots.iterator();
        ArrayList<ExecutionSlotSharingGroupAndSlot> assignments = new ArrayList<ExecutionSlotSharingGroupAndSlot>();
        HashMap<JobVertexID, Integer> allVertexParallelism = new HashMap<JobVertexID, Integer>();
        for (SlotSharingGroup slotSharingGroup : jobInformation.getSlotSharingGroups()) {
            List<JobInformation.VertexInformation> containedJobVertices = slotSharingGroup.getJobVertexIds().stream().map(jobInformation::getVertexInformation).collect(Collectors.toList());
            Map<JobVertexID, Integer> vertexParallelism = SlotSharingSlotAllocator.determineParallelism(containedJobVertices, slotsPerSlotSharingGroup);
            Iterable<ExecutionSlotSharingGroup> sharedSlotToVertexAssignment = SlotSharingSlotAllocator.createExecutionSlotSharingGroups(vertexParallelism);
            for (ExecutionSlotSharingGroup executionSlotSharingGroup : sharedSlotToVertexAssignment) {
                SlotInfo slotInfo = slotIterator.next();
                assignments.add(new ExecutionSlotSharingGroupAndSlot(executionSlotSharingGroup, slotInfo));
            }
            allVertexParallelism.putAll(vertexParallelism);
        }
        return Optional.of(new VertexParallelismWithSlotSharing(allVertexParallelism, assignments));
    }

    private static Map<JobVertexID, Integer> determineParallelism(Collection<JobInformation.VertexInformation> containedJobVertices, int availableSlots) {
        HashMap<JobVertexID, Integer> vertexParallelism = new HashMap<JobVertexID, Integer>();
        for (JobInformation.VertexInformation jobVertex : containedJobVertices) {
            int parallelism = Math.min(jobVertex.getParallelism(), availableSlots);
            vertexParallelism.put(jobVertex.getJobVertexID(), parallelism);
        }
        return vertexParallelism;
    }

    private static Iterable<ExecutionSlotSharingGroup> createExecutionSlotSharingGroups(Map<JobVertexID, Integer> containedJobVertices) {
        HashMap<Integer, Set> sharedSlotToVertexAssignment = new HashMap<Integer, Set>();
        for (Map.Entry<JobVertexID, Integer> jobVertex : containedJobVertices.entrySet()) {
            for (int i = 0; i < jobVertex.getValue(); ++i) {
                sharedSlotToVertexAssignment.computeIfAbsent(i, ignored -> new HashSet()).add(new ExecutionVertexID(jobVertex.getKey(), i));
            }
        }
        return sharedSlotToVertexAssignment.values().stream().map(ExecutionSlotSharingGroup::new).collect(Collectors.toList());
    }

    @Override
    public Optional<ReservedSlots> tryReserveResources(VertexParallelism vertexParallelism) {
        Preconditions.checkArgument(vertexParallelism instanceof VertexParallelismWithSlotSharing, String.format("%s expects %s as argument.", SlotSharingSlotAllocator.class.getSimpleName(), VertexParallelismWithSlotSharing.class.getSimpleName()));
        VertexParallelismWithSlotSharing vertexParallelismWithSlotSharing = (VertexParallelismWithSlotSharing)vertexParallelism;
        Collection<AllocationID> expectedSlots = this.calculateExpectedSlots(vertexParallelismWithSlotSharing.getAssignments());
        if (this.areAllExpectedSlotsAvailableAndFree(expectedSlots)) {
            HashMap<ExecutionVertexID, LogicalSlot> assignedSlots = new HashMap<ExecutionVertexID, LogicalSlot>();
            for (ExecutionSlotSharingGroupAndSlot executionSlotSharingGroup : vertexParallelismWithSlotSharing.getAssignments()) {
                SharedSlot sharedSlot = this.reserveSharedSlot(executionSlotSharingGroup.getSlotInfo());
                for (ExecutionVertexID executionVertexId : executionSlotSharingGroup.getExecutionSlotSharingGroup().getContainedExecutionVertices()) {
                    LogicalSlot logicalSlot = sharedSlot.allocateLogicalSlot();
                    assignedSlots.put(executionVertexId, logicalSlot);
                }
            }
            return Optional.of(ReservedSlots.create(assignedSlots));
        }
        return Optional.empty();
    }

    @Nonnull
    private Collection<AllocationID> calculateExpectedSlots(Iterable<? extends ExecutionSlotSharingGroupAndSlot> assignments) {
        ArrayList<AllocationID> requiredSlots = new ArrayList<AllocationID>();
        for (ExecutionSlotSharingGroupAndSlot executionSlotSharingGroupAndSlot : assignments) {
            requiredSlots.add(executionSlotSharingGroupAndSlot.getSlotInfo().getAllocationId());
        }
        return requiredSlots;
    }

    private boolean areAllExpectedSlotsAvailableAndFree(Iterable<? extends AllocationID> requiredSlots) {
        for (AllocationID allocationID : requiredSlots) {
            if (this.isSlotAvailableAndFreeFunction.isSlotAvailableAndFree(allocationID)) continue;
            return false;
        }
        return true;
    }

    private SharedSlot reserveSharedSlot(SlotInfo slotInfo) {
        PhysicalSlot physicalSlot = this.reserveSlotFunction.reserveSlot(slotInfo.getAllocationId(), ResourceProfile.UNKNOWN);
        return new SharedSlot(new SlotRequestId(), physicalSlot, slotInfo.willBeOccupiedIndefinitely(), () -> this.freeSlotFunction.freeSlot(slotInfo.getAllocationId(), null, System.currentTimeMillis()));
    }

    static class ExecutionSlotSharingGroupAndSlot {
        private final ExecutionSlotSharingGroup executionSlotSharingGroup;
        private final SlotInfo slotInfo;

        public ExecutionSlotSharingGroupAndSlot(ExecutionSlotSharingGroup executionSlotSharingGroup, SlotInfo slotInfo) {
            this.executionSlotSharingGroup = executionSlotSharingGroup;
            this.slotInfo = slotInfo;
        }

        public ExecutionSlotSharingGroup getExecutionSlotSharingGroup() {
            return this.executionSlotSharingGroup;
        }

        public SlotInfo getSlotInfo() {
            return this.slotInfo;
        }
    }

    static class ExecutionSlotSharingGroup {
        private final Set<ExecutionVertexID> containedExecutionVertices;

        public ExecutionSlotSharingGroup(Set<ExecutionVertexID> containedExecutionVertices) {
            this.containedExecutionVertices = containedExecutionVertices;
        }

        public Collection<ExecutionVertexID> getContainedExecutionVertices() {
            return this.containedExecutionVertices;
        }
    }
}

