moduleActiveRecord::Import::PostgreSQLAdapterincludeActiveRecord::Import::ImportSupportincludeActiveRecord::Import::OnDuplicateKeyUpdateSupportMIN_VERSION_FOR_UPSERT=90_500definsert_many(sql,values,options={},*args)# :nodoc:number_of_inserts=1returned_values=[]ids=[]results=[]base_sql,post_sql=ifsql.is_a?(String)[sql,'']elsifsql.is_a?(Array)[sql.shift,sql.join(' ')]endsql2insert=base_sql+values.join(',')+post_sqlcolumns=returning_columns(options)ifcolumns.blank?||(options[:no_returning]&&!options[:recursive])insert(sql2insert,*args)elsereturned_values=ifcolumns.size>1# Select composite columnsselect_rows(sql2insert,*args)elseselect_values(sql2insert,*args)endquery_cache.clearifquery_cache_enabledendifoptions[:returning].blank?ids=returned_valueselsifoptions[:primary_key].blank?results=returned_valueselse# split primary key and returning columnsids,results=split_ids_and_results(returned_values,columns,options)endActiveRecord::Import::Result.new([],number_of_inserts,ids,results)enddefsplit_ids_and_results(values,columns,options)ids=[]results=[]id_indexes=Array(options[:primary_key]).map{|key|columns.index(key)}returning_indexes=Array(options[:returning]).map{|key|columns.index(key)}values.eachdo|value|value_array=Array(value)ids<<id_indexes.map{|i|value_array[i]}results<<returning_indexes.map{|i|value_array[i]}endids.map!(&:first)ifid_indexes.size==1results.map!(&:first)ifreturning_indexes.size==1[ids,results]enddefnext_value_for_sequence(sequence_name)%{nextval('#{sequence_name}')}enddefpost_sql_statements(table_name,options)# :nodoc:sql=[]ifsupports_on_duplicate_key_update?# Options :recursive and :on_duplicate_key_ignore are mutually exclusiveif(options[:ignore]||options[:on_duplicate_key_ignore])&&!options[:on_duplicate_key_update]&&!options[:recursive]sql<<sql_for_on_duplicate_key_ignore(table_name,options[:on_duplicate_key_ignore])endelsiflogger&&options[:on_duplicate_key_ignore]&&!options[:on_duplicate_key_update]logger.warn"Ignoring on_duplicate_key_ignore because it is not supported by the database."endsql+=super(table_name,options)columns=returning_columns(options)unlesscolumns.blank?||(options[:no_returning]&&!options[:recursive])sql<<" RETURNING \"#{columns.join('", "')}\""endsqlenddefreturning_columns(options)columns=[]columns+=Array(options[:primary_key])ifoptions[:primary_key].present?columns|=Array(options[:returning])ifoptions[:returning].present?columnsend# Add a column to be updated on duplicate key updatedefadd_column_for_on_duplicate_key_update(column,options={})# :nodoc:arg=options[:on_duplicate_key_update]ifarg.is_a?(Hash)columns=arg.fetch(:columns){arg[:columns]=[]}casecolumnswhenArraythencolumns<<column.to_symunlesscolumns.include?(column.to_sym)whenHashthencolumns[column.to_sym]=column.to_symendelsifarg.is_a?(Array)arg<<column.to_symunlessarg.include?(column.to_sym)endend# Returns a generated ON CONFLICT DO NOTHING statement given the passed# in +args+.defsql_for_on_duplicate_key_ignore(table_name,*args)# :nodoc:arg=args.firstconflict_target=sql_for_conflict_target(arg)ifarg.is_a?(Hash)" ON CONFLICT #{conflict_target}DO NOTHING"end# Returns a generated ON CONFLICT DO UPDATE statement given the passed# in +args+.defsql_for_on_duplicate_key_update(table_name,*args)# :nodoc:arg,primary_key,locking_column=argsarg={columns: arg}ifarg.is_a?(Array)||arg.is_a?(String)returnunlessarg.is_a?(Hash)sql=' ON CONFLICT 'conflict_target=sql_for_conflict_target(arg)columns=arg.fetch(:columns,[])condition=arg[:condition]ifcolumns.respond_to?(:empty?)&&columns.empty?returnsql<<"#{conflict_target}DO NOTHING"endconflict_target||=sql_for_default_conflict_target(table_name,primary_key)unlessconflict_targetraiseArgumentError,'Expected :conflict_target or :constraint_name to be specified'endsql<<"#{conflict_target}DO UPDATE SET "ifcolumns.is_a?(Array)sql<<sql_for_on_duplicate_key_update_as_array(table_name,locking_column,columns)elsifcolumns.is_a?(Hash)sql<<sql_for_on_duplicate_key_update_as_hash(table_name,locking_column,columns)elsifcolumns.is_a?(String)sql<<columnselseraiseArgumentError,'Expected :columns to be an Array or Hash'endsql<<" WHERE #{condition}"ifcondition.present?sqlenddefsql_for_on_duplicate_key_update_as_array(table_name,locking_column,arr)# :nodoc:results=arr.mapdo|column|qc=quote_column_name(column)"#{qc}=EXCLUDED.#{qc}"endincrement_locking_column!(table_name,results,locking_column)results.join(',')enddefsql_for_on_duplicate_key_update_as_hash(table_name,locking_column,hsh)# :nodoc:results=hsh.mapdo|column1,column2|qc1=quote_column_name(column1)qc2=quote_column_name(column2)"#{qc1}=EXCLUDED.#{qc2}"endincrement_locking_column!(table_name,results,locking_column)results.join(',')enddefsql_for_conflict_target(args={})constraint_name=args[:constraint_name]conflict_target=args[:conflict_target]index_predicate=args[:index_predicate]ifconstraint_name.present?"ON CONSTRAINT #{constraint_name} "elsifconflict_target.present?'('<<Array(conflict_target).reject(&:blank?).join(', ')<<') '.tapdo|sql|sql<<"WHERE #{index_predicate} "ifindex_predicateendendenddefsql_for_default_conflict_target(table_name,primary_key)conflict_target=Array(primary_key).join(', ')"(#{conflict_target}) "ifconflict_target.present?end# Return true if the statement is a duplicate key record errordefduplicate_key_update_error?(exception)# :nodoc:exception.is_a?(ActiveRecord::StatementInvalid)&&exception.to_s.include?('duplicate key')enddefsupports_on_duplicate_key_update?database_version>=MIN_VERSION_FOR_UPSERTenddefsupports_setting_primary_key_of_imported_objects?trueendprivatedefdatabase_versiondefined?(postgresql_version)?postgresql_version:superendend