class Lutaml::ModelTransformations::FormatRegistry
parser = parser_class.new
parser_class = registry.parser_for_extension(“.xmi”)
@example Get parser for extension
registry.register(“.custom”, MyCustomParser)
registry = FormatRegistry.new
@example Register a parser
by allowing new parsers to be registered without modifying existing code.
location for managing format parsers. It follows the Open/Closed Principle
This class implements the Registry pattern to provide a centralized
Format Registry manages parser registration and discovery.
def self.with_defaults
-
(FormatRegistry)- New registry instance
def self.with_defaults registry = new registry.load_default_parsers registry end
def all_extensions
-
(Array- List of registered extensions)
def all_extensions @extensions.dup end
def all_parsers
-
(Hash- Map of extensions to parser classes)
def all_parsers # ensure_default_parsers_loaded! @parsers.dup end
def auto_register_from_parser(parser_class)
-
(void)-
Parameters:
-
parser_class(Class) -- Parser class inherited from BaseParser
def auto_register_from_parser(parser_class) supported_extensions = "" if parser_class.method_defined?(:supported_extensions) supported_extensions = parser_class.new.supported_extensions end register(supported_extensions, parser_class) end
def best_parser_for_file(file_path, use_content_detection: true)
-
(Class, nil)- Best parser class for the file
Parameters:
-
use_content_detection(Boolean) -- Whether to use content detection -
file_path(String) -- Path to the file
def best_parser_for_file(file_path, use_content_detection: true) # First try extension-based detection parser = parser_for_file(file_path) return parser if parser # Fall back to content detection if enabled if use_content_detection parser = detect_by_content(file_path) return parser if parser end nil end
def clear
-
(void)-
def clear @extensions.clear @parsers.clear @default_parsers_loaded = false end
def constantize_parser_class(class_name)
-
(NameError)- if class cannot be found
Returns:
-
(Class)- The class constant
Parameters:
-
class_name(String) -- Fully qualified class name
def constantize_parser_class(class_name) # Split class name into modules and class parts = class_name.split("::") # Start from root constant constant = Object # Navigate through nested modules/classes parts.each do |part| constant = constant.const_get(part) end constant end
def detect_by_content(file_path) # rubocop:disable Metrics/AbcSize,Metrics/CyclomaticComplexity,Metrics/MethodLength,Metrics/PerceivedComplexity
-
(Class, nil)- Parser class based on content detection
Parameters:
-
file_path(String) -- Path to the file
def detect_by_content(file_path) # rubocop:disable Metrics/AbcSize,Metrics/CyclomaticComplexity,Metrics/MethodLength,Metrics/PerceivedComplexity # ensure_default_parsers_loaded! unless File.exist?(file_path) raise ArgumentError, "#{file_path} does not exist!" end # Read first few bytes to detect format File.open(file_path, "rb") do |file| header = file.read(1024) # Read first 1KB return nil if header.nil? || header.empty? # Check header match for content patterns @parsers.each do |ext, parser_class| if parser_class.method_defined?(:content_patterns) parser_klass = parser_class.new parser_klass.content_patterns.each do |pattern| if header.match?(pattern) return parser_for_extension(ext) end end end end # Check for XML/XMI signatures if header.include?("<?xml") && header.include?("xmi:") ensure_default_parsers_loaded! return parser_for_extension(".xmi") end # Check for SQLite database signature (QEA files) if header[0..15] == "SQLite format 3\0" ensure_default_parsers_loaded! return parser_for_extension(".qea") end end nil end
def ensure_default_parsers_loaded!
-
(void)-
def ensure_default_parsers_loaded! load_default_parsers unless @default_parsers_loaded end
def export_configuration # rubocop:disable Metrics/MethodLength
def export_configuration # rubocop:disable Metrics/MethodLength { exported_at: Time.now, parsers: @parsers.map do |parser| _ext, parser_class = parser { parser_class: parser_class, extensions: parser_class.new.supported_extensions, priority: parser_class.new.priority, format: parser_class.new.format_name, } end, } end
def initialize
def initialize @parsers = {} @extensions = [] @default_parsers_loaded = false end
def load_default_parsers # rubocop:disable Metrics/MethodLength
-
(void)-
def load_default_parsers # rubocop:disable Metrics/MethodLength return if @default_parsers_loaded # Load XMI parser if available begin register(".xmi", Parsers::XmiParser) rescue LoadError # XMI parser not available, skip end # Load QEA parser if available begin register(".qea", Parsers::QeaParser) rescue LoadError # QEA parser not available, skip end @default_parsers_loaded = true end
def load_from_configuration(configuration)
-
(void)-
Parameters:
-
configuration(Configuration) -- Configuration with parser
def load_from_configuration(configuration) configuration.enabled_parsers.each do |parser_config| parser_class = constantize_parser_class(parser_config.parser_class) register(parser_config.extension, parser_class) rescue NameError => e warn "Warning: Could not load parser " \ "#{parser_config.parser_class}: #{e.message}" end end
def normalize_extension(extension)
-
(String)- Normalized extension
Parameters:
-
extension(String) -- File extension
def normalize_extension(extension) ext = extension.to_s.downcase ext = ".#{ext}" unless ext.start_with?(".") ext end
def parser_for_extension(extension)
-
(Class, nil)- Parser class, or nil if not found
Parameters:
-
extension(String) -- File extension (e.g., ".xmi", ".qea")
def parser_for_extension(extension) # ensure_default_parsers_loaded! normalized_ext = normalize_extension(extension) @parsers[normalized_ext] end
def parser_for_file(file_path)
-
(Class, nil)- Parser class, or nil if not found
Parameters:
-
file_path(String) -- Path to the file
def parser_for_file(file_path) extension = File.extname(file_path) parser_for_extension(extension) end
def parsers_by_priority
-
(Array- List of [extension, parser_class])
def parsers_by_priority @parsers.sort_by do |_ext, parser_class| if parser_class.method_defined?(:priority) parser_class.new.priority else 100 end end.reverse end
def register(extension, parser_class) # rubocop:disable Metrics/MethodLength
-
(ArgumentError)- if extension or parser_class is invalid
Parameters:
-
parser_class(Class) -- Parser class implementing BaseParser -
extension(String) -- File extension (e.g., ".xmi", ".qea")
def register(extension, parser_class) # rubocop:disable Metrics/MethodLength if extension.is_a?(Array) extension.each { |ext| register(ext, parser_class) } return end validate_extension!(extension) validate_parser_class!(parser_class) normalized_ext = normalize_extension(extension) @parsers[normalized_ext] = parser_class unless @extensions.include?(normalized_ext) @extensions << normalized_ext end end
def statistics # rubocop:disable Metrics/MethodLength
-
(Hash)- Statistics hash
def statistics # rubocop:disable Metrics/MethodLength parsers = @parsers.values.uniq total_parsers = parsers.size ext_size = @extensions.size { total_parsers: total_parsers, total_extensions: ext_size, extensions_per_parser: (ext_size.to_f / total_parsers).round(2), parser_details: parsers.map do |parser_class| { parser: parser_class, extensions: parser_class.new.supported_extensions, priority: parser_class.new.priority, format_name: parser_class.new.format_name, } end, } end
def supported_extensions
-
(Array- List of supported file extensions)
def supported_extensions # ensure_default_parsers_loaded! @parsers.keys.sort end
def supports_extension?(extension)
-
(Boolean)- true if extension is supported
Parameters:
-
extension(String) -- File extension to check
def supports_extension?(extension) parser_for_extension(extension) != nil end
def supports_file?(file_path)
-
(Boolean)- true if file format is supported
Parameters:
-
file_path(String) -- Path to the file
def supports_file?(file_path) parser_for_file(file_path) != nil end
def unregister(extension)
-
(Class, nil)- The unregistered parser class, or nil if not found
Parameters:
-
extension(String) -- File extension to unregister
def unregister(extension) normalized_ext = normalize_extension(extension) @extensions.delete(normalized_ext) @parsers.delete(normalized_ext) end
def validate_extension!(extension)
-
(ArgumentError)- if extension is invalid
Parameters:
-
extension(String) -- Extension to validate
def validate_extension!(extension) if extension.nil? || extension.empty? raise ArgumentError, "Extension cannot be nil or empty" end normalized = normalize_extension(extension) unless normalized.match?(/^\.[a-z0-9]+(.[a-z0-9]+)?/) raise ArgumentError, "Invalid extension format: #{extension}" end end
def validate_parser_class!(parser_class) # rubocop:disable Metrics/CyclomaticComplexity,Metrics/MethodLength
-
(ArgumentError)- if parser class is invalid
Parameters:
-
parser_class(Class) -- Parser class to validate
def validate_parser_class!(parser_class) # rubocop:disable Metrics/CyclomaticComplexity,Metrics/MethodLength # Check if nil if parser_class.nil? raise ArgumentError, "Parser class cannot be nil" end # Check if it's a class unless parser_class.is_a?(Class) raise ArgumentError, "Parser must be a class" end # Check if class is a subclass of BaseParser unless parser_class < Parsers::BaseParser raise ArgumentError, "Parser class must inherit from BaseParser" end # Check if class responds to required methods required_methods = [:parse] missing_methods = required_methods.reject do |method| parser_class.method_defined?(method) || parser_class.private_method_defined?(method) end # Check if any methods are missing unless missing_methods.empty? raise ArgumentError, "Parser class must implement methods: " \ "#{missing_methods.join(', ')}" end end