elsa v0.12.3 Elsa.Supervisor

Top-level supervisor that orchestrates all other components of the Elsa library. Allows for a single point of integration into your application supervision tree and configuration by way of a series of nested keyword lists

Components not needed by a running application (if your application only consumes messages from Kafka and never producers back to it) can be safely omitted from the configuration.

Link to this section Summary

Functions

Returns a specification to start this module under a supervisor.

Callback implementation for Supervisor.init/1.

Defines a connection for locating the Elsa Registry process.

Starts the top-level Elsa supervisor and links it to the current process. Starts a brod client and a custom process registry by default and then conditionally starts and takes supervision of any brod group-based consumers or producer processes defined.

Link to this section Functions

Link to this function

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

Link to this function

dynamic_supervisor(registry)

Callback implementation for Supervisor.init/1.

Link to this function

registry(connection)

registry(String.t() | atom()) :: atom()

Defines a connection for locating the Elsa Registry process.

Link to this function

start_link(args)

start_link(keyword()) :: GenServer.on_start()

Starts the top-level Elsa supervisor and links it to the current process. Starts a brod client and a custom process registry by default and then conditionally starts and takes supervision of any brod group-based consumers or producer processes defined.

Options

  • :endpoints - Required. Keyword list of kafka brokers. ex. [localhost: 9092]

  • :connection - Required. Atom used to track kafka connection.

  • :config - Optional. Client configuration options passed to brod.

  • :producer - Optional. Can be a single producer configuration of multiples in a list.

  • :group_consumer - Optional. Group consumer configuration.

  • :consumer - Optional. Simple topic consumer configuration.

Producer Config

  • :topic - Required. Producer will be started for configured topic.

  • :poll - Optional. If set to a number in milliseconds, will poll for new partitions and startup producers on the fly.

  • :config - Optional. Producer configuration options passed to brod_producer.

Group Consumer Config

  • :group - Required. Name of consumer group.

  • :topics - Required. List of topics to subscribe to.

  • :handler - Required. Module that implements Elsa.Consumer.MessageHandler behaviour.

  • :handler_init_args - Optional. Any args to be passed to init function in handler module.

  • :assignment_received_handler - Optional. Arity 4 Function that will be called with any partition assignments. Return :ok to for assignment to be subscribed to. Return {:error, reason} to stop subscription. Arguments are group, topic, partition, generation_id.

  • :assignments_revoked_handler - Optional. Zero arity function that will be called when assignments are revoked. All workers will be shutdown before callback is invoked and must return :ok.

  • :config - Optional. Consumer configuration options passed to brod_consumer.

Consumer Config

  • :topic - Required. Topic to subscribe to.

  • :begin_offset - Required. Where to begin consuming from. Must be either :earliest, :latest, or a valid offset integer.

  • :handler - Required. Module that implements Elsa.Consumer.MessageHandler behaviour.

  • :partition - Optional. Topic partition to subscribe to. If nil, will default to all partitions.

  • :handler_init_args - Optional. Any args to be passed to init function in handler module.

  • :poll - Optional. If set to number of milliseconds, will poll for new partitions and startup consumers on the fly.

Example

  Elsa.Supervisor.start_link([
    endpoints: [localhost: 9092],
    connection: :conn,
    producer: [topic: "topic1"],
    consumer: [
      topic: "topic2",
      partition: 0,
      begin_offset: :earliest,
      handler: ExampleHandler
    ],
    group_consumer: [
      group: "example-group",
      topics: ["topic1"],
      handler: ExampleHandler,
      config: [
        begin_offset: :earliest,
        offset_reset_policy: :reset_to_earliest
      ]
    ]
  ])
Link to this function

via_name(registry, name)