/*
 * Decompiled with CFR 0.152.
 */
package com.dedicatedcode.reitti.service;

import com.dedicatedcode.reitti.dto.LocationPoint;
import com.dedicatedcode.reitti.event.TriggerProcessingEvent;
import com.dedicatedcode.reitti.model.security.User;
import com.dedicatedcode.reitti.service.ImportProcessor;
import com.dedicatedcode.reitti.service.processing.LocationDataIngestPipeline;
import com.dedicatedcode.reitti.service.processing.ProcessingPipelineTrigger;
import jakarta.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class DefaultImportProcessor
implements ImportProcessor {
    private static final Logger logger = LoggerFactory.getLogger(DefaultImportProcessor.class);
    private final LocationDataIngestPipeline locationDataIngestPipeline;
    private final int batchSize;
    private final int processingIdleStartTime;
    private final ProcessingPipelineTrigger processingPipelineTrigger;
    private final ScheduledExecutorService scheduler;
    private final ConcurrentHashMap<String, ScheduledFuture<?>> pendingTriggers;
    private final ThreadPoolExecutor importExecutors = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());

    public DefaultImportProcessor(LocationDataIngestPipeline locationDataIngestPipeline, @Value(value="${reitti.import.batch-size:10000}") int batchSize, @Value(value="${reitti.import.processing-idle-start-time:15}") int processingIdleStartTime, ProcessingPipelineTrigger processingPipelineTrigger) {
        this.locationDataIngestPipeline = locationDataIngestPipeline;
        this.batchSize = batchSize;
        this.processingIdleStartTime = processingIdleStartTime;
        this.processingPipelineTrigger = processingPipelineTrigger;
        this.scheduler = Executors.newScheduledThreadPool(2);
        this.pendingTriggers = new ConcurrentHashMap();
    }

    public void processBatch(User user, List<LocationPoint> batch) {
        logger.debug("Sending batch of [{}] locations for user [{}] into executor queue", (Object)batch.size(), (Object)user.getUsername());
        ArrayList<LocationPoint> points = new ArrayList<LocationPoint>(batch);
        this.importExecutors.submit(() -> {
            logger.trace("Sending batch of {} locations for storing", (Object)points.size());
            this.locationDataIngestPipeline.processLocationData(user.getUsername(), points);
            logger.trace("Sending batch of {} locations for processing", (Object)points.size());
            this.scheduleProcessingTrigger(user.getUsername());
        });
    }

    public void scheduleProcessingTrigger(String username) {
        ScheduledFuture existingTrigger = (ScheduledFuture)this.pendingTriggers.get(username);
        if (existingTrigger != null && !existingTrigger.isDone()) {
            existingTrigger.cancel(false);
        }
        ScheduledFuture<?> newTrigger = this.scheduler.schedule(() -> {
            try {
                logger.debug("Triggered processing for user: {}", (Object)username);
                TriggerProcessingEvent triggerEvent = new TriggerProcessingEvent(username, null, UUID.randomUUID().toString());
                this.processingPipelineTrigger.handle(triggerEvent, false);
                this.pendingTriggers.remove(username);
            }
            catch (Exception e) {
                logger.error("Failed to trigger processing for user: {}", (Object)username, (Object)e);
            }
        }, (long)this.processingIdleStartTime, TimeUnit.SECONDS);
        this.pendingTriggers.put(username, newTrigger);
    }

    public boolean isIdle() {
        return this.importExecutors.getQueue().isEmpty() && this.pendingTriggers.isEmpty() || this.pendingTriggers.values().stream().allMatch(Future::isDone);
    }

    @PreDestroy
    public void shutdown() {
        this.importExecutors.shutdown();
        this.scheduler.shutdown();
        try {
            if (!this.importExecutors.awaitTermination(30L, TimeUnit.SECONDS)) {
                this.importExecutors.shutdownNow();
            }
            if (!this.scheduler.awaitTermination(5L, TimeUnit.SECONDS)) {
                this.scheduler.shutdownNow();
            }
        }
        catch (InterruptedException e) {
            this.importExecutors.shutdownNow();
            this.scheduler.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    public int getBatchSize() {
        return this.batchSize;
    }

    public int getPendingTaskCount() {
        return this.importExecutors.getQueue().size();
    }
}

