diff --git a/contrib/log-courier.init b/contrib/log-courier.init new file mode 100644 index 00000000..13a6ff1c --- /dev/null +++ b/contrib/log-courier.init @@ -0,0 +1,96 @@ +#!/bin/sh +# +# log-courier Log Courier +# +# chkconfig: 2345 90 10 +# description: Controls the Log Courier daemon +# +### BEGIN INIT INFO +# Provides: log-courier +# Required-Start: $local_fs $remote_fs $syslog +# Required-Stop: $local_fs $remote_fs $syslog +# Default-Start: 2 3 4 5 +# Default-Stop: 0 1 6 +# Short-Description: Log Courier +### END INIT INFO + +# source function library +. /etc/rc.d/init.d/functions + +DAEMON='/usr/sbin/log-courier' +DATA_DIR='/var/lib/log-courier' +CONFIG_FILE='/etc/log-courier/log-courier.conf' +PID_FILE='/var/run/log-courier.pid' + +do_start() { + echo -n "Starting Log Courier: " + status -p $PID_FILE $DAEMON &>/dev/null + RC=$? + if [ $RC -eq 0 ]; then + success + else + cd $DATA_DIR + nohup ${DAEMON} -config="${CONFIG_FILE}" /dev/null & + RC=$? + echo $! > ${PID_FILE} + [ $RC -eq 0 ] && success || failure + fi + echo + return $? +} + +do_reload() { + echo -n "Reloading Log Courier: " + killproc -p $PID_FILE $DAEMON -HUP + RC=$? + echo +} + +case "$1" in + start) + $0 configtest && do_start + RC=$? + ;; + stop) + echo -n "Stopping Log Courier: " + killproc -p $PID_FILE -d 30 $DAEMON + RC=$? + echo + ;; + status) + echo "Log Courier status:" + status -p $PID_FILE $DAEMON + RC=$? + ;; + reload) + $0 configtest && do_reload + ;; + restart) + $0 configtest + RC=$? + if [ $RC -eq 0 ]; then + $0 stop + do_start + RC=$? + fi + ;; + configtest) + echo -n "Configuration test: " + TESTRESULT=$( ${DAEMON} -config="${CONFIG_FILE}" -config-test ) + RC=$? + if [ $RC -ne 0 ]; then + failure + echo + echo "${TESTRESULT}" + else + success + echo + fi + ;; + *) + echo "Usage: $0 start|stop|status|reload|restart|configtest" + exit 1 + ;; +esac + +exit $RC diff --git a/docs/AdministrationUtility.md b/docs/AdministrationUtility.md index 02381f43..dd546353 100644 --- a/docs/AdministrationUtility.md +++ b/docs/AdministrationUtility.md @@ -18,9 +18,11 @@ The `lc-admin` command allows you to remotely (or locally) connect to a running Log Courier instance to monitor and control log shipping. -In order to connect, the `admin enabled` general configuration entry must be set -to `true` and an `admin port` specified. See [Configuration](Configuration.md) -for more information on these options. +To enable a Log Courier instance to receive administration connections, set the +`admin enabled` general configuration entry to `true`. To specify a custom +listen address, set the `admin listen address` entry. See +[Configuration](Configuration.md) for more information on these options and the +default listen address. ## Available Commands @@ -60,8 +62,7 @@ Following is an example of the output this command provides. The `lc-admin` command accepts the following command line options. - -host="127.0.0.1": the Log Courier host to connect to (default 127.0.0.1) - -port=1234: the Log Courier monitor port (default 1234) + -connect="tcp:127.0.0.1:1234": the Log Courier address to connect to -quiet=false: quietly execute the command line argument and output only the result -version=false: display the Log Courier client version -watch=false: repeat the command specified on the command line every second diff --git a/docs/ChangeLog.md b/docs/ChangeLog.md index 0280f15d..c3d3124b 100644 --- a/docs/ChangeLog.md +++ b/docs/ChangeLog.md @@ -4,6 +4,7 @@ **Table of Contents** *generated with [DocToc](http://doctoc.herokuapp.com/)* +- [0.14](#014) - [0.13](#013) - [0.12](#012) - [0.11](#011) @@ -13,6 +14,34 @@ +## 0.14 + +*18th September 2014* + +**Breaking Changes** + +* The 'file' field in generated events has been renamed to 'path'. This +normalises the events with those generated by Logstash itself, and means the +Logstash `multiline` filter's default `stream_identity` setting is compatible. + +**Changes** + +* Fix connection failure and retry sometimes entering an unrecoverable state +that freezes log shipping +* Fix ProtocolError with large log packets and on idle connections (since 0.13) +* Provide more information when the gem encounters ProtocolError failures +* Fix ssl_verify usage triggering error, "Either 'ssl_verify_default_ca' or +'ssl_verify_ca' must be specified when ssl_verify is true" (#41) +* Fix (#45) +* Restore message reliability and correctly perform partial ack. Since 0.9 a +full event spool from log-courier could be lost (default 1024) instead of just +* Significantly improve Log Courier gem performance within JRuby by switching +JrJackson parse mode from string to raw+bigdecimal +* Add unix domain socket support to the administration connection +* Provide publisher connection status via the administration connection +* Gracefully handle lines greater than 1 MiB in size by splitting and tagging +them, and make the size configurable (#40) + ## 0.13 *30th August 2014* diff --git a/docs/Configuration.md b/docs/Configuration.md index 5a1a63d6..7dbec630 100644 --- a/docs/Configuration.md +++ b/docs/Configuration.md @@ -13,12 +13,13 @@ - [Fileglob](#fileglob) - [`"general"`](#general) - [`"admin enabled"`](#admin-enabled) - - [`"admin bind address"`](#admin-bind-address) - - [`"admin port"`](#admin-port) + - [`"admin listen address"`](#admin-listen-address) - [`"persist directory"`](#persist-directory) - [`"prospect interval"`](#prospect-interval) - [`"spool size"`](#spool-size) + - [`"spool max bytes"`](#spool-max-bytes) - [`"spool timeout"`](#spool-timeout) + - [`"max line bytes"`](#max-line-bytes) - [`"log level"`](#log-level) - [`"log stdout"`](#log-stdout) - [`"log syslog"`](#log-syslog) @@ -153,26 +154,32 @@ of new log files. ### `"admin enabled"` -*Boolean. Optional. Default: false* +*Boolean. Optional. Default: false* *Requires restart* Enables the administration listener that the `lc-admin` utility can connect to. -### `"admin bind address"` +### `"admin listen address"` -*String. Optional. Default: 127.0.0.1* +*String. Optional. Default: tcp:127.0.0.1:1234* -The TCP address the administration listener should listen on. +The address the administration listener should listen on in the format +`transport:address`. -### `"admin port"` +Allowed transports are "tcp", "tcp4", "tcp6" (Windows and *nix) and "unix" +(*nix only). For the tcp transports the address format is `host:port`. For the +unix transport the address should specify a filename to use when creating the +unix domain socket. If no transport name is specified, "tcp" is assumed. -*Number. Required with "admin enabled" = true* +Examples: -The TCP port the administration listener should listen on. + 127.0.0.1:1234 + tcp:127.0.0.1:1234 + unix:/var/run/log-courier/admin.socket ### `"persist directory"` -*String. Optional. Default: "."* +*String. Optional. Default: "."* *Requires restart* The directory that Log Courier should store its persistence data in. The default @@ -207,31 +214,53 @@ usage. easily cope with over 10,000 events a second on most machines and uses little memory. It is useful only in very specific circumstances.* +### `"spool max bytes"` + +*Number. Optional. Default: 10485760* + +The maximum size of an event spool, before compression. If an incomplete spool +does not have enough room for the next event, it will be flushed immediately. + +If this value is modified, the receiving end should also be configured with the +new limit. For the Logstash plugin, this is the `max_packet_size` setting. + ### `"spool timeout"` *Duration. Optional. Default: 5* The maximum amount of time to wait for a full spool. If an incomplete spool is -not filled within this time limit, the spool will be flushed regardless. +not filled within this time limit, the spool will be flushed immediately. + +### `"max line bytes"` + +*Number. Optional. Default: 1048576* + +The maxmimum line length to process. If a line exceeds this length, it will be +split across multiple events. Each split line will have a "tag" field added +containing the tag "splitline". The final part of the line will not have a "tag" +field added. + +If the `fields` configuration already contained a "tags" entry, and it is an +array, it will be appended to. Otherwise, the "tag" field will be left as is. ### `"log level"` -*String. Optional. Default: "info". -Available values: "critical", "error", "warning", "notice", "info", "debug"* +*String. Optional. Default: "info". +Available values: "critical", "error", "warning", "notice", "info", "debug"* *Requires restart* The maximum level of detail to produce in Log Courier's internal log. ### `"log stdout"` -*Boolean. Optional. Default: true* +*Boolean. Optional. Default: true* *Requires restart* Enables sending of Log Courier's internal log to the console (stdout). May be used in conjunction with `"log syslog"` and `"log file"`. ### `"log syslog"` -*Boolean. Optional. Default: false* +*Boolean. Optional. Default: false* *Requires restart* Enables sending of Log Courier's internal log to syslog. May be used in conjunction with `"log stdout"` and `"log file"`. @@ -240,7 +269,7 @@ Enables sending of Log Courier's internal log to syslog. May be used in conjunct ### `"log file"` -*Filepath. Optional* +*Filepath. Optional* *Requires restart* A log file to save Log Courier's internal log into. May be used in conjunction with `"log stdout"` and `"log syslog"`. diff --git a/docs/LogstashIntegration.md b/docs/LogstashIntegration.md index e65e7855..75ec29c2 100644 --- a/docs/LogstashIntegration.md +++ b/docs/LogstashIntegration.md @@ -81,6 +81,8 @@ The following options are available for the input plugin: * ssl_verify_ca - Path to an SSL CA certificate to use for client certificate verification (tls) * curve_secret_key - CurveZMQ secret key for the server (zmq) +* max_packet_size - The maximum packet size to accept (default 10485760, +corresponds to Log Courier's `"spool max bytes"`) The following options are available for the output plugin: diff --git a/lib/log-courier/client.rb b/lib/log-courier/client.rb index c3f7c624..409af0cc 100644 --- a/lib/log-courier/client.rb +++ b/lib/log-courier/client.rb @@ -236,7 +236,7 @@ def run_io rescue => e # Unknown error occurred @logger.warn("[LogCourierClient] Unknown error: #{e}") unless @logger.nil? - @logger.debug("[LogCourierClient] #{e.backtrace}: #{e.message} (#{e.class})") unless @logger.nil? || !@logger.debug? + @logger.warn("[LogCourierClient] #{e.backtrace}: #{e.message} (#{e.class})") unless @logger.nil? end # Disconnect and retry payloads @@ -311,8 +311,7 @@ def buffer_jdat_data_event(buffer, event) def process_pong(message) # Sanity if message.length != 0 - # TODO: log something - raise ClientProtocolError + raise ClientProtocolError, "Unexpected data attached to pong message (#{message.length})" end # No longer pending a PONG @@ -322,8 +321,7 @@ def process_pong(message) def process_ackn(message) # Sanity if message.length != 20 - # TODO: log something - raise ClientProtocolError + raise ClientProtocolError, "ACKN message size invalid (#{message.length})" end # Grab nonce diff --git a/lib/log-courier/client_tls.rb b/lib/log-courier/client_tls.rb index f51b7c6c..bb267223 100644 --- a/lib/log-courier/client_tls.rb +++ b/lib/log-courier/client_tls.rb @@ -115,7 +115,7 @@ def run_send(io_control) # Just shutdown rescue => e @logger.warn("[LogCourierClient] Unknown SSL write error: #{e}") unless @logger.nil? - @logger.debug("[LogCourierClient] #{e.backtrace}: #{e.message} (#{e.class})") unless @logger.nil? || !@logger.debug? + @logger.warn("[LogCourierClient] #{e.backtrace}: #{e.message} (#{e.class})") unless @logger.nil? io_control << ['F'] end @@ -151,7 +151,7 @@ def run_recv(io_control) # Just shutdown rescue => e @logger.warn("[LogCourierClient] Unknown SSL read error: #{e}") unless @logger.nil? - @logger.debug("[LogCourierClient] #{e.backtrace}: #{e.message} (#{e.class})") unless @logger.nil? || !@logger.debug? + @logger.warn("[LogCourierClient] #{e.backtrace}: #{e.message} (#{e.class})") unless @logger.nil? io_control << ['F'] end @@ -211,7 +211,7 @@ def tls_connect 0 rescue => e @logger.warn("[LogCourierClient] Unknown connection failure to #{@options[:addresses][0]}:#{@options[:port]}: #{e}") unless @logger.nil? - @logger.debug("[LogCourierClient] #{e.backtrace}: #{e.message} (#{e.class})") unless @logger.nil? || !@logger.debug? + @logger.warn("[LogCourierClient] #{e.backtrace}: #{e.message} (#{e.class})") unless @logger.nil? end end end diff --git a/lib/log-courier/server.rb b/lib/log-courier/server.rb index eff9f1fe..d007fcda 100644 --- a/lib/log-courier/server.rb +++ b/lib/log-courier/server.rb @@ -55,11 +55,12 @@ def initialize(options = {}) # Load the json adapter @json_adapter = MultiJson.adapter.instance + @json_options = { raw: true, use_bigdecimal: true } end def run(&block) # TODO: Make queue size configurable - event_queue = EventQueue.new 10 + event_queue = EventQueue.new 1 server_thread = nil begin @@ -81,10 +82,7 @@ def run(&block) end loop do - events = event_queue.pop - events.each do |event| - block.call event - end + block.call event_queue.pop end ensure # Signal the server thread to stop @@ -98,8 +96,7 @@ def run(&block) def process_ping(message, comm) # Size of message should be 0 if message.length != 0 - # TODO: log something - raise ProtocolError + raise ProtocolError, "unexpected data attached to ping message (#{message.length})" end # PONG! @@ -115,8 +112,7 @@ def process_jdat(message, comm, event_queue) # This allows the client to know what is being acknowledged # Nonce is 16 so check we have enough if message.length < 17 - # TODO: log something - raise ProtocolError + raise ProtocolError, "JDAT message too small (#{message.length})" end nonce = message[0...16] @@ -138,9 +134,7 @@ def process_jdat(message, comm, event_queue) # Finished! break elsif length_buf.length < 4 - @logger.warn("length extraction failed #{ret} #{length_buf.length}") - # TODO: log something - raise ProtocolError + raise ProtocolError, "JDAT length extraction failed (#{ret} #{length_buf.length})" end length = length_buf.unpack('N').first @@ -148,9 +142,8 @@ def process_jdat(message, comm, event_queue) # Extract message ret = message.read length, data_buf if ret.nil? or data_buf.length < length - @logger.warn("message extraction failed #{ret} #{data_buf.length}") - # TODO: log something - raise ProtocolError + @logger.warn() + raise ProtocolError, "JDAT message extraction failed #{ret} #{data_buf.length}" end data_buf.force_encoding('utf-8') @@ -164,31 +157,29 @@ def process_jdat(message, comm, event_queue) # Decode the JSON begin - event = @json_adapter.load(data_buf) + event = @json_adapter.load(data_buf, @json_options) rescue MultiJson::ParserError => e @logger.warn("[LogCourierServer] JSON parse failure, falling back to plain-text: #{e}") unless @logger.nil? event = { 'message' => data_buf } end - events << event + # Queue the event + begin + event_queue.push event, @ack_timeout - Time.now.to_i + rescue TimeoutError + # Full pipeline, partial ack + # NOTE: comm.send can raise a Timeout::Error of its own + comm.send 'ACKN', [nonce, sequence].pack('A*N') + reset_ack_timeout + retry + end sequence += 1 end - # Queue the events - begin - event_queue.push events, @ack_timeout - Time.now.to_i - rescue TimeoutError - # Full pipeline, partial ack - # NOTE: comm.send can raise a Timeout::Error of its own - comm.send('ACKN', [nonce, sequence].pack('A*N')) - reset_ack_timeout - retry - end - # Acknowledge the full message # NOTE: comm.send can raise a Timeout::Error - comm.send('ACKN', [nonce, sequence].pack('A*N')) + comm.send 'ACKN', [nonce, sequence].pack('A*N') end def reset_ack_timeout diff --git a/lib/log-courier/server_tcp.rb b/lib/log-courier/server_tcp.rb index a4602c9d..83c50ef1 100644 --- a/lib/log-courier/server_tcp.rb +++ b/lib/log-courier/server_tcp.rb @@ -49,7 +49,8 @@ def initialize(options = {}) ssl_key_passphrase: nil, ssl_verify: false, ssl_verify_default_ca: false, - ssl_verify_ca: nil + ssl_verify_ca: nil, + max_packet_size: 10_485_760, }.merge!(options) @logger = @options[:logger] @@ -59,7 +60,7 @@ def initialize(options = {}) raise "[LogCourierServer] '#{k}' is required" if @options[k].nil? end - if @options[:ssl_verify] and (not @options[:ssl_verify_default_ca] && @options[:ssl_verify_ca].nil?) + if @options[:ssl_verify] and (!@options[:ssl_verify_default_ca] && @options[:ssl_verify_ca].nil?) raise '[LogCourierServer] Either \'ssl_verify_default_ca\' or \'ssl_verify_ca\' must be specified when ssl_verify is true' end end @@ -132,7 +133,7 @@ def run(&block) # Start a new connection thread client_threads[client] = Thread.new(client, peer) do |client_copy, peer_copy| - ConnectionTcp.new(@logger, client_copy, peer_copy).run(&block) + ConnectionTcp.new(@logger, client_copy, peer_copy, @options).run(&block) end end rescue ShutdownSignal @@ -154,11 +155,12 @@ def run(&block) class ConnectionTcp attr_accessor :peer - def initialize(logger, fd, peer) + def initialize(logger, fd, peer, options) @logger = logger @fd = fd @peer = peer @in_progress = false + @options = options end def run @@ -171,16 +173,22 @@ def run signature, length = recv(8).unpack('A4N') # Sanity - if length > 1_048_576 - # TODO: log something - raise ProtocolError + if length > @options[:max_packet_size] + raise ProtocolError, "packet too large (#{length} > #{@options[:max_packet_size]})" end # While we're processing, EOF is bad as it may occur during send @in_progress = true # Read the message - yield signature, recv(length), self + if length == 0 + data = '' + else + data = recv(length) + end + + # Send for processing + yield signature, data, self # If we EOF next it's a graceful close @in_progress = false @@ -206,43 +214,56 @@ def run rescue => e # Some other unknown problem @logger.warn("[LogCourierServer] Unknown error on connection from #{@peer}: #{e}") unless @logger.nil? - @logger.debug("[LogCourierServer] #{e.backtrace}: #{e.message} (#{e.class})") unless @logger.nil? || !@logger.debug? + @logger.warn("[LogCourierServer] #{e.backtrace}: #{e.message} (#{e.class})") unless @logger.nil? ensure @fd.close rescue nil end def recv(need) reset_timeout - begin - buffer = @fd.read_nonblock need - rescue IO::WaitReadable - raise TimeoutError if IO.select([@fd], nil, [@fd], @timeout - Time.now.to_i).nil? - retry - rescue IO::WaitWritable - raise TimeoutError if IO.select(nil, [@fd], [@fd], @timeout - Time.now.to_i).nil? - retry - end - if buffer.nil? - raise EOFError - elsif buffer.length < need - raise ProtocolError + have = '' + loop do + begin + buffer = @fd.read_nonblock need - have.length + rescue IO::WaitReadable + raise TimeoutError if IO.select([@fd], nil, [@fd], @timeout - Time.now.to_i).nil? + retry + rescue IO::WaitWritable + raise TimeoutError if IO.select(nil, [@fd], [@fd], @timeout - Time.now.to_i).nil? + retry + end + if buffer.nil? + raise EOFError + elsif buffer.length == 0 + raise ProtocolError, "read failure (#{have.length}/#{need})" + end + if have.length == 0 + have = buffer + else + have << buffer + end + break if have.length >= need end - buffer + have end def send(signature, message) reset_timeout - - written = 0 data = signature + [message.length].pack('N') + message - begin - written = @fd.write_nonblock(data[written...data.length]) - rescue IO::WaitReadable - raise TimeoutError if IO.select([@fd], nil, [@fd], @timeout - Time.now.to_i).nil? - retry - rescue IO::WaitWritable - raise TimeoutError if IO.select(nil, [@fd], [@fd], @timeout - Time.now.to_i).nil? - retry + done = 0 + loop do + begin + written = @fd.write_nonblock(data[done...data.length]) + rescue IO::WaitReadable + raise TimeoutError if IO.select([@fd], nil, [@fd], @timeout - Time.now.to_i).nil? + retry + rescue IO::WaitWritable + raise TimeoutError if IO.select(nil, [@fd], [@fd], @timeout - Time.now.to_i).nil? + retry + end + raise ProtocolError, "write failure (#{done}/#{data.length})" if written == 0 + done += written + break if done >= data.length end end diff --git a/lib/log-courier/server_zmq.rb b/lib/log-courier/server_zmq.rb index ab7dee9c..328cf505 100644 --- a/lib/log-courier/server_zmq.rb +++ b/lib/log-courier/server_zmq.rb @@ -29,7 +29,8 @@ def initialize(options = {}) transport: 'zmq', port: 0, address: '0.0.0.0', - curve_secret_key: nil + curve_secret_key: nil, + max_packet_size: 10_485_760, }.merge!(options) @logger = @options[:logger] @@ -141,7 +142,7 @@ def run(&block) rescue => e # Some other unknown problem @logger.warn("[LogCourierServer] Unknown error: #{e}") unless @logger.nil? - @logger.debug("[LogCourierServer] #{e.backtrace}: #{e.message} (#{e.class})") unless @logger.nil? || !@logger.debug? + @logger.warn("[LogCourierServer] #{e.backtrace}: #{e.message} (#{e.class})") unless @logger.nil? ensure @socket.close @context.terminate @@ -160,6 +161,9 @@ def recv(data) if data.length - 8 != length @logger.warn "[LogCourierServer] Invalid message: data has invalid length (#{data.length - 8} != #{length})" unless @logger.nil? return + elsif length > @options[:max_packet_size] + @logger.warn "[LogCourierServer] Invalid message: packet too large (#{length} > #{@options[:max_packet_size]})" unless @logger.nil? + return end # Yield the parts diff --git a/lib/logstash/inputs/courier.rb b/lib/logstash/inputs/courier.rb index a8d975bd..70da59e2 100644 --- a/lib/logstash/inputs/courier.rb +++ b/lib/logstash/inputs/courier.rb @@ -60,25 +60,32 @@ class Courier < LogStash::Inputs::Base # Curve secret key config :curve_secret_key, :validate => :string + # Max packet size + config :max_packet_size, :validate => :number + public def register @logger.info('Starting courier input listener', :address => "#{@host}:#{@port}") + options = { + logger: @logger, + address: @host, + port: @port, + transport: @transport, + ssl_certificate: @ssl_certificate, + ssl_key: @ssl_key, + ssl_key_passphrase: @ssl_key_passphrase, + ssl_verify: @ssl_verify, + ssl_verify_default_ca: @ssl_verify_default_ca, + ssl_verify_ca: @ssl_verify_ca, + curve_secret_key: @curve_secret_key + } + + options[:max_packet_size] = @max_packet_size unless @max_packet_size.nil? + require 'log-courier/server' - @log_courier = LogCourier::Server.new( - :logger => @logger, - :address => @host, - :port => @port, - :transport => @transport, - :ssl_certificate => @ssl_certificate, - :ssl_key => @ssl_key, - :ssl_key_passphrase => @ssl_key_passphrase, - :ssl_verify => @ssl_verify, - :ssl_verify_default_ca => @ssl_verify_default_ca, - :ssl_verify_ca => @ssl_verify_ca, - :curve_secret_key => @curve_secret_key - ) + @log_courier = LogCourier::Server.new options end public diff --git a/lib/logstash/outputs/courier.rb b/lib/logstash/outputs/courier.rb index b8e30515..b72fcb7c 100644 --- a/lib/logstash/outputs/courier.rb +++ b/lib/logstash/outputs/courier.rb @@ -54,14 +54,14 @@ def register require 'log-courier/client' @client = LogCourier::Client.new( - :addresses => @hosts, - :port => @port, - :ssl_ca => @ssl_ca, - :ssl_certificate => @ssl_certificate, - :ssl_key => @ssl_key, - :ssl_key_passphrase => @ssl_key_passphrase, - :spool_size => @spool_size, - :idle_timeout => @idle_timeout + addresses: @hosts, + port: @port, + ssl_ca: @ssl_ca, + ssl_certificate: @ssl_certificate, + ssl_key: @ssl_key, + ssl_key_passphrase: @ssl_key_passphrase, + spool_size: @spool_size, + idle_timeout: @idle_timeout ) end diff --git a/log-courier.gemspec b/log-courier.gemspec index 2a7f2b4a..f3dd0602 100644 --- a/log-courier.gemspec +++ b/log-courier.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |gem| gem.name = 'log-courier' - gem.version = '0.13' + gem.version = '0.14' gem.description = 'Log Courier library' gem.summary = 'Receive events from Log Courier and transmit between LogStash instances' gem.homepage = 'https://github.com/driskell/log-courier' diff --git a/spec/courier_spec.rb b/spec/courier_spec.rb index 7a14d756..07c0a65f 100644 --- a/spec/courier_spec.rb +++ b/spec/courier_spec.rb @@ -47,7 +47,46 @@ receive_and_check(total: 5_000) do |e| expect(e['message']).to eq "stdin line test #{i}" expect(e['host']).to eq host - expect(e['file']).to eq '-' + expect(e['path']).to eq '-' + i += 1 + end + end + + it 'should split lines that are too long' do + startup stdin: true, config: <<-config + { + "network": { + "ssl ca": "#{@ssl_cert.path}", + "servers": [ "127.0.0.1:#{server_port}" ] + }, + "files": [ + { + "paths": [ "-" ] + } + ] + } + config + + # This should send over the 10 MiB packet limit but not break it + # Since we are sending 10 * 2 = 20 MiB + 10.times do |i| + # 1048575 since the line terminater adds 1 - ensures second part fits + @log_courier.puts 'X' * 1_048_575 * 2 + end + + # Receive and check + i = 0 + host = Socket.gethostname + receive_and_check(total: 20) do |e| + if i.even? + expect(e['message'].length).to eq 1_048_576 + expect(e['tags']).to eq ['splitline'] + else + expect(e['message'].length).to eq 1_048_574 + expect(e.has_key?('tags')).to eq false + end + expect(e['host']).to eq host + expect(e['path']).to eq '-' i += 1 end end @@ -458,11 +497,11 @@ # Receive and check receive_and_check(total: 10_000) do |e| - if e['file'] == "#{TEMP_PATH}/logs/log-0" + if e['path'] == "#{TEMP_PATH}/logs/log-0" expect(e['first']).to eq "value" expect(e['second']).to eq "more" else - expect(e['file']).to eq "#{TEMP_PATH}/logs/log-1" + expect(e['path']).to eq "#{TEMP_PATH}/logs/log-1" expect(e['first']).to eq "different" expect(e['second']).to eq "something" end @@ -498,7 +537,7 @@ expect(e['array'][0]).to eq 1 expect(e['array'][1]).to eq 2 expect(e['host']).to eq host - expect(e['file']).to eq '-' + expect(e['path']).to eq '-' i += 1 end end @@ -532,7 +571,7 @@ expect(e['dict']['first']).to eq 'first' expect(e['dict']['second']).to eq 5 expect(e['host']).to eq host - expect(e['file']).to eq '-' + expect(e['path']).to eq '-' i += 1 end end diff --git a/spec/filter_spec.rb b/spec/filter_spec.rb index 759b7a0c..84ba0f2c 100644 --- a/spec/filter_spec.rb +++ b/spec/filter_spec.rb @@ -49,7 +49,7 @@ receive_and_check(total: 2_778) do |e| expect(e['message']).to eq "stdin line test #{i}" expect(e['host']).to eq host - expect(e['file']).to eq '-' + expect(e['path']).to eq '-' i += 1 i += 1 while /^[12]/ =~ i.to_s end diff --git a/spec/gem_spec.rb b/spec/gem_spec.rb index b7e5b1fd..ecb422bb 100644 --- a/spec/gem_spec.rb +++ b/spec/gem_spec.rb @@ -50,7 +50,7 @@ def shutdown # Allow 60 seconds Timeout.timeout(60) do 5_000.times do |i| - @client.publish 'message' => "gem line test #{i}", 'host' => @host, 'file' => 'gemfile.log' + @client.publish 'message' => "gem line test #{i}", 'host' => @host, 'path' => 'gemfile.log' end end @@ -59,7 +59,7 @@ def shutdown receive_and_check(total: 5_000) do |e| expect(e['message']).to eq "gem line test #{i}" expect(e['host']).to eq @host - expect(e['file']).to eq 'gemfile.log' + expect(e['path']).to eq 'gemfile.log' i += 1 end diff --git a/spec/lib/logfile.rb b/spec/lib/logfile.rb index bcc83459..ba3a08e4 100644 --- a/spec/lib/logfile.rb +++ b/spec/lib/logfile.rb @@ -87,7 +87,7 @@ def logged?(args = {}) event = args[:event] return false if event['host'] != @host - return false if args[:check_file] && event['file'] != @orig_path + return false if args[:check_file] && event['path'] != @orig_path if args[:check_order] # Regular simple test that follows the event number diff --git a/spec/lib/logfile/multiline.rb b/spec/lib/logfile/multiline.rb index 5c230426..9a6ff948 100644 --- a/spec/lib/logfile/multiline.rb +++ b/spec/lib/logfile/multiline.rb @@ -37,7 +37,7 @@ def skip_one def logged?(event: event, check_file: true, check_order: true) return false if event['host'] != @host - return false if check_file && event['file'] != @path + return false if check_file && event['path'] != @path return false if event['message'] != 'BEGIN ' + @path + " test event #{@next}" + $/ + " line 2 of test event #{@next}" + $/ + " line 3 of test event #{@next}" + $/ + ' END of test event' @count -= 1 @next += 1 diff --git a/spec/multiline_spec.rb b/spec/multiline_spec.rb index 7818df33..dccdd4d3 100644 --- a/spec/multiline_spec.rb +++ b/spec/multiline_spec.rb @@ -96,7 +96,7 @@ "files": [ { "paths": [ "#{TEMP_PATH}/logs/log-*" ], - "codec": { "name": "multiline", "what": "previous", "pattern": "^\\\\s", "previous timeout": "10s" } + "codec": { "name": "multiline", "what": "previous", "pattern": "^\\\\s", "previous timeout": "3s" } } ] } @@ -104,7 +104,13 @@ f = create_log(LogFile::Multiline) - 5_000.times do + 1_500.times do + f.log + end + + sleep 15 + + 1_500.times do f.log end diff --git a/src/lc-admin/lc-admin.go b/src/lc-admin/lc-admin.go index 216b6a04..185ead41 100644 --- a/src/lc-admin/lc-admin.go +++ b/src/lc-admin/lc-admin.go @@ -29,18 +29,16 @@ import ( ) type Admin struct { - client *admin.Client - connected bool - quiet bool - host string - port int + client *admin.Client + connected bool + quiet bool + admin_connect string } -func NewAdmin(quiet bool, host string, port int) *Admin { +func NewAdmin(quiet bool, admin_connect string) *Admin { return &Admin{ - quiet: quiet, - host: host, - port: port, + quiet: quiet, + admin_connect: admin_connect, } } @@ -49,10 +47,10 @@ func (a *Admin) connect() error { var err error if !a.quiet { - fmt.Printf("Attempting connection to %s:%d...\n", a.host, a.port) + fmt.Printf("Attempting connection to %s...\n", a.admin_connect) } - if a.client, err = admin.NewClient(a.host, a.port); err != nil { + if a.client, err = admin.NewClient(a.admin_connect); err != nil { fmt.Printf("Failed to connect: %s\n", err) return err } @@ -237,14 +235,12 @@ func main() { var version bool var quiet bool var watch bool - var host string - var port int + var admin_connect string flag.BoolVar(&version, "version", false, "display the Log Courier client version") flag.BoolVar(&quiet, "quiet", false, "quietly execute the command line argument and output only the result") flag.BoolVar(&watch, "watch", false, "repeat the command specified on the command line every second") - flag.StringVar(&host, "host", "127.0.0.1", "the Log Courier host to connect to (default 127.0.0.1)") - flag.IntVar(&port, "port", 1234, "the Log Courier monitor port (default 1234)") + flag.StringVar(&admin_connect, "connect", "tcp:127.0.0.1:1234", "the Log Courier instance to connect to (default tcp:127.0.0.1:1234)") flag.Parse() @@ -257,7 +253,7 @@ func main() { fmt.Printf("Log Courier version %s client\n\n", core.Log_Courier_Version) } - admin := NewAdmin(quiet, host, port) + admin := NewAdmin(quiet, admin_connect) args := flag.Args() if len(args) != 0 { diff --git a/src/lc-lib/admin/client.go b/src/lc-lib/admin/client.go index 5377ea94..428dd2e0 100644 --- a/src/lc-lib/admin/client.go +++ b/src/lc-lib/admin/client.go @@ -21,40 +21,42 @@ import ( "fmt" "lc-lib/core" "net" + "strings" "time" ) type Client struct { - addr net.TCPAddr - conn *net.TCPConn - decoder *gob.Decoder + admin_connect string + conn net.Conn + decoder *gob.Decoder } -func NewClient(host string, port int) (*Client, error) { - ret := &Client{} - - ret.addr.IP = net.ParseIP(host) - if ret.addr.IP == nil { - return nil, fmt.Errorf("Invalid admin connect address") - } +func NewClient(admin_connect string) (*Client, error) { + var err error - ret.addr.Port = port + ret := &Client{} - if err := ret.connect(); err != nil { + if ret.conn, err = ret.connect(admin_connect); err != nil { return nil, err } + ret.decoder = gob.NewDecoder(ret.conn) + return ret, nil } -func (c *Client) connect() (err error) { - if c.conn, err = net.DialTCP("tcp", nil, &c.addr); err != nil { - return +func (c *Client) connect(admin_connect string) (net.Conn, error) { + connect := strings.SplitN(admin_connect, ":", 2) + if len(connect) == 1 { + connect = append(connect, connect[0]) + connect[0] = "tcp" } - c.decoder = gob.NewDecoder(c.conn) + if connector, ok := registeredConnectors[connect[0]]; ok { + return connector(connect[0], connect[1]) + } - return nil + return nil, fmt.Errorf("Unknown transport specified in connection address: '%s'", connect[0]) } func (c *Client) request(command string) (*Response, error) { diff --git a/src/lc-lib/admin/listener.go b/src/lc-lib/admin/listener.go index 7d5008c0..0de9146c 100644 --- a/src/lc-lib/admin/listener.go +++ b/src/lc-lib/admin/listener.go @@ -20,6 +20,7 @@ import ( "fmt" "lc-lib/core" "net" + "strings" "time" ) @@ -30,7 +31,7 @@ type Listener struct { config *core.GeneralConfig command_chan chan string response_chan chan *Response - listener *net.TCPListener + listener NetListener client_shutdown chan interface{} client_started chan interface{} client_ended chan interface{} @@ -49,7 +50,7 @@ func NewListener(pipeline *core.Pipeline, config *core.GeneralConfig) (*Listener client_ended: make(chan interface{}, 50), } - if ret.listener, err = ret.startListening(config); err != nil { + if ret.listener, err = ret.listen(config); err != nil { return nil, err } @@ -58,23 +59,18 @@ func NewListener(pipeline *core.Pipeline, config *core.GeneralConfig) (*Listener return ret, nil } -func (l *Listener) startListening(config *core.GeneralConfig) (*net.TCPListener, error) { - var addr net.TCPAddr - - addr.IP = net.ParseIP(config.AdminBind) - - if addr.IP == nil { - return nil, fmt.Errorf("The admin bind address specified is not a valid IP address") +func (l *Listener) listen(config *core.GeneralConfig) (NetListener, error) { + bind := strings.SplitN(config.AdminBind, ":", 2) + if len(bind) == 1 { + bind = append(bind, bind[0]) + bind[0] = "tcp" } - addr.Port = config.AdminPort - - listener, err := net.ListenTCP("tcp", &addr) - if err != nil { - return nil, err + if listener, ok := registeredListeners[bind[0]]; ok { + return listener(bind[0], bind[1]) } - return listener, nil + return nil, fmt.Errorf("Unknown transport specified for admin bind: '%s'", bind[0]) } func (l *Listener) OnCommand() <-chan string { @@ -98,8 +94,8 @@ ListenerLoop: case config := <-l.OnConfig(): // We can't yet disable admin during a reload if config.General.AdminEnabled { - if config.General.AdminBind != l.config.AdminBind || config.General.AdminPort != l.config.AdminPort { - new_listener, err := l.startListening(&config.General) + if config.General.AdminBind != l.config.AdminBind { + new_listener, err := l.listen(&config.General) if err != nil { log.Error("The new admin configuration failed to apply: %s", err) continue @@ -115,7 +111,7 @@ ListenerLoop: l.listener.SetDeadline(time.Now().Add(time.Second)) - conn, err := l.listener.AcceptTCP() + conn, err := l.listener.Accept() if err != nil { if net_err, ok := err.(*net.OpError); ok && net_err.Timeout() { continue @@ -128,6 +124,9 @@ ListenerLoop: l.startServer(conn) } + // Shutdown listener + l.listener.Close() + // Trigger shutdowns close(l.client_shutdown) @@ -145,7 +144,7 @@ ListenerLoop: } } -func (l *Listener) startServer(conn *net.TCPConn) { +func (l *Listener) startServer(conn net.Conn) { server := newServer(l, conn) select { diff --git a/src/lc-lib/admin/server.go b/src/lc-lib/admin/server.go index 0f67fe7b..7c7e0e14 100644 --- a/src/lc-lib/admin/server.go +++ b/src/lc-lib/admin/server.go @@ -26,12 +26,12 @@ import ( type server struct { listener *Listener - conn *net.TCPConn + conn net.Conn encoder *gob.Encoder } -func newServer(listener *Listener, conn *net.TCPConn) *server { +func newServer(listener *Listener, conn net.Conn) *server { return &server{ listener: listener, conn: conn, @@ -45,8 +45,11 @@ func (s *server) Run() { log.Debug("Admin connection from %s closed", s.conn.RemoteAddr()) } - // TODO: Make linger time configurable? - s.conn.SetLinger(5) + if conn, ok := s.conn.(*net.TCPConn); ok { + // TODO: Make linger time configurable? + conn.SetLinger(5) + } + s.conn.Close() s.listener.client_ended <- 1 diff --git a/src/lc-lib/admin/transport.go b/src/lc-lib/admin/transport.go new file mode 100644 index 00000000..483ffad7 --- /dev/null +++ b/src/lc-lib/admin/transport.go @@ -0,0 +1,42 @@ +/* +* Copyright 2014 Jason Woods. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package admin + +import ( + "net" + "time" +) + +type NetListener interface { + Accept() (net.Conn, error) + Close() error + Addr() net.Addr + SetDeadline(time.Time) error +} + +type connectorFunc func(string, string) (net.Conn, error) +type listenerFunc func(string, string) (NetListener, error) + +var ( + registeredConnectors map[string]connectorFunc = make(map[string]connectorFunc) + registeredListeners map[string]listenerFunc = make(map[string]listenerFunc) +) + +func registerTransport(name string, connector connectorFunc, listener listenerFunc) { + registeredConnectors[name] = connector + registeredListeners[name] = listener +} diff --git a/src/lc-lib/admin/transport_tcp.go b/src/lc-lib/admin/transport_tcp.go new file mode 100644 index 00000000..44ab237c --- /dev/null +++ b/src/lc-lib/admin/transport_tcp.go @@ -0,0 +1,56 @@ +/* +* Copyright 2014 Jason Woods. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package admin + +import ( + "fmt" + "net" +) + +func init() { + registerTransport("tcp", connectTCP, listenTCP) + registerTransport("tcp4", connectTCP, listenTCP) + registerTransport("tcp6", connectTCP, listenTCP) +} + +func connectTCP(transport, addr string) (net.Conn, error) { + taddr, err := net.ResolveTCPAddr(transport, addr) + if err != nil { + return nil, fmt.Errorf("The connection address specified is not valid: %s", err) + } + + conn, err := net.DialTCP(transport, nil, taddr) + if err != nil { + return nil, err + } + + return conn, nil +} + +func listenTCP(transport, addr string) (NetListener, error) { + taddr, err := net.ResolveTCPAddr(transport, addr) + if err != nil { + return nil, fmt.Errorf("The admin bind address specified is not valid: %s", err) + } + + listener, err := net.ListenTCP(transport, taddr) + if err != nil { + return nil, err + } + + return listener, nil +} diff --git a/src/lc-lib/admin/transport_unix.go b/src/lc-lib/admin/transport_unix.go new file mode 100644 index 00000000..02c6d6e9 --- /dev/null +++ b/src/lc-lib/admin/transport_unix.go @@ -0,0 +1,58 @@ +// +build !windows + +/* +* Copyright 2014 Jason Woods. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package admin + +import ( + "fmt" + "net" +) + +func init() { + registerTransport("unix", connectUnix, listenUnix) +} + +func connectUnix(transport, path string) (net.Conn, error) { + uaddr, err := net.ResolveUnixAddr("unix", path) + if err != nil { + return nil, fmt.Errorf("The connection address specified is not valid: %s", err) + } + + // TODO: Change umask to 111 so all can write (need to move to _unix) + // Permission will be controlled by folder permissions instead of file + conn, err := net.DialUnix("unix", nil, uaddr) + if err != nil { + return nil, err + } + + return conn, nil +} + +func listenUnix(transport, addr string) (NetListener, error) { + uaddr, err := net.ResolveUnixAddr("unix", addr) + if err != nil { + return nil, fmt.Errorf("The admin bind address specified is not valid: %s", err) + } + + listener, err := net.ListenUnix("unix", uaddr) + if err != nil { + return nil, err + } + + return listener, nil +} diff --git a/src/lc-lib/codecs/filter.go b/src/lc-lib/codecs/filter.go index 3ac7a7e5..47116141 100644 --- a/src/lc-lib/codecs/filter.go +++ b/src/lc-lib/codecs/filter.go @@ -73,7 +73,7 @@ func (c *CodecFilter) Teardown() int64 { return c.last_offset } -func (c *CodecFilter) Event(start_offset int64, end_offset int64, line uint64, text string) { +func (c *CodecFilter) Event(start_offset int64, end_offset int64, text string) { // Only flush the event if it matches a filter var match bool for _, matcher := range c.config.matchers { @@ -84,7 +84,7 @@ func (c *CodecFilter) Event(start_offset int64, end_offset int64, line uint64, t } if c.config.Negate != match { - c.callback_func(start_offset, end_offset, line, text) + c.callback_func(start_offset, end_offset, text) } else { c.filtered_lines++ } diff --git a/src/lc-lib/codecs/multiline.go b/src/lc-lib/codecs/multiline.go index 70ac469d..aa7f0adf 100644 --- a/src/lc-lib/codecs/multiline.go +++ b/src/lc-lib/codecs/multiline.go @@ -48,7 +48,6 @@ type CodecMultiline struct { end_offset int64 start_offset int64 - line uint64 buffer []string buffer_lines uint64 timer_lock sync.Mutex @@ -115,7 +114,7 @@ func (c *CodecMultiline) Teardown() int64 { return c.last_offset } -func (c *CodecMultiline) Event(start_offset int64, end_offset int64, line uint64, text string) { +func (c *CodecMultiline) Event(start_offset int64, end_offset int64, text string) { // TODO(driskell): If we are using previous and we match on the very first line read, // then this is because we've started in the middle of a multiline event (the first line // should never match) - so we could potentially offer an option to discard this. @@ -135,7 +134,6 @@ func (c *CodecMultiline) Event(start_offset int64, end_offset int64, line uint64 } } if len(c.buffer) == 0 { - c.line = line c.start_offset = start_offset } c.end_offset = end_offset @@ -164,12 +162,12 @@ func (c *CodecMultiline) flush() { c.buffer = nil c.buffer_lines = 0 - c.callback_func(c.start_offset, c.end_offset, c.line, text) + c.callback_func(c.start_offset, c.end_offset, text) } func (c *CodecMultiline) Meter() { c.meter_lines = c.buffer_lines - c.meter_bytes = c.last_offset - c.end_offset + c.meter_bytes = c.end_offset - c.last_offset } func (c *CodecMultiline) Snapshot() *core.Snapshot { @@ -197,10 +195,12 @@ DeadlineLoop: if !now.After(c.timer_deadline) { // Deadline moved, update the timer timer.Reset(c.timer_deadline.Sub(now)) + c.timer_lock.Unlock() + continue } c.flush() - + timer.Reset(c.config.PreviousTimeout) c.timer_lock.Unlock() } } diff --git a/src/lc-lib/codecs/plain.go b/src/lc-lib/codecs/plain.go index ffd965de..865e70e8 100644 --- a/src/lc-lib/codecs/plain.go +++ b/src/lc-lib/codecs/plain.go @@ -46,10 +46,10 @@ func (c *CodecPlain) Teardown() int64 { return c.last_offset } -func (c *CodecPlain) Event(start_offset int64, end_offset int64, line uint64, text string) { +func (c *CodecPlain) Event(start_offset int64, end_offset int64, text string) { c.last_offset = end_offset - c.callback_func(start_offset, end_offset, line, text) + c.callback_func(start_offset, end_offset, text) } func (c *CodecPlain) Meter() { diff --git a/src/lc-lib/core/codec.go b/src/lc-lib/core/codec.go index 2c3fc5ec..e4d73571 100644 --- a/src/lc-lib/core/codec.go +++ b/src/lc-lib/core/codec.go @@ -18,12 +18,12 @@ package core type Codec interface { Teardown() int64 - Event(int64, int64, uint64, string) + Event(int64, int64, string) Meter() Snapshot() *Snapshot } -type CodecCallbackFunc func(int64, int64, uint64, string) +type CodecCallbackFunc func(int64, int64, string) type CodecFactory interface { NewCodec(CodecCallbackFunc, int64) Codec diff --git a/src/lc-lib/core/config.go b/src/lc-lib/core/config.go index 11f120db..900db3a7 100644 --- a/src/lc-lib/core/config.go +++ b/src/lc-lib/core/config.go @@ -32,13 +32,14 @@ import ( ) const ( - default_GeneralConfig_AdminEnabled bool = false - default_GeneralConfig_AdminBind string = "127.0.0.1" - default_GeneralConfig_AdminPort int = 0 + default_GeneralConfig_AdminEnabled bool = true + default_GeneralConfig_AdminBind string = "tcp:127.0.0.1:1234" default_GeneralConfig_PersistDir string = "." default_GeneralConfig_ProspectInterval time.Duration = 10 * time.Second default_GeneralConfig_SpoolSize int64 = 1024 + default_GeneralConfig_SpoolMaxBytes int64 = 10485760 default_GeneralConfig_SpoolTimeout time.Duration = 5 * time.Second + default_GeneralConfig_MaxLineBytes int64 = 1048576 default_GeneralConfig_LogLevel logging.Level = logging.INFO default_GeneralConfig_LogStdout bool = true default_GeneralConfig_LogSyslog bool = false @@ -59,12 +60,13 @@ type Config struct { type GeneralConfig struct { AdminEnabled bool `config:"admin enabled"` - AdminBind string `config:"admin bind address"` - AdminPort int `config:"admin port"` + AdminBind string `config:"admin listen address"` PersistDir string `config:"persist directory"` ProspectInterval time.Duration `config:"prospect interval"` SpoolSize int64 `config:"spool size"` + SpoolMaxBytes int64 `config:"spool max bytes"` SpoolTimeout time.Duration `config:"spool timeout"` + MaxLineBytes int64 `config:"max line bytes"` LogLevel logging.Level `config:"log level"` LogStdout bool `config:"log stdout"` LogSyslog bool `config:"log syslog"` @@ -254,21 +256,6 @@ func (c *Config) Load(path string) (err error) { } } - // Validations and defaults - if c.General.AdminEnabled { - c.General.AdminPort = default_GeneralConfig_AdminPort - - if c.General.AdminPort == 0 { - err = fmt.Errorf("An admin port must be specified when admin is enabled") - return - } - - if c.General.AdminPort <= 0 || c.General.AdminPort >= 65535 { - err = fmt.Errorf("Invalid admin port specified") - return - } - } - if c.General.AdminBind == "" { c.General.AdminBind = default_GeneralConfig_AdminBind } @@ -285,10 +272,20 @@ func (c *Config) Load(path string) (err error) { c.General.SpoolSize = default_GeneralConfig_SpoolSize } + // TODO: If max line bytes plus fields size is too big, this could be exceeded + if c.General.SpoolMaxBytes == 0 { + c.General.SpoolMaxBytes = default_GeneralConfig_SpoolMaxBytes + } + if c.General.SpoolTimeout == time.Duration(0) { c.General.SpoolTimeout = default_GeneralConfig_SpoolTimeout } + // TODO: Event transmit length is uint32 - if this is bigger a rediculously large line will fail + if c.General.MaxLineBytes == 0 { + c.General.MaxLineBytes = default_GeneralConfig_MaxLineBytes + } + if c.Network.Transport == "" { c.Network.Transport = default_NetworkConfig_Transport } @@ -322,13 +319,15 @@ func (c *Config) Load(path string) (err error) { return } } else { - err = fmt.Errorf("Unrecognised codec '%s'", c.Files[k].Codec.Name) + err = fmt.Errorf("Unrecognised codec '%s' for 'files' entry %d", c.Files[k].Codec.Name, k) return } if c.Files[k].DeadTime == time.Duration(0) { c.Files[k].DeadTime = time.Duration(default_FileConfig_DeadTime) * time.Second } + + // TODO: Event transmit length is uint32, if fields length is rediculous we will fail } return diff --git a/src/lc-lib/core/event.go b/src/lc-lib/core/event.go index a6fb3f4f..5b9c83a1 100644 --- a/src/lc-lib/core/event.go +++ b/src/lc-lib/core/event.go @@ -16,22 +16,18 @@ package core +import ( + "encoding/json" +) + type Event map[string]interface{} type EventDescriptor struct { Stream Stream Offset int64 - Event Event + Event []byte } -func NewEvent(fields map[string]interface{}, file string, offset int64, line uint64, message string) Event { - event := Event{ - "file": file, - "offset": offset, - "message": message, - } - for k := range fields { - event[k] = fields[k] - } - return event +func (e *Event) Encode() ([]byte, error) { + return json.Marshal(e) } diff --git a/src/lc-lib/core/util.go b/src/lc-lib/core/util.go new file mode 100644 index 00000000..f09b6d9f --- /dev/null +++ b/src/lc-lib/core/util.go @@ -0,0 +1,42 @@ +/* +* Copyright 2014 Jason Woods. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package core + +import ( + "math" + "time" +) + +func CalculateSpeed(duration time.Duration, speed float64, count float64, seconds_no_change *int) float64 { + if count == 0 { + *seconds_no_change++ + } else { + *seconds_no_change = 0 + } + + if speed == 0. { + return count + } + + if *seconds_no_change >= 5 { + *seconds_no_change = 0 + return 0. + } + + // Calculate a moving average over 5 seconds - use similiar weight as load average + return count + math.Exp(float64(duration) / float64(time.Second) / -5.) * (speed - count) +} diff --git a/src/lc-lib/core/version.go b/src/lc-lib/core/version.go index 2dd7b7f9..fef6d314 100644 --- a/src/lc-lib/core/version.go +++ b/src/lc-lib/core/version.go @@ -16,4 +16,4 @@ package core -const Log_Courier_Version string = "0.13" +const Log_Courier_Version string = "0.14" diff --git a/src/lc-lib/harvester/harvester.go b/src/lc-lib/harvester/harvester.go index a40625f7..8b78737b 100644 --- a/src/lc-lib/harvester/harvester.go +++ b/src/lc-lib/harvester/harvester.go @@ -20,43 +20,56 @@ package harvester import ( - "bufio" - "bytes" "fmt" "io" "lc-lib/core" - "math" "os" "sync" "time" ) +var ( + event_host string = "localhost.localdomain" +) + +func init() { + ret, err := os.Hostname() + if err == nil { + event_host = ret + } else { + log.Warning("Failed to determine the FQDN; using '%s'.", event_host) + } +} + type HarvesterFinish struct { Last_Offset int64 Error error } type Harvester struct { - stop_chan chan interface{} - return_chan chan *HarvesterFinish - stream core.Stream - fileinfo os.FileInfo - path string - fileconfig *core.FileConfig - offset int64 - output chan<- *core.EventDescriptor - codec core.Codec - file *os.File - - mutex sync.Mutex - line_speed float64 - byte_speed float64 - line_count uint64 - byte_count uint64 - last_eof *int64 + sync.RWMutex + + stop_chan chan interface{} + return_chan chan *HarvesterFinish + stream core.Stream + fileinfo os.FileInfo + path string + config *core.Config + fileconfig *core.FileConfig + offset int64 + output chan<- *core.EventDescriptor + codec core.Codec + file *os.File + split bool + + line_speed float64 + byte_speed float64 + line_count uint64 + byte_count uint64 + last_eof *int64 } -func NewHarvester(stream core.Stream, fileconfig *core.FileConfig, offset int64) *Harvester { +func NewHarvester(stream core.Stream, config *core.Config, fileconfig *core.FileConfig, offset int64) *Harvester { var fileinfo os.FileInfo var path string @@ -69,13 +82,14 @@ func NewHarvester(stream core.Stream, fileconfig *core.FileConfig, offset int64) } ret := &Harvester{ - stop_chan: make(chan interface{}), - return_chan: make(chan *HarvesterFinish, 1), - stream: stream, - fileinfo: fileinfo, - path: path, - fileconfig: fileconfig, - offset: offset, + stop_chan: make(chan interface{}), + return_chan: make(chan *HarvesterFinish, 1), + stream: stream, + fileinfo: fileinfo, + path: path, + config: config, + fileconfig: fileconfig, + offset: offset, } ret.codec = fileconfig.CodecFactory.NewCodec(ret.eventCallback, ret.offset) @@ -104,22 +118,32 @@ func (h *Harvester) harvest(output chan<- *core.EventDescriptor) (int64, error) return h.offset, err } + defer h.file.Close() + h.output = output - defer h.file.Close() + if h.path == "-" { + log.Info("Started stdin harvester at position 0") + h.offset = 0 + } else { + // Get current offset in file + offset, err := h.file.Seek(0, os.SEEK_CUR) + if err != nil { + log.Warning("Failed to determine start offset for %s: %s", h.path, err) + return h.offset, err + } - // NOTE(driskell): How would we know line number if from_beginning is false and we SEEK_END? Or would we scan,count,skip? - var line uint64 = 0 // Ask registrar about the line number + if h.offset != offset { + log.Warning("Started harvester at position %d (requested %d): %s", offset, h.offset, h.path) + } else { + log.Info("Started harvester at position %d (requested %d): %s", offset, h.offset, h.path) + } - // Get current offset in file - // TODO: Check error? - offset, _ := h.file.Seek(0, os.SEEK_CUR) - log.Info("Started harvester at position %d (requested %d): %s", offset, h.offset, h.path) - h.offset = offset + h.offset = offset + } - // TODO(sissel): Make the buffer size tunable at start-time - reader := bufio.NewReaderSize(h.file, 16<<10) // 16kb buffer by default - buffer := new(bytes.Buffer) + // The buffer size limits the maximum line length we can read, including terminator + reader := NewLineReader(h.file, h.config.General.MaxLineBytes) // TODO: Make configurable? read_timeout := 10 * time.Second @@ -132,23 +156,13 @@ func (h *Harvester) harvest(output chan<- *core.EventDescriptor) (int64, error) ReadLoop: for { - text, bytesread, err := h.readline(reader, buffer) + text, bytesread, err := h.readline(reader) if duration := time.Since(last_measurement); duration >= time.Second { - h.mutex.Lock() + h.Lock() - count := float64(h.line_count - last_line_count) - - if count == 0 { - if seconds_without_events != 5 { - seconds_without_events++ - } - } else { - seconds_without_events = 0 - } - - h.line_speed = h.calculateSpeed(duration, h.line_speed, count, seconds_without_events) - h.byte_speed = h.calculateSpeed(duration, h.byte_speed, float64(h.byte_count - last_byte_count), seconds_without_events) + h.line_speed = core.CalculateSpeed(duration, h.line_speed, float64(h.line_count - last_line_count), &seconds_without_events) + h.byte_speed = core.CalculateSpeed(duration, h.byte_speed, float64(h.byte_count - last_byte_count), &seconds_without_events) last_byte_count = h.byte_count last_line_count = h.line_count @@ -156,7 +170,7 @@ ReadLoop: h.codec.Meter() - h.mutex.Unlock() + h.Unlock() // Check shutdown select { @@ -166,92 +180,116 @@ ReadLoop: } } - if err != nil { - if err == io.EOF { - // Check shutdown - select { - case <-h.stop_chan: - break ReadLoop - default: - } - - h.mutex.Lock() - last_eof := h.offset - h.last_eof = &last_eof - h.mutex.Unlock() - - // Timed out waiting for data, got EOF - if h.path == "-" { - // This wouldn't make sense on stdin so lets not risk anything strange happening - continue - } - - // Don't check for truncation until we hit the full read_timeout - if time.Since(last_read_time) < read_timeout { - continue - } - - info, err := h.file.Stat() - if err == nil { - if info.Size() < h.offset { - log.Warning("Unexpected file truncation, seeking to beginning: %s", h.path) - h.file.Seek(0, os.SEEK_SET) - h.offset = 0 - continue - } else if age := time.Since(last_read_time); age > h.fileconfig.DeadTime { - // if last_read_time was more than dead time, this file is probably dead. Stop watching it. - log.Info("Stopping harvest of %s; last change was %v ago", h.path, age-(age%time.Second)) - // TODO: We should return a Stat() from before we attempted to read - // In prospector we use that for comparison to resume - // This prevents a potential race condition if we stop just as the - // file is modified with extra lines... - return h.codec.Teardown(), nil - } - - continue - } else { - log.Error("Unexpected error checking status of %s: %s", h.path, err) - } - } else { - log.Error("Unexpected error reading from %s: %s", h.path, err) - } + if err == ErrBufferFull { + err = nil + h.split = true + } + + if err == nil { + line_offset := h.offset + h.offset += int64(bytesread) + + // Codec is last - it forwards harvester state for us such as offset for resume + h.codec.Event(line_offset, h.offset, text) + + last_read_time = time.Now() + h.line_count++ + h.byte_count += uint64(bytesread) + + continue + } + + if err != io.EOF { + log.Error("Unexpected error reading from %s: %s", h.path, err) return h.codec.Teardown(), err } - line++ - line_offset := h.offset - h.offset += int64(bytesread) + // Check shutdown + select { + case <-h.stop_chan: + break ReadLoop + default: + } + + h.Lock() + last_eof := h.offset + h.last_eof = &last_eof + h.Unlock() + + // Timed out waiting for data, got EOF + if h.path == "-" { + // This wouldn't make sense on stdin so lets not risk anything strange happening + continue + } + + // Don't check for truncation until we hit the full read_timeout + if time.Since(last_read_time) < read_timeout { + continue + } + + info, err := h.file.Stat() + if err != nil { + log.Error("Unexpected error checking status of %s: %s", h.path, err) + return h.codec.Teardown(), err + } - // Codec is last - it forwards harvester state for us such as offset for resume - h.codec.Event(line_offset, h.offset, line, text) + if info.Size() < h.offset { + log.Warning("Unexpected file truncation, seeking to beginning: %s", h.path) + h.file.Seek(0, os.SEEK_SET) + h.offset = 0 + continue + } - last_read_time = time.Now() - h.line_count++ - h.byte_count += uint64(bytesread) + if age := time.Since(last_read_time); age > h.fileconfig.DeadTime { + // if last_read_time was more than dead time, this file is probably dead. Stop watching it. + log.Info("Stopping harvest of %s; last change was %v ago", h.path, age-(age%time.Second)) + // TODO: We should return a Stat() from before we attempted to read + // In prospector we use that for comparison to resume + // This prevents a potential race condition if we stop just as the + // file is modified with extra lines... + return h.codec.Teardown(), nil + } } log.Info("Harvester for %s exiting", h.path) return h.codec.Teardown(), nil } -func (h *Harvester) calculateSpeed(duration time.Duration, speed float64, count float64, seconds_without_events int) float64 { - if speed == 0. { - return count +func (h *Harvester) eventCallback(start_offset int64, end_offset int64, text string) { + event := core.Event{ + "host": event_host, + "path": h.path, + "offset": start_offset, + "message": text, + } + for k := range h.fileconfig.Fields { + event[k] = h.fileconfig.Fields[k] } - if seconds_without_events == 5 { - return 0. + // If we split any of the line data, tag it + if h.split { + if v, ok := event["tags"]; ok { + if v, ok := v.([]string); ok { + v = append(v, "splitline") + } + } else { + event["tags"] = []string{"splitline"} + } + h.split = false } - // Calculate a moving average over 5 seconds - use similiar weight as load average - return count + math.Exp(float64(duration) / float64(time.Second) / -5.) * (speed - count) -} + encoded, err := event.Encode() + if err != nil { + // This should never happen - log and skip if it does + log.Warning("Skipping line in %s at offset %d due to encoding failure: %s", h.path, start_offset, err) + return + } -func (h *Harvester) eventCallback(start_offset int64, end_offset int64, line uint64, text string) { - event := &core.EventDescriptor{ + desc := &core.EventDescriptor{ Stream: h.stream, Offset: end_offset, - Event: core.NewEvent(h.fileconfig.Fields, h.path, start_offset, line, text), + // NOTE: Make this include the fileconfig fields? + Event: encoded, } EventLoop: @@ -259,7 +297,7 @@ EventLoop: select { case <-h.stop_chan: break EventLoop - case h.output <- event: + case h.output <- desc: break EventLoop } } @@ -297,66 +335,50 @@ func (h *Harvester) prepareHarvester() error { return nil } -func (h *Harvester) readline(reader *bufio.Reader, buffer *bytes.Buffer) (string, int, error) { - var is_partial bool = true - var newline_length int = 1 - - for { - segment, err := reader.ReadBytes('\n') +func (h *Harvester) readline(reader *LineReader) (string, int, error) { + var newline int - if segment != nil && len(segment) > 0 { - if segment[len(segment)-1] == '\n' { - // Found a complete line - is_partial = false + line, err := reader.ReadSlice() - // Check if also a CR present - if len(segment) > 1 && segment[len(segment)-2] == '\r' { - newline_length++ - } + if line != nil { + if err == nil { + // Line will always end in '\n' if no error, but check also for CR + if len(line) > 1 && line[len(line)-2] == '\r' { + newline = 2 + } else { + newline = 1 } - - // TODO(sissel): if buffer exceeds a certain length, maybe report an error condition? chop it? - buffer.Write(segment) } - if err != nil { - if err == io.EOF && is_partial { - // Backoff - select { - case <-h.stop_chan: - return "", 0, err - case <-time.After(1 * time.Second): - } - - return "", 0, err - } else { - log.Warning("%s", err) - return "", 0, err // TODO(sissel): don't do this? - } + // Return the line along with the length including line ending + length := len(line) + return string(line[:length-newline]), length, err + } + + if err != nil { + if err != io.EOF { + // Pass back error to tear down harvester + return "", 0, err } - // If we got a full line, return the whole line without the EOL chars (CRLF or LF) - if !is_partial { - // Get the str length with the EOL chars (LF or CRLF) - buffer_size := buffer.Len() - str := buffer.String()[:buffer_size-newline_length] - // Reset the buffer for the next line - buffer.Reset() - return str, buffer_size, nil + // Backoff + select { + case <-h.stop_chan: + case <-time.After(1 * time.Second): } - } /* forever read chunks */ + } - return "", 0, nil + return "", 0, io.EOF } func (h *Harvester) Snapshot() *core.Snapshot { - h.mutex.Lock() + h.RLock() ret := core.NewSnapshot("Harvester") ret.AddEntry("Speed (Lps)", h.line_speed) ret.AddEntry("Speed (Bps)", h.byte_speed) ret.AddEntry("Processed lines", h.line_count) - ret.AddEntry("Last offset", h.offset) + ret.AddEntry("Current offset", h.offset) if h.last_eof == nil { ret.AddEntry("Last EOF", "Never") } else { @@ -367,7 +389,7 @@ func (h *Harvester) Snapshot() *core.Snapshot { ret.AddSub(sub_snap) } - h.mutex.Unlock() + h.RUnlock() return ret } diff --git a/src/lc-lib/harvester/linereader.go b/src/lc-lib/harvester/linereader.go new file mode 100644 index 00000000..0f28d557 --- /dev/null +++ b/src/lc-lib/harvester/linereader.go @@ -0,0 +1,91 @@ +/* +* Copyright 2014 Jason Woods. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package harvester + +import ( + "bytes" + "errors" + "io" +) + +var ( + ErrBufferFull = errors.New("LineReader: buffer full") +) + +// A read interface to tail +type LineReader struct { + rd io.Reader + buf []byte + start int + end int + err error +} + +func NewLineReader(rd io.Reader, size int64) *LineReader { + lr := &LineReader{ + rd: rd, + buf: make([]byte, size), + } + + return lr +} + +func (lr *LineReader) Reset() { + lr.start = 0 +} + +func (lr *LineReader) ReadSlice() ([]byte, error) { + var err error + + if lr.end == 0 { + err = lr.fill() + } + + for { + if n := bytes.IndexByte(lr.buf[lr.start:lr.end], '\n'); n > 0 { + line := lr.buf[lr.start:lr.start+n+1] + lr.start += n + 1 + return line, nil + } + + if err != nil { + return nil, err + } + + if lr.end - lr.start >= len(lr.buf) { + break + } + + err = lr.fill() + } + + lr.start, lr.end = 0, 0 + return lr.buf, ErrBufferFull +} + +func (lr *LineReader) fill() error { + if lr.start != 0 { + copy(lr.buf, lr.buf[lr.start:lr.end]) + lr.end -= lr.start + lr.start = 0 + } + + n, err := lr.rd.Read(lr.buf[lr.end:]) + lr.end += n + + return err +} diff --git a/src/lc-lib/prospector/prospector.go b/src/lc-lib/prospector/prospector.go index bb0d01af..7e0c6a0d 100644 --- a/src/lc-lib/prospector/prospector.go +++ b/src/lc-lib/prospector/prospector.go @@ -35,8 +35,7 @@ type Prospector struct { core.PipelineConfigReceiver core.PipelineSnapshotProvider - generalconfig *core.GeneralConfig - fileconfigs []core.FileConfig + config *core.Config prospectorindex map[string]*prospectorInfo prospectors map[*prospectorInfo]*prospectorInfo from_beginning bool @@ -52,8 +51,7 @@ type Prospector struct { func NewProspector(pipeline *core.Pipeline, config *core.Config, from_beginning bool, registrar_imp *registrar.Registrar, spooler_imp *spooler.Spooler) (*Prospector, error) { ret := &Prospector{ - generalconfig: &config.General, - fileconfigs: config.Files, + config: config, prospectorindex: make(map[string]*prospectorInfo), prospectors: make(map[*prospectorInfo]*prospectorInfo), from_beginning: from_beginning, @@ -104,7 +102,7 @@ func (p *Prospector) Run() { // Handle any "-" (stdin) paths - but only once stdin_started := false - for config_k, config := range p.fileconfigs { + for config_k, config := range p.config.Files { for i, path := range config.Paths { if path == "-" { if !stdin_started { @@ -123,7 +121,7 @@ func (p *Prospector) Run() { p.prospectors[info] = info // Start the harvester - p.startHarvesterWithOffset(info, &p.fileconfigs[config_k], 0) + p.startHarvesterWithOffset(info, &p.config.Files[config_k], 0) stdin_started = true } @@ -139,10 +137,10 @@ ProspectLoop: newlastscan := time.Now() p.iteration++ // Overflow is allowed - for config_k, config := range p.fileconfigs { + for config_k, config := range p.config.Files { for _, path := range config.Paths { // Scan - flag false so new files always start at beginning - p.scan(path, &p.fileconfigs[config_k]) + p.scan(path, &p.config.Files[config_k]) } } @@ -176,7 +174,7 @@ ProspectLoop: // Defer next scan for a bit now := time.Now() - scan_deadline := now.Add(p.generalconfig.ProspectInterval) + scan_deadline := now.Add(p.config.General.ProspectInterval) DelayLoop: for { @@ -188,8 +186,7 @@ ProspectLoop: case <-p.snapshot_chan: p.handleSnapshot() case config := <-p.OnConfig(): - p.generalconfig = &config.General - p.fileconfigs = config.Files + p.config = config } now = time.Now() @@ -392,7 +389,7 @@ func (p *Prospector) flagDuplicateError(file string, info *prospectorInfo) { p.prospectorindex[file] = info } -func (p *Prospector) startHarvester(info *prospectorInfo, config *core.FileConfig) { +func (p *Prospector) startHarvester(info *prospectorInfo, fileconfig *core.FileConfig) { var offset int64 if p.from_beginning { @@ -404,12 +401,12 @@ func (p *Prospector) startHarvester(info *prospectorInfo, config *core.FileConfi // Send a new file event to allow registrar to begin persisting for this harvester p.registrar_spool.Add(registrar.NewDiscoverEvent(info, info.file, offset, info.identity.Stat())) - p.startHarvesterWithOffset(info, config, offset) + p.startHarvesterWithOffset(info, fileconfig, offset) } -func (p *Prospector) startHarvesterWithOffset(info *prospectorInfo, config *core.FileConfig, offset int64) { +func (p *Prospector) startHarvesterWithOffset(info *prospectorInfo, fileconfig *core.FileConfig, offset int64) { // TODO - hook in a shutdown channel - info.harvester = harvester.NewHarvester(info, config, offset) + info.harvester = harvester.NewHarvester(info, p.config, fileconfig, offset) info.running = true info.status = Status_Ok info.harvester.Start(p.output) diff --git a/src/lc-lib/publisher/pending_payload.go b/src/lc-lib/publisher/pending_payload.go index 59b3b114..fb2f8b3f 100644 --- a/src/lc-lib/publisher/pending_payload.go +++ b/src/lc-lib/publisher/pending_payload.go @@ -20,8 +20,6 @@ import ( "bytes" "compress/zlib" "encoding/binary" - "encoding/json" - "io" "lc-lib/core" "time" ) @@ -34,28 +32,29 @@ type pendingPayload struct { ack_events int payload_start int payload []byte - timeout *time.Time + timeout time.Time } -func newPendingPayload(events []*core.EventDescriptor, nonce string, hostname string) (*pendingPayload, error) { +func newPendingPayload(events []*core.EventDescriptor, nonce string, timeout time.Duration) (*pendingPayload, error) { payload := &pendingPayload{ events: events, nonce: nonce, num_events: len(events), + timeout: time.Now().Add(timeout), } - if err := payload.Generate(hostname); err != nil { + if err := payload.Generate(); err != nil { return nil, err } return payload, nil } -func (pp *pendingPayload) Generate(hostname string) (err error) { +func (pp *pendingPayload) Generate() (err error) { var buffer bytes.Buffer // Begin with the nonce - if _, err = buffer.Write([]byte(pp.nonce)); err != nil { + if _, err = buffer.Write([]byte(pp.nonce)[0:16]); err != nil { return } @@ -66,9 +65,10 @@ func (pp *pendingPayload) Generate(hostname string) (err error) { // Append all the events for _, event := range pp.events[pp.ack_events:] { - // Add host field - event.Event["host"] = hostname - if err = pp.bufferJdatDataEvent(compressor, event); err != nil { + if err = binary.Write(compressor, binary.BigEndian, uint32(len(event.Event))); err != nil { + return + } + if _, err = compressor.Write(event.Event); err != nil { return } } @@ -80,29 +80,3 @@ func (pp *pendingPayload) Generate(hostname string) (err error) { return } - -func (pp *pendingPayload) bufferJdatDataEvent(output io.Writer, event *core.EventDescriptor) (err error) { - var value []byte - value, err = json.Marshal(event.Event) - if err != nil { - log.Error("JSON event encoding error: %s", err) - - if err = binary.Write(output, binary.BigEndian, 2); err != nil { - return - } - if _, err = output.Write([]byte("{}")); err != nil { - return - } - - return - } - - if err = binary.Write(output, binary.BigEndian, uint32(len(value))); err != nil { - return - } - if _, err = output.Write(value); err != nil { - return - } - - return nil -} diff --git a/src/lc-lib/publisher/publisher.go b/src/lc-lib/publisher/publisher.go index f03e8cb2..9d55851e 100644 --- a/src/lc-lib/publisher/publisher.go +++ b/src/lc-lib/publisher/publisher.go @@ -27,22 +27,31 @@ import ( "lc-lib/core" "lc-lib/registrar" "math/rand" - "os" + "sync" "time" ) const ( - default_publisher_hostname string = "localhost.localdomain" + // TODO(driskell): Make the idle timeout configurable like the network timeout is? keepalive_timeout time.Duration = 900 * time.Second ) +const ( + Status_Disconnected = iota + Status_Connected + Status_Reconnecting +) + type Publisher struct { core.PipelineSegment core.PipelineConfigReceiver + core.PipelineSnapshotProvider + + sync.RWMutex config *core.NetworkConfig transport core.Transport - hostname string + status int can_send <-chan int pending_ping bool pending_payloads map[string]*pendingPayload @@ -53,6 +62,12 @@ type Publisher struct { input chan []*core.EventDescriptor registrar_spool *registrar.RegistrarEventSpool shutdown bool + line_count int64 + seconds_no_ack int + + line_speed float64 + last_line_count int64 + last_measurement time.Time } func NewPublisher(pipeline *core.Pipeline, config *core.NetworkConfig, registrar_imp *registrar.Registrar) (*Publisher, error) { @@ -74,12 +89,6 @@ func NewPublisher(pipeline *core.Pipeline, config *core.NetworkConfig, registrar func (p *Publisher) init() error { var err error - p.hostname, err = os.Hostname() - if err != nil { - log.Warning("Failed to determine the FQDN; using localhost.localdomain.") - p.hostname = default_publisher_hostname - } - p.pending_payloads = make(map[string]*pendingPayload) // Set up the selected transport @@ -115,8 +124,8 @@ func (p *Publisher) Run() { var err error var reload int - // TODO(driskell): Make the idle timeout configurable like the network timeout is? timer := time.NewTimer(keepalive_timeout) + stats_timer := time.NewTimer(time.Second) control_signal := p.OnShutdown() delay_shutdown := func() { @@ -147,36 +156,52 @@ PublishLoop: if err = p.transport.Init(); err != nil { log.Error("Transport init failed: %s", err) - // TODO: implement shutdown select - select { - case <-time.After(p.config.Reconnect): - continue - case <-control_signal: - // TODO: Persist pending payloads and resume? Quicker shutdown - if p.num_payloads == 0 { - break PublishLoop - } - delay_shutdown() - case config := <-p.OnConfig(): - // Apply and check for changes - reload = p.reloadConfig(&config.Network) + now := time.Now() + reconnect_due := now.Add(p.config.Reconnect) - // If a change and no pending payloads, process immediately - if reload != core.Reload_None && p.num_payloads == 0 { - continue + ReconnectTimeLoop: + for { + + select { + case <-time.After(reconnect_due.Sub(now)): + break ReconnectTimeLoop + case <-control_signal: + // TODO: Persist pending payloads and resume? Quicker shutdown + if p.num_payloads == 0 { + break PublishLoop + } + + delay_shutdown() + case config := <-p.OnConfig(): + // Apply and check for changes + reload = p.reloadConfig(&config.Network) + + // If a change and no pending payloads, process immediately + if reload != core.Reload_None && p.num_payloads == 0 { + break ReconnectTimeLoop + } + } + + now = time.Now() + if now.After(reconnect_due) { + break } } + + continue } + p.Lock() + p.status = Status_Connected + p.Unlock() + + timer.Reset(keepalive_timeout) + stats_timer.Reset(time.Second) + p.pending_ping = false input_toggle = nil - - if p.shutdown || reload != core.Reload_None || p.num_payloads >= p.config.MaxPendingPayloads { - p.can_send = nil - } else { - p.can_send = p.transport.CanSend() - } + p.can_send = p.transport.CanSend() SelectLoop: for { @@ -186,19 +211,24 @@ PublishLoop: if retry_payload != nil { // Do we need to regenerate the payload? if retry_payload.payload == nil { - if err = retry_payload.Generate(p.hostname); err != nil { + if err = retry_payload.Generate(); err != nil { break SelectLoop } } // Reset timeout - retry_payload.timeout = nil + retry_payload.timeout = time.Now().Add(p.config.Timeout) // Send the payload again if err = p.transport.Write("JDAT", retry_payload.payload); err != nil { break SelectLoop } + // Expect an ACK within network timeout if this is the first of the retries + if p.first_payload == retry_payload { + timer.Reset(p.config.Timeout) + } + // Move to next non-empty payload for { retry_payload = retry_payload.next @@ -207,12 +237,6 @@ PublishLoop: } } - // Expect an ACK within network timeout - if p.first_payload.timeout != nil { - timer.Reset(p.first_payload.timeout.Sub(time.Now())) - } else { - timer.Reset(p.config.Timeout) - } break } else if p.out_of_sync != 0 { var resent bool @@ -246,10 +270,9 @@ PublishLoop: p.can_send = nil } - // Expect an ACK within network timeout - if p.first_payload.timeout != nil { - timer.Reset(p.first_payload.timeout.Sub(time.Now())) - } else { + // Expect an ACK within network timeout if this is first payload after idle + // Otherwise leave the previous timer + if p.num_payloads == 1 { timer.Reset(p.config.Timeout) } case data := <-p.transport.Read(): @@ -288,24 +311,10 @@ PublishLoop: break SelectLoop } timer.Reset(keepalive_timeout) - } else if p.first_payload.timeout != nil { - timer.Reset(p.first_payload.timeout.Sub(time.Now())) } else { timer.Reset(p.config.Timeout) } case <-timer.C: - // Do we need to resend first payload? - if p.out_of_sync != 0 { - var resent bool - if resent, err = p.checkResend(); err != nil { - break SelectLoop - } else if resent { - // Expect an ACK within network timeout - timer.Reset(p.config.Timeout) - break - } - } - // If we have pending payloads, we should've received something by now if p.num_payloads != 0 { err = errors.New("Server did not respond within network timeout") @@ -349,6 +358,9 @@ PublishLoop: } p.can_send = nil + case <-stats_timer.C: + p.updateStatistics(Status_Connected) + stats_timer.Reset(time.Second) } } @@ -360,9 +372,15 @@ PublishLoop: break PublishLoop } + p.updateStatistics(Status_Reconnecting) + // An error occurred, reconnect after timeout log.Error("Transport error, will try again: %s", err) time.Sleep(p.config.Reconnect) + } else { + log.Info("Reconnecting transport") + + p.updateStatistics(Status_Reconnecting) } retry_payload = p.first_payload @@ -400,20 +418,37 @@ func (p *Publisher) reloadConfig(new_config *core.NetworkConfig) int { return reload } +func (p *Publisher) updateStatistics(status int) { + p.Lock() + + p.status = status + + p.line_speed = core.CalculateSpeed(time.Since(p.last_measurement), p.line_speed, float64(p.line_count - p.last_line_count), &p.seconds_no_ack) + + p.last_line_count = p.line_count + p.last_measurement = time.Now() + + p.Unlock() +} + func (p *Publisher) checkResend() (bool, error) { // We're out of sync (received ACKs for later payloads but not earlier ones) // Check timeouts of earlier payloads and resend if necessary if payload := p.first_payload; payload.timeout.Before(time.Now()) { // Do we need to regenerate the payload? if payload.payload == nil { - if err := payload.Generate(p.hostname); err != nil { + if err := payload.Generate(); err != nil { return false, err } } // Update timeout - timeout := time.Now().Add(p.config.Timeout) - payload.timeout = &timeout + payload.timeout = time.Now().Add(p.config.Timeout) + + // Requeue the payload + p.first_payload = payload.next + p.last_payload.next = payload + p.last_payload = payload // Send the payload again if err := p.transport.Write("JDAT", payload.payload); err != nil { @@ -447,7 +482,7 @@ func (p *Publisher) sendNewPayload(events []*core.EventDescriptor) (err error) { } var payload *pendingPayload - if payload, err = newPendingPayload(events, nonce, p.hostname); err != nil { + if payload, err = newPendingPayload(events, nonce, p.config.Timeout); err != nil { return } @@ -459,7 +494,10 @@ func (p *Publisher) sendNewPayload(events []*core.EventDescriptor) (err error) { p.last_payload.next = payload } p.last_payload = payload + + p.Lock() p.num_payloads++ + p.Unlock() return p.transport.Write("JDAT", payload.payload) } @@ -499,11 +537,15 @@ func (p *Publisher) processAck(message []byte) (err error) { // Full ACK? // TODO: Protocol error if sequence is too large? if int(sequence) >= payload.num_events-payload.payload_start { + p.line_count += int64(payload.num_events-payload.ack_events) + // No more events left for this payload, free the payload memory payload.ack_events = len(payload.events) payload.payload = nil delete(p.pending_payloads, nonce) } else { + p.line_count += int64(sequence)-int64(payload.ack_events-payload.payload_start) + // Only process the ACK if something was actually processed if int(sequence) > payload.num_events-payload.ack_events { payload.ack_events = int(sequence) + payload.payload_start @@ -516,6 +558,7 @@ func (p *Publisher) processAck(message []byte) (err error) { // This is where we enforce ordering again to ensure registrar receives ACK in order if payload == p.first_payload { out_of_sync := p.out_of_sync + 1 + for payload.ack_events != 0 { if payload.ack_events != len(payload.events) { p.registrar_spool.Add(registrar.NewAckEvent(payload.events[:payload.ack_events])) @@ -531,10 +574,13 @@ func (p *Publisher) processAck(message []byte) (err error) { p.registrar_spool.Send() payload = payload.next p.first_payload = payload - p.num_payloads-- out_of_sync-- p.out_of_sync = out_of_sync + p.Lock() + p.num_payloads-- + p.Unlock() + // Resume sending if we stopped due to excessive pending payload count if !p.shutdown && p.can_send == nil { p.can_send = p.transport.CanSend() @@ -549,11 +595,28 @@ func (p *Publisher) processAck(message []byte) (err error) { p.out_of_sync++ } - // Set a timeout of the first payload if out of sync as we should be expecting it any time - if p.out_of_sync != 0 && p.first_payload.timeout == nil { - timeout := time.Now().Add(p.config.Timeout) - p.first_payload.timeout = &timeout + return +} + +func (p *Publisher) Snapshot() []*core.Snapshot { + p.RLock() + + snapshot := core.NewSnapshot("Publisher") + + switch p.status { + case Status_Connected: + snapshot.AddEntry("Status", "Connected") + case Status_Reconnecting: + snapshot.AddEntry("Status", "Reconnecting...") + default: + snapshot.AddEntry("Status", "Disconnected") } - return + snapshot.AddEntry("Speed (Lps)", p.line_speed) + snapshot.AddEntry("Published lines", p.last_line_count) + snapshot.AddEntry("Pending Payloads", p.num_payloads) + + p.RUnlock() + + return []*core.Snapshot{snapshot} } diff --git a/src/lc-lib/spooler/spooler.go b/src/lc-lib/spooler/spooler.go index 25712f3f..e9f50ee3 100644 --- a/src/lc-lib/spooler/spooler.go +++ b/src/lc-lib/spooler/spooler.go @@ -25,12 +25,20 @@ import ( "time" ) +const ( + // Event header is just uint32 at the moment + event_header_size = 4 + // Payload header is the nonce plus the ZLIB overheads (http://www.zlib.net/zlib_tech.html) + payload_header_size = 16 + 11 +) + type Spooler struct { core.PipelineSegment core.PipelineConfigReceiver config *core.GeneralConfig spool []*core.EventDescriptor + spool_size int input chan *core.EventDescriptor output chan<- []*core.EventDescriptor timer_start time.Time @@ -66,26 +74,45 @@ SpoolerLoop: for { select { case event := <-s.input: + if len(s.spool) > 0 && int64(s.spool_size) + int64(len(event.Event)) + event_header_size >= s.config.SpoolMaxBytes - payload_header_size { + log.Debug("Spooler flushing %d events due to spool max bytes (%d/%d - next is %d)", len(s.spool), s.spool_size, s.config.SpoolMaxBytes, len(event.Event) + 4) + + // Can't fit this event in the spool - flush and then queue + if !s.sendSpool() { + break SpoolerLoop + } + + s.resetTimer() + s.spool_size += len(event.Event) + event_header_size + s.spool = append(s.spool, event) + + continue + } + + s.spool_size += len(event.Event) + event_header_size s.spool = append(s.spool, event) // Flush if full - if len(s.spool) == cap(s.spool) { + if len(s.spool) >= cap(s.spool) { + log.Debug("Spooler flushing %d events due to spool size reached", len(s.spool)) + if !s.sendSpool() { break SpoolerLoop } - s.timer_start = time.Now() - s.timer.Reset(s.config.SpoolTimeout) + + s.resetTimer() } case <-s.timer.C: // Flush what we have, if anything if len(s.spool) > 0 { + log.Debug("Spooler flushing %d events due to spool timeout exceeded", len(s.spool)) + if !s.sendSpool() { break SpoolerLoop } } - s.timer_start = time.Now() - s.timer.Reset(s.config.SpoolTimeout) + s.resetTimer() case <-s.OnShutdown(): break SpoolerLoop case config := <-s.OnConfig(): @@ -110,10 +137,16 @@ func (s *Spooler) sendSpool() bool { } s.spool = make([]*core.EventDescriptor, 0, s.config.SpoolSize) + s.spool_size = 0 return true } +func (s *Spooler) resetTimer() { + s.timer_start = time.Now() + s.timer.Reset(s.config.SpoolTimeout) +} + func (s *Spooler) reloadConfig(config *core.Config) bool { s.config = &config.General diff --git a/src/lc-lib/transports/tcp.go b/src/lc-lib/transports/tcp.go index 1c71ef8e..06f5f97d 100644 --- a/src/lc-lib/transports/tcp.go +++ b/src/lc-lib/transports/tcp.go @@ -140,6 +140,7 @@ func (t *TransportTcp) ReloadConfig(new_net_config *core.NetworkConfig) int { return core.Reload_Transport } + // TODO - This does not catch changes to the underlying certificate file! if new_config.SSLCertificate != t.config.SSLCertificate || new_config.SSLKey != t.config.SSLKey || new_config.SSLCA != t.config.SSLCA { return core.Reload_Transport } @@ -157,6 +158,7 @@ func (t *TransportTcp) Init() error { // Pick a random server from the list. hostport := t.net_config.Servers[rand.Int()%len(t.net_config.Servers)] + // TODO: Parse and lookup using net.ResolveTCPAddr submatch := t.config.hostport_re.FindSubmatch([]byte(hostport)) if submatch == nil { return fmt.Errorf("Invalid host:port given: %s", hostport) @@ -296,12 +298,19 @@ func (t *TransportTcp) receiver() { } // Pass back the message - t.recv_chan <- [][]byte{header[0:4], message} + select { + case <-t.shutdown: + break + case t.recv_chan <- [][]byte{header[0:4], message}: + } } /* loop until shutdown */ if err != nil { // Pass the error back and abort - t.recv_chan <- err + select { + case <-t.shutdown: + case t.recv_chan <- err: + } } t.wait.Done()