class Karafka::Cli::Server
Server Karafka Cli action
def call
def call validate! puts 'Starting Karafka server' cli.info if cli.options[:daemon] FileUtils.mkdir_p File.dirname(cli.options[:pid]) daemonize end # We assign active topics on a server level, as only server is expected to listen on # part of the topics Karafka::Server.consumer_groups = cli.options[:consumer_groups] # Remove pidfile on stop, just before the server instance is going to be GCed # We want to delay the moment in which the pidfile is removed as much as we can, # so instead of removing it after the server stops running, we rely on the gc moment # when this object gets removed (it is a bit later), so it is closer to the actual # system process end. We do that, so monitoring and deployment tools that rely on pids # won't alarm or start new system process up until the current one is finished ObjectSpace.define_finalizer(self, proc { send(:clean) }) Karafka::Server.run end
def clean
def clean FileUtils.rm_f(cli.options[:pid]) if cli.options[:pid] end
def daemonize
def daemonize ::Process.daemon(true) File.open( cli.options[:pid], 'w' ) { |file| file.write(::Process.pid) } end
def validate!
Checks the server cli configuration
def validate! result = Schemas::ServerCliOptions.call(cli.options) return if result.success? raise Errors::InvalidConfiguration, result.errors end