module ZuoraConnect
require "uri"
require 'aws-sigv4'
require 'aws-sdk-s3'
require 'aws-sdk-ses'
require 'aws-sdk-kms'
class AppInstanceBase < ActiveRecord::Base
default_scope {select(ZuoraConnect::AppInstance.column_names.delete_if {|x| ["catalog_mapping", "catalog"].include?(x) }) }
after_initialize :init
after_create :initialize_redis_placeholders
before_destroy :prune_data
self.table_name = "zuora_connect_app_instances"
attr_accessor :options, :mode, :logins, :task_data, :last_refresh, :username, :password, :s3_client, :api_version, :drop_message, :new_session_message, :connect_user, :logitems, :user_timezone
@@telegraf_host = nil
REFRESH_TIMEOUT = 2.minute #Used to determine how long to wait on current refresh call before executing another
INSTANCE_REFRESH_WINDOW = 1.hours #Used to set how how long till app starts attempting to refresh cached task connect data
INSTANCE_REDIS_CACHE_PERIOD = 24.hours #Used to determine how long to cached task data will live for
API_LIMIT_TIMEOUT = 2.minutes #Used to set the default for expiring timeout when api rate limiting is in effect
BLANK_OBJECT_ID_LOOKUP = 'BlankValueSupplied'
HOLDING_PATTERN_SLEEP = 5.seconds
CONNECT_APPLICATION_ID = 0
CONNECT_COMMUNICATION_SLEEP = Rails.env.test? ? 0.seconds : 5.seconds
CATALOG_LOOKUP_PAGE_SIZE = 10_000
CATALOG_LOOKUP_CACHE_TIME_KEY = 'CatalogCachedAt'
CATALOG_LOOKUP_TTL = 60.seconds
CATALOG_LOOKUP_CACHE_RESULT_KEY = 'CatalogCache'
TIMEZONE_LOG_RATE_LIMIT_KEY = 'TimezoneLoggedAt'
TIMEZONE_LOG_PERIOD = 4.hours
IGNORED_LOCALS = ['fr', 'ja', 'es', 'zh', 'de']
INTERNAL_HOSTS = []
LOGIN_TENANT_DESTINATION = 'target_login'
AWS_AUTH_ERRORS = [
Aws::Sigv4::Errors::MissingCredentialsError,
Aws::Errors::MissingCredentialsError,
Aws::S3::Errors::AccessDenied,
Aws::SES::Errors::AccessDenied,
Aws::KMS::Errors::AccessDeniedException
].freeze
AWS_AUTH_ERRORS_MSG = "AWS Auth Errors".freeze
def init
self.connect_user = 'Nobody'
self.logitems = {}
self.task_data = {}
self.options = Hash.new
self.logins = Hash.new
self.api_version = "v2"
self.attr_builder("timezone", ZuoraConnect.configuration.default_time_zone)
self.attr_builder("locale", ZuoraConnect.configuration.default_locale)
PaperTrail.whodunnit = "Backend" if defined?(PaperTrail)
if defined?(ElasticAPM) && ElasticAPM.running?
ElasticAPM.set_user("Backend")
if ElasticAPM.respond_to?(:set_label)
ElasticAPM.set_label(:app_instance, self.id)
else
ElasticAPM.set_label(:app_instance, self.id)
end
end
if INSTANCE_REFRESH_WINDOW > INSTANCE_REDIS_CACHE_PERIOD
raise "The instance refresh window cannot be greater than the instance cache period"
end
self.apartment_switch(nil, false)
if ZuoraConnect.logger.is_a?(Ougai::Logger)
ZuoraConnect.logger.with_fields.merge!(default_ougai_items)
end
if Rails.logger.is_a?(Ougai::Logger)
Rails.logger.with_fields.merge!(default_ougai_items)
end
end
def initialize_redis_placeholders
if defined?(Redis.current)
unless Redis.current.zscore("AppInstance:Deleted", "placeholder").present? # O(1)
Redis.current.zadd("AppInstance:Deleted", 9_999_999_999, "placeholder") # O(log(N))
end
if self.id.present?
if Redis.current.zscore("AppInstance:Deleted", self.id).present? # O(1)
Redis.current.zrem("AppInstance:Deleted", self.id) # O(log(N))
end
end
unless Redis.current.zscore("APILimits", "placeholder").present? # O(1)
Redis.current.zadd("APILimits", 9_999_999_999, "placeholder") # O(log(N))
end
unless Redis.current.zscore("InstanceRefreshing", "placeholder").present? # O(1)
Redis.current.zadd("InstanceRefreshing", 9_999_999_999, "placeholder") # O(log(N))
end
end
if defined?(Resque.redis)
unless Resque.redis.zscore("PauseQueue", "placeholder").present? # O(1)
Resque.redis.zadd("PauseQueue", 9_999_999_999, "placeholder") # O(log(N))
end
end
true
end
def prune_data
if defined?(Redis.current)
Redis.current.zadd("AppInstance:Deleted", Time.now.to_i, self.id)
Redis.current.del("AppInstance:#{self.id}")
Redis.current.zrem("APILimits", self.id)
Redis.current.zrem("InstanceRefreshing", self.id)
end
if defined?(Resque.redis)
Resque.redis.zrange("PauseQueue", 0, -1).each do |key|
Resque.redis.zrem("PauseQueue", key) if key.split("__").first.to_i == self.id
end
end
return true
end
def apartment_switch(method = nil, migrate = false)
switch_count ||= 0
if self.persisted?
begin
Apartment::Tenant.switch!(self.id)
rescue Apartment::TenantNotFound => ex
sleep(2)
begin
Apartment::Tenant.create(self.id.to_s)
rescue Apartment::TenantExists => ex
end
if (switch_count += 1) < 2
retry
else
raise
end
end
if migrate
Apartment::Migrator.migrate(self.id)
end
end
Thread.current[:appinstance] = self
end
def default_ougai_items
return {app_instance_id: self.id, tenant_ids: self.zuora_tenant_ids, organization: self.organizations, environment: self.environment}
end
def new_session(session: self.data_lookup, username: self.access_token, password: self.refresh_token, holding_pattern: false, **args)
self.api_version = "v2"
self.username = username
self.password = password
self.last_refresh = session["#{self.id}::last_refresh"]
self.connect_user = session["#{self.id}::user::email"] if session["#{self.id}::user::email"].present?
PaperTrail.whodunnit = self.connect_user if defined?(PaperTrail)
ElasticAPM.set_user(self.connect_user) if defined?(ElasticAPM) && ElasticAPM.running?
recoverable_session = false
## DEV MODE TASK DATA MOCKUP
if ZuoraConnect.configuration.mode != "Production"
mock_task_data = {
"id" => ZuoraConnect.configuration.dev_mode_appinstance,
"mode" => ZuoraConnect.configuration.dev_mode_mode,
"name" => "Developer Instance"
}
case ZuoraConnect.configuration.dev_mode_options.class
when Hash
self.options = ZuoraConnect.configuration.dev_mode_options
when Array
mock_task_data["options"] = ZuoraConnect.configuration.dev_mode_options
end
ZuoraConnect.configuration.dev_mode_logins.each do |k,v|
v = v.merge({"entities": [] }) if !v.keys.include?("entities")
mock_task_data[k] = v
end
self.build_task(task_data: mock_task_data, session: session)
self.set_backup_creds if !self['zuora_logins'].present?
self.last_refresh = Time.now.to_i
else
time_expire = (session["#{self.id}::last_refresh"] || Time.now).to_i - INSTANCE_REFRESH_WINDOW.ago.to_i
if session.empty?
self.new_session_message = "REFRESHING - Session Empty"
ZuoraConnect.logger.debug(self.new_session_message)
raise ZuoraConnect::Exceptions::HoldingPattern if holding_pattern && !self.mark_for_refresh
self.refresh(session: session)
elsif (self.id != session["appInstance"].to_i)
self.new_session_message = "REFRESHING - AppInstance ID(#{self.id}) does not match session id(#{session["appInstance"].to_i})"
ZuoraConnect.logger.debug(self.new_session_message)
raise ZuoraConnect::Exceptions::HoldingPattern if holding_pattern && !self.mark_for_refresh
self.refresh(session: session)
elsif session["#{self.id}::task_data"].blank?
self.new_session_message = "REFRESHING - Task Data Blank"
ZuoraConnect.logger.debug(self.new_session_message)
raise ZuoraConnect::Exceptions::HoldingPattern if holding_pattern && !self.mark_for_refresh
self.refresh(session: session)
elsif session["#{self.id}::last_refresh"].blank?
self.new_session_message = "REFRESHING - No Time on Cookie"
recoverable_session = true
ZuoraConnect.logger.debug(self.new_session_message)
raise ZuoraConnect::Exceptions::HoldingPattern if holding_pattern && !self.mark_for_refresh
self.refresh(session: session)
# If the cache is expired and we can aquire a refresh lock
elsif (session["#{self.id}::last_refresh"].to_i < INSTANCE_REFRESH_WINDOW.ago.to_i) && self.mark_for_refresh
self.new_session_message = "REFRESHING - Session Old by #{time_expire.abs} second"
recoverable_session = true
ZuoraConnect.logger.debug(self.new_session_message)
self.refresh(session: session)
else
if time_expire < 0
self.new_session_message = ["REBUILDING - Expired by #{time_expire} seconds", self.marked_for_refresh? ? " cache updating as of #{self.reset_mark_refreshed_at} seconds ago" : nil].compact.join(',')
else
self.new_session_message = "REBUILDING - Expires in #{time_expire} seconds"
end
ZuoraConnect.logger.debug(self.new_session_message, self.default_ougai_items)
self.build_task(task_data: session["#{self.id}::task_data"], session: session)
end
end
return self
rescue ZuoraConnect::Exceptions::HoldingPattern => ex
while self.marked_for_refresh?
ZuoraConnect.logger.info("Holding - Expires in #{self.reset_mark_expires_at}. '#{self.new_session_message}'", self.default_ougai_items)
sleep(HOLDING_PATTERN_SLEEP)
end
self.reload_attributes([:refresh_token, :oauth_expires_at, :access_token])
session = self.data_lookup(session: session)
retry
rescue ZuoraConnect::Exceptions::MissMatch => ex
self.delete_app_instance
session = {}
ZuoraConnect.logger.error(ex, self.default_ougai_items.merge({app_instance_id_new: self.task_data['id']}))
retry
rescue ZuoraConnect::Exceptions::InvalidCredentialSet => ex
raise
rescue => ex
if recoverable_session
ZuoraConnect.logger.warn("REBUILDING - Using backup expired cache", ex, self.default_ougai_items)
self.build_task(task_data: session["#{self.id}::task_data"], session: session)
return self
else
ZuoraConnect.logger.error("Failed new session", ex, self.default_ougai_items)
raise
end
ensure
begin
I18n.locale = self.locale
rescue I18n::InvalidLocale => ex
ZuoraConnect.logger.error(ex) if !IGNORED_LOCALS.include?(ex.locale.to_s.downcase)
end
self.set_timezone
if self.task_data.present?
tenants = self.task_data.fetch('tenant_ids', [])
organizations = self.task_data.fetch('organizations', [])
if defined?(ElasticAPM) && ElasticAPM.running?
if ElasticAPM.respond_to?(:set_label)
ElasticAPM.set_label(:tenant_id, tenants.first)
ElasticAPM.set_label(:organization, organizations.first)
else
ElasticAPM.set_label(:tenant_id, tenants.first)
ElasticAPM.set_label(:organization, organizations.first)
end
end
params = {
name: self.task_data.dig('name'),
zuora_entity_ids: (self.task_data.dig(LOGIN_TENANT_DESTINATION,'entities') || []).select {|entity| !entity['skip'].to_bool}.map{|e| e['id']}.uniq,
zuora_global_tenant_id: task_data.dig(LOGIN_TENANT_DESTINATION, 'entities', 0, 'globalEntityId').to_i, # tenant id of the global/parent entity, 0 if nil
zuora_tenant_ids: tenants.map(&:to_s).uniq,
organizations: organizations
}
if self.methods.include?(LOGIN_TENANT_DESTINATION.to_sym)
client = self.send(LOGIN_TENANT_DESTINATION).client
if defined?(client.rest_domain)
ZuoraConnect::RequestIdMiddleware.zuora_rest_domain = client.rest_domain
params.merge!({zuora_domain: client.rest_domain, environment: client.environment })
end
end
# makes it safe to add elements to params which don't correspond to existing columns in an app's schema
# rejects those elements which do not correspond to model attributes for your app
params = params.reject{|k,v| !self.attributes.keys.member?(k.to_s) || self[k] == v}
self.update_columns(params) if params.present?
end
end
def set_timezone(timezone: self.timezone, type: :default)
if timezone.blank?
timezone = self.timezone
end
if type == :default
Time.zone = timezone
elsif type == :user
begin
sql = <<-eos
SELECT zuora_users.zuora_identity_response FROM "#{self.id}".zuora_users ORDER BY zuora_users.updated_at DESC LIMIT 1;
eos
user = ActiveRecord::Base.connection.execute(sql).to_a.first
if user.present?
zuora_identity_response = JSON.parse(user.fetch('zuora_identity_response', '{}'))
self.user_timezone = zuora_identity_response.values.first&.dig('timeZone')
else
if (Redis.current.hget(TIMEZONE_LOG_RATE_LIMIT_KEY, self.id).to_i + TIMEZONE_LOG_PERIOD.to_i) <= Time.now.to_i
Rails.logger.error('Cannot find any user to set the timezone', app_instance_id: self.id)
Redis.current.hset(TIMEZONE_LOG_RATE_LIMIT_KEY, self.id, Time.now.to_i)
end
end
rescue => ex
Rails.logger.error('There is an error while getting timezone users', ex)
end
if self.user_timezone.present?
# connect instance which has a custom timezone
if !self.auto_deployed? && (
ActiveSupport::TimeZone[self.task_data.dig('user_settings', 'timezone') || '']&.utc_offset !=
ActiveSupport::TimeZone[self.user_timezone]&.utc_offset
)
if self.environment == 'Production' &&
(Redis.current.hget(TIMEZONE_LOG_RATE_LIMIT_KEY, self.id).to_i + TIMEZONE_LOG_PERIOD.to_i) <= Time.now.to_i
ZuoraConnect.logger.error(
"Instance and user timezones are different. User has '#{self.user_timezone}' and " \
"instance has '#{self.task_data.dig('user_settings', 'timezone')}'",
app_instance_id: self.id
)
Redis.current.hset(TIMEZONE_LOG_RATE_LIMIT_KEY, self.id, Time.now.to_i)
end
self.user_timezone = nil
Time.zone = timezone
else
begin
Time.zone = self.user_timezone
rescue ArgumentError
Rails.logger.error('Malformed user timezone', app_instance_id: self.id)
Time.zone = timezone
end
end
else
Time.zone = timezone
end
end
rescue => e
Rails.logger.error('Malformed timezone used', e, app_instance_id: self.id)
Time.zone = self.timezone
end
def auto_deployed?
self.id >= 25000000
end
def refresh(session: {})
refresh_count ||= 0
skip_connect ||= ZuoraConnect.configuration.skip_connect
begin
#Check how app was deployed
if !self.auto_deployed? && !skip_connect
self.check_oauth_state
response = HTTParty.get(ZuoraConnect.configuration.url + "/api/#{self.api_version}/tools/tasks/#{self.id}.json",:body => {:access_token => self.access_token})
if response.code == 200
begin
parsed_json = JSON.parse(response.body)
rescue JSON::ParserError => ex
raise ZuoraConnect::Exceptions::ConnectCommunicationError.new("JSON parse error", response.body, response.code)
end
self.build_task(task_data: parsed_json, session: session)
self.set_backup_creds
self.save(validate: false) if self.changed?
else
raise ZuoraConnect::Exceptions::ConnectCommunicationError.new("Error Communicating with Connect", response.body, response.code)
end
else
self.build_task(task_data: self.zuora_logins, session: session)
end
self.last_refresh = Time.now.to_i
self.cache_app_instance
self.reset_mark_for_refresh
rescue *(ZuoraAPI::Login::CONNECTION_EXCEPTIONS + ZuoraAPI::Login::CONNECTION_READ_EXCEPTIONS) => ex
refresh_count += 1
if refresh_count < 3
sleep(10)
ZuoraConnect.logger.debug("REFRESH TASK - Connection Failure Retrying(#{refresh_count})", ex, self.default_ougai_items)
retry
else
ZuoraConnect.logger.fatal("REFRESH TASK - Connection Failed", ex)
raise
end
rescue ZuoraConnect::Exceptions::ConnectCommunicationError => ex
refresh_count += 1
if refresh_count < 3
ZuoraConnect.logger.debug("REFRESH TASK - Communication Failure Retrying(#{refresh_count})", ex, self.default_ougai_items)
self.refresh_oauth if ex.code == 401
retry
else
ZuoraConnect.logger.fatal("REFRESH TASK - Communication Failed #{ex.code}", ex, self.default_ougai_items)
raise
end
end
rescue => ex
refresh_count += 1
if self['zuora_logins'].present? && refresh_count < 3
ZuoraConnect.logger.warn("REFRESH TASK - Fallback to local encrypted store", ex, self.default_ougai_items)
skip_connect = true
retry
end
raise
end
def aws_secrets
(Rails.application.secrets.aws || {}).transform_keys { |key| key.to_s }
end
#### START KMS ENCRYPTION Methods ####
def set_backup_creds
if self.kms_key.present? && self.kms_key.match(/^arn:aws:.*/) && self.task_data.present?
self.zuora_logins = self.strip_cache_data(object: self.task_data.dup, keys: ['applications', 'tokens', 'user_settings'])
end
end
def zuora_logins=(val)
write_attribute(:zuora_logins, kms_encrypt(val.to_json))
rescue Aws::KMS::Errors::ValidationException, Aws::KMS::Errors::NotFoundException, *AWS_AUTH_ERRORS => ex
Rails.logger.warn(AWS_AUTH_ERRORS_MSG, ex)
end
def zuora_logins
raise ZuoraConnect::Exceptions::ConnectCommunicationError.new("Zuora Logins is blank, cannot decrypt.") if super.blank?
return JSON.parse(kms_decrypt(super))
end
def kms_decrypt(value)
kms_tries ||= 0
kms_client = Aws::KMS::Client.new({region: aws_secrets['AWS_REGION'], credentials: self.aws_auth_client}.delete_if { |k, v| v.blank? })
resp = kms_client.decrypt({ciphertext_blob: [value].pack("H*") })
return resp.plaintext
rescue *AWS_AUTH_ERRORS => ex
if (kms_tries += 1) < 3
Rails.logger.warn(AWS_AUTH_ERRORS_MSG, ex)
retry
else
Rails.logger.error(AWS_AUTH_ERRORS_MSG, ex)
raise
end
end
def kms_encrypt(value)
kms_tries ||= 0
kms_client = Aws::KMS::Client.new({region: aws_secrets['AWS_REGION'], credentials: self.aws_auth_client}.delete_if {|k,v| v.blank? })
resp = kms_client.encrypt({key_id: kms_key, plaintext: value})
return resp.ciphertext_blob.unpack('H*').first
rescue *AWS_AUTH_ERRORS => ex
if (kms_tries += 1) < 3
Rails.logger.warn(AWS_AUTH_ERRORS_MSG, ex)
retry
else
Rails.logger.error(AWS_AUTH_ERRORS_MSG, ex)
raise
end
end
def kms_key
return ENV['AWS_KMS_ARN'] || aws_secrets['AWS_KMS_ARN']
end
def aws_auth_client
if Rails.env.to_s == 'development'
return Aws::Credentials.new(aws_secrets['AWS_ACCESS_KEY_ID'], aws_secrets['AWS_SECRET_ACCESS_KEY'])
else
return nil
end
end
#### END KMS ENCRYPTION Methods ####
#### START Metrics Methods ####
def logitem(item: {}, reset: false)
self.logitems = {} if self.logitems.class != Hash
if item.class == Hash
self.logitems = reset ? item : self.logitems.merge(item)
end
Thread.current[:appinstance] = self
end
def self.write_to_telegraf(*args)
if ZuoraConnect.configuration.enable_metrics && !defined?(Prometheus)
@@telegraf_host = ZuoraConnect::Telegraf.new() if @@telegraf_host == nil
unicorn_stats = ZuoraObservability::Metrics.unicorn_listener if defined?(Unicorn) && Unicorn.respond_to?(:listener_names)
@@telegraf_host.write(direction: 'Raindrops', tags: {}, values: unicorn_stats) unless unicorn_stats.blank?
return @@telegraf_host.write(*args)
end
end
#### END Task Methods ####
#### START Task Methods ####
def build_task(task_data: {}, session: {})
session = {} if session.blank?
self.task_data = task_data
self.mode = self.task_data["mode"]
if task_data['id'].to_s != self.id.to_s
raise ZuoraConnect::Exceptions::MissMatch.new("Wrong Instance Identifier/Lookup")
end
self.task_data.each do |k,v|
if k.match(/^(.*)_login$/)
tmp = ZuoraConnect::Login.new(v)
if v["tenant_type"] == "Zuora"
if tmp.entities.size > 0
tmp.entities.each do |value|
entity_id = value["id"]
tmp.client(entity_id).current_session = session["#{self.id}::#{k}::#{entity_id}:current_session"] if session["#{self.id}::#{k}::#{entity_id}:current_session"]
tmp.client(entity_id).bearer_token = session["#{self.id}::#{k}::#{entity_id}:bearer_token"] if session["#{self.id}::#{k}::#{entity_id}:bearer_token"]
tmp.client(entity_id).oauth_session_expires_at = session["#{self.id}::#{k}::#{entity_id}:oauth_session_expires_at"] if session["#{self.id}::#{k}::#{entity_id}:oauth_session_expires_at"]
end
else
tmp.client.current_session = session["#{self.id}::#{k}:current_session"] if session["#{self.id}::#{k}:current_session"]
tmp.client.bearer_token = session["#{self.id}::#{k}:bearer_token"] if session["#{self.id}::#{k}:bearer_token"] && tmp.client.respond_to?(:bearer_token) ## need incase session id goes from basic to aouth in same redis store
tmp.client.oauth_session_expires_at = session["#{self.id}::#{k}:oauth_session_expires_at"] if session["#{self.id}::#{k}:oauth_session_expires_at"] && tmp.client.respond_to?(:oauth_session_expires_at)
end
end
self.logins[k] = tmp
self.attr_builder(k, @logins[k])
elsif k == "options"
v.each do |opt|
self.options[opt["config_name"]] = opt
end
elsif k == "user_settings"
self.timezone = v["timezone"]
self.locale = v["local"]
end
end
rescue ZuoraConnect::Exceptions::MissMatch => ex
raise
rescue ZuoraConnect::Exceptions::InvalidCredentialSet => ex
raise
rescue => ex
ZuoraConnect.logger.error("Build Task Error", ex)
ZuoraConnect.logger.error("Task Data: #{task_data}") if task_data.present?
if session.present?
ZuoraConnect.logger.error("Task Session: #{session.to_h}") if session.methods.include?(:to_h)
ZuoraConnect.logger.error("Task Session: #{session.to_hash}") if session.methods.include?(:to_hash)
end
raise
end
def updateOption(optionId, value)
response = HTTParty.get(ZuoraConnect.configuration.url + "/api/#{self.api_version}/tools/application_options/#{optionId}/edit?value=#{value}",:body => {:access_token => self.username})
end
#This can update an existing login, add a new login, change to another existing login
#EXAMPLE: {"name": "ftp_login_14","username": "ftplogin7","tenant_type": "Custom","password": "test2","url": "www.ftp.com","custom_data": { "path": "/var/usr/test"}}
def update_logins(options)
update_login_count ||= 0
response = HTTParty.post(ZuoraConnect.configuration.url + "/api/#{self.api_version}/tools/tasks/#{self.id}/logins",:body => {:access_token => self.username}.merge(options))
parsed_json = JSON.parse(response.body)
if response.code == 200
if defined?(Redis.current)
self.build_task(task_data: parsed_json, session: self.data_lookup)
self.last_refresh = Time.now.to_i
self.cache_app_instance
end
return parsed_json
elsif response.code == 400
raise ZuoraConnect::Exceptions::APIError.new(message: parsed_json['errors'].join(' '), response: response)
else
raise ZuoraConnect::Exceptions::ConnectCommunicationError.new("Error Communicating with Connect", response.body, response.code)
end
rescue *(ZuoraAPI::Login::CONNECTION_EXCEPTIONS + ZuoraAPI::Login::CONNECTION_READ_EXCEPTIONS) => ex
if (update_login_count += 1) < 3
retry
else
raise
end
rescue ZuoraConnect::Exceptions::ConnectCommunicationError => ex
if (update_login_count += 1) < 3
if ex.code == 401
self.refresh_oauth
end
retry
else
raise
end
end
def fetch_org_details(debug: false)
details_count ||= 0
self.refresh if !defined?(self.target_login)
response = HTTParty.get("#{ZuoraConnect.configuration.url}/api/#{self.api_version}/tenants/search?hostname=#{self.target_login.client.hostname}&node_id=#{self.zuora_entity_ids.first}")
if response.success?
parsed_json = JSON.parse(response.body)
#Set Org
if self.auto_deployed? && parsed_json['organization'].present?
login_cache = self.zuora_logins
login_cache.delete('organization')
self.zuora_logins = login_cache.merge({'organizations' => [parsed_json['organization']]})
end
if defined?(ZuoraConnect::AppInstance::CONNECT_APPLICATION_ID)
downloads = parsed_json.fetch('downloads',[]).select{|a| a['applicationId'] == ZuoraConnect::AppInstance::CONNECT_APPLICATION_ID }.map { |h| h.slice('Name', 'provisionState') }
Rails.logger.info("Instance Downloads: #{downloads.to_s}") if debug
if downloads.size > 1
self.provision_status = 'MultipleProvisioningRecords'
self.provisioned_app = downloads.map {|d| d['Name']}.to_s
elsif downloads.size == 1
self.provision_status = downloads.first['provisionState']
self.provisioned_app = downloads.first['Name']
else
self.provision_status = 'NoProvisioningRecords'
self.provisioned_app = nil
end
end
if self.changed?
self.save(:validate => false)
self.refresh
end
return parsed_json
end
rescue *(ZuoraAPI::Login::CONNECTION_EXCEPTIONS + ZuoraAPI::Login::CONNECTION_READ_EXCEPTIONS) => ex
if (details_count += 1) < 3
retry
else
raise
end
end
def update_task(options)
update_task_count ||= 0
response = HTTParty.post(ZuoraConnect.configuration.url + "/api/#{self.api_version}/tools/tasks/#{self.id}/update_task",:body => {:access_token => self.username}.merge(options))
parsed_json = JSON.parse(response.body)
if response.code == 200
return parsed_json
elsif response.code == 400
raise ZuoraConnect::Exceptions::APIError.new(message: parsed_json['errors'].join(' '), response: response)
else
raise ZuoraConnect::Exceptions::ConnectCommunicationError.new("Error Communicating with Connect", response.body, response.code)
end
rescue *(ZuoraAPI::Login::CONNECTION_EXCEPTIONS + ZuoraAPI::Login::CONNECTION_READ_EXCEPTIONS) => ex
if (update_task_count += 1) < 3
retry
else
raise
end
rescue ZuoraConnect::Exceptions::ConnectCommunicationError => ex
if (update_task_count += 1) < 3
if ex.code == 401
self.refresh_oauth
end
retry
else
raise
end
end
#### END Task Methods ####
#### START Connect OAUTH Methods ####
def check_oauth_state(method=nil)
#Refresh token if already expired
if self.oauth_expired?
ZuoraConnect.logger.debug("Before '#{method}' method, Oauth expired")
self.refresh_oauth
end
end
def oauth_expired?
return self.oauth_expires_at.present? ? (self.oauth_expires_at < Time.now.utc) : true
end
def refresh_oauth
refresh_oauth_count ||= 0
response = HTTParty.post("#{ZuoraConnect.configuration.url}/oauth/token", body: {
:grant_type => "refresh_token",
:redirect_uri => ZuoraConnect.configuration.oauth_client_redirect_uri,
:refresh_token => self.refresh_token
})
if response.code == 200
response_body = JSON.parse(response.body)
self.refresh_token = response_body["refresh_token"]
self.access_token = response_body["access_token"]
self.oauth_expires_at = Time.at(response_body["created_at"].to_i) + response_body["expires_in"].seconds
self.save(:validate => false)
else
raise ZuoraConnect::Exceptions::ConnectCommunicationError.new("Error Refreshing Access Token", response.body, response.code)
end
rescue *(ZuoraAPI::Login::CONNECTION_EXCEPTIONS + ZuoraAPI::Login::CONNECTION_READ_EXCEPTIONS) => ex
if (refresh_oauth_count += 1) < 3
sleep(CONNECT_COMMUNICATION_SLEEP)
ZuoraConnect.logger.debug("REFRESH OAUTH - Connection Failure Retrying(#{refresh_oauth_count})", ex, self.default_ougai_items)
retry
else
Rails.logger.fatal("REFRESH OAUTH - Connection Failed", ex, self.default_ougai_items)
raise
end
rescue ZuoraConnect::Exceptions::ConnectCommunicationError => ex
sleep(CONNECT_COMMUNICATION_SLEEP)
self.reload_attributes([:refresh_token, :oauth_expires_at, :access_token]) #Reload only the refresh token for retry
#After reload, if nolonger expired return
return if !self.oauth_expired?
if (refresh_oauth_count += 1) < 3
ZuoraConnect.logger.debug("REFRESH OAUTH - Communication Failure Retrying(#{refresh_oauth_count})", ex, self.default_ougai_items)
retry
else
ZuoraConnect.logger.fatal("REFRESH OAUTH - Communication Failed #{ex.code}", ex, self.default_ougai_items)
raise
end
end
#### END Connect OAUTH Methods ####
#### START AppInstance Temporary Persistance Methods ####
def marked_for_refresh?
if defined?(Redis.current)
Redis.current.zremrangebyscore("InstanceRefreshing", "0", "(#{Time.now.to_i}")
return Redis.current.zscore("InstanceRefreshing", self.id).present?
else
return false
end
end
def reset_mark_for_refresh
Redis.current.zrem("InstanceRefreshing", self.id) if defined?(Redis.current)
end
def reset_mark_refreshed_at
return defined?(Redis.current) ? REFRESH_TIMEOUT.to_i - reset_mark_expires_at : 0
end
def reset_mark_expires_at
if defined?(Redis.current)
refresh_time = Redis.current.zscore("InstanceRefreshing", self.id)
return refresh_time.present? ? (refresh_time - Time.now.to_i).round(0) : 0
else
return 0
end
end
def mark_for_refresh
return defined?(Redis.current) ? Redis.current.zadd("InstanceRefreshing", Time.now.to_i + REFRESH_TIMEOUT.to_i, self.id, {:nx => true}) : true
end
def data_lookup(session: {})
if defined?(Redis.current)
begin
redis_get_command ||= 0
cached_instance = Redis.current.get("AppInstance:#{self.id}")
rescue *(ZuoraAPI::Login::CONNECTION_EXCEPTIONS + ZuoraAPI::Login::CONNECTION_READ_EXCEPTIONS) => ex
if (redis_get_command += 1) < 3
retry
else
raise
end
end
if cached_instance.blank?
ZuoraConnect.logger.debug("Cached AppInstance Missing", self.default_ougai_items)
return session
else
ZuoraConnect.logger.debug("Cached AppInstance Found", self.default_ougai_items)
return decrypt_data(data: cached_instance, rescue_return: session).merge(session)
end
else
return session
end
end
def delete_app_instance
Redis.current.del("AppInstance:#{self.id}")
end
def cache_app_instance
if defined?(Redis.current)
#Task data must be present and the last refresh cannot be old. We dont want to overwite new cache data with old
if self.task_data.present? && (self.last_refresh.to_i > INSTANCE_REFRESH_WINDOW.ago.to_i)
ZuoraConnect.logger.debug("Caching AppInstance", self.default_ougai_items)
Redis.current.setex("AppInstance:#{self.id}", INSTANCE_REDIS_CACHE_PERIOD.to_i, self.encrypt_data(data: self.save_data))
end
end
end
def save_data(session = Hash.new)
self.logins.each do |key, login|
if login.tenant_type == "Zuora"
if login.available_entities.size > 1 && Rails.application.config.session_store != ActionDispatch::Session::CookieStore
login.available_entities.each do |entity_key|
session["#{self.id}::#{key}::#{entity_key}:current_session"] = login.client(entity_key).current_session if login.client.respond_to?(:current_session)
session["#{self.id}::#{key}::#{entity_key}:bearer_token"] = login.client(entity_key).bearer_token if login.client.respond_to?(:bearer_token)
session["#{self.id}::#{key}::#{entity_key}:oauth_session_expires_at"] = login.client(entity_key).oauth_session_expires_at if login.client.respond_to?(:oauth_session_expires_at)
end
else
session["#{self.id}::#{key}:current_session"] = login.client.current_session if login.client.respond_to?(:current_session)
session["#{self.id}::#{key}:bearer_token"] = login.client.bearer_token if login.client.respond_to?(:bearer_token)
session["#{self.id}::#{key}:oauth_session_expires_at"] = login.client.oauth_session_expires_at if login.client.respond_to?(:oauth_session_expires_at)
end
end
end
session["#{self.id}::task_data"] = self.task_data
#Redis is not defined strip out old data
if !defined?(Redis.current)
strip_cache_data(object: session["#{self.id}::task_data"])
end
session["#{self.id}::last_refresh"] = self.last_refresh
session["appInstance"] = self.id
return session
end
def strip_cache_data(object: {}, keys: ['applications', 'tokens','tenant_ids', 'organizations','user_settings'] )
keys.each {|key| object.delete(key) }
object.select {|k,v| k.include?('login') && v['tenant_type'] == 'Zuora'}.each do |login_key, login_data|
object[login_key]['entities'] = login_data.fetch('entities',[]).map {|entity| entity.slice('id', 'tenantId', 'entityId', 'displayName', 'identifier')}
end
return object
end
def encryptor
# Default values for Rails 4 apps
key_iter_num, key_size, salt, signed_salt = [1000, 64, "encrypted cookie", "signed encrypted cookie"]
raise ZuoraConnect::Exceptions::Error.new("'secret_key_base' is not set for rails environment '#{Rails.env}'. Please set in secrets file.") if Rails.application.secrets.secret_key_base.blank?
key_generator = ActiveSupport::KeyGenerator.new(Rails.application.secrets.secret_key_base, iterations: key_iter_num)
secret, sign_secret = [key_generator.generate_key(salt, 32), key_generator.generate_key(signed_salt)]
return ActiveSupport::MessageEncryptor.new(secret, sign_secret)
end
def decrypt_data(data: nil, rescue_return: nil, log_fatal: true)
return data if data.blank?
if Rails.env == 'development'
begin
return JSON.parse(data)
rescue JSON::ParserError => ex
return data
end
else
begin
return JSON.parse(encryptor.decrypt_and_verify(CGI::unescape(data)))
rescue ActiveSupport::MessageVerifier::InvalidSignature => ex
ZuoraConnect.logger.error("Error Decrypting", ex, self.default_ougai_items) if log_fatal
return rescue_return
rescue JSON::ParserError => ex
ZuoraConnect.logger.error("JSON Parse Error", ex, self.default_ougai_items) if log_fatal
return encryptor.decrypt_and_verify(CGI::unescape(data))
end
end
end
def encrypt_data(data: nil)
return data if data.blank?
if Rails.env == 'development'
return data.to_json
else
return encryptor.encrypt_and_sign(data.to_json)
end
end
#### END AppInstance Temporary Persistance Methods ####
### START Resque Helping Methods ####
def api_limit(start: true, time: API_LIMIT_TIMEOUT.to_i)
if start
Redis.current.zadd("APILimits", Time.now.to_i + time, self.id)
else
Redis.current.zrem("APILimits", self.id)
end
end
def api_limit?
Redis.current.zremrangebyscore("APILimits", "0", "(#{Time.now.to_i}")
return Redis.current.zscore("APILimits", self.id).present?
end
def queue_paused?
Resque.redis.zremrangebyscore("PauseQueue", "0", "(#{Time.now.to_i}")
return Resque.redis.zrange("PauseQueue", 0, -1).map {|key| key.split("__")[0]}.include?(self.id.to_s)
end
def queue_pause(time: nil, current_user: 'Default')
key = "#{self.id}__#{current_user}"
if time.present?
raise "Time must be integer of seconds instead of #{time.class}." if !['Integer', 'Fixnum'].include?(time.class.to_s)
Resque.redis.zadd("PauseQueue", Time.now.to_i + time, key)
else
Resque.redis.zadd("PauseQueue", 9999999999, key)
end
end
def queue_start(current_user: 'Default')
paused_user = Resque.redis.zrange("PauseQueue", 0, -1).map {|key| key.split("__")[0] == "#{self.id}" ? key.split("__")[1] : nil}.compact.first
if paused_user == current_user || paused_user.blank?
Resque.redis.zrem("PauseQueue", "#{self.id}__#{paused_user}")
else
raise "Can only unpause for user #{paused_user}."
end
end
### END Resque Helping Methods ####
### START Catalog Helping Methods #####
def get_catalog(page_size: 5, zuora_login: self.login_lookup(type: "Zuora").first, entity_id: nil)
self.update_column(:catalog_update_attempt_at, Time.now.utc)
entity_reference = entity_id.blank? ? 'Default' : entity_id
ZuoraConnect.logger.debug("Fetch Catalog")
ZuoraConnect.logger.debug("Zuora Entity: #{entity_id.blank? ? 'default' : entity_id}")
login = zuora_login.client(entity_id)
old_logger = ActiveRecord::Base.logger
ActiveRecord::Base.logger = nil
ActiveRecord::Base.connection.execute('UPDATE "public"."zuora_connect_app_instances" SET "catalog" = jsonb_set("catalog", \'{tmp}\', \'{}\'), "catalog_mapping" = jsonb_set("catalog_mapping", \'{tmp}\', \'{}\') where "id" = %{id}' % {:id => self.id})
response = {'nextPage' => login.rest_endpoint("catalog/products?pageSize=#{page_size}")}
while !response["nextPage"].blank?
url = login.rest_endpoint(response["nextPage"].split('/v1/').last)
ZuoraConnect.logger.debug("Fetch Catalog URL #{url}")
output_json, response = login.rest_call(:debug => false, :url => url, :timeout_retry => true)
output_json["products"].each do |product|
ActiveRecord::Base.connection.execute('UPDATE "public"."zuora_connect_app_instances" SET "catalog_mapping" = jsonb_set("catalog_mapping", \'{tmp, %s}\', \'%s\') where "id" = %s' % [product["id"], {"productId" => product["id"]}.to_json.gsub("'", "''"), self.id])
rateplans = {}
product["productRatePlans"].each do |rateplan|
ActiveRecord::Base.connection.execute('UPDATE "public"."zuora_connect_app_instances" SET "catalog_mapping" = jsonb_set("catalog_mapping", \'{tmp, %s}\', \'%s\') where "id" = %s' % [rateplan["id"], {"productId" => product["id"], "productRatePlanId" => rateplan["id"]}.to_json.gsub("'", "''"), self.id])
charges = {}
rateplan["productRatePlanCharges"].each do |charge|
ActiveRecord::Base.connection.execute('UPDATE "public"."zuora_connect_app_instances" SET "catalog_mapping" = jsonb_set("catalog_mapping", \'{tmp, %s}\', \'%s\') where "id" = %s' % [charge["id"], {"productId" => product["id"], "productRatePlanId" => rateplan["id"], "productRatePlanChargeId" => charge["id"]}.to_json.gsub("'", "''"), self.id])
charges[charge["id"]] = charge.merge({"productId" => product["id"], "productName" => product["name"], "productRatePlanId" => rateplan["id"], "productRatePlanName" => rateplan["name"] })
end
rateplan["productRatePlanCharges"] = charges
rateplans[rateplan["id"]] = rateplan.merge({"productId" => product["id"], "productName" => product["name"]})
end
product["productRatePlans"] = rateplans
ActiveRecord::Base.connection.execute('UPDATE "public"."zuora_connect_app_instances" SET "catalog" = jsonb_set("catalog", \'{tmp, %s}\', \'%s\') where "id" = %s' % [product["id"], product.to_json.gsub("'", "''"), self.id])
end
end
# Move from tmp to actual
ActiveRecord::Base.connection.execute('UPDATE "public"."zuora_connect_app_instances" SET "catalog" = jsonb_set("catalog", \'{%{entity}}\', "catalog" #> \'{tmp}\'), "catalog_mapping" = jsonb_set("catalog_mapping", \'{%{entity}}\', "catalog_mapping" #> \'{tmp}\') where "id" = %{id}' % {:entity => entity_reference, :id => self.id})
if defined?(Redis.current)
catalog_keys = Redis.current.smembers("Catalog:#{self.id}:Keys")
Redis.current.del(catalog_keys.push("Catalog:#{self.id}:Keys"))
end
# Clear tmp holder
ActiveRecord::Base.connection.execute('UPDATE "public"."zuora_connect_app_instances" SET "catalog" = jsonb_set("catalog", \'{tmp}\', \'{}\'), "catalog_mapping" = jsonb_set("catalog_mapping", \'{tmp}\', \'{}\') where "id" = %{id}' % {:id => self.id})
ActiveRecord::Base.logger = old_logger
self.update_column(:catalog_updated_at, Time.now.utc)
self.touch
# DO NOT RETURN CATALOG. THIS IS NOT SCALABLE WITH LARGE CATALOGS. USE THE CATALOG_LOOKUP method provided
return true
end
def catalog_outdated?(time: Time.now - 12.hours)
return self.catalog_updated_at.blank? || (self.catalog_updated_at < time)
end
def catalog_loaded?
return ActiveRecord::Base.connection.execute('SELECT id FROM "public"."zuora_connect_app_instances" WHERE "id" = %s AND catalog = \'{}\' LIMIT 1' % [self.id]).first.nil?
end
# Catalog lookup provides method to lookup zuora catalog efficiently.
# entity_id: If the using catalog json be field to store multiple entity product catalogs.
# object: The Object class desired to be returned. Available [:product, :rateplan, :charge]
# object_id: The id or id's of the object/objects to be returned.
# child_objects: Whether to include child objects of the object in question.
# cache: Store individual "1" object lookup in redis for caching.
def catalog_lookup(entity_id: nil, object: :product, object_id: nil, child_objects: false, cache: false, source: 'DB')
entity_reference = entity_id.blank? ? 'Default' : entity_id
if object_id.present? && ![Array, String].include?(object_id.class)
raise "Object Id can only be a string or an array of strings"
end
if source == 'API'
catalog_container = {}
if (Redis.current.hget(CATALOG_LOOKUP_CACHE_TIME_KEY, self.id).to_i + CATALOG_LOOKUP_TTL.to_i) > Time.now.to_i
begin
catalog_container = JSON.parse(Redis.current.hget(CATALOG_LOOKUP_CACHE_RESULT_KEY, self.id))
rescue JSON::ParserError => ex
Rails.logger.warn('Failed to parse catalog cache', ex)
end
else
zuora_login = self.login_lookup(type: 'Zuora').first
login = zuora_login.client(entity_reference)
response = {
'nextPage' => login.rest_endpoint("catalog/products?pageSize=#{CATALOG_LOOKUP_PAGE_SIZE}")
}
while response['nextPage'].present?
url = login.rest_endpoint(response['nextPage'].split('/v1/').last)
output_json, response = login.rest_call(debug: false, url: url, timeout_retry: true)
case object
when :product
output_json.fetch('products', []).each do |product|
rate_plans = {}
product['productRatePlans'].each do |rate_plan|
charges = {}
rate_plan['productRatePlanCharges'].each do |charge|
charges[charge['id']] = charge.merge(
{
'productId' => product['id'],
'productName' => product['name'],
'productRatePlanId' => rate_plan['id'],
'productRatePlanName' => rate_plan['name'],
}
)
end
rate_plan['productRatePlanCharges'] = charges
rate_plans[rate_plan['id']] = rate_plan.merge(
{
'productId' => product['id'],
'productName' => product['name']
}
)
end
product['productRatePlans'] = rate_plans
catalog_container[product['id']] = product
end
else
raise "Available objects include [:product]"
end
end
Redis.current.hset(CATALOG_LOOKUP_CACHE_RESULT_KEY, self.id, catalog_container.to_json)
Redis.current.hset(CATALOG_LOOKUP_CACHE_TIME_KEY, self.id, Time.now.to_i)
end
if object_id.nil?
catalog_container.transform_values! { |v| v.except('productRatePlans') }
elsif object_id.is_a?(String)
catalog_container = catalog_container[object_id]
end
return catalog_container || {}
end
if defined?(Redis.current) && object_id.present? && object_id.class == String && object_id.present?
stub_catalog = cache ? decrypt_data(data: Redis.current.get("Catalog:#{self.id}:#{object_id}:Children:#{child_objects}")) : nil
object_hierarchy = decrypt_data(data: Redis.current.get("Catalog:#{self.id}:#{object_id}:Hierarchy"))
end
if defined?(object_hierarchy)
object_hierarchy ||= (JSON.parse(ActiveRecord::Base.connection.execute('SELECT catalog_mapping #> \'{%s}\' AS item FROM "public"."zuora_connect_app_instances" WHERE "id" = %s LIMIT 1' % [entity_reference, self.id]).first["item"] || "{}") [object_id] || {"productId" => "SAFTEY", "productRatePlanId" => "SAFTEY", "productRatePlanChargeId" => "SAFTEY"})
end
case object
when :product
if object_id.nil?
string =
"SELECT "\
"json_object_agg(product_id, product #{child_objects ? '' : '- \'productRatePlans\''}) AS item "\
"FROM "\
"\"public\".\"zuora_connect_app_instances\", "\
"jsonb_each((\"public\".\"zuora_connect_app_instances\".\"catalog\" #> '{%s}' )) AS e(product_id, product) "\
"WHERE "\
"\"id\" = %s" % [entity_reference, self.id]
else
if object_id.class == String
string =
"SELECT "\
"(catalog #> '{%s, %s}') #{child_objects ? '' : '- \'productRatePlans\''} AS item "\
"FROM "\
"\"public\".\"zuora_connect_app_instances\" "\
"WHERE "\
"\"id\" = %s" % [entity_reference, object_id.blank? ? BLANK_OBJECT_ID_LOOKUP : object_id, self.id]
elsif object_id.class == Array
string =
"SELECT "\
"json_object_agg(product_id, product #{child_objects ? '' : '- \'productRatePlans\''}) AS item "\
"FROM "\
"\"public\".\"zuora_connect_app_instances\", "\
"jsonb_each((\"public\".\"zuora_connect_app_instances\".\"catalog\" #> '{%s}' )) AS e(product_id, product) "\
"WHERE "\
"\"product_id\" IN (\'%s\') AND "\
"\"id\" = %s" % [entity_reference, object_id.join("\',\'"), self.id]
end
end
when :rateplan
if object_id.nil?
string =
"SELECT "\
"json_object_agg(rateplan_id, rateplan #{child_objects ? '' : '- \'productRatePlanCharges\''}) AS item "\
"FROM "\
"\"public\".\"zuora_connect_app_instances\", "\
"jsonb_each((\"public\".\"zuora_connect_app_instances\".\"catalog\" #> '{%s}' )) AS e(product_id, product), "\
"jsonb_each(product #> '{productRatePlans}') AS ee(rateplan_id, rateplan) "\
"WHERE "\
"\"id\" = %s" % [entity_reference, self.id]
else
if object_id.class == String
string =
"SELECT "\
"(catalog #> '{%s, %s, productRatePlans, %s}') #{child_objects ? '' : '- \'productRatePlanCharges\''} AS item "\
"FROM "\
"\"public\".\"zuora_connect_app_instances\" "\
"WHERE "\
"\"id\" = %s" % [entity_reference, object_hierarchy['productId'], object_id.blank? ? BLANK_OBJECT_ID_LOOKUP : object_id, self.id]
elsif object_id.class == Array
string =
"SELECT "\
"json_object_agg(rateplan_id, rateplan #{child_objects ? '' : '- \'productRatePlanCharges\''}) AS item "\
"FROM "\
"\"public\".\"zuora_connect_app_instances\", "\
"jsonb_each((\"public\".\"zuora_connect_app_instances\".\"catalog\" #> '{%s}' )) AS e(product_id, product), "\
"jsonb_each(product #> '{productRatePlans}') AS ee(rateplan_id, rateplan) "\
"WHERE "\
"\"rateplan_id\" IN (\'%s\') AND "\
"\"id\" = %s" % [entity_reference, object_id.join("\',\'"), self.id]
end
end
when :charge
if object_id.nil?
string =
"SELECT "\
"json_object_agg(charge_id, charge) as item "\
"FROM "\
"\"public\".\"zuora_connect_app_instances\", "\
"jsonb_each((\"public\".\"zuora_connect_app_instances\".\"catalog\" #> '{%s}' )) AS e(product_id, product), "\
"jsonb_each(product #> '{productRatePlans}') AS ee(rateplan_id, rateplan), "\
"jsonb_each(rateplan #> '{productRatePlanCharges}') AS eee(charge_id, charge) "\
"WHERE "\
"\"id\" = %s" % [entity_reference, self.id]
else
if object_id.class == String
string =
"SELECT "\
"catalog #> '{%s, %s, productRatePlans, %s, productRatePlanCharges, %s}' AS item "\
"FROM "\
"\"public\".\"zuora_connect_app_instances\" "\
"WHERE "\
"\"id\" = %s" % [entity_reference, object_hierarchy['productId'], object_hierarchy['productRatePlanId'], object_id.blank? ? BLANK_OBJECT_ID_LOOKUP : object_id , self.id]
elsif object_id.class == Array
string =
"SELECT "\
"json_object_agg(charge_id, charge) AS item "\
"FROM "\
"\"public\".\"zuora_connect_app_instances\", "\
"jsonb_each((\"public\".\"zuora_connect_app_instances\".\"catalog\" #> '{%s}' )) AS e(product_id, product), "\
"jsonb_each(product #> '{productRatePlans}') AS ee(rateplan_id, rateplan), "\
"jsonb_each(rateplan #> '{productRatePlanCharges}') AS eee(charge_id, charge) "\
"WHERE "\
"\"charge_id\" IN (\'%s\') AND "\
"\"id\" = %s" % [entity_reference, object_id.join("\',\'"), self.id]
end
end
else
raise "Available objects include [:product, :rateplan, :charge]"
end
stub_catalog ||= JSON.parse(ActiveRecord::Base.connection.execute(string).first["item"] || "{}")
if defined?(Redis.current) && object_id.present? && object_id.class == String && object_id.present?
if cache
Redis.current.sadd("Catalog:#{self.id}:Keys", ["Catalog:#{self.id}:#{object_id}:Hierarchy", "Catalog:#{self.id}:#{object_id}:Children:#{child_objects}"])
Redis.current.set("Catalog:#{self.id}:#{object_id}:Hierarchy", encrypt_data(data: object_hierarchy))
Redis.current.set("Catalog:#{self.id}:#{object_id}:Children:#{child_objects}", encrypt_data(data: stub_catalog))
else
Redis.current.sadd("Catalog:#{self.id}:Keys", ["Catalog:#{self.id}:#{object_id}:Hierarchy"])
Redis.current.set("Catalog:#{self.id}:#{object_id}:Hierarchy", encrypt_data(data: object_hierarchy))
end
end
return stub_catalog
end
### END Catalog Helping Methods #####
### START S3 Helping Methods #####
def s3_client
require 'aws-sdk-s3'
if ZuoraConnect.configuration.mode == "Development"
@s3_client ||= Aws::S3::Resource.new(region: ZuoraConnect.configuration.aws_region,access_key_id: ZuoraConnect.configuration.dev_mode_access_key_id,secret_access_key: ZuoraConnect.configuration.dev_mode_secret_access_key)
else
@s3_client ||= Aws::S3::Resource.new(region: ZuoraConnect.configuration.aws_region)
end
end
def upload_to_s3(local_file,s3_path = nil)
s3_path = local_file.split("/").last if s3_path.nil?
obj = self.s3_client.bucket(ZuoraConnect.configuration.s3_bucket_name).object("#{ZuoraConnect.configuration.s3_folder_name}/#{self.id.to_s}/#{s3_path}}")
obj.upload_file(local_file, :server_side_encryption => 'AES256')
end
def get_s3_file_url(key)
require 'aws-sdk-s3'
signer = Aws::S3::Presigner.new(client: self.s3_client)
url = signer.presigned_url(:get_object, bucket: ZuoraConnect.configuration.s3_bucket_name, key: "#{ZuoraConnect.configuration.s3_folder_name}/#{self.id.to_s}/#{key}")
end
### END S3 Helping Methods #####
### START Aggregate Grouping Helping Methods ####
# Traverse entire database and run a query on table(table_name) with where clause(where_clause)
# Data from each schema will be loaded into table(aggregate_name) into the public schema
def self.refresh_aggregate_table(aggregate_name: 'all_tasks_processing', table_name: 'tasks', where_clause: "where status in ('Processing', 'Queued')", index_table: true, ignore_indexes: [])
self.update_functions
sql_result = ActiveRecord::Base.connection.execute <<-eos
SELECT pid, relname, mode
FROM pg_locks l
JOIN pg_class t ON l.relation = t.oid AND t.relkind = 'r'
WHERE t.relname = '#{aggregate_name}' AND l.mode ='AccessExclusiveLock';
eos
raise ZuoraConnect::Exceptions::Error.new("An existing lock detected while dropping table '#{aggregate_name}'") if sql_result.count > 0
if index_table
ActiveRecord::Base.connection.execute('SELECT "shared_extensions".refresh_aggregate_table(\'%s\', \'%s\', %s, \'Index\', \'{%s}\');' % [aggregate_name, table_name, ActiveRecord::Base.connection.quote(where_clause), ignore_indexes.map { |index| "\"#{index}\"" }.join(',')])
else
ActiveRecord::Base.connection.execute('SELECT "shared_extensions".refresh_aggregate_table(\'%s\', \'%s\', %s, \'NO\',\'{}\');' % [aggregate_name, table_name, ActiveRecord::Base.connection.quote(where_clause)])
end
end
# Load a psql script as a function in a transaction lock
def self.update_functions
ActiveRecord::Base.transaction do
ActiveRecord::Base.connection.execute("SELECT pg_advisory_xact_lock(#{Zlib.crc32('refresh_aggregate_table')})")
ActiveRecord::Base.connection.execute(File.read("#{Gem.loaded_specs["zuora_connect"].gem_dir}/app/views/sql/refresh_aggregate_table.txt"))
end
end
### END Aggregate Grouping Helping Methods #####
# Overide this method to avoid the new session call for api requests that use the before filter authenticate_app_api_request.
# This can be usefull for apps that dont need connect metadata call, or credentials, to operate for api requests
def new_session_for_api_requests(params: {})
return true
end
# Overide this method to avoid the new session call for ui requests that use the before filter authenticate_connect_app_request.
# This can be usefull for apps that dont need connect metadata call, or credentials, to operate for ui requests
def new_session_for_ui_requests(params: {})
return true
end
#Method for overiding droping of an app instance
def drop_instance
self.drop_message = 'Ok to drop'
return true
end
def reload_attributes(selected_attributes)
raise "Attibutes must be array" unless selected_attributes.is_a?(Array)
selected_attributes.push(:organizations, :environment, :zuora_tenant_ids)
selected_attributes.uniq!
value_attributes = self.class.unscoped.where(id: id).select(selected_attributes).first.attributes
value_attributes.each do |key, value|
next if key == "id" && value.blank?
self.send(:write_attribute, key, value)
end
self
end
def instance_failure(failure)
raise failure
end
def send_email
end
def login_lookup(type: "Zuora")
results = []
self.logins.each do |name, login|
results << login if login.tenant_type == type
end
return results
end
def self.decrypt_response(resp)
OpenSSL::PKey::RSA.new(ZuoraConnect.configuration.private_key).private_decrypt(resp)
end
def attr_builder(field,val)
singleton_class.class_eval { attr_accessor "#{field}" }
send("#{field}=", val)
end
def method_missing(method_sym, *arguments, &block)
if method_sym.to_s.include?("login")
ZuoraConnect.logger.fatal("Method Missing #{method_sym}. Instance Data: #{self.task_data} Instance Logins: #{self.logins}")
end
super
end
def self.read_master_db
if self.connection.respond_to?(:stick_to_primary!)
self.connection.stick_to_primary!(false)
yield
Makara::Context.release_all
else
yield
end
end
def self.without_sticking
if self.connection.respond_to?(:without_sticking)
self.connection.without_sticking do
yield
end
else
yield
end
end
method_hook :updateOption, :update_logins, :before => :check_oauth_state
method_hook :new_session, :refresh, :build_task, :after => :apartment_switch
end
end