/*
 * Decompiled with CFR 0.152.
 */
package com.dedicatedcode.reitti.service.processing;

import com.dedicatedcode.reitti.event.LocationProcessEvent;
import com.dedicatedcode.reitti.event.TriggerProcessingEvent;
import com.dedicatedcode.reitti.model.geo.RawLocationPoint;
import com.dedicatedcode.reitti.model.security.User;
import com.dedicatedcode.reitti.repository.PreviewRawLocationPointJdbcService;
import com.dedicatedcode.reitti.repository.RawLocationPointJdbcService;
import com.dedicatedcode.reitti.repository.UserJdbcService;
import com.dedicatedcode.reitti.service.ImportStateHolder;
import com.dedicatedcode.reitti.service.processing.UnifiedLocationProcessingService;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

@Service
public class ProcessingPipelineTrigger {
    private static final Logger log = LoggerFactory.getLogger(ProcessingPipelineTrigger.class);
    private final ImportStateHolder stateHolder;
    private final RawLocationPointJdbcService rawLocationPointJdbcService;
    private final PreviewRawLocationPointJdbcService previewRawLocationPointJdbcService;
    private final UserJdbcService userJdbcService;
    private final UnifiedLocationProcessingService unifiedLocationProcessingService;
    private final int batchSize;
    private final ThreadPoolExecutor executorService = (ThreadPoolExecutor)Executors.newFixedThreadPool(1);

    public ProcessingPipelineTrigger(ImportStateHolder stateHolder, RawLocationPointJdbcService rawLocationPointJdbcService, PreviewRawLocationPointJdbcService previewRawLocationPointJdbcService, UserJdbcService userJdbcService, UnifiedLocationProcessingService unifiedLocationProcessingService, @Value(value="${reitti.import.batch-size:100}") int batchSize) {
        this.stateHolder = stateHolder;
        this.rawLocationPointJdbcService = rawLocationPointJdbcService;
        this.previewRawLocationPointJdbcService = previewRawLocationPointJdbcService;
        this.userJdbcService = userJdbcService;
        this.unifiedLocationProcessingService = unifiedLocationProcessingService;
        this.batchSize = batchSize;
    }

    @Scheduled(cron="${reitti.process-data.schedule}")
    public void start() {
        if (this.stateHolder.isImportRunning() || !this.isIdle()) {
            log.warn("Data Import is currently running, wil skip this run");
            return;
        }
        for (User user : this.userJdbcService.findAll()) {
            this.handleDataForUser(user, null, UUID.randomUUID().toString(), false);
        }
    }

    public void start(User user) {
        this.handleDataForUser(user, null, UUID.randomUUID().toString(), false);
    }

    public void handle(TriggerProcessingEvent event, boolean immediate) {
        Optional byUsername = this.userJdbcService.findByUsername(event.getUsername());
        if (byUsername.isPresent()) {
            this.handleDataForUser((User)byUsername.get(), event.getPreviewId(), event.getTraceId(), immediate);
        } else {
            log.warn("No user found for username: {}", (Object)event.getUsername());
        }
    }

    private void handleDataForUser(User user, String previewId, String traceId, boolean immediate) {
        int totalProcessed = 0;
        while (true) {
            this.stateHolder.importStarted();
            try {
                List currentBatch = previewId == null ? this.rawLocationPointJdbcService.findByUserAndProcessedIsFalseOrderByTimestampWithLimit(user, this.batchSize, 0) : this.previewRawLocationPointJdbcService.findByUserAndProcessedIsFalseOrderByTimestampWithLimit(user, previewId, this.batchSize, 0);
                if (currentBatch.isEmpty()) break;
                Instant earliest = ((RawLocationPoint)currentBatch.getFirst()).getTimestamp();
                Instant latest = ((RawLocationPoint)currentBatch.getLast()).getTimestamp();
                log.debug("Scheduling stay detection event for user [{}] and points between [{}] and [{}]", new Object[]{user.getId(), earliest, latest});
                currentBatch.forEach(RawLocationPoint::markProcessed);
                if (previewId == null) {
                    this.rawLocationPointJdbcService.bulkUpdateProcessedStatus(currentBatch);
                } else {
                    this.previewRawLocationPointJdbcService.bulkUpdateProcessedStatus(currentBatch);
                }
                if (!immediate) {
                    this.executorService.submit(() -> this.unifiedLocationProcessingService.processLocationEvent(new LocationProcessEvent(user.getUsername(), earliest, latest, previewId, traceId)));
                } else {
                    this.unifiedLocationProcessingService.processLocationEvent(new LocationProcessEvent(user.getUsername(), earliest, latest, previewId, traceId));
                }
                totalProcessed += currentBatch.size();
            }
            catch (Exception e) {
                log.error("Error processing batch for user [{}]", (Object)user.getId(), (Object)e);
            }
        }
        this.stateHolder.importFinished();
        log.debug("Processed [{}] unprocessed points for user [{}]", (Object)totalProcessed, (Object)user.getId());
    }

    public boolean isIdle() {
        return this.executorService.getQueue().isEmpty() && this.executorService.getActiveCount() == 0;
    }

    public int getPendingCount() {
        return this.executorService.getActiveCount() + this.executorService.getQueue().size();
    }
}

