lib/multiwoven/integrations/protocol/protocol.rb



# frozen_string_literal: true

module Multiwoven
  module Integrations::Protocol
    module Types
      include Dry.Types()
    end

    SyncMode = Types::String.enum("full_refresh", "incremental")
    SyncStatus = Types::String.enum("started", "running", "complete", "incomplete")
    DestinationSyncMode = Types::String.enum("insert", "upsert")
    ConnectorType = Types::String.enum("source", "destination")
    ConnectorQueryType = Types::String.enum("raw_sql", "soql")
    ModelQueryType = Types::String.enum("raw_sql", "dbt", "soql", "table_selector")
    ConnectionStatusType = Types::String.enum("succeeded", "failed")
    StreamType = Types::String.enum("static", "dynamic")
    StreamAction = Types::String.enum("fetch", "create", "update", "delete")
    MultiwovenMessageType = Types::String.enum(
      "record", "log", "connector_spec",
      "connection_status", "catalog", "control",
      "tracking"
    )
    ControlMessageType = Types::String.enum(
      "rate_limit", "connection_config", "full_refresh"
    )
    LogLevel = Types::String.enum("fatal", "error", "warn", "info", "debug", "trace")
    RequestRateLimitingUnit = Types::String.default("minute").enum("minute", "hour", "day")
    SchemaMode = Types::String.enum("schema", "schemaless")
    FileFormatType = Types::String.enum("csv")
    CompressionType = Types::String.enum("un_compressed", "zip")

    class ProtocolModel < Dry::Struct
      extend Multiwoven::Integrations::Core::Utils
      class << self
        def from_json(json_string)
          data = JSON.parse(json_string)
          new(keys_to_symbols(data))
        end
      end
    end

    class ConnectionStatus < ProtocolModel
      attribute :status, ConnectionStatusType
      attribute? :message, Types::String.optional

      def to_multiwoven_message
        MultiwovenMessage.new(
          type: MultiwovenMessageType["connection_status"],
          connection_status: self
        )
      end
    end

    class ConnectorSpecification < ProtocolModel
      attribute? :documentation_url, Types::String.optional
      attribute? :changelog_url, Types::String.optional
      attribute :connection_specification, Types::Hash
      attribute :supports_normalization, Types::Bool.default(false)
      attribute :supports_dbt, Types::Bool.default(false)
      attribute :stream_type, StreamType
      attribute? :supported_destination_sync_modes, Types::Array.of(DestinationSyncMode).optional
      attribute? :connector_query_type, ConnectorQueryType

      def to_multiwoven_message
        MultiwovenMessage.new(
          type: MultiwovenMessageType["connector_spec"],
          connector_spec: self
        )
      end
    end

    class Connector < ProtocolModel
      attribute :name, Types::String
      attribute :type, ConnectorType
      attribute :connection_specification, Types::Hash
      attribute :query_type, Types::String.default("raw_sql").enum(*ConnectorQueryType.values)
    end

    class LogMessage < ProtocolModel
      attribute :level, LogLevel
      attribute :message, Types::String
      attribute? :name, Types::String.optional
      attribute? :stack_trace, Types::String.optional

      def to_multiwoven_message
        MultiwovenMessage.new(
          type: MultiwovenMessageType["log"],
          log: self
        )
      end
    end

    class Model < ProtocolModel
      attribute? :name, Types::String.optional
      attribute :query, Types::String
      attribute :query_type, ModelQueryType
      attribute :primary_key, Types::String
    end

    class RecordMessage < ProtocolModel
      attribute :data, Types::Hash
      attribute :emitted_at, Types::Integer

      def to_multiwoven_message
        MultiwovenMessage.new(
          type: MultiwovenMessageType["record"],
          record: self
        )
      end
    end

    class Stream < ProtocolModel
      # Common
      attribute :name, Types::String
      attribute? :action, StreamAction
      attribute :json_schema, Types::Hash
      attribute? :supported_sync_modes, Types::Array.of(SyncMode).optional.default(["incremental"])

      # Applicable for database streams
      attribute :source_defined_cursor, Types::Bool.default(false)
      attribute? :default_cursor_field, Types::Array.of(Types::String).optional
      attribute? :source_defined_primary_key, Types::Array.of(Types::Array.of(Types::String)).optional
      attribute? :namespace, Types::String.optional
      # Applicable for API streams
      attribute? :url, Types::String.optional
      attribute? :request_method, Types::String.optional
      attribute :batch_support, Types::Bool.default(false)
      attribute :batch_size, Types::Integer.default(1)
      # Rate limits
      attribute? :request_rate_limit, Types::Integer
      attribute? :request_rate_limit_unit, RequestRateLimitingUnit
      attribute? :request_rate_concurrency, Types::Integer

      def rate_limit_unit_seconds
        case request_rate_limit_unit
        when "minute"
          60 # Seconds in a minute
        when "hour"
          3600 # Seconds in an hour
        when "day"
          86_400 # Seconds in a day
        else
          1 # Default case, consider as seconds or handle as error
        end
      end
    end

    class Catalog < ProtocolModel
      attribute :streams, Types::Array.of(Stream)

      # Rate limits
      attribute? :request_rate_limit, Types::Integer.default(60)
      attribute? :request_rate_limit_unit, RequestRateLimitingUnit
      attribute? :request_rate_concurrency, Types::Integer.default(10)
      attribute? :schema_mode, Types::String.optional.default("schema")
      attribute :source_defined_cursor, Types::Bool.default(false)
      attribute? :default_cursor_field, Types::Array.of(Types::String).optional

      def to_multiwoven_message
        MultiwovenMessage.new(
          type: MultiwovenMessageType["catalog"],
          catalog: self
        )
      end
    end

    class SyncConfig < ProtocolModel
      attr_accessor :offset, :limit, :sync_run_id

      attribute :source, Connector
      attribute :destination, Connector
      attribute :model, Model
      attribute :stream, Stream
      attribute :sync_mode, SyncMode
      attribute? :cursor_field, Types::String.optional
      attribute? :current_cursor_field, Types::String.optional
      attribute :destination_sync_mode, DestinationSyncMode
      # reference ids
      attribute :sync_id, Types::String.default("unknown")
    end

    class ControlMessage < ProtocolModel
      attribute :type, ControlMessageType
      attribute :emitted_at, Types::Integer
      attribute? :status, ConnectionStatusType.optional
      attribute? :meta, Types::Hash

      def to_multiwoven_message
        MultiwovenMessage.new(
          type: MultiwovenMessageType["control"],
          control: self
        )
      end
    end

    class TrackingMessage < ProtocolModel
      attribute :success, Types::Integer.default(0)
      attribute :failed, Types::Integer.default(0)
      attribute? :meta, Types::Hash
      attribute? :logs, Types::Array.of(LogMessage)

      def to_multiwoven_message
        MultiwovenMessage.new(
          type: MultiwovenMessageType["tracking"],
          tracking: self
        )
      end
    end

    class MultiwovenMessage < ProtocolModel
      attribute :type, MultiwovenMessageType
      attribute? :log, LogMessage.optional
      attribute? :connection_status, ConnectionStatus.optional
      attribute? :connector_spec, ConnectorSpecification.optional
      attribute? :catalog, Catalog.optional
      attribute? :record, RecordMessage.optional
      attribute? :control, ControlMessage.optional
      attribute? :tracking, TrackingMessage.optional
    end
  end
end