66require "tempfile"
77require "time"
88
9- import "javax.crypto.Mac"
10-
11- class ProtocolError < LogStash ::Error ; end
12- class HeaderError < LogStash ::Error ; end
13- class EncryptionError < LogStash ::Error ; end
14- class NaNError < LogStash ::Error ; end
9+ require 'logstash/plugin_mixins/event_support/event_factory_adapter'
10+ require 'logstash/plugin_mixins/validator_support/field_reference_validation_adapter'
1511
1612# Read events from the collectd binary protocol over the network via udp.
1713# See https://collectd.org/wiki/index.php/Binary_protocol
@@ -38,15 +34,24 @@ class NaNError < LogStash::Error; end
3834# IgnoreSelected false
3935# </Plugin>
4036# <Plugin network>
41- # <Server "10.0.0.1" "25826">
42- # </Server>
37+ # Server "10.0.0.1" "25826"
4338# </Plugin>
4439#
4540# Be sure to replace `10.0.0.1` with the IP of your Logstash instance.
4641#
4742class LogStash ::Codecs ::Collectd < LogStash ::Codecs ::Base
43+
44+ extend LogStash ::PluginMixins ::ValidatorSupport ::FieldReferenceValidationAdapter
45+
46+ include LogStash ::PluginMixins ::EventSupport ::EventFactoryAdapter
47+
4848 config_name "collectd"
4949
50+ class ProtocolError < LogStash ::Error ; end
51+ class HeaderError < LogStash ::Error ; end
52+ class EncryptionError < LogStash ::Error ; end
53+ class NaNError < LogStash ::Error ; end
54+
5055 @@openssl_mutex = Mutex . new
5156
5257 AUTHFILEREGEX = /([^:]+): (.+)/
@@ -108,8 +113,7 @@ class LogStash::Codecs::Collectd < LogStash::Codecs::Base
108113
109114 # Security Level. Default is `None`. This setting mirrors the setting from the
110115 # collectd https://collectd.org/wiki/index.php/Plugin:Network[Network plugin]
111- config :security_level , :validate => [ SECURITY_NONE , SECURITY_SIGN , SECURITY_ENCR ] ,
112- :default => "None"
116+ config :security_level , :validate => [ SECURITY_NONE , SECURITY_SIGN , SECURITY_ENCR ] , :default => "None"
113117
114118 # What to do when a value in the event is `NaN` (Not a Number)
115119 #
@@ -132,6 +136,12 @@ class LogStash::Codecs::Collectd < LogStash::Codecs::Base
132136 # `Sign` or `Encrypt`
133137 config :authfile , :validate => :string
134138
139+ # Defines a target field for placing decoded fields.
140+ # If this setting is omitted, data gets stored at the root (top level) of the event.
141+ #
142+ # NOTE: the target is only relevant while decoding data into a new event.
143+ config :target , :validate => :field_reference
144+
135145 public
136146 def register
137147 @logger . trace ( "Starting Collectd codec..." )
@@ -467,14 +477,10 @@ def decode(payload)
467477 # This is better than looping over all keys every time.
468478 collectd . delete ( 'type_instance' ) if collectd [ 'type_instance' ] == ""
469479 collectd . delete ( 'plugin_instance' ) if collectd [ 'plugin_instance' ] == ""
470- if add_nan_tag
471- collectd [ 'tags' ] ||= [ ]
472- collectd [ 'tags' ] << @nan_tag
473- end
474480 # This ugly little shallow-copy hack keeps the new event from getting munged by the cleanup
475481 # With pass-by-reference we get hosed (if we pass collectd, then clean it up rapidly, values can disappear)
476482 if !drop # Drop the event if it's flagged true
477- yield LogStash :: Event . new ( collectd . dup )
483+ yield generate_event ( collectd . dup , add_nan_tag )
478484 else
479485 raise ( NaNError )
480486 end
@@ -485,8 +491,15 @@ def decode(payload)
485491 end
486492 end
487493 end # while payload.length > 0 do
488- rescue EncryptionError , ProtocolError , HeaderError , NaNError
494+ rescue EncryptionError , ProtocolError , HeaderError , NaNError => e
489495 # basically do nothing, we just want out
496+ @logger . debug ( "Decode failure" , payload : payload , message : e . message )
490497 end # def decode
491498
499+ def generate_event ( payload , add_nan_tag )
500+ event = targeted_event_factory . new_event ( payload )
501+ event . tag @nan_tag if add_nan_tag
502+ event
503+ end
504+
492505end # class LogStash::Codecs::Collectd
0 commit comments