module OpenTelemetry::Instrumentation::Que::Patches::QueJob::ClassMethods
def bulk_enqueue(**_kwargs, &block)
def bulk_enqueue(**_kwargs, &block) tracer = Que::Instrumentation.instance.tracer otel_config = Que::Instrumentation.instance.config tracer.in_span('publish', kind: :producer) do |span| super do yield job_attrs = Thread.current[:que_jobs_to_bulk_insert][:jobs_attrs] unless job_attrs.empty? span.name = "#{job_attrs.first[:job_class]} publish" span.add_attributes(QueJob.job_attributes(job_attrs.first)) end if otel_config[:propagation_style] != :none job_options = Thread.current[:que_jobs_to_bulk_insert][:job_options] job_options[:tags] ||= [] OpenTelemetry.propagation.inject(job_options[:tags], setter: TagSetter) end end end end
def enqueue(*args, job_options: {}, **arg_opts)
def enqueue(*args, job_options: {}, **arg_opts) # In Que version 2.1.0 `bulk_enqueue` was introduced. # In that case, the span is created inside the `bulk_enqueue` method. return super(*args, **arg_opts) if gem_version >= Gem::Version.new('2.1.0') && Thread.current[:que_jobs_to_bulk_insert] tracer = Que::Instrumentation.instance.tracer otel_config = Que::Instrumentation.instance.config tracer.in_span('publish', kind: :producer) do |span| # Que doesn't have a good place to store metadata. There are # basically two options: the job payload and the job tags. # # Using the job payload is very brittle. We'd have to modify # existing Hash arguments or add a new argument when there are # no arguments we can modify. If the server side is not using # this instrumentation yet (e.g. old jobs before the # instrumentation was added or when instrumentation is being # added to client side first) then the server can error out due # to unexpected payload. # # The second option (which we are using here) is to use tags. # They also are not meant for tracing information but they are # much safer to use than modifying the payload. tags = job_options[:tags] if otel_config[:propagation_style] != :none tags ||= [] OpenTelemetry.propagation.inject(tags, setter: TagSetter) end job = super(*args, job_options: job_options.merge(tags: tags), **arg_opts) job_attrs = job.que_attrs span.name = "#{job_attrs[:job_class]} publish" span.add_attributes(QueJob.job_attributes(job_attrs)) job end end
def gem_version
def gem_version @gem_version ||= Gem.loaded_specs['que'].version end