diff --git a/.gitignore b/.gitignore index 7327a2dc..41d6cf6c 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,6 @@ Gemfile.bak .bundle vendor .idea +lib/logstash-input-http_jars.rb +build/ +.gradle/ diff --git a/ci/build.sh b/ci/build.sh index 06caffdc..3b1c3018 100755 --- a/ci/build.sh +++ b/ci/build.sh @@ -7,15 +7,15 @@ ######################################################## set -e -echo "Starting build process in: `pwd`" +echo "Starting build process in: $(pwd)" source ./ci/setup.sh if [[ -f "ci/run.sh" ]]; then - echo "Running custom build script in: `pwd`/ci/run.sh" - source ./ci/run.sh + echo "Running custom build script in: $(pwd)/ci/run.sh" + source ./ci/run.sh else - echo "Running default build scripts in: `pwd`/ci/build.sh" - bundle install - bundle exec rake vendor - bundle exec rspec spec + echo "Running default build scripts in: $(pwd)/ci/build.sh" + bundle install + bundle exec rake vendor + bundle exec rspec spec fi diff --git a/ci/setup.sh b/ci/setup.sh index 835fa437..7b15a903 100755 --- a/ci/setup.sh +++ b/ci/setup.sh @@ -8,18 +8,18 @@ set -e if [ "$LOGSTASH_BRANCH" ]; then echo "Building plugin using Logstash source" - BASE_DIR=`pwd` + BASE_DIR=$(pwd) echo "Checking out branch: $LOGSTASH_BRANCH" - git clone -b $LOGSTASH_BRANCH https://github.com/elastic/logstash.git ../../logstash --depth 1 + git clone -b "$LOGSTASH_BRANCH" https://github.com/elastic/logstash.git ../../logstash --depth 1 printf "Checked out Logstash revision: %s\n" "$(git -C ../../logstash rev-parse HEAD)" cd ../../logstash echo "Building plugins with Logstash version:" cat versions.yml echo "---" # We need to build the jars for that specific version - echo "Running gradle assemble in: `pwd`" + echo "Running gradle assemble in: $(pwd)" ./gradlew assemble - cd $BASE_DIR + cd "$BASE_DIR" export LOGSTASH_SOURCE=1 else echo "Building plugin using released gems on rubygems" diff --git a/gradlew b/gradlew index 91a7e269..cc20d8ff 100755 --- a/gradlew +++ b/gradlew @@ -10,27 +10,27 @@ DEFAULT_JVM_OPTS="" APP_NAME="Gradle" -APP_BASE_NAME=`basename "$0"` +APP_BASE_NAME=$(basename "$0") # Use the maximum available, or set MAX_FD != -1 to use that value. MAX_FD="maximum" warn ( ) { - echo "$*" + echo "$*" } die ( ) { - echo - echo "$*" - echo - exit 1 + echo + echo "$*" + echo + exit 1 } # OS specific support (must be 'true' or 'false'). cygwin=false msys=false darwin=false -case "`uname`" in +case "$(uname)" in CYGWIN* ) cygwin=true ;; @@ -44,7 +44,7 @@ esac # For Cygwin, ensure paths are in UNIX format before anything is touched. if $cygwin ; then - [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --unix "$JAVA_HOME"` + [ -n "$JAVA_HOME" ] && JAVA_HOME=$(cygpath --unix "$JAVA_HOME") fi # Attempt to set APP_HOME @@ -52,111 +52,111 @@ fi PRG="$0" # Need this for relative symlinks. while [ -h "$PRG" ] ; do - ls=`ls -ld "$PRG"` - link=`expr "$ls" : '.*-> \(.*\)$'` - if expr "$link" : '/.*' > /dev/null; then - PRG="$link" - else - PRG=`dirname "$PRG"`"/$link" - fi + ls=$(ls -ld "$PRG") + link=$(expr "$ls" : '.*-> \(.*\)$') + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG=$(dirname "$PRG")"/$link" + fi done -SAVED="`pwd`" -cd "`dirname \"$PRG\"`/" >&- -APP_HOME="`pwd -P`" +SAVED="$(pwd)" +cd "$(dirname \"$PRG\")/" >&- +APP_HOME="$(pwd -P)" cd "$SAVED" >&- CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar # Determine the Java command to use to start the JVM. if [ -n "$JAVA_HOME" ] ; then - if [ -x "$JAVA_HOME/jre/sh/java" ] ; then - # IBM's JDK on AIX uses strange locations for the executables - JAVACMD="$JAVA_HOME/jre/sh/java" - else - JAVACMD="$JAVA_HOME/bin/java" - fi - if [ ! -x "$JAVACMD" ] ; then - die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME - -Please set the JAVA_HOME variable in your environment to match the -location of your Java installation." - fi + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD="$JAVA_HOME/jre/sh/java" + else + JAVACMD="$JAVA_HOME/bin/java" + fi + if [ ! -x "$JAVACMD" ] ; then + die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME + + Please set the JAVA_HOME variable in your environment to match the + location of your Java installation." + fi else - JAVACMD="java" - which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + JAVACMD="java" + command -v java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. -Please set the JAVA_HOME variable in your environment to match the -location of your Java installation." + Please set the JAVA_HOME variable in your environment to match the + location of your Java installation." fi # Increase the maximum file descriptors if we can. -if [ "$cygwin" = "false" -a "$darwin" = "false" ] ; then - MAX_FD_LIMIT=`ulimit -H -n` - if [ $? -eq 0 ] ; then - if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then - MAX_FD="$MAX_FD_LIMIT" - fi - ulimit -n $MAX_FD - if [ $? -ne 0 ] ; then - warn "Could not set maximum file descriptor limit: $MAX_FD" - fi - else - warn "Could not query maximum file descriptor limit: $MAX_FD_LIMIT" +if [ "$cygwin" = "false" ] && [ "$darwin" = "false" ] ; then + MAX_FD_LIMIT=$(ulimit -H -n) + if [ $? -eq 0 ] ; then + if [ "$MAX_FD" = "maximum" ] || [ "$MAX_FD" = "max" ] ; then + MAX_FD="$MAX_FD_LIMIT" + fi + ulimit -n "$MAX_FD" + if [ $? -ne 0 ] ; then + warn "Could not set maximum file descriptor limit: $MAX_FD" fi + else + warn "Could not query maximum file descriptor limit: $MAX_FD_LIMIT" + fi fi # For Darwin, add options to specify how the application appears in the dock if $darwin; then - GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\"" + GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\"" fi # For Cygwin, switch paths to Windows format before running java if $cygwin ; then - APP_HOME=`cygpath --path --mixed "$APP_HOME"` - CLASSPATH=`cygpath --path --mixed "$CLASSPATH"` + APP_HOME=$(cygpath --path --mixed "$APP_HOME") + CLASSPATH=$(cygpath --path --mixed "$CLASSPATH") # We build the pattern for arguments to be converted via cygpath - ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null` - SEP="" + ROOTDIRSRAW=$(find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null) + SEP='' for dir in $ROOTDIRSRAW ; do - ROOTDIRS="$ROOTDIRS$SEP$dir" - SEP="|" + ROOTDIRS="$ROOTDIRS$SEP$dir" + SEP="|" done OURCYGPATTERN="(^($ROOTDIRS))" # Add a user-defined pattern to the cygpath arguments if [ "$GRADLE_CYGPATTERN" != "" ] ; then - OURCYGPATTERN="$OURCYGPATTERN|($GRADLE_CYGPATTERN)" + OURCYGPATTERN="$OURCYGPATTERN|($GRADLE_CYGPATTERN)" fi # Now convert the arguments - kludge to limit ourselves to /bin/sh i=0 for arg in "$@" ; do - CHECK=`echo "$arg"|egrep -c "$OURCYGPATTERN" -` - CHECK2=`echo "$arg"|egrep -c "^-"` ### Determine if an option - - if [ $CHECK -ne 0 ] && [ $CHECK2 -eq 0 ] ; then ### Added a condition - eval `echo args$i`=`cygpath --path --ignore --mixed "$arg"` - else - eval `echo args$i`="\"$arg\"" - fi - i=$((i+1)) + CHECK=$(echo "$arg"|egrep -c "$OURCYGPATTERN" -) + CHECK2=$(echo "$arg"|egrep -c "^-" ) ### Determine if an option + + if [ "$CHECK" -ne 0 ] && [ "$CHECK2" -eq 0 ] ; then ### Added a condition + eval "$(echo args$i)"=$(cygpath --path --ignore --mixed "$arg") + else + eval "$(echo args$i)"="\"$arg\"" + fi + i=$((i+1)) done case $i in - (0) set -- ;; - (1) set -- "$args0" ;; - (2) set -- "$args0" "$args1" ;; - (3) set -- "$args0" "$args1" "$args2" ;; - (4) set -- "$args0" "$args1" "$args2" "$args3" ;; - (5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;; - (6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;; - (7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;; - (8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;; - (9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;; + (0) set -- ;; + (1) set -- "$args0" ;; + (2) set -- "$args0" "$args1" ;; + (3) set -- "$args0" "$args1" "$args2" ;; + (4) set -- "$args0" "$args1" "$args2" "$args3" ;; + (5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;; + (6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;; + (7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;; + (8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;; + (9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;; esac -fi + fi # Split up the JVM_OPTS And GRADLE_OPTS values into an array, following the shell quoting and substitution rules function splitJvmOpts() { - JVM_OPTS=("$@") + JVM_OPTS=("$@") } eval splitJvmOpts $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS JVM_OPTS[${#JVM_OPTS[*]}]="-Dorg.gradle.appname=$APP_BASE_NAME" diff --git a/lib/logstash/inputs/http.rb b/lib/logstash/inputs/http.rb index 2179c314..baebfdad 100644 --- a/lib/logstash/inputs/http.rb +++ b/lib/logstash/inputs/http.rb @@ -1,14 +1,13 @@ -# encoding: utf-8 -require "logstash/inputs/base" -require "logstash/namespace" -require "stud/interval" -require "logstash-input-http_jars" +require 'logstash/inputs/base' +require 'logstash/namespace' +require 'stud/interval' +require 'logstash-input-http_jars' -java_import "io.netty.handler.codec.http.HttpUtil" +java_import 'io.netty.handler.codec.http.HttpUtil' # Using this input you can receive single or multiline events over http(s). # Applications can send a HTTP POST request with a body to the endpoint started by this -# input and Logstash will convert it into an event for subsequent processing. Users +# input and Logstash will convert it into an event for subsequent processing. Users # can pass plain text, JSON, or any formatted data and use a corresponding codec with this # input. For Content-Type `application/json` the `json` codec is used, but for all other # data formats, `plain` codec is used. @@ -16,28 +15,28 @@ # This input can also be used to receive webhook requests to integrate with other services # and applications. By taking advantage of the vast plugin ecosystem available in Logstash # you can trigger actionable events right from your application. -# +# # ==== Security # This plugin supports standard HTTP basic authentication headers to identify the requester. # You can pass in an username, password combination while sending data to this input # -# You can also setup SSL and send data securely over https, with an option of validating -# the client's certificate. Currently, the certificate setup is through -# https://docs.oracle.com/cd/E19509-01/820-3503/ggfen/index.html[Java Keystore +# You can also setup SSL and send data securely over https, with an option of validating +# the client's certificate. Currently, the certificate setup is through +# https://docs.oracle.com/cd/E19509-01/820-3503/ggfen/index.html[Java Keystore # format] # class LogStash::Inputs::Http < LogStash::Inputs::Base - require "logstash/inputs/http/tls" + require 'logstash/inputs/http/tls' - config_name "http" + config_name 'http' # Codec used to decode the incoming data. # This codec will be used as a fall-back if the content-type - # is not found in the "additional_codecs" hash - default :codec, "plain" + # is not found in the 'additional_codecs' hash + default :codec, 'plain' # The host or ip to bind - config :host, :validate => :string, :default => "0.0.0.0" + config :host, :validate => :string, :default => '0.0.0.0' # The TCP port to bind to config :port, :validate => :number, :default => 8080 @@ -79,10 +78,10 @@ class LogStash::Inputs::Http < LogStash::Inputs::Base # If the client doesn't provide a certificate, the connection will be closed. # # This option needs to be used with `ssl_certificate_authorities` and a defined list of CAs. - config :ssl_verify_mode, :validate => ["none", "peer", "force_peer"], :default => "none" + config :ssl_verify_mode, :validate => ['none', 'peer', 'force_peer'], :default => 'none' # Time in milliseconds for an incomplete ssl handshake to timeout - config :ssl_handshake_timeout, :validate => :number, :default => 10000 + config :ssl_handshake_timeout, :validate => :number, :default => 10_000 # The minimum TLS version allowed for the encrypted connections. The value must be one of the following: # 1.0 for TLS 1.0, 1.1 for TLS 1.1, 1.2 for TLS 1.2 @@ -98,16 +97,16 @@ class LogStash::Inputs::Http < LogStash::Inputs::Base # Apply specific codecs for specific content types. # The default codec will be applied only after this list is checked # and no codec for the request's content-type is found - config :additional_codecs, :validate => :hash, :default => { "application/json" => "json" } + config :additional_codecs, :validate => :hash, :default => { 'application/json' => 'json' } # specify a custom set of response headers config :response_headers, :validate => :hash, :default => { 'Content-Type' => 'text/plain' } # target field for the client host of the http request - config :remote_host_target_field, :validate => :string, :default => "host" + config :remote_host_target_field, :validate => :string, :default => 'host' # target field for the client host of the http request - config :request_headers_target_field, :validate => :string, :default => "headers" + config :request_headers_target_field, :validate => :string, :default => 'headers' config :threads, :validate => :number, :required => false, :default => ::LogStash::Config::CpuCoreStrategy.maximum @@ -122,11 +121,9 @@ class LogStash::Inputs::Http < LogStash::Inputs::Base config :keystore_password, :validate => :password, :deprecated => "Set 'ssl_key_passphrase' instead." config :verify_mode, :validate => ['none', 'peer', 'force_peer'], :default => 'none', - :deprecated => "Set 'ssl_verify_mode' instead." + :deprecated => "Set 'ssl_verify_mode' instead." - public def register - validate_ssl_settings! if @user && @password then @@ -134,40 +131,40 @@ def register @auth_token = "Basic #{token}" end - @codecs = Hash.new + @codecs = {} @additional_codecs.each do |content_type, codec| - @codecs[content_type] = LogStash::Plugin.lookup("codec", codec).new + @codecs[content_type] = LogStash::Plugin.lookup('codec', codec).new end - require "logstash/inputs/http/message_handler" + require 'logstash/inputs/http/message_handler' message_handler = MessageHandler.new(self, @codec, @codecs, @auth_token) @http_server = create_http_server(message_handler) - end # def register + end def run(queue) @queue = queue - @logger.info("Starting http input listener", :address => "#{@host}:#{@port}", :ssl => "#{@ssl}") - @http_server.run() + @logger.info('Starting http input listener', :address => "#{@host}:#{@port}", :ssl => "#{@ssl}") + @http_server.run end def stop - @http_server.close() rescue nil + @http_server.close rescue nil end def close - @http_server.close() rescue nil + @http_server.close rescue nil end def decode_body(headers, remote_address, body, default_codec, additional_codecs) - content_type = headers.fetch("content_type", "") + content_type = headers.fetch('content_type', '') codec = additional_codecs.fetch(HttpUtil.getMimeType(content_type), default_codec) codec.decode(body) { |event| push_decoded_event(headers, remote_address, event) } codec.flush { |event| push_decoded_event(headers, remote_address, event) } true rescue => e @logger.error( - "unable to process event.", + 'unable to process event.', :message => e.message, :class => e.class.name, :backtrace => e.backtrace @@ -184,33 +181,35 @@ def push_decoded_event(headers, remote_address, event) def validate_ssl_settings! if !@ssl - @logger.warn("SSL Certificate will not be used") if @ssl_certificate - @logger.warn("SSL Key will not be used") if @ssl_key - @logger.warn("SSL Java Key Store will not be used") if @keystore + @logger.warn('SSL Certificate will not be used') if @ssl_certificate + @logger.warn('SSL Key will not be used') if @ssl_key + @logger.warn('SSL Java Key Store will not be used') if @keystore elsif !(ssl_key_configured? || ssl_jks_configured?) - raise LogStash::ConfigurationError, "Certificate or JKS must be configured" + raise LogStash::ConfigurationError, 'Certificate or JKS must be configured' end - if @ssl && (original_params.key?("verify_mode") && original_params.key?("ssl_verify_mode")) - raise LogStash::ConfigurationError, "Both 'ssl_verify_mode' and 'verify_mode' were set. Use only 'ssl_verify_mode'." - elsif original_params.key?("verify_mode") + if @ssl && (original_params.key?('verify_mode') && original_params.key?('ssl_verify_mode')) + raise LogStash::ConfigurationError, "Both 'ssl_verify_mode' and 'verify_mode' were set. Use only 'ssl_verify_mode'." + elsif original_params.key?('verify_mode') @ssl_verify_mode_final = @verify_mode - elsif original_params.key?("ssl_verify_mode") + elsif original_params.key?('ssl_verify_mode') @ssl_verify_mode_final = @ssl_verify_mode else @ssl_verify_mode_final = @ssl_verify_mode end if @ssl && require_certificate_authorities? && !client_authentication? - raise LogStash::ConfigurationError, "Using `ssl_verify_mode` or `verify_mode` set to PEER or FORCE_PEER, requires the configuration of `ssl_certificate_authorities`" + raise LogStash::ConfigurationError, 'Using `ssl_verify_mode` or `verify_mode` set to PEER or FORCE_PEER, requires the configuration of `ssl_certificate_authorities`' elsif @ssl && !require_certificate_authorities? && client_authentication? - raise LogStash::ConfigurationError, "The configuration of `ssl_certificate_authorities` requires setting `ssl_verify_mode` or `verify_mode` to PEER or FORCE_PEER" + raise LogStash::ConfigurationError, 'The configuration of `ssl_certificate_authorities` requires setting `ssl_verify_mode` or `verify_mode` to PEER or FORCE_PEER' end end def create_http_server(message_handler) org.logstash.plugins.inputs.http.NettyHttpServer.new( - @host, @port, message_handler, build_ssl_params(), @threads, @max_pending_requests, @max_content_length) + @host, @port, message_handler, build_ssl_params, + @threads, @max_pending_requests, @max_content_length + ) end def build_ssl_params @@ -223,17 +222,15 @@ def build_ssl_params else begin ssl_builder = org.logstash.plugins.inputs.http.util.SslSimpleBuilder.new(@ssl_certificate, @ssl_key, @ssl_key_passphrase.nil? ? nil : @ssl_key_passphrase.value) - .setCipherSuites(normalized_ciphers) + .setCipherSuites(normalized_ciphers) rescue java.lang.IllegalArgumentException => e raise LogStash::ConfigurationError.new(e) end - if client_authentication? - ssl_builder.setCertificateAuthorities(@ssl_certificate_authorities) - end + ssl_builder.setCertificateAuthorities(@ssl_certificate_authorities) if client_authentication? end - ssl_context = ssl_builder.build() + ssl_context = ssl_builder.build ssl_handler_provider = org.logstash.plugins.inputs.http.util.SslHandlerProvider.new(ssl_context) ssl_handler_provider.setVerifyMode(@ssl_verify_mode_final.upcase) ssl_handler_provider.setProtocols(convert_protocols) @@ -251,11 +248,11 @@ def ssl_jks_configured? end def client_authentication? - @ssl_certificate_authorities && @ssl_certificate_authorities.size > 0 + @ssl_certificate_authorities && !@ssl_certificate_authorities.empty? end def require_certificate_authorities? - @ssl_verify_mode_final == "force_peer" || @ssl_verify_mode_final == "peer" + @ssl_verify_mode_final == 'force_peer' || @ssl_verify_mode_final == 'peer' end def normalized_ciphers @@ -265,5 +262,4 @@ def normalized_ciphers def convert_protocols TLS.get_supported(@tls_min_version..@tls_max_version).map(&:name) end - -end # class LogStash::Inputs::Http +end diff --git a/lib/logstash/inputs/http/message_handler.rb b/lib/logstash/inputs/http/message_handler.rb index aad1d95c..c2882809 100644 --- a/lib/logstash/inputs/http/message_handler.rb +++ b/lib/logstash/inputs/http/message_handler.rb @@ -1,12 +1,11 @@ -# encoding: utf-8 -require "logstash-input-http_jars" +require 'logstash-input-http_jars' java_import org.logstash.plugins.inputs.http.MessageHandler -java_import "io.netty.handler.codec.http.DefaultFullHttpResponse" -java_import "io.netty.handler.codec.http.HttpHeaderNames" -java_import "io.netty.handler.codec.http.HttpVersion" -java_import "io.netty.handler.codec.http.HttpResponseStatus" -java_import "io.netty.buffer.Unpooled" -java_import "io.netty.util.CharsetUtil" +java_import 'io.netty.handler.codec.http.DefaultFullHttpResponse' +java_import 'io.netty.handler.codec.http.HttpHeaderNames' +java_import 'io.netty.handler.codec.http.HttpVersion' +java_import 'io.netty.handler.codec.http.HttpResponseStatus' +java_import 'io.netty.buffer.Unpooled' +java_import 'io.netty.util.CharsetUtil' module LogStash module Inputs class Http class MessageHandler @@ -34,7 +33,7 @@ def onNewMessage(remote_address, headers, body) end def copy - MessageHandler.new(@input, @default_codec.clone, clone_additional_codecs(), @auth_token) + MessageHandler.new(@input, @default_codec.clone, clone_additional_codecs, @auth_token) end def clone_additional_codecs diff --git a/lib/logstash/inputs/http/tls.rb b/lib/logstash/inputs/http/tls.rb index 46c3bc4f..7f7f7df6 100644 --- a/lib/logstash/inputs/http/tls.rb +++ b/lib/logstash/inputs/http/tls.rb @@ -1,4 +1,3 @@ -# encoding: utf-8 module LogStash module Inputs class Http class TLS class TLSOption @@ -16,9 +15,9 @@ def <=>(other) end TLS_PROTOCOL_OPTIONS = [ - TLSOption.new("TLSv1", 1), - TLSOption.new("TLSv1.1", 1.1), - TLSOption.new("TLSv1.2", 1.2) + TLSOption.new('TLSv1', 1), + TLSOption.new('TLSv1.1', 1.1), + TLSOption.new('TLSv1.2', 1.2) ] def self.min @@ -32,7 +31,7 @@ def self.max def self.get_supported(versions) if versions.is_a?(Range) TLS_PROTOCOL_OPTIONS.select { |tls| versions.cover?(tls.version) } - else + else TLS_PROTOCOL_OPTIONS.select { |tls| versions == tls.version } end end diff --git a/lib/logstash/util/http_compressed_requests.rb b/lib/logstash/util/http_compressed_requests.rb index f1eeb002..53470b71 100644 --- a/lib/logstash/util/http_compressed_requests.rb +++ b/lib/logstash/util/http_compressed_requests.rb @@ -16,7 +16,7 @@ def call(env) begin extracted = decode(env['rack.input'], env['HTTP_CONTENT_ENCODING']) rescue Zlib::Error - return [400, {'Content-Type' => 'text/plain'}, ["Failed to decompress body"]] + return [400, { 'Content-Type' => 'text/plain' }, ['Failed to decompress body']] end env.delete('HTTP_CONTENT_ENCODING') @@ -29,11 +29,10 @@ def call(env) def decode(input, content_encoding) case content_encoding - when 'gzip' then - Zlib::GzipReader.new(input).read - when 'deflate' then - Zlib::Inflate.inflate(input.read) + when 'gzip' then + Zlib::GzipReader.new(input).read + when 'deflate' then + Zlib::Inflate.inflate(input.read) end end - end diff --git a/spec/inputs/http_spec.rb b/spec/inputs/http_spec.rb index 8f62aeab..5cefc992 100644 --- a/spec/inputs/http_spec.rb +++ b/spec/inputs/http_spec.rb @@ -1,26 +1,25 @@ -require "logstash/devutils/rspec/spec_helper" -require "logstash/inputs/http" -require "json" -require "manticore" -require "stud/temporary" -require "zlib" -require "stringio" +require 'logstash/devutils/rspec/spec_helper' +require 'logstash/inputs/http' +require 'json' +require 'manticore' +require 'stud/temporary' +require 'zlib' +require 'stringio' -java_import "io.netty.handler.ssl.util.SelfSignedCertificate" +java_import 'io.netty.handler.ssl.util.SelfSignedCertificate' describe LogStash::Inputs::Http do - before do srand(RSpec.configuration.seed) end let(:client) { Manticore::Client.new(client_options) } - let(:client_options) { { } } + let(:client_options) { {} } let(:logstash_queue) { Queue.new } - let(:port) { rand(5000) + 1025 } + let(:port) { rand(1025..6025) } - it_behaves_like "an interruptible input plugin" do - let(:config) { { "port" => port } } + it_behaves_like 'an interruptible input plugin' do + let(:config) { { 'port' => port } } end after :each do @@ -29,12 +28,12 @@ subject.stop end - describe "request handling" do - subject { LogStash::Inputs::Http.new("port" => port) } + describe 'request handling' do + subject { LogStash::Inputs::Http.new('port' => port) } before :each do subject.register - t = Thread.new { subject.run(logstash_queue) } + Thread.new { subject.run(logstash_queue) } ok = false until ok begin @@ -49,29 +48,35 @@ logstash_queue.pop if logstash_queue.size == 1 # pop test event end - describe "handling overflowing requests with a 429" do - let(:logstash_queue_size) { rand(10) + 1 } - let(:max_pending_requests) { rand(5) + 1 } - let(:threads) { rand(4) + 1 } + describe 'handling overflowing requests with a 429' do + let(:logstash_queue_size) { rand(1..10) } + let(:max_pending_requests) { rand(1..5) } + let(:threads) { rand(1..4) } let(:logstash_queue) { SizedQueue.new(logstash_queue_size) } - let(:client_options) { { - "request_timeout" => 0.1, - "connect_timeout" => 3, - "socket_timeout" => 0.1 - } } + let(:client_options) do + { + 'request_timeout' => 0.1, + 'connect_timeout' => 3, + 'socket_timeout' => 0.1 + } + end - subject { described_class.new("port" => port, "threads" => threads, "max_pending_requests" => max_pending_requests) } + subject { + described_class.new('port' => port, + 'threads' => threads, + 'max_pending_requests' => max_pending_requests) + } - context "when sending more requests than queue slots" do - it "should block when the queue is full" do + context 'when sending more requests than queue slots' do + it 'should block when the queue is full' do # these will queue and return 200 - logstash_queue_size.times.each do |i| + logstash_queue_size.times.each do response = client.post("http://127.0.0.1:#{port}", :body => '{}').call expect(response.code).to eq(200) end # these will block - (threads + max_pending_requests).times.each do |i| + (threads + max_pending_requests).times.each do expect { client.post("http://127.0.0.1:#{port}", :body => '{}').call }.to raise_error(Manticore::SocketTimeout) @@ -84,256 +89,261 @@ end end - describe "remote host" do - subject { LogStash::Inputs::Http.new(config.merge("port" => port)) } - context "by default" do + describe 'remote host' do + subject { LogStash::Inputs::Http.new(config.merge('port' => port)) } + context 'by default' do let(:config) { {} } - it "is written to the \"host\" field" do + it 'is written to the "host" field' do client.post("http://localhost:#{port}/meh.json", - :headers => { "content-type" => "text/plain" }, - :body => "hello").call + :headers => { 'content-type' => 'text/plain' }, + :body => 'hello').call event = logstash_queue.pop - expect(event.get("host")).to eq("127.0.0.1") + expect(event.get('host')).to eq('127.0.0.1') end end - context "when using remote_host_target_field" do - let(:config) { { "remote_host_target_field" => "remote_host" } } - it "is written to the value of \"remote_host_target_field\" property" do + context 'when using remote_host_target_field' do + let(:config) { { 'remote_host_target_field' => 'remote_host' } } + it 'is written to the value of "remote_host_target_field" property' do client.post("http://localhost:#{port}/meh.json", - :headers => { "content-type" => "text/plain" }, - :body => "hello").call + :headers => { 'content-type' => 'text/plain' }, + :body => 'hello').call event = logstash_queue.pop - expect(event.get("remote_host")).to eq("127.0.0.1") + expect(event.get('remote_host')).to eq('127.0.0.1') end end end - describe "request headers" do - subject { LogStash::Inputs::Http.new(config.merge("port" => port)) } - context "by default" do + describe 'request headers' do + subject { LogStash::Inputs::Http.new(config.merge('port' => port)) } + context 'by default' do let(:config) { {} } - it "are written to the \"headers\" field" do + it 'are written to the "headers" field' do client.post("http://localhost:#{port}/meh.json", - :headers => { "content-type" => "text/plain" }, - :body => "hello").call + :headers => { 'content-type' => 'text/plain' }, + :body => 'hello').call event = logstash_queue.pop - expect(event.get("headers")).to be_a(Hash) - expect(event.get("headers")).to include("request_method" => "POST") + expect(event.get('headers')).to be_a(Hash) + expect(event.get('headers')).to include('request_method' => 'POST') end end - context "when using request_headers_target_field" do - let(:config) { { "request_headers_target_field" => "request_headers" } } - it "are written to the field set in \"request_headers_target_field\"" do + context 'when using request_headers_target_field' do + let(:config) { { 'request_headers_target_field' => 'request_headers' } } + it 'are written to the field set in "request_headers_target_field"' do client.post("http://localhost:#{port}/meh.json", - :headers => { "content-type" => "text/plain" }, - :body => "hello").call + :headers => { 'content-type' => 'text/plain' }, + :body => 'hello').call event = logstash_queue.pop - expect(event.get("request_headers")).to be_a(Hash) - expect(event.get("request_headers")).to include("request_method" => "POST") + expect(event.get('request_headers')).to be_a(Hash) + expect(event.get('request_headers')).to include('request_method' => 'POST') end end end - it "should include remote host in \"host\" property" do + it 'should include remote host in "host" property' do client.post("http://127.0.0.1:#{port}/meh.json", - :headers => { "content-type" => "text/plain" }, - :body => "hello").call + :headers => { 'content-type' => 'text/plain' }, + :body => 'hello').call event = logstash_queue.pop - expect(event.get("host")).to eq("127.0.0.1") + expect(event.get('host')).to eq('127.0.0.1') end - context "with default codec" do - subject { LogStash::Inputs::Http.new("port" => port) } - context "when receiving a text/plain request" do - it "should process the request normally" do + context 'with default codec' do + subject { LogStash::Inputs::Http.new('port' => port) } + context 'when receiving a text/plain request' do + it 'should process the request normally' do client.post("http://127.0.0.1:#{port}/meh.json", - :headers => { "content-type" => "text/plain" }, - :body => "hello").call + :headers => { 'content-type' => 'text/plain' }, + :body => 'hello').call event = logstash_queue.pop - expect(event.get("message")).to eq("hello") + expect(event.get('message')).to eq('hello') end end - context "when receiving a deflate compressed text/plain request" do - it "should process the request normally" do + context 'when receiving a deflate compressed text/plain request' do + it 'should process the request normally' do client.post("http://127.0.0.1:#{port}/meh.json", - :headers => { "content-type" => "text/plain", "content-encoding" => "deflate" }, - :body => Zlib::Deflate.deflate("hello")).call + :headers => { 'content-type' => 'text/plain', 'content-encoding' => 'deflate' }, + :body => Zlib::Deflate.deflate('hello')).call event = logstash_queue.pop - expect(event.get("message")).to eq("hello") + expect(event.get('message')).to eq('hello') end end - context "when receiving a deflate text/plain request that cannot be decompressed" do + context 'when receiving a deflate text/plain request that cannot be decompressed' do let(:response) do response = client.post("http://127.0.0.1:#{port}/meh.json", - :headers => { "content-type" => "text/plain", "content-encoding" => "deflate" }, - :body => "hello").call + :headers => { 'content-type' => 'text/plain', 'content-encoding' => 'deflate' }, + :body => 'hello').call end - it "should respond with 400" do + it 'should respond with 400' do expect(response.code).to eq(400) end end - context "when receiving a gzip compressed text/plain request" do - it "should process the request normally" do - wio = StringIO.new("w") + context 'when receiving a gzip compressed text/plain request' do + it 'should process the request normally' do + wio = StringIO.new('w') z = Zlib::GzipWriter.new(wio) - z.write("hello") + z.write('hello') z.close entity = org.apache.http.entity.ByteArrayEntity.new(wio.string.to_java_bytes) response = client.post("http://127.0.0.1:#{port}", - :headers => { "Content-Encoding" => "gzip" }, - :entity => entity).call + :headers => { 'Content-Encoding' => 'gzip' }, + :entity => entity).call expect(response.code).to eq(200) event = logstash_queue.pop - expect(event.get("message")).to eq("hello") + expect(event.get('message')).to eq('hello') end end - context "when receiving a gzip text/plain request that cannot be decompressed" do + context 'when receiving a gzip text/plain request that cannot be decompressed' do let(:response) do client.post("http://127.0.0.1:#{port}", - :headers => { "Content-Encoding" => "gzip" }, - :body => Zlib::Deflate.deflate("hello")).call + :headers => { 'Content-Encoding' => 'gzip' }, + :body => Zlib::Deflate.deflate('hello')).call end - it "should respond with 400" do + it 'should respond with 400' do expect(response.code).to eq(400) end end - context "when receiving an application/json request" do - it "should parse the json body" do + context 'when receiving an application/json request' do + it 'should parse the json body' do client.post("http://127.0.0.1:#{port}/meh.json", - :headers => { "content-type" => "application/json" }, - :body => { "message_body" => "Hello" }.to_json).call + :headers => { 'content-type' => 'application/json' }, + :body => { 'message_body' => 'Hello' }.to_json).call event = logstash_queue.pop - expect(event.get("message_body")).to eq("Hello") + expect(event.get('message_body')).to eq('Hello') end end end - context "with json codec" do - subject { LogStash::Inputs::Http.new("port" => port, "codec" => "json") } - it "should parse the json body" do - response = client.post("http://127.0.0.1:#{port}/meh.json", :body => { "message" => "Hello" }.to_json).call + context 'with json codec' do + subject { LogStash::Inputs::Http.new('port' => port, 'codec' => 'json') } + it 'should parse the json body' do + client.post("http://127.0.0.1:#{port}/meh.json", :body => { 'message' => 'Hello' }.to_json).call event = logstash_queue.pop - expect(event.get("message")).to eq("Hello") + expect(event.get('message')).to eq('Hello') end end - context "with json_lines codec without final delimiter" do - subject { LogStash::Inputs::Http.new("port" => port, "codec" => "json_lines") } + context 'with json_lines codec without final delimiter' do + subject { LogStash::Inputs::Http.new('port' => port, 'codec' => 'json_lines') } let(:line1) { '{"foo": 1}' } let(:line2) { '{"foo": 2}' } - it "should parse all json_lines in body including last one" do + it 'should parse all json_lines in body including last one' do client.post("http://localhost:#{port}/meh.json", :body => "#{line1}\n#{line2}").call expect(logstash_queue.size).to eq(2) event = logstash_queue.pop - expect(event.get("foo")).to eq(1) + expect(event.get('foo')).to eq(1) event = logstash_queue.pop - expect(event.get("foo")).to eq(2) + expect(event.get('foo')).to eq(2) end end - context "when using a custom codec mapping" do - subject { LogStash::Inputs::Http.new("port" => port, - "additional_codecs" => { "application/json" => "plain" }) } - it "should decode the message accordingly" do - body = { "message" => "Hello" }.to_json + context 'when using a custom codec mapping' do + subject { + LogStash::Inputs::Http.new('port' => port, + 'additional_codecs' => { 'application/json' => 'plain' }) + } + it 'should decode the message accordingly' do + body = { 'message' => 'Hello' }.to_json client.post("http://127.0.0.1:#{port}/meh.json", - :headers => { "content-type" => "application/json" }, - :body => body).call + :headers => { 'content-type' => 'application/json' }, + :body => body).call event = logstash_queue.pop - expect(event.get("message")).to eq(body) + expect(event.get('message')).to eq(body) end end - - context "when receiving a content-type with a charset" do - subject { LogStash::Inputs::Http.new("port" => port, - "additional_codecs" => { "application/json" => "plain" }) } - it "should decode the message accordingly" do - body = { "message" => "Hello" }.to_json + + context 'when receiving a content-type with a charset' do + subject { + LogStash::Inputs::Http.new('port' => port, + 'additional_codecs' => { 'application/json' => 'plain' }) + } + it 'should decode the message accordingly' do + body = { 'message' => 'Hello' }.to_json client.post("http://127.0.0.1:#{port}/meh.json", - :headers => { "content-type" => "application/json; charset=utf-8" }, - :body => body).call + :headers => { 'content-type' => 'application/json; charset=utf-8' }, + :body => body).call event = logstash_queue.pop - expect(event.get("message")).to eq(body) + expect(event.get('message')).to eq(body) end end - context "when using custom headers" do + context 'when using custom headers' do let(:custom_headers) { { 'access-control-allow-origin' => '*' } } - subject { LogStash::Inputs::Http.new("port" => port, "response_headers" => custom_headers) } + subject { LogStash::Inputs::Http.new('port' => port, 'response_headers' => custom_headers) } - describe "the response" do - it "should include the custom headers" do - response = client.post("http://127.0.0.1:#{port}/meh", :body => "hello").call + describe 'the response' do + it 'should include the custom headers' do + response = client.post("http://127.0.0.1:#{port}/meh", :body => 'hello').call expect(response.headers.to_hash).to include(custom_headers) end end end - describe "basic auth" do - user = "test"; password = "pwd" - subject { LogStash::Inputs::Http.new("port" => port, "user" => user, "password" => password) } + describe 'basic auth' do + user = 'test' + password = 'pwd' + subject { LogStash::Inputs::Http.new('port' => port, 'user' => user, 'password' => password) } let(:auth_token) { Base64.strict_encode64("#{user}:#{password}") } context "when client doesn't present auth token" do - let!(:response) { client.post("http://127.0.0.1:#{port}/meh", :body => "hi").call } - it "should respond with 401" do + let!(:response) { client.post("http://127.0.0.1:#{port}/meh", :body => 'hi').call } + it 'should respond with 401' do expect(response.code).to eq(401) end - it "should not generate an event" do + it 'should not generate an event' do expect(logstash_queue).to be_empty end end - context "when client presents incorrect auth token" do + context 'when client presents incorrect auth token' do let!(:response) do client.post("http://127.0.0.1:#{port}/meh", :headers => { - "content-type" => "text/plain", - "authorization" => "Basic meh" + 'content-type' => 'text/plain', + 'authorization' => 'Basic meh' }, - :body => "hi").call + :body => 'hi').call end - it "should respond with 401" do + it 'should respond with 401' do expect(response.code).to eq(401) end - it "should not generate an event" do + it 'should not generate an event' do expect(logstash_queue).to be_empty end end - context "when client presents correct auth token" do + context 'when client presents correct auth token' do let!(:response) do client.post("http://127.0.0.1:#{port}/meh", :headers => { - "content-type" => "text/plain", - "authorization" => "Basic #{auth_token}" - }, :body => "hi").call + 'content-type' => 'text/plain', + 'authorization' => "Basic #{auth_token}" + }, :body => 'hi').call end - it "should respond with 200" do + it 'should respond with 200' do expect(response.code).to eq(200) end - it "should generate an event" do + it 'should generate an event' do expect(logstash_queue).to_not be_empty end end end - describe "HTTP Protocol Handling" do - context "when an HTTP1.1 request is made" do + describe 'HTTP Protocol Handling' do + context 'when an HTTP1.1 request is made' do let(:protocol_version) do Java::OrgApacheHttp::HttpVersion::HTTP_1_1 end - it "responds with a HTTP1.1 response" do - response = client.post("http://127.0.0.1:#{port}", :body => "hello") + it 'responds with a HTTP1.1 response' do + response = client.post("http://127.0.0.1:#{port}", :body => 'hello') response.request.set_protocol_version(protocol_version) response.call response_protocol_version = response.instance_variable_get(:@response).get_protocol_version expect(response_protocol_version).to eq(protocol_version) end end - context "when an HTTP1.0 request is made" do + context 'when an HTTP1.0 request is made' do let(:protocol_version) do Java::OrgApacheHttp::HttpVersion::HTTP_1_0 end - it "responds with a HTTP1.0 response" do - response = client.post("http://127.0.0.1:#{port}", :body => "hello") + it 'responds with a HTTP1.0 response' do + response = client.post("http://127.0.0.1:#{port}", :body => 'hello') response.request.set_protocol_version(protocol_version) response.call response_protocol_version = response.instance_variable_get(:@response).get_protocol_version @@ -343,75 +353,81 @@ end end - context "with :ssl => false" do - subject { LogStash::Inputs::Http.new("port" => port, "ssl" => false) } - it "should not raise exception" do + context 'with :ssl => false' do + subject { LogStash::Inputs::Http.new('port' => port, 'ssl' => false) } + it 'should not raise exception' do expect { subject.register }.to_not raise_exception end end - context "with :ssl => true" do - context "without :ssl_certificate" do - subject { LogStash::Inputs::Http.new("port" => port, "ssl" => true) } - it "should raise exception" do + context 'with :ssl => true' do + context 'without :ssl_certificate' do + subject { LogStash::Inputs::Http.new('port' => port, 'ssl' => true) } + it 'should raise exception' do expect { subject.register }.to raise_exception(LogStash::ConfigurationError) end end - context "with :ssl_certificate" do + context 'with :ssl_certificate' do let(:ssc) { SelfSignedCertificate.new } let(:ssl_certificate) { ssc.certificate } let(:ssl_key) { ssc.private_key } after(:each) { ssc.delete } - subject { LogStash::Inputs::Http.new("port" => port, "ssl" => true, - "ssl_certificate" => ssl_certificate.path, - "ssl_key" => ssl_key.path) } - it "should not raise exception" do + subject { + LogStash::Inputs::Http.new('port' => port, 'ssl' => true, + 'ssl_certificate' => ssl_certificate.path, + 'ssl_key' => ssl_key.path) + } + it 'should not raise exception' do expect { subject.register }.to_not raise_exception end - context "with ssl_verify_mode = none" do - subject { LogStash::Inputs::Http.new("port" => port, "ssl" => true, - "ssl_certificate" => ssl_certificate.path, - "ssl_key" => ssl_key.path, - "ssl_verify_mode" => "none" - ) } - it "should not raise exception" do + context 'with ssl_verify_mode = none' do + subject { + LogStash::Inputs::Http.new('port' => port, 'ssl' => true, + 'ssl_certificate' => ssl_certificate.path, + 'ssl_key' => ssl_key.path, + 'ssl_verify_mode' => 'none') + } + it 'should not raise exception' do expect { subject.register }.to_not raise_exception end end - ["peer", "force_peer"].each do |verify_mode| + ['peer', 'force_peer'].each do |verify_mode| context "with ssl_verify_mode = #{verify_mode}" do - subject { LogStash::Inputs::Http.new("port" => port, "ssl" => true, - "ssl_certificate" => ssl_certificate.path, - "ssl_certificate_authorities" => ssl_certificate.path, - "ssl_key" => ssl_key.path, - "ssl_verify_mode" => verify_mode - ) } - it "should not raise exception" do + subject { + LogStash::Inputs::Http.new('port' => port, 'ssl' => true, + 'ssl_certificate' => ssl_certificate.path, + 'ssl_certificate_authorities' => ssl_certificate.path, + 'ssl_key' => ssl_key.path, + 'ssl_verify_mode' => verify_mode) + } + it 'should not raise exception' do expect { subject.register }.to_not raise_exception end end end - context "with verify_mode = none" do - subject { LogStash::Inputs::Http.new("port" => port, "ssl" => true, - "ssl_certificate" => ssl_certificate.path, - "ssl_key" => ssl_key.path, - "verify_mode" => "none" - ) } - it "should not raise exception" do + context 'with verify_mode = none' do + subject { + LogStash::Inputs::Http.new('port' => port, 'ssl' => true, + 'ssl_certificate' => ssl_certificate.path, + 'ssl_key' => ssl_key.path, + 'verify_mode' => 'none') + } + it 'should not raise exception' do expect { subject.register }.to_not raise_exception end end - ["peer", "force_peer"].each do |verify_mode| + ['peer', 'force_peer'].each do |verify_mode| context "with verify_mode = #{verify_mode}" do - subject { LogStash::Inputs::Http.new("port" => port, "ssl" => true, - "ssl_certificate" => ssl_certificate.path, - "ssl_certificate_authorities" => ssl_certificate.path, - "ssl_key" => ssl_key.path, - "verify_mode" => verify_mode - ) } - it "should not raise exception" do + subject { + LogStash::Inputs::Http.new('port' => port, 'ssl' => true, + 'ssl_certificate' => ssl_certificate.path, + 'ssl_certificate_authorities' => ssl_certificate.path, + 'ssl_key' => ssl_key.path, + 'verify_mode' => verify_mode) + } + it 'should not raise exception' do expect { subject.register }.to_not raise_exception end end