/*
 * Decompiled with CFR 0.152.
 */
package com.opcfy.logger.kafka;

import com.opcfy.logger.kafka.OpcUaKafkaSourceConnectorConfig;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.ServerSocket;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission;
import java.security.SecureRandom;
import java.security.cert.X509Certificate;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;

public class OpcUaKafkaSourceConnectorTask
extends SourceTask {
    private OpcUaKafkaSourceConnectorConfig config;
    private Process process;
    private Thread executor;
    private BlockingQueue<String> stdoutLines;
    private Process haProcess;
    private Thread haExecutor;
    private BlockingQueue<String> haStdoutLines;
    private volatile boolean running = false;
    private volatile boolean haStarted = false;
    private volatile boolean ovlStarted = false;
    private volatile boolean shutdownFlag = false;
    private long lastHealthcheck = 0L;
    private static final Schema VALUE_SCHEMA = SchemaBuilder.struct().name("com.opcfy.logger.kafka.Record").field("application", Schema.STRING_SCHEMA).field("cluster", Schema.STRING_SCHEMA).field("node", Schema.STRING_SCHEMA).field("message", Schema.STRING_SCHEMA).field("timestamp", Schema.STRING_SCHEMA).build();
    private int failureCounter = 0;
    private int pollCounter = 0;

    public String version() {
        return "1.0.0";
    }

    public boolean isProcessRunning(String processName) throws IOException {
        List<Long> pids = this.findPidsByExecutableName(processName);
        return pids.size() > 0;
    }

    public static boolean terminateProcess(long pid, Long gracefulTimeout) {
        Optional<ProcessHandle> opt = ProcessHandle.of(pid);
        if (opt.isEmpty()) {
            return false;
        }
        ProcessHandle ph = opt.get();
        boolean requested = ph.destroy();
        if (!requested) {
            return ph.destroyForcibly();
        }
        CompletableFuture<ProcessHandle> exitFuture = ph.onExit();
        try {
            exitFuture.get(gracefulTimeout, TimeUnit.MILLISECONDS);
            return true;
        }
        catch (TimeoutException te) {
            boolean forced = ph.destroyForcibly();
            if (!forced) {
                return !ph.isAlive();
            }
            try {
                ph.onExit().get(1L, TimeUnit.SECONDS);
            }
            catch (Exception exception) {
                // empty catch block
            }
            return !ph.isAlive();
        }
        catch (Exception e) {
            return !ph.isAlive();
        }
    }

    private boolean waitUntilOvlStops() {
        if (!this.config.checkIfProcessRunningAtStart()) {
            return true;
        }
        String processName = "ogamma-logger";
        long timeout = this.config.processExitWaitTimeout();
        try {
            if (this.isProcessRunning(processName)) {
                System.out.println("OpcUaConnector: " + processName + " is running, waiting for it to finish...");
                long startTime = System.currentTimeMillis();
                while (this.isProcessRunning(processName)) {
                    long elapsed = System.currentTimeMillis() - startTime;
                    if (elapsed > timeout) {
                        System.out.println("Timeout reached, process did not complete in " + timeout + " milliseconds.");
                        break;
                    }
                    Thread.sleep(1000L);
                }
                if (!this.isProcessRunning(processName)) {
                    return true;
                }
                System.out.println("OpcUaConnector: the process " + processName + " from other task is still running.");
                return false;
            }
            return true;
        }
        catch (IOException ex) {
            System.out.println("OpcUaConnector: failed to check if the process " + processName + " is running, error: " + String.valueOf(ex));
        }
        catch (InterruptedException ex) {
            System.out.println("OpcUaConnector: failed to check if the process " + processName + " is running, error: " + String.valueOf(ex));
        }
        return false;
    }

    public boolean isTcpPortAvailable(int port) {
        boolean bl;
        ServerSocket serverSocket = new ServerSocket(port);
        try {
            serverSocket.setReuseAddress(true);
            bl = true;
        }
        catch (Throwable throwable) {
            try {
                try {
                    serverSocket.close();
                }
                catch (Throwable throwable2) {
                    throwable.addSuppressed(throwable2);
                }
                throw throwable;
            }
            catch (Exception e) {
                return false;
            }
        }
        serverSocket.close();
        return bl;
    }

    public void start(Map<String, String> props) {
        this.haStarted = false;
        this.ovlStarted = false;
        this.shutdownFlag = false;
        System.out.println("OpcUaConnector: Starting ...");
        this.config = new OpcUaKafkaSourceConnectorConfig(props);
        this.config.validate();
        long startTime = System.currentTimeMillis();
        int checkCounter = 0;
        while (!this.isTcpPortAvailable(Integer.parseInt(this.config.ovlPort()))) {
            long elapsed;
            if (++checkCounter == 1) {
                System.out.println("OpcUaConnector: TCP port " + this.config.ovlPort() + " is not free, waiting until it is available ...");
            }
            if ((elapsed = System.currentTimeMillis() - startTime) > this.config.processExitWaitTimeout()) {
                System.out.println("OpcUaConnector: TCP port " + this.config.ovlPort() + " is not free");
                throw new RuntimeException("OpcUaConnector: TCP port " + this.config.ovlPort() + " is not free");
            }
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException interruptedException) {}
        }
        if (!this.waitUntilOvlStops()) {
            System.out.println("OpcUaConnector: another OPC UA Producer process is still running, which is not allowed.");
            throw new RuntimeException("OpcUaConnector: another OPC UA Producer process is still running, which is not allowed.");
        }
        this.stdoutLines = new LinkedBlockingQueue<String>();
        this.haStdoutLines = new LinkedBlockingQueue<String>();
        try {
            this.prepareWorkDir();
            System.out.println("OpcUaConnector: work folder have been prepared to run");
            System.out.println("OpcUaConnector: starting child processess...");
            this.startProcesses();
            this.running = true;
            System.out.println("OpcUaConnector: starting child processess completed ...");
        }
        catch (Exception e) {
            System.err.println("OpcUaConnector:: failed to start wrapped OPC UA Producer process" + String.valueOf(e));
            throw new RuntimeException("OpcUaConnector: failed to start wrapped OPC UA Producer process", e);
        }
    }

    private void prepareWorkDir() throws IOException {
        Path workDir = Paths.get(this.config.workDir(), new String[0]).toAbsolutePath();
        Files.createDirectories(workDir, new FileAttribute[0]);
        Path executablePath = workDir.resolve(this.config.executableName());
        if (Files.exists(executablePath, new LinkOption[0])) {
            System.out.println("OpcUaConnector: files already extracted");
            return;
        }
        Path executablePath2 = workDir.resolve("ogamma-logger");
        Path executablePath3 = workDir.resolve("ha_node");
        Path executablePath4 = workDir.resolve("start-ha-node-manager.sh");
        System.out.println("OpcUaConnector: extracting OPC UA Producer files in folder " + workDir.toString());
        this.extractEmbeddedZip(workDir);
        try {
            EnumSet<PosixFilePermission[]> perms = EnumSet.of(PosixFilePermission.OWNER_READ, new PosixFilePermission[]{PosixFilePermission.OWNER_WRITE, PosixFilePermission.OWNER_EXECUTE, PosixFilePermission.GROUP_READ, PosixFilePermission.GROUP_EXECUTE, PosixFilePermission.OTHERS_READ, PosixFilePermission.OTHERS_EXECUTE});
            Files.setPosixFilePermissions(executablePath, perms);
            Files.setPosixFilePermissions(executablePath2, perms);
            Files.setPosixFilePermissions(executablePath3, perms);
            Files.setPosixFilePermissions(executablePath4, perms);
        }
        catch (UnsupportedOperationException e) {
            System.err.println("OpcUaConnector: exception thrown when attempted to set permissoins for OPC UA executable files: " + String.valueOf(e));
        }
    }

    private String detectOsForZip() {
        return "linux";
    }

    private void extractEmbeddedZip(Path workDir) throws IOException {
        String osName = this.detectOsForZip();
        String resourceName = "windows".equals(osName) ? "opcfy/ogamma-logger-windows-4.2.2.zip" : "opcfy/ogamma-logger-oraclelinux-4.2.2.zip";
        InputStream zipStream = ((Object)((Object)this)).getClass().getClassLoader().getResourceAsStream(resourceName);
        if (zipStream == null) {
            throw new FileNotFoundException("Embedded ZIP not found in JAR: " + resourceName);
        }
        try (ZipInputStream zis = new ZipInputStream(zipStream);){
            ZipEntry entry;
            while ((entry = zis.getNextEntry()) != null) {
                Path targetPath = workDir.resolve(entry.getName()).normalize();
                if (!targetPath.startsWith(workDir)) {
                    throw new IOException("Zip entry outside target dir: " + entry.getName());
                }
                if (entry.isDirectory()) {
                    Files.createDirectories(targetPath, new FileAttribute[0]);
                } else {
                    Files.createDirectories(targetPath.getParent(), new FileAttribute[0]);
                    try (OutputStream os = Files.newOutputStream(targetPath, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);){
                        int len;
                        byte[] buffer = new byte[8192];
                        while ((len = zis.read(buffer)) > 0) {
                            os.write(buffer, 0, len);
                        }
                    }
                }
                zis.closeEntry();
            }
        }
    }

    private static boolean waitAlive(Process p, Duration grace) {
        Instant currentTime = Instant.now();
        while (Duration.between(currentTime, Instant.now()).compareTo(grace) <= 0 && !p.isAlive()) {
            OpcUaKafkaSourceConnectorTask.sleepQuietly(50L);
        }
        if (!p.isAlive()) {
            return false;
        }
        while (p.isAlive() && Duration.between(currentTime, Instant.now()).compareTo(grace) <= 0) {
            OpcUaKafkaSourceConnectorTask.sleepQuietly(50L);
        }
        return p.isAlive();
    }

    private static void sleepQuietly(long millis) {
        try {
            Thread.sleep(millis);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    private boolean isHaReadyLine(String line) {
        return line.contains("High Availability Node Manager");
    }

    private boolean isOvlReadyLine(String line) {
        return line.contains("Visual Logger for OPC");
    }

    private void startHaProcess() throws IOException {
        Path workDir = Paths.get(this.config.workDir(), new String[0]).toAbsolutePath();
        Path executable = workDir.resolve("start-ha-node-manager.sh");
        if (!Files.exists(executable, new LinkOption[0])) {
            String msg = "OpcUaConnector: failed to start HA service, executable not found at " + String.valueOf(executable);
            System.err.println(msg);
            throw new FileNotFoundException(msg);
        }
        System.out.println("OpcUaConnector: starting HA process with executable at " + executable.toString());
        ProcessBuilder pb = new ProcessBuilder("/bin/sh", "-c", executable.toString()).directory(workDir.toFile());
        pb.redirectErrorStream(true);
        pb.redirectOutput(ProcessBuilder.Redirect.PIPE);
        Map<String, String> env = pb.environment();
        if (!this.config.haDbClusterId().isEmpty()) {
            env.put("HA_CLUSTER_ID", this.config.haDbClusterId());
        }
        if (!this.config.haNodeId().isEmpty()) {
            env.put("HA_NODE_ID", this.config.haNodeId());
        }
        if (!this.config.haPriority().isEmpty()) {
            env.put("HA_PRIORITY", this.config.haPriority());
        }
        if (!this.config.haDbHost().isEmpty()) {
            env.put("HA_DB_HOSTNAME", this.config.haDbHost());
        }
        if (!this.config.haDbPort().isEmpty()) {
            env.put("HA_DB_PORT", this.config.haDbPort());
        }
        if (!this.config.haDbUser().isEmpty()) {
            env.put("HA_DB_USER", this.config.haDbUser());
        }
        if (!this.config.haDbPassword().isEmpty()) {
            env.put("HA_DB_PASSWORD", this.config.haDbPassword());
        }
        if (!this.config.haDbName().isEmpty()) {
            env.put("HA_DB_NAME", this.config.haDbName());
        }
        if (!this.config.haDbTableName().isEmpty()) {
            env.put("HA_DB_TABLENAME", this.config.haDbTableName());
        }
        env.put("HA_HEALTH_CHECK_ENABLED", "ON");
        env.put("HA_HEALTH_CHECK_URL", this.config.ovlProtocol() + "://localhost:" + this.config.ovlPort() + "/health");
        if (!this.config.haHealthCheckInterval().isEmpty()) {
            env.put("HA_HEALTH_CHECK_INTERVAL", this.config.haHealthCheckInterval());
        }
        if (!this.config.haHealthCheckTimeout().isEmpty()) {
            env.put("HA_HEALTH_CHECK_TIMEOUT", this.config.haHealthCheckTimeout());
        }
        if (!this.config.haMaxHealthCheckFailures().isEmpty()) {
            env.put("HA_MAX_HEALTH_CHECK_FAILURES", this.config.haMaxHealthCheckFailures());
        }
        if (!this.config.haHeartbeatInterval().isEmpty()) {
            env.put("HA_HEARTBEAT_INTERVAL", this.config.haHeartbeatInterval());
        }
        if (!this.config.haHeartbeatTimeout().isEmpty()) {
            env.put("HA_HEARTBEAT_TIMEOUT", this.config.haHeartbeatTimeout());
        }
        for (Map.Entry<String, String> entry : env.entrySet()) {
            if (entry.getKey().contains("PASSWORD")) continue;
            System.out.println("OpcUaConnector: HA Environment variable " + entry.getKey() + "=" + entry.getValue());
        }
        this.haExecutor = new Thread(() -> {
            block9: {
                System.out.println("OpcUaConnector: HA output redirect thread started");
                try (BufferedReader reader = new BufferedReader(new InputStreamReader(this.haProcess.getInputStream()));){
                    String line;
                    while ((line = reader.readLine()) != null && !this.shutdownFlag) {
                        System.out.println("OpcUaConnector: HA output: " + line);
                        if (!this.haStarted && this.isHaReadyLine(line)) {
                            this.haStarted = true;
                        }
                        this.haStdoutLines.offer(line);
                    }
                }
                catch (IOException e) {
                    if (!this.running) break block9;
                    System.err.println("OpcUaConnector: unexpected error in HA output redirect thread: " + String.valueOf(e));
                }
            }
        });
        this.haExecutor.setDaemon(true);
        this.haProcess = pb.start();
        this.haExecutor.start();
        this.haProcess.onExit().thenRun(() -> System.out.println("OpcUaConnector: HA process has finished with exit code: " + this.haProcess.exitValue()));
        Duration readyTimeout = Duration.ofSeconds(10L);
        if (!this.haProcess.isAlive()) {
            System.err.println("OpcUaConnector: HA service exited immediately after starting");
            throw new IOException("OpcUaConnector: HA service exited immediately after starting");
        }
        System.out.println("OpcUaConnector: OPC UA HA Service process started. Waiting for console output ...");
        Instant currentTime = Instant.now();
        while (Duration.between(currentTime, Instant.now()).compareTo(readyTimeout) <= 0) {
            if (!this.haProcess.isAlive()) {
                System.err.println("OpcUaConnector: HA service already stopped");
                throw new IOException("pcUaConnector: HA service already stopped");
            }
            if (this.haStarted) {
                System.out.println("OpcUaConnector: OPC UA HA Service ready!");
                return;
            }
            OpcUaKafkaSourceConnectorTask.sleepQuietly(100L);
        }
        System.out.println("OpcUaConnector: HA process running is " + this.haProcess.isAlive());
        System.out.println("OpcUaConnector: HA service did not log messages to the console, considering it as failed.");
        throw new IOException("OpcUaConnector: HA service did not log messages to the console, considering it as failed.");
    }

    private void startOvlProcess() throws IOException {
        Path workDir = Paths.get(this.config.workDir(), new String[0]).toAbsolutePath();
        Path executable = workDir.resolve(this.config.executableName());
        if (!Files.exists(executable, new LinkOption[0])) {
            String msg = "OpcUaConnector: failed to start wrapped connector process, executable not found at: " + String.valueOf(executable);
            System.out.println(msg);
            throw new FileNotFoundException(msg);
        }
        ProcessBuilder pb = new ProcessBuilder(executable.toString()).directory(workDir.toFile());
        pb.redirectErrorStream(true);
        Map<String, String> env = pb.environment();
        if (!this.config.ovlConfigDbType().isEmpty()) {
            env.put("OVL_CONFIGDB_TYPE", this.config.ovlConfigDbType());
        }
        if (!this.config.ovlConfigDbHost().isEmpty()) {
            env.put("OVL_CONFIGDB_HOST", this.config.ovlConfigDbHost());
        }
        if (!this.config.ovlConfigDbPort().isEmpty()) {
            env.put("OVL_CONFIGDB_PORT", this.config.ovlConfigDbPort());
        }
        if (!this.config.ovlConfigDbUsername().isEmpty()) {
            env.put("OVL_CONFIGDB_USERNAME", this.config.ovlConfigDbUsername());
        }
        if (!this.config.ovlConfigDbPwd().isEmpty()) {
            env.put("OVL_CONFIGDB_PWD", this.config.ovlConfigDbPwd());
        }
        if (!this.config.ovlConfigDbPath().isEmpty()) {
            env.put("OVL_CONFIGDB_PATH", this.config.ovlConfigDbPath());
        }
        if (!this.config.ovlProtocol().isEmpty()) {
            env.put("OVL_PROTOCOL", this.config.ovlProtocol());
        }
        if (!this.config.ovlPort().isEmpty()) {
            env.put("OVL_PORT", this.config.ovlPort());
        }
        for (Map.Entry<String, String> entry : env.entrySet()) {
            if (entry.getKey().contains("PASSWORD")) continue;
            System.out.println("OpcUaConnector: OPC UA Producer Environment variable " + entry.getKey() + "=" + entry.getValue());
        }
        this.executor = new Thread(() -> {
            block9: {
                try (BufferedReader reader = new BufferedReader(new InputStreamReader(this.process.getInputStream()));){
                    String line;
                    System.out.println("OpcUaConnector: OPC UA Producer output redirect thread has started");
                    while ((line = reader.readLine()) != null && !this.shutdownFlag) {
                        System.out.println("OpcUaConnector: OPC UA Producer output: " + line);
                        if (!this.ovlStarted && this.isOvlReadyLine(line)) {
                            this.ovlStarted = true;
                        }
                        this.stdoutLines.offer(line);
                    }
                }
                catch (IOException e) {
                    if (!this.running) break block9;
                    System.out.println("OpcUaConnector: unexpected error in the OPC UA Producer output redirect thread: " + String.valueOf(e));
                }
            }
        });
        this.executor.setDaemon(true);
        this.process = pb.start();
        this.executor.start();
        this.process.onExit().thenRun(() -> System.out.println("OpcUaConnector: OPC UA Producer process has finished with exit code: " + this.process.exitValue()));
        Duration readyTimeout = Duration.ofSeconds(30L);
        if (!this.process.isAlive()) {
            System.out.println("OpcUaConnector: OPC UA Producer process exited shortly after starting.");
            throw new IOException("OpcUaConnector: OPC UA Producer process exited shortly after starting.");
        }
        Instant currentTime = Instant.now();
        while (Duration.between(currentTime, Instant.now()).compareTo(readyTimeout) < 0) {
            if (!this.process.isAlive()) {
                System.out.println("OpcUaConnector: OPC UA Producer already stopped");
                throw new IOException("OpcUaConnector: OPC UA Producer already stopped");
            }
            if (this.ovlStarted) {
                System.out.println("OpcUaConnector: OPC UA Producer is ready!");
                return;
            }
            OpcUaKafkaSourceConnectorTask.sleepQuietly(100L);
        }
        System.out.println("OpcUaConnector: OPC UA Producer process running is " + this.process.isAlive());
        System.out.println("OpcUaConnector: OPC UA Producer process did not log messages to the console, considering it as failed.");
        throw new IOException("OpcUaConnector: OPC UA Producer process did not log messages to the console, considering it as failed.");
    }

    private void startProcesses() throws IOException {
        this.failureCounter = 0;
        this.startOvlProcess();
        if (this.config.haEnabled().booleanValue()) {
            this.startHaProcess();
        } else {
            System.out.println("OpcUaConnector: HA service not enabled, not starting it.");
        }
    }

    public List<SourceRecord> poll() throws InterruptedException {
        SourceRecord record;
        Struct value;
        Map<String, String> sourceOffset;
        Map<String, String> sourcePartition;
        String line;
        ArrayList<SourceRecord> records = new ArrayList<SourceRecord>();
        while ((line = (String)this.stdoutLines.poll()) != null) {
            if (!this.ovlStarted && this.isOvlReadyLine(line)) {
                this.ovlStarted = true;
            }
            sourcePartition = Collections.singletonMap("opcua-connector", "default");
            sourceOffset = Collections.singletonMap("timestamp", Instant.now().toString());
            value = new Struct(VALUE_SCHEMA).put("application", (Object)"OPC UA Connector").put("cluster", (Object)this.config.haDbClusterId()).put("node", (Object)this.config.haNodeId()).put("message", (Object)line).put("timestamp", (Object)Instant.now().toString());
            record = new SourceRecord(sourcePartition, sourceOffset, this.config.topic(), VALUE_SCHEMA, (Object)value);
            records.add(record);
        }
        while ((line = (String)this.haStdoutLines.poll()) != null) {
            if (!this.haStarted && this.isHaReadyLine(line)) {
                this.haStarted = true;
            }
            sourcePartition = Collections.singletonMap("opcua-ha-service", "default");
            sourceOffset = Collections.singletonMap("timestamp", Instant.now().toString());
            value = new Struct(VALUE_SCHEMA).put("application", (Object)"HA Service for OPC UA Connector").put("cluster", (Object)this.config.haDbClusterId()).put("node", (Object)this.config.haNodeId()).put("message", (Object)line).put("timestamp", (Object)Instant.now().toString());
            record = new SourceRecord(sourcePartition, sourceOffset, this.config.haTopic(), VALUE_SCHEMA, (Object)value);
            records.add(record);
        }
        this.performHealthcheckIfNeeded();
        if (this.pollCounter < 3) {
            System.out.println("OpcUaConnector: polling " + this.pollCounter + ", produced " + records.size() + " records");
        }
        ++this.pollCounter;
        if (records.isEmpty()) {
            Thread.sleep(this.config.pollIntervalMs());
        }
        return records;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void performHealthcheckIfNeeded() {
        long now;
        if (!this.config.healthcheckEnabled()) {
            return;
        }
        if (this.ovlStarted && this.running && this.process != null && !this.process.isAlive()) {
            ++this.failureCounter;
            if (this.failureCounter > 3) {
                throw new ConnectException("OpcUaConnector: Producer process is not running");
            }
        }
        if (this.config.haEnabled().booleanValue() && this.running && this.haStarted && this.haProcess != null && !this.haProcess.isAlive()) {
            ++this.failureCounter;
            if (this.failureCounter > 3) {
                throw new ConnectException("OpcUaConnector: HA process is not running");
            }
        }
        if ((now = System.currentTimeMillis()) - this.lastHealthcheck < this.config.healthcheckIntervalMs()) {
            return;
        }
        this.lastHealthcheck = now;
        HttpURLConnection conn = null;
        String endpointUrl = String.format("%s://localhost:%s/health", this.config.ovlProtocol(), this.config.ovlPort());
        try {
            String line;
            URL url = new URL(endpointUrl);
            if ("https".equalsIgnoreCase(url.getProtocol())) {
                TrustManager[] trustAll = new TrustManager[]{new X509TrustManager(){

                    @Override
                    public void checkClientTrusted(X509Certificate[] chain, String authType) {
                    }

                    @Override
                    public void checkServerTrusted(X509Certificate[] chain, String authType) {
                    }

                    @Override
                    public X509Certificate[] getAcceptedIssuers() {
                        return new X509Certificate[0];
                    }
                }};
                SSLContext sslContext = SSLContext.getInstance("TLS");
                sslContext.init(null, trustAll, new SecureRandom());
                HttpsURLConnection https = (HttpsURLConnection)url.openConnection();
                https.setSSLSocketFactory(sslContext.getSocketFactory());
                https.setHostnameVerifier((hostname, session) -> true);
                conn = https;
            } else {
                conn = (HttpURLConnection)url.openConnection();
            }
            conn.setRequestMethod("GET");
            conn.setConnectTimeout(3000);
            conn.setReadTimeout(3000);
            int status = conn.getResponseCode();
            System.err.println("OpcUaConnector:: Healthcheck returned status " + status + " on " + endpointUrl);
            BufferedReader br = new BufferedReader(new InputStreamReader(status >= 400 ? conn.getErrorStream() : conn.getInputStream()));
            StringBuilder sb = new StringBuilder();
            while ((line = br.readLine()) != null) {
                sb.append(line).append('\n');
            }
            System.out.println("OpcUaConnector: Healthcheck response is: \n" + sb.toString());
        }
        catch (Exception e) {
            System.err.println("OpcUaConnector: Healthcheck failed on " + endpointUrl + ": " + String.valueOf(e));
        }
        finally {
            if (conn != null) {
                conn.disconnect();
            }
        }
    }

    public List<Long> findPidsByExecutableName(String name) {
        return this.findPidsByExecutableName(name, null);
    }

    public List<Long> findPidsByExecutableName(String name, ProcessHandle pp) {
        String target = name.toLowerCase();
        ProcessHandle parentProcess = pp;
        if (parentProcess == null && this.process != null) {
            parentProcess = this.process.toHandle();
        }
        List<Long> result = new ArrayList<Long>();
        if (parentProcess == null) {
            System.out.println("OpcUaConnector: the producer main process is not running yet.");
            return result;
        }
        Stream<ProcessHandle> processes = parentProcess.descendants();
        if (processes != null && processes.count() > 0L) {
            System.out.println("OpcUaConnector: this task's producer main process has " + processes.count() + " child processes");
            result = processes.filter(ph -> {
                ProcessHandle.Info info = ph.info();
                String cmd = info.command().orElse("").toLowerCase();
                String cmdLine = info.commandLine().orElse("").toLowerCase();
                return cmd.contains(target) || cmdLine.contains(target);
            }).map(ProcessHandle::pid).collect(Collectors.toList());
        } else {
            System.out.println("OpcUaConnector: this task's producer main process has no child processes");
        }
        if (this.process != null) {
            ProcessHandle.Info info = this.process.info();
            String cmd = info.command().orElse("").toLowerCase();
            String cmdLine = info.commandLine().orElse("").toLowerCase();
            if (cmd.contains(target) || cmdLine.contains(target)) {
                System.out.println("OpcUaConnector: main producer process name is " + name);
            } else {
                System.out.println("OpcUaConnector: main producer process command: " + String.valueOf(info.commandLine()) + ", info: " + info.toString());
            }
            result.add(this.process.pid());
        }
        return result;
    }

    public void stop() {
        System.out.println("OpcUaConnector: stopping task ...");
        this.shutdownFlag = true;
        this.running = false;
        if (this.process != null) {
            System.out.println("OpcUaConnector: stopping OPC UA Producer process ...");
            String processName = "ogamma-logger";
            try {
                List<Long> pids = this.findPidsByExecutableName(processName);
                if (pids.size() < 1) {
                    System.out.println("OpcUaConnector: No process found with the name: " + processName + " to stop, probably already stopped.");
                } else {
                    for (Long pid : pids) {
                        try {
                            System.out.println("OpcUaConnector: stopping process " + processName + " with pid = " + pid + " ...");
                            OpcUaKafkaSourceConnectorTask.terminateProcess(pid, this.config.processExitWaitTimeout());
                            System.out.println("OpcUaConnector: stopped process with id " + pid);
                        }
                        catch (Exception ex) {
                            System.out.println("OpcUaConnector: failed to stop process with id " + pid);
                        }
                    }
                }
            }
            catch (Exception e) {
                System.out.println("OpcUaConnector: producer process stopping fired error: " + e.getMessage());
                e.printStackTrace();
            }
            this.process.destroy();
            try {
                if (!this.process.waitFor(this.config.processExitWaitTimeout(), TimeUnit.MILLISECONDS)) {
                    System.out.println("OpcUaConnector: OPC UA Producer process did not stop within " + this.config.processExitWaitTimeout() + " milliseconds, killing......");
                    this.process.destroyForcibly();
                    System.out.println("OpcUaConnector: OPC UA Producer process stopped forcibly");
                }
            }
            catch (InterruptedException e) {
                System.out.println("OpcUaConnector: OPC UA Producer process could not be stopped gracefully");
                Thread.currentThread().interrupt();
            }
        }
        if (this.executor != null) {
            try {
                System.out.println("OpcUaConnector: OPC UA Producer: waiting for the output redirect thread stop");
                this.executor.join();
                System.out.println("OpcUaConnector: OPC UA Producer output redirect thread stopped");
            }
            catch (InterruptedException e) {
                System.out.println("OpcUaConnector: OPC UA Producer output redirect thread could not be stopped gracefully");
                Thread.currentThread().interrupt();
            }
            this.executor = null;
        }
        if (this.haProcess != null) {
            System.out.println("OpcUaConnector: stopping HA process ...");
            this.haProcess.destroy();
            try {
                if (!this.haProcess.waitFor(30L, TimeUnit.SECONDS)) {
                    System.out.println("OpcUaConnector: HA process did not stop withint 30 seconds, killing......");
                    this.haProcess.destroyForcibly();
                    System.out.println("OpcUaConnector: HA process stopped by force.");
                }
            }
            catch (InterruptedException e) {
                System.out.println("OpcUaConnector: HA process could not be stopped gracefully");
                Thread.currentThread().interrupt();
            }
            System.out.println("OpcUaConnector: HA process stopped.");
        }
        if (this.haExecutor != null) {
            try {
                System.out.println("OpcUaConnector: HA: waiting for the output redirect thread stop");
                this.haExecutor.join();
                System.out.println("OpcUaConnector: HA output redirect thread stopped");
            }
            catch (InterruptedException e) {
                System.out.println("OpcUaConnector: HA output redirect thread could not be stopped gracefully");
                Thread.currentThread().interrupt();
            }
            this.haExecutor = null;
        }
    }
}

