class LogStash::Inputs::Beats::MessageListener

def codec(ctx)

def codec(ctx)
  return if connections_list[ctx].nil?
  connections_list[ctx].codec
end

def extract_target_field(hash)

def extract_target_field(hash)
  if from_filebeat?(hash)
    hash.delete(FILEBEAT_LOG_LINE_FIELD).to_s
  elsif from_logstash_forwarder?(hash)
    hash.delete(LSF_LOG_LINE_FIELD).to_s
  end
end

def flush_buffer(ctx)

def flush_buffer(ctx)
  return if codec(ctx).nil?
  transformer = EventTransformCommon.new(@input)
  codec(ctx).flush do |event|
    transformer.transform(event)
    @queue << event
  end
end

def from_filebeat?(hash)

def from_filebeat?(hash)
  !hash[FILEBEAT_LOG_LINE_FIELD].nil?
end

def from_logstash_forwarder?(hash)

def from_logstash_forwarder?(hash)
  !hash[LSF_LOG_LINE_FIELD].nil?
end

def initialize(queue, input)

def initialize(queue, input)
  @connections_list = ThreadSafe::Hash.new
  @queue = queue
  @logger = input.logger
  @input = input
  @nocodec_transformer = RawEventTransform.new(@input)
  @codec_transformer = DecodedEventTransform.new(@input)
end

def onChannelInitializeException(ctx, cause)

def onChannelInitializeException(ctx, cause)
  # This is mostly due to a bad certificate or keys, running Logstash in debug mode will show more information
  if cause.is_a?(Java::JavaLang::IllegalArgumentException)
    if input.logger.debug?
      input.logger.error("Looks like you either have an invalid key or your private key was not in PKCS8 format.")
    else
      input.logger.error("Looks like you either have an invalid key or your private key was not in PKCS8 format.", :exception => cause)
    end
  else
    input.logger.warn("Error when creating a connection", :exception => cause.to_s)
  end
end

def onConnectionClose(ctx)

def onConnectionClose(ctx)
  unregister_connection(ctx)
end

def onException(ctx, cause)

def onException(ctx, cause)
  unregister_connection(ctx) unless connections_list[ctx].nil?
end

def onNewConnection(ctx)

def onNewConnection(ctx)
  register_connection(ctx)
end

def onNewMessage(ctx, message)

def onNewMessage(ctx, message)
  hash = message.getData()
  target_field = extract_target_field(hash)
  if target_field.nil?
    event = LogStash::Event.new(hash)
    @nocodec_transformer.transform(event)
    @queue << event
  else
    codec(ctx).accept(CodecCallbackListener.new(target_field,
                                                hash,
                                                message.getIdentityStream(),
                                                @codec_transformer,
                                                @queue))
  end
end

def register_connection(ctx)

def register_connection(ctx)
  connections_list[ctx] = ConnectionState.new(ctx, input.codec.dup)
end

def unregister_connection(ctx)

def unregister_connection(ctx)
  flush_buffer(ctx)
  connections_list.delete(ctx)
end