- Notifications
You must be signed in to change notification settings - Fork 20
Add option to send indexed fields along with events #25
base: master
Are you sure you want to change the base?
Add option to send indexed fields along with events #25
Conversation
| I am testing this out as I would really like this feature. Though I am having trouble getting a correct "fields" json object with the record transformer plugin. Similar to your example, I have:
but I get an error:
I am using |
| Hi Brian, Unfortunately I have not been using the more recent versions of fluentd. I have been using "gem install fluentd -v 0.12.35" for my deployments in Kubernetes. Perhaps something has changed in the more recent version that I didn't anticipate. I know that isn't very helpful. I had also done the same testing with td-agent version 2. . .but that is also apparently fluentd version 0.12. Perhaps I can get td-agent version 3 installed somewhere and do a quick test to see what happens with it. I think that td-agent version 3 is very close, if not exactly, fluentd version 1.0.0 …On Tue, Mar 6, 2018 at 10:40 AM, Brian Wong ***@***.***> wrote: I am testing this out as I would really like this feature. Though I am having trouble getting a correct "fields" json object with the record transformer plugin. Similar to your example, I have: fields '{"logfile": "${record["logfile"]}"}' but I get an error: 2018-03-06 01:31:49 +0000 [warn]: #0 failed to parse {"logfile": "${record["logfile"]}"} as json. Assuming {"logfile": "${record["logfile"]}"} is a string error_class=JSON::ParserError error="765: unexpected token at '{\"logfile\": \"${record[\"logfile\"]}\"}'" I am using fluentd-1.1.0 pid=7 ruby="2.4.3". This may be a general Fluentd question but I was hoping you can help so I can test this PR myself. — You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub <#25 (comment)>, or mute the thread <https://github.com/notifications/unsubscribe-auth/AbwQc8uNKx6wqGnVTx72eg7hCvU2Wg0bks5tbtgJgaJpZM4SRdat> . |
| Thanks for the quick response. I was not able to get this working on v0.12.42 either. |
| Can you post a more comprehensive configuration so I can verify my settings? |
| Sure thing, here are the relevant parts of my fluentd config for this: First, I am receiving what are essentially custom syslog messages on port 5140 udp: <source> @type syslog port 5140 bind 0.0.0.0 tag syslog format /^(?<time>[^ ]*\s*[^ ]* [^ ]*) (?<host>\S+) (?<app-name>\S+) (?<procid>\S+) (?<msgid>\S+) \[(?<sd-id>\S+) index="(?<index>[^"]*)" cluster="(?<cluster>[^"]*)"\] (?<message>.*)$/ message_format auto message_length_limit 10240 </source> Note that in the syslog message, they are sending a key value pair (doesn't have to be, could be anything you capture as a named group, just happens to be a key value pair here.) for "cluster" which I capture as "cluster". Now I believe it is in the record as "cluster": "blah" Next, I use a filter with record transformer to in inject the "fields" json object into the record: <filter *.syslog.local0.**> @type record_transformer <record> fields '{"cluster": "${record["cluster"]}" }' </record> </filter> Then I send them out to the http event collector: <match *.syslog.local0.**> @type splunk-http-eventcollector server servername.corp.theplatform.com:8088 verify false send_fields true token D17501D5-5DA1-4096-BE8E-B0CD05C318CB host '${record["host"]}' index '${record["index"]}' check_index false source "#{Socket.gethostname}.${tag_parts[1]}.${tag_parts[2]}.${tag_parts[3]}" sourcetype '${record["app-name"]}-${record["msgid"]}' fields '${record["fields"]}' buffer_type memory buffer_queue_limit 16 buffer_chunk_limit 8m flush_interval 5s </match> Hopefully that helps! …On Tue, Mar 6, 2018 at 4:01 PM, Brian Wong ***@***.***> wrote: Can you post a more comprehensive configuration so I can verify my settings? — You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub <#25 (comment)>, or mute the thread <https://github.com/notifications/unsubscribe-auth/AbwQc3CYGuRalo9cILOueQgi_lq5_KHvks5tbyNcgaJpZM4SRdat> . |
| Thanks for this. It turns out that the error I am seeing does not affect the functionality. Your changes works perfectly. Thank you for this. |
| can we get this change in soon? |
| +1 for this change |
| "host" => @placeholder_expander.expand(@host.to_s, placeholders), | ||
| "index" => @placeholder_expander.expand(@index, placeholders) | ||
| ] | ||
| if @send_fields |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't this be better as:
splunk_object = Hash[ "time" => time.to_i, "source" => if @source.nil? then tag.to_s else @placeholder_expander.expand(@source, placeholders) end, "sourcetype" => @placeholder_expander.expand(@sourcetype.to_s, placeholders), "host" => @placeholder_expander.expand(@host.to_s, placeholders), "index" => @placeholder_expander.expand(@index, placeholders) ] if @send_fields splunk_object = splunk_object.merge(Hash[ "fields" => JSON.parse(@placeholder_expander.expand(@fields.to_s, placeholders)) ])
We collect events with fluentd via syslog input that match the following regex format:
^(?<time>[^ ]*\s*[^ ]* [^ ]*) (?<host>\S+) sourcetype=(?<sourcetype>\S+) cluster=(?<cluster>\S+)::(?<message>.*)$We want to send in cluster=clustername as an indexed field with these events.
We modify the records to have a "fields" json object with the record transformer plugin:
fields '{"cluster": "${record["cluster"]}" }'Then we configure the http event collector plugin to include the indexed field by setting the following options:
send_fields truefields '${record["fields"]}'