Module: Elasticsearch::Model::Importing::ClassMethods

Included in:
Proxy::ClassMethodsProxy
Defined in:
lib/elasticsearch/model/importing.rb

Instance Method Summary collapse

Instance Method Details

#__batch_to_bulk(batch, transform) ⇒ Object

 178 179 180
# File 'lib/elasticsearch/model/importing.rb', line 178 def __batch_to_bulk(batch, transform) batch.map { |model| transform.call(model) } end 

#import(options = {}, &block) {|Hash| ... } ⇒ Fixnum, Array<Hash>

Import all model records into the index

The method will pick up correct strategy based on the ‘Importing` module defined in the corresponding adapter.

Examples:

Import all records into the index

 Article.import 

Set the batch size to 100

 Article.import batch_size: 100 

Process the response from Elasticsearch

 Article.import do |response| puts "Got " + response['items'].select { |i| i['index']['error'] }.size.to_s + " errors" end 

Delete and create the index with appropriate settings and mappings

 Article.import force: true 

Refresh the index after importing all batches

 Article.import refresh: true 

Import the records into a different index/type than the default one

 Article.import index: 'my-new-index', type: 'my-other-type' 

Pass an ActiveRecord scope to limit the imported records

 Article.import scope: 'published' 

Pass an ActiveRecord query to limit the imported records

 Article.import query: -> { where(author_id: author_id) } 

Transform records during the import with a lambda

 transform = lambda do |a| {index: {_id: a.id, _parent: a.author_id, data: a.__elasticsearch__.as_indexed_json}} end Article.import transform: transform 

Update the batch before yielding it

 class Article # ... def self.enrich(batch) batch.each do |item| item. = MyAPI.(item.id) end batch end end Article.import preprocess: :enrich 

Return an array of error elements instead of the number of errors, e.g. to try importing these records again

 Article.import return: 'errors' 

Parameters:

  • options (Hash) (defaults to: {})

    Options passed to the underlying ‘__find_in_batches` method

  • block (Proc)

    Optional block to evaluate for each batch

Yields:

  • (Hash)

    Gives the Hash with the Elasticsearch response to the block

Returns:

  • (Fixnum)

    default, number of errors encountered during importing

  • (Array<Hash>)

    if return option is specified to be “errors”, contains only those failed items in the response items key, e.g.:

    [ { "index" => { "error" => 'FAILED', "_index" => "test", "_id" => '1', "_version" => 1, "result" => "foo", "_shards" => { "total" => 1, "successful" => 0, "failed" => 1 }, "status" => 400 } } ] 
 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
# File 'lib/elasticsearch/model/importing.rb', line 136 def import(options={}, &block) errors = [] refresh = options.delete(:refresh) || false target_index = options.delete(:index) || index_name transform = options.delete(:transform) || __transform pipeline = options.delete(:pipeline) return_value = options.delete(:return) || 'count' unless transform.respond_to?(:call) raise ArgumentError, "Pass an object responding to `call` as the :transform option, #{transform.class} given" end if options.delete(:force) self.create_index! force: true, index: target_index elsif !self.index_exists? index: target_index raise ArgumentError, "#{target_index} does not exist to be imported into. Use create_index! or the :force option to create it." end __find_in_batches(options) do |batch| params = { index: target_index, body: __batch_to_bulk(batch, transform) } params[:pipeline] = pipeline if pipeline response = client.bulk params yield response if block_given? errors += response['items'].select { |k, v| k.values.first['error'] } end self.refresh_index! index: target_index if refresh case return_value when 'errors' errors else errors.size end end