@@ -212,31 +212,45 @@ def run
212212 @logger . push_tags ( @producer . to_s )
213213 @logger . info "Starting async producer in the background..."
214214
215+ do_loop
216+ rescue Exception => e
217+ @logger . error "Unexpected Kafka error #{ e . class } : #{ e . message } \n #{ e . backtrace . join ( "\n " ) } "
218+ @logger . error "Async producer crashed!"
219+ ensure
220+ @producer . shutdown
221+ @logger . pop_tags
222+ end
223+
224+ private
225+
226+ def do_loop
215227 loop do
216- operation , payload = @queue . pop
217-
218- case operation
219- when :produce
220- produce ( payload [ 0 ] , **payload [ 1 ] )
221- deliver_messages if threshold_reached?
222- when :deliver_messages
223- deliver_messages
224- when :shutdown
225- begin
226- # Deliver any pending messages first.
227- @producer . deliver_messages
228- rescue Error => e
229- @logger . error ( "Failed to deliver messages during shutdown: #{ e . message } " )
230-
231- @instrumenter . instrument ( "drop_messages.async_producer" , {
232- message_count : @producer . buffer_size + @queue . size ,
233- } )
228+ begin
229+ operation , payload = @queue . pop
230+
231+ case operation
232+ when :produce
233+ produce ( payload [ 0 ] , **payload [ 1 ] )
234+ deliver_messages if threshold_reached?
235+ when :deliver_messages
236+ deliver_messages
237+ when :shutdown
238+ begin
239+ # Deliver any pending messages first.
240+ @producer . deliver_messages
241+ rescue Error => e
242+ @logger . error ( "Failed to deliver messages during shutdown: #{ e . message } " )
243+
244+ @instrumenter . instrument ( "drop_messages.async_producer" , {
245+ message_count : @producer . buffer_size + @queue . size ,
246+ } )
247+ end
248+
249+ # Stop the run loop.
250+ break
251+ else
252+ raise "Unknown operation #{ operation . inspect } "
234253 end
235-
236- # Stop the run loop.
237- break
238- else
239- raise "Unknown operation #{ operation . inspect } "
240254 end
241255 end
242256 rescue Kafka ::Error => e
@@ -245,16 +259,8 @@ def run
245259
246260 sleep 10
247261 retry
248- rescue Exception => e
249- @logger . error "Unexpected Kafka error #{ e . class } : #{ e . message } \n #{ e . backtrace . join ( "\n " ) } "
250- @logger . error "Async producer crashed!"
251- ensure
252- @producer . shutdown
253- @logger . pop_tags
254262 end
255263
256- private
257-
258264 def produce ( value , **kwargs )
259265 retries = 0
260266 begin
0 commit comments