module ElasticAPM::Spies::SQSSpy::Ext
def self.prepended(mod)
def send_message(params = {}, options = {})
unless (transaction = ElasticAPM.current_transaction)
return super(params, options)
end
queue_name = ElasticAPM::Spies::SQSSpy.queue_name(params)
span_name = queue_name ? "SQS SEND to #{queue_name}" : 'SQS SEND'
region = ElasticAPM::Spies::SQSSpy.region_from_url(params[:queue_url])
context = ElasticAPM::Spies::SQSSpy.span_context(
queue_name,
region || config.region
)
ElasticAPM.with_span(
span_name,
TYPE,
subtype: SUBTYPE,
action: 'send',
context: context
) do |span|
trace_context = span&.trace_context || transaction.trace_context
trace_context.apply_headers do |key, value|
params[:message_attributes] ||= {}
params[:message_attributes][key] ||= {}
params[:message_attributes][key][:string_value] = value
params[:message_attributes][key][:data_type] = 'String'
end
ElasticAPM::Spies::SQSSpy.without_net_http do
super(params, options)
end
end
end
def send_message_batch(params = {}, options = {})
unless (transaction = ElasticAPM.current_transaction)
return super(params, options)
end
queue_name = ElasticAPM::Spies::SQSSpy.queue_name(params)
span_name =
queue_name ? "SQS SEND_BATCH to #{queue_name}" : 'SQS SEND_BATCH'
region = ElasticAPM::Spies::SQSSpy.region_from_url(params[:queue_url])
context = ElasticAPM::Spies::SQSSpy.span_context(
queue_name,
region || config.region
)
ElasticAPM.with_span(
span_name,
TYPE,
subtype: SUBTYPE,
action: 'send_batch',
context: context
) do |span|
trace_context = span&.trace_context || transaction.trace_context
trace_context.apply_headers do |key, value|
params[:entries].each do |message|
message[:message_attributes] ||= {}
message[:message_attributes][key] ||= {}
message[:message_attributes][key][:string_value] = value
message[:message_attributes][key][:data_type] = 'String'
end
end
ElasticAPM::Spies::SQSSpy.without_net_http do
super(params, options)
end
end
end
def receive_message(params = {}, options = {})
unless ElasticAPM.current_transaction
return super(params, options)
end
queue_name = ElasticAPM::Spies::SQSSpy.queue_name(params)
span_name =
queue_name ? "SQS RECEIVE from #{queue_name}" : 'SQS RECEIVE'
region = ElasticAPM::Spies::SQSSpy.region_from_url(params[:queue_url])
context = ElasticAPM::Spies::SQSSpy.span_context(
queue_name,
region || config.region
)
ElasticAPM.with_span(
span_name,
TYPE,
subtype: SUBTYPE,
action: 'receive',
context: context
) do
ElasticAPM::Spies::SQSSpy.without_net_http do
super(params, options)
end
end
end
def delete_message(params = {}, options = {})
unless ElasticAPM.current_transaction
return super(params, options)
end
queue_name = ElasticAPM::Spies::SQSSpy.queue_name(params)
span_name = queue_name ? "SQS DELETE from #{queue_name}" : 'SQS DELETE'
region = ElasticAPM::Spies::SQSSpy.region_from_url(params[:queue_url])
context = ElasticAPM::Spies::SQSSpy.span_context(
queue_name,
region || config.region
)
ElasticAPM.with_span(
span_name,
TYPE,
subtype: SUBTYPE,
action: 'delete',
context: context
) do
ElasticAPM::Spies::SQSSpy.without_net_http do
super(params, options)
end
end
end
def delete_message_batch(params = {}, options = {})
unless ElasticAPM.current_transaction
return super(params, options)
end
queue_name = ElasticAPM::Spies::SQSSpy.queue_name(params)
span_name =
queue_name ? "SQS DELETE_BATCH from #{queue_name}" : 'SQS DELETE_BATCH'
region = ElasticAPM::Spies::SQSSpy.region_from_url(params[:queue_url])
context = ElasticAPM::Spies::SQSSpy.span_context(
queue_name,
region || config.region
)
ElasticAPM.with_span(
span_name,
TYPE,
subtype: SUBTYPE,
action: 'delete_batch',
context: context
) do
ElasticAPM::Spies::SQSSpy.without_net_http do
super(params, options)
end
end
end
end