#!/usr/bin/env ruby
# frozen_string_literal: true

require 'bundler/setup'
require 'redis_cluster_client'

module PubSubDebug
  WAIT_SEC = 2.0

  module_function

  def spawn_publisher(client, channel)
    Thread.new(client, channel) do |cli, chan|
      role = 'Publisher'
      i = 0

      loop do
        handle_errors(role) do
          cli.call('spublish', chan, i)
          log(role, :spublish, chan, i)
          i += 1
        end
      ensure
        sleep WAIT_SEC
      end
    rescue StandardError => e
      log(role, :dead, e.class, e.message)
      raise
    end
  end

  def spawn_subscriber(client, channel) # rubocop:disable Metrics/AbcSize
    Thread.new(client, channel) do |cli, chan|
      role = 'Subscriber'
      ps = nil

      loop do
        ps = cli.pubsub
        ps.call('ssubscribe', chan)
        break
      rescue StandardError => e
        log(role, :init, e.class, e.message)
        ps&.close
      ensure
        sleep WAIT_SEC
      end

      loop do
        handle_errors('Subscriber') do
          event = ps.next_event(WAIT_SEC)
          log(role, *event) unless event.nil?
          case event&.first
          when 'sunsubscribe' then ps.call('ssubscribe', chan)
          end
        end
      end
    rescue StandardError, SignalException => e
      log(role, :dead, e.class, e.message)
      ps&.close
      raise
    end
  end

  def handle_errors(role)
    yield
  rescue RedisClient::ConnectionError, RedisClient::Cluster::InitialSetupError, RedisClient::Cluster::NodeMightBeDown => e
    log(role, e.class)
  rescue RedisClient::CommandError => e
    log(role, e.class, e.message)
    raise unless e.message.start_with?('CLUSTERDOWN')
  rescue StandardError => e
    log(role, e.class, e.message)
    raise
  end

  def log(*texts)
    return if texts.nil? || texts.empty?

    message = texts.map { |text| "#{' ' * [15 - text.to_s.size, 0].max}#{text}" }.join(': ')
    print "#{message}\n"
  end
end

nodes = (6379..6384).map { |port| "redis://127.0.0.1:#{port}" }.freeze
clients = Array.new(6) { RedisClient.cluster(nodes: nodes, connect_with_original_config: true).new_client }.freeze
threads = []

Signal.trap(:INT) do
  threads.each(&:exit)
  clients.each(&:close)
  PubSubDebug.log("\nBye bye")
  exit 0
end

%w[chan1 chan2 chan3].each_with_index do |channel, i|
  threads << PubSubDebug.spawn_subscriber(clients[i], channel)
  threads << PubSubDebug.spawn_publisher(clients[i + 3], channel)
end

threads.each(&:join)
