There are several Kafka clients for ruby but we recommend using rdkafka-ruby
It's a wrapper around the excellent librdkafka and supports all the features in the latest Kafka release. The main feature that we need from a library is support for authentication with SASL/SCRAM since this is what CloudKarafka uses.
For a complete code example that lets you get going quickly go here: https://github.com/CloudKarafka/ruby-kafka-example
https://github.com/karafka/karafka
Karafka is a framework for Apache Kafka based Ruby and Rails applications development. Karafka allows you to capture everything that happens in your systems in large scale, providing you with a seamless and stable core for consuming and processing this data, without having to focus on things that are not your business domain. Karafka not only handles incoming messages but also provides tools for building complex data-flow applications that receive and send messages.
Note: The library Karafka and the Service CloudKarafka share names but there is no relation between them.
Karafka has an example app here:
https://github.com/karafka/example-app
This example only require some small changes in order to work with CloudKarafka.
The changes that needs to be made depends on if you are running a dedicated or shared plan on CloudKarafka.
If you are running a dedicated cluster, you will only need to edit the kafka config in the setup phase of Karafka like this
config.kafka.seed_brokers = ENV['CLOUDKARAFKA_BROKERS']&.split(",")&.map { |b| "kafka://#{b}" }
config.kafka.sasl_scram_username = ENV['CLOUDKARAFKA_USERNAME']
config.kafka.sasl_scram_password = ENV['CLOUDKARAFKA_PASSWORD']
config.kafka.sasl_scram_mechanism = "sha256"
config.kafka.ssl_ca_certs_from_system = true
If you are running a shared cluster, make the changes as shown below
Edit karafka.rb to look like the following:
# frozen_string_literal: true
# Non Ruby on Rails setup
ENV['RACK_ENV'] ||= 'development'
ENV['KARAFKA_ENV'] ||= ENV['RACK_ENV']
Bundler.require(:default, ENV['KARAFKA_ENV'])
Karafka::Loader.load(Karafka::App.root)
require 'active_support/core_ext/hash'
class CloudKarafkaTopicMapper
def initialize(prefix)
@prefix = "#{prefix}-"
end
def incoming(topic)
topic.to_s.gsub(@prefix, '')
end
def outgoing(topic)
"#{@prefix}#{topic}"
end
end
class App < Karafka::App
setup do |config|
config.topic_mapper = CloudKarafkaTopicMapper.new(ENV['CLOUDKARAFKA_USERNAME'])
config.consumer_mapper = proc { |name| "#{ENV['CLOUDKARAFKA_USERNAME']}-#{name}" }
config.kafka.seed_brokers = ENV['CLOUDKARAFKA_BROKERS']&.split(",")&.map { |b| "kafka://#{b}" }
config.kafka.sasl_scram_username = ENV['CLOUDKARAFKA_USERNAME']
config.kafka.sasl_scram_password = ENV['CLOUDKARAFKA_PASSWORD']
config.kafka.sasl_scram_mechanism = "sha256"
config.kafka.ssl_ca_certs_from_system = true
config.client_id = "example_app"
# Enable those 2 lines if you use Rails and want to use hash with indifferent access for
# Karafka params
# require 'active_support/hash_with_indifferent_access'
# config.params_base_class = HashWithIndifferentAccess
end
after_init do
WaterDrop.setup { |config| config.deliver = !Karafka.env.test? }
end
end
Karafka.monitor.subscribe(Karafka::Instrumentation::Listener)
# Consumer group defined with the 0.6+ routing style (recommended)
App.consumer_groups.draw do
consumer_group :batched_group do
batch_fetching true
topic :xml_data do
consumer XmlMessagesConsumer
batch_consuming false
parser XmlParser
end
topic :inline_batch_data do
consumer InlineBatchConsumer
batch_consuming true
end
topic :callbacked_data do
consumer CallbackedConsumer
batch_consuming true
end
end
# A ping-pong implementation using karafka-sidekiq backend
# @note The backend is totally optional, if you disable it, the game will
# work as well
consumer_group :async_pong do
topic :ping do
consumer Pong::PingConsumer
backend :sidekiq
end
topic :pong do
consumer Pong::PongConsumer
backend :sidekiq
end
end
end
App.boot!
What we have done is added a new TopicMapper called CloudKarafkaTopicMapper and added the kafka configuration to connect to CloudKarafka using SASL/SCRAM.
You will also need to edit the file
lib/tasks/sender.rake
and add the topic prefix on all topics. The topic prefix is your username with a dash after.
This is because all topics on shared clusters are prefixed with your username.
If you are using Docker the setting
config.kafka.ssl_ca_certs_from_system
will not work.
You will need to explicitly set the CA certificate to use with the setting
config.kafka.ssl_ca_cert_file_path
.
Download the CA certificate here:
https://www.cloudkarafka.com/certs/cloudkarafka.ca