0
votes

I have 2 mix projects one is called server which publishes factorials to kafka and the other is a consumer who is supposed to solve the factorials, but when i start the consumer it keeps crashing.

server.exs

defmodule Server do
 alias KafkaEx.Protocol.Produce.Request
 alias KafkaEx.Protocol.CreateTopics.TopicRequest

 def create_topic() do
   KafkaEx.create_topics([%TopicRequest{topic: "factorials-to-be-calculated", num_partitions: 1, replication_factor: 1}])
 end

 def delete_topic() do
   KafkaEx.delete_topics("factorials-to-be-calculated")
 end

 def generate_number(max, min \\ 0) do
   number = :rand.uniform(max - min) + min
   message = %KafkaEx.Protocol.Produce.Message{value: Integer.to_string(number)}
   IO.puts(number)
   request = %{%Request{topic: "factorials-to-be-calculated", required_acks: 1} | messages: [message]}
   {:ok, offset} = KafkaEx.produce(request)
 end
end

factorial_consumer.exs

defmodule Consumer.FactorialConsumer do
  use KafkaEx.GenConsumer
  require Logger
  alias KafkaEx.Protocol.Fetch.Message
  alias KafkaEx.Protocol.Produce.Request

  def handle_message_set(message_set, state) do
    for %Message{value: message} <- message_set do
      Logger.debug(fn -> "message: " <> inspect(message) end)
    end
    {:async_commit, state}
  end

  def factorial(0), do: 1
  def factorial(n), do: n * factorial(n-1)

end

application.exs (consumer)

defmodule Consumer.Application do
  # See https://hexdocs.pm/elixir/Application.html
  # for more information on OTP Applications
  @moduledoc false

  use Application
  import Supervisor.Spec
  @impl true
  def start(_type, _args) do

    gen_consumer_impl = Consumer.FactorialConsumer
    consumer_group_name = "Factorials"
    topic_names = ["factorials-to-be-calculated"]
    consumer_group_opts = []

    children = [
      supervisor(
        KafkaEx.ConsumerGroup,
        [gen_consumer_impl, consumer_group_name, topic_names, consumer_group_opts]
      )
    ]

    # See https://hexdocs.pm/elixir/Supervisor.html
    # for other strategies and supported options
    opts = [strategy: :one_for_one, name: Consumer.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

this is the error that i get when running iex -S mix run

Thank you for any help you can offer me

error description

edit: the link to the library i am using (KafkaEx) https://hexdocs.pm/kafka_ex/KafkaEx.html

stacktrace in plaintext: 17:07:13.790 [error] GenServer #PID<0.220.0> terminating ** (CaseClauseError) no case clause matching: {:error, {{:EXIT, {{:case_clause, {:error, {:undef, [{Consumer.FactorialConsumer, :init, ["factorials-to-be-calculated", 0, nil], []}, {KafkaEx.GenConsumer, :init, 1, [file: 'lib/kafka_ex/gen_consumer.ex', line: 545]}, {:gen_server, :init_it, 2, [file: 'gen_server.erl', line: 417]}, {:gen_server, :init_it, 6, [file: 'gen_server.erl', line: 385]}, {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 226]}]}}}, [{KafkaEx.GenConsumer.Supervisor, :"-start_workers/3-fun-0-", 3, [file: 'lib/kafka_ex/gen_consumer/supervisor.ex', line: 100]}, {Enum, :"-each/2-lists^foreach/1-0-", 2, [file: 'lib/enum.ex', line: 786]}, {KafkaEx.GenConsumer.Supervisor, :start_workers, 3, [file: 'lib/kafka_ex/gen_consumer/supervisor.ex', line: 99]}, {KafkaEx.GenConsumer.Supervisor, :start_link, 4, [file: 'lib/kafka_ex/gen_consumer/supervisor.ex', line: 57]}, {:supervisor, :do_start_child_i, 3, [file: 'supervisor.erl', line: 385]}, {:supervisor, :do_start_child, 2, [file: 'supervisor.erl', line: 371]}, {:supervisor, :handle_start_child, 2, [file: 'supervisor.erl', line: 677]}, {:supervisor, :handle_call, 3, [file: 'supervisor.erl', line: 426]}]}}, {:child, :undefined, :consumer, {KafkaEx.GenConsumer.Supervisor, :start_link, [{KafkaEx.GenConsumer, Consumer.FactorialConsumer}, "Factorials", [{"factorials-to-be-calculated", 0}], [commit_interval: 1000, generation_id: 224, member_id: "kafka_ex-dee44079-8cae-4432-9926-7d35f7d8c7dd"]]}, :permanent, :infinity, :supervisor, [KafkaEx.GenConsumer.Supervisor]}}} (kafka_ex 0.11.0) lib/kafka_ex/consumer_group.ex:340: KafkaEx.ConsumerGroup.start_consumer/5 (kafka_ex 0.11.0) lib/kafka_ex/consumer_group/manager.ex:479: KafkaEx.ConsumerGroup.Manager.start_consumer/2 (kafka_ex 0.11.0) lib/kafka_ex/consumer_group/manager.ex:204: KafkaEx.ConsumerGroup.Manager.handle_info/2 (stdlib 3.13.2) gen_server.erl:680: :gen_server.try_dispatch/4 (stdlib 3.13.2) gen_server.erl:756: :gen_server.handle_msg/6 (stdlib 3.13.2) proc_lib.erl:226: :proc_lib.init_p_do_apply/3 Last message: {:EXIT, #PID<0.224.0>, {:shutdown, :rebalance}} State: %KafkaEx.ConsumerGroup.Manager.State{assignments: [], consumer_module: Consumer.FactorialConsumer, consumer_opts: [commit_interval: 1000], consumer_supervisor_pid: #PID<0.225.0>, gen_consumer_module: KafkaEx.GenConsumer, generation_id: 223, group_name: "Factorials", heartbeat_interval: 1000, heartbeat_timer: #PID<0.224.0>, leader_id: "kafka_ex-dee44079-8cae-4432-9926-7d35f7d8c7dd", member_id: "kafka_ex-dee44079-8cae-4432-9926-7d35f7d8c7dd", members: nil, partition_assignment_callback: &KafkaEx.ConsumerGroup.PartitionAssignment.round_robin/2, session_timeout: 30000, session_timeout_padding: 10000, supervisor_pid: #PID<0.219.0>, topics: ["factorials-to-be-calculated"], worker_name: #PID<0.221.0>}

1
Looks like it expects FactorialConsumer to implement init/? callback. The link to the library you use and the error stack trace posted as a plain text could have helped to answer more precisely.Aleksei Matiushkin
I just tried adding a init callback but this did not solve the issue, i also added the stacktrace and library in my postrafaelBackx

1 Answers

0
votes

I just fixed it by creating a new mix project