Skip to content

sandbox cpeerd_workers

tora edited this page Jan 30, 2012 · 1 revision
      class CommandProcessor < Worker
        class Ignore < Exception ; end

        @@Commands = {
          'NOP'      => :NOP,
          'GET'      => :GET,
          'CREATE'   => :CREATE,
          'CLONE'    => :CLONE,
          'DELETE'   => :DELETE,
          'CANCEL'   => :CANCEL,
          'FINALIZE' => :FINALIZE,
          'INSERT'   => :INSERT,
          'DROP'     => :DROP,
          'ALIVE'    => :ALIVE,
        }

        def initialize( pipeline )
          @pipeline = pipeline
          super
          @hostname = Configurations.instance.HostnameForClient
        end

        def serve
          ticket = @pipeline.deq
          ticket.mark
          command, args = ticket.channel.parse
          ticket.command, ticket.args, = command, args
          basket_text = args[ 'basket' ]
          basket = Basket.new_from_text( basket_text ) if basket_text
          ticket.host, ticket.basket = @hostname, basket

          command_sym = @@Commands[ command ] or raise BadRequestError, "Unknown command: #{command}"
          threshold = case command_sym
                      when :CREATE            ; ServerStatus::ONLINE
                      when :DELETE            ; ServerStatus::DEL_REP
                      when :FINALIZE, :CANCEL ; ServerStatus::FIN_REP
                      #                       ; ServerStatus::REP
                      when :GET, :NOP         ; ServerStatus::READONLY
                      else    # :INSERT, :DROP, :ALIVE, :CLONE
                        raise Ignore
                      end

          unless ServerStatus.instance.equal_or_greater_than? threshold
            Log.warning( "#{command_sym}: ServerStatusError server status: #{ServerStatus.instance.status_name}: #{basket}" )
            if command_sym == :GET and not ticket.channel.tcp? 
              raise Ignore
            else
              raise ServerStatusError, "server status: #{ServerStatus.instance.status_name}"
            end
          end

          case command_sym
          when :GET
            do_get ticket, basket, args['island']
          when :NOP
            ticket.push Hash[]
            ResponseSenderPL.instance.enq ticket
          else   # :CREATE, :DELETE, :FINALIZE, :CANCEL
            ticket.command_sym = command_sym
            BasketStatusQueryDatabasePL.instance.enq ticket
          end

        rescue Ignore
          CommandReceiverTicketPool.instance.delete( ticket )
        rescue => e
          ticket.push e
          ResponseSenderPL.instance.enq ticket
        end

        def do_get ticket, basket, island
          path_a = basket.path_a
          if ( File.exist? path_a )
            basket_text = basket.to_s
            ticket.push Hash[ 'basket', basket_text, 'paths', { ticket.host => path_a } ].tap do |h|
              h['island'] = island if island
            end
            ResponseSenderPL.instance.enq ticket
            t = MulticastCommandSenderTicketPool.instance.create_ticket
            t.push( 'INSERT', Hash[ 'basket', basket_text, 'host', ticket.host, 'path', path_a ] )
            MulticastCommandSenderPL.instance.enq t
          else
            ticket.mark
            if ( ticket.channel.tcp? )
              raise NotFoundError, path_a 
            else
              ticket.finish
              Log.debug( "Get received, but not found: #{basket_text}" ) if $DEBUG
              #########
              ####  basket id is required
              #########
              Log.debug( sprintf( "%s %.1fms [%s] %s is not found", ticket.command.slice(0,3), ticket.duration * 1000, 
                                  ( ticket.durations.map { |x| "%.1f" % (x * 1000) } ).join(', '), basket_text ) ) if $DEBUG
              CommandReceiverTicketPool.instance.delete( ticket )
            end
          end
        end
      end