Emque Consuming is a Ruby application framework that includes everything needed to create and run application capable of consuming messages from a message broker in a Pub/sub architecture. Messages can be produced with the emque-producing library.
We currently only support RabbitMQ. If you would like to add your own adapter, take a look at the adapters directory.
Add this line to your application's Gemfile:
gem "emque-consuming" # make sure you have bunny for rabbitmq unless you're using a custom adapter gem "bunny", "~> 2.11.0"And then execute:
$ bundle Or install it yourself as:
$ gem install emque-consuming Using the new generator is the easiest way to get up and running quickly.
emque <options> new name This command will create a directory "name" with barebones directories and files you'll need to get started. You can also use the following command line options to customize your application's configuration. Options will be added to the config/application.rb file generated.
emque <options> (start|stop|new|console|help) <name (new only)> # ... -S, --socket PATH PATH to the application's unix socket -b, --bind IP:PORT IP & port for the http status application to listen on. # ... -e, --error-limit N Set the max errors before application suicide -s, --status Run the http status application -x, --error-expiration SECONDS Expire errors after SECONDS --app-name NAME Run the application as NAME # ... In 1.2.0 the ability to automatically retry specific types of errors was introduced. This depends on the Delayed Message Exchange plugin. After installing the plugin, you'll need to configure emque consuming to utilize the plugin:
config.enable_delayed_message = true # turn on retryable errors, defaults to false config.retryable_errors = ["ExampleError"] # a comma delimited array of strings matching the error, defaults to any empty array ([]) config.retryable_error_limit = 4 # the number of retries to use before dead lettering the message, defaults to 3 config.delayed_message_workers = 3 # the number of workers processing delayed messages, defaults to 1 This will now allow you to retry known errors with an exponential backoff, which should help with transient errors!
Configure Emque::Consuming in your config/application.rb file. Here is an example:
# config/application.rb require "emque/consuming" module Example class Application include Emque::Consuming::Application initialize_core! config.set_adapter(:rabbit_mq, :url => ENV["RABBITMQ_URL"], :prefetch => 10) # customize the error thresholds # config.error_limit = 10 # config.error_expiration = 300 # enable the status application # config.status = :on # errors will be logged, but if more is needed, that can be added # config.error_handlers << Proc.new {|ex, context| # send an email, send to HoneyBadger, etc # } # do something when shutdown occurs # config.shutdown_handlers << Proc.new { |context| # notify slack, pagerduty or send an email # } end endYou'll also want to set up a routes.rb file:
# config/routes.rb Example::Application.router.map do topic "events" => EventsConsumer do map "events.new" => "new_event" # ... end # ... endand a consumer for each topic:
# app/consumers/events_consumer.rb class EventsConsumer include Emque::Consuming.consumer def new_event(message) # NOTE: message is an immutable Virtus (https://github.com/solnic/virtus) Value Object. # Check it out here: https://github.com/emque/emque-consuming/blob/main/lib/emque/consuming/message.rb # You don't have to use pipe (https://github.com/teamsnap/pipe-ruby), but we love it! pipe(message, :through => [ :shared_action, :do_something_with_new_event ]) end private def shared_action(message) # ... end def do_something_with_new_event(message) # ... end endEmque::Consuming provides a command line interface:
$ bundle exec emque help emque <options> (start|stop|new|console|help) <name (new only)> -P, --pidfile PATH Store pid in PATH -S, --socket PATH PATH to the application's unix socket -b, --bind IP:PORT IP & port for the http status application to listen on. -d, --daemon Daemonize the application -e, --error-limit N Set the max errors before application suicide -s, --status Run the http status application -x, --error-expiration SECONDS Expire errors after SECONDS --app-name NAME Run the application as NAME --env (ex. production) Set the application environment, overrides EMQUE_ENV --auto-shutdown (false|true) Enable or disable auto shutdown on reaching the error limit and a series of rake commands:
$ bundle exec rake -T rake emque:configuration # Show the current configuration of a running instance (accepts SOCKET) rake emque:console # Start a pry console rake emque:errors:clear # Clear all outstanding errors (accepts SOCKET) rake emque:errors:expire_after # Change the number of seconds to SECONDS before future errors expire (accepts SOCKET) rake emque:errors:limit:down # Decrease the error limit (accepts SOCKET) rake emque:errors:limit:up # Increase the error limit (accepts SOCKET) rake emque:restart # Restart the workers inside a running instance (does not reload code; accepts SOCKET) rake emque:routes # Show the available routes rake emque:start # Start a new instance (accepts PIDFILE, DAEMON) rake emque:status # Show the current status of a running instance (accepts SOCKET) rake emque:stop # Stop a running instance (accepts SOCKET) To use the rake commands, add the following lines to your application's Rakefile:
require_relative "config/application" require "emque/consuming/tasks"Testing is a bit sparse at the moment, but we're working on it.
To run tests...
bundle exec rspec FIRST: Read our style guides at https://github.com/teamsnap/guides/tree/master/ruby
- Fork it ( http://github.com/emque/emque-consuming/fork )
- Create your feature branch (
git checkout -b my-new-feature) - Commit your changes (
git commit -am 'Add some feature') - Push to the branch (
git push origin my-new-feature) - Create new Pull Request