|
| 1 | +require "active_record/relation/batches/batch_enumerator" |
| 2 | + |
1 | 3 | module ActiveRecord
|
2 | 4 | module Batches
|
3 | 5 | # Looping through a collection of records from the database
|
@@ -122,24 +124,102 @@ def find_in_batches(begin_at: nil, end_at: nil, batch_size: 1000, start: nil)
|
122 | 124 | end
|
123 | 125 | end
|
124 | 126 |
|
| 127 | + in_batches(of: batch_size, begin_at: begin_at, end_at: end_at, load: true) do |batch| |
| 128 | + yield batch.to_a |
| 129 | + end |
| 130 | + end |
| 131 | + |
| 132 | + # Yields ActiveRecord::Relation objects to work with a batch of records. |
| 133 | + # |
| 134 | + # Person.where("age > 21").in_batches do |relation| |
| 135 | + # relation.delete_all |
| 136 | + # sleep(10) # Throttle the delete queries |
| 137 | + # end |
| 138 | + # |
| 139 | + # If you do not provide a block to #in_batches, it will return a |
| 140 | + # BatchEnumerator which is enumerable. |
| 141 | + # |
| 142 | + # Person.in_batches.with_index do |relation, batch_index| |
| 143 | + # puts "Processing relation ##{batch_index}" |
| 144 | + # relation.each { |relation| relation.delete_all } |
| 145 | + # end |
| 146 | + # |
| 147 | + # Examples of calling methods on the returned BatchEnumerator object: |
| 148 | + # |
| 149 | + # Person.in_batches.delete_all |
| 150 | + # Person.in_batches.update_all(awesome: true) |
| 151 | + # Person.in_batches.each_record(&:party_all_night!) |
| 152 | + # |
| 153 | + # ==== Options |
| 154 | + # * <tt>:of</tt> - Specifies the size of the batch. Default to 1000. |
| 155 | + # * <tt>:load</tt> - Specifies if the relation should be loaded. Default to false. |
| 156 | + # * <tt>:begin_at</tt> - Specifies the primary key value to start from, inclusive of the value. |
| 157 | + # * <tt>:end_at</tt> - Specifies the primary key value to end at, inclusive of the value. |
| 158 | + # |
| 159 | + # This is especially useful if you want to work with the |
| 160 | + # ActiveRecord::Relation object instead of the array of records, or if |
| 161 | + # you want multiple workers dealing with the same processing queue. You can |
| 162 | + # make worker 1 handle all the records between id 0 and 10,000 and worker 2 |
| 163 | + # handle from 10,000 and beyond (by setting the +:begin_at+ and +:end_at+ |
| 164 | + # option on each worker). |
| 165 | + # |
| 166 | + # # Let's process the next 2000 records |
| 167 | + # Person.in_batches(of: 2000, begin_at: 2000).update_all(awesome: true) |
| 168 | + # |
| 169 | + # An example of calling where query method on the relation: |
| 170 | + # |
| 171 | + # Person.in_batches.each do |relation| |
| 172 | + # relation.update_all('age = age + 1') |
| 173 | + # relation.where('age > 21').update_all(should_party: true) |
| 174 | + # relation.where('age <= 21').delete_all |
| 175 | + # end |
| 176 | + # |
| 177 | + # NOTE: If you are going to iterate through each record, you should call |
| 178 | + # #each_record on the yielded BatchEnumerator: |
| 179 | + # |
| 180 | + # Person.in_batches.each_record(&:party_all_night!) |
| 181 | + # |
| 182 | + # NOTE: It's not possible to set the order. That is automatically set to |
| 183 | + # ascending on the primary key ("id ASC") to make the batch ordering |
| 184 | + # consistent. Therefore the primary key must be orderable, e.g an integer |
| 185 | + # or a string. |
| 186 | + # |
| 187 | + # NOTE: You can't set the limit either, that's used to control the batch |
| 188 | + # sizes. |
| 189 | + def in_batches(of: 1000, begin_at: nil, end_at: nil, load: false) |
| 190 | + relation = self |
| 191 | + unless block_given? |
| 192 | + return BatchEnumerator.new(of: of, begin_at: begin_at, end_at: end_at, relation: self) |
| 193 | + end |
| 194 | + |
125 | 195 | if logger && (arel.orders.present? || arel.taken.present?)
|
126 | 196 | logger.warn("Scoped order and limit are ignored, it's forced to be batch order and batch size")
|
127 | 197 | end
|
128 | 198 |
|
129 |
| - relation = relation.reorder(batch_order).limit(batch_size) |
| 199 | + relation = relation.reorder(batch_order).limit(of) |
130 | 200 | relation = apply_limits(relation, begin_at, end_at)
|
131 |
| - records = relation.to_a |
| 201 | + batch_relation = relation |
| 202 | + |
| 203 | + loop do |
| 204 | + if load |
| 205 | + records = batch_relation.to_a |
| 206 | + ids = records.map(&:id) |
| 207 | + yielded_relation = self.where(primary_key => ids) |
| 208 | + yielded_relation.load_records(records) |
| 209 | + else |
| 210 | + ids = batch_relation.pluck(primary_key) |
| 211 | + yielded_relation = self.where(primary_key => ids) |
| 212 | + end |
132 | 213 |
|
133 |
| - while records.any? |
134 |
| - records_size = records.size |
135 |
| - primary_key_offset = records.last.id |
136 |
| - raise "Primary key not included in the custom select clause" unless primary_key_offset |
| 214 | + break if ids.empty? |
137 | 215 |
|
138 |
| - yield records |
| 216 | + primary_key_offset = ids.last |
| 217 | + raise ArgumentError.new("Primary key not included in the custom select clause") unless primary_key_offset |
139 | 218 |
|
140 |
| - break if records_size < batch_size |
| 219 | + yield yielded_relation |
141 | 220 |
|
142 |
| - records = relation.where(table[primary_key].gt(primary_key_offset)).to_a |
| 221 | + break if ids.length < of |
| 222 | + batch_relation = relation.where(table[primary_key].gt(primary_key_offset)) |
143 | 223 | end
|
144 | 224 | end
|
145 | 225 |
|
|
0 commit comments