@@ -5,6 +5,7 @@ class JobBatch < Record
5
5
belongs_to :job, foreign_key: :job_id, optional: true
6
6
belongs_to :parent_job_batch, foreign_key: :parent_job_batch_id, class_name: "SolidQueue::JobBatch", optional: true
7
7
has_many :jobs, foreign_key: :batch_id
8
+ has_many :children, foreign_key: :parent_job_batch_id, class_name: "SolidQueue::JobBatch"
8
9
9
10
serialize :on_finish_active_job, coder: JSON
10
11
serialize :on_success_active_job, coder: JSON
@@ -21,28 +22,33 @@ def current_batch_id
21
22
end
22
23
23
24
def enqueue(attributes = {})
24
- previous_batch_id = current_batch_id.presence || nil
25
-
26
25
job_batch = nil
27
26
transaction do
28
27
job_batch = create!(batch_attributes(attributes))
29
- ActiveSupport::IsolatedExecutionState[:current_batch_id] = job_batch.id
30
- yield job_batch
28
+ wrap_in_batch_context(job_batch.id) do
29
+ yield job_batch
30
+ end
31
31
end
32
32
33
33
job_batch
34
- ensure
35
- ActiveSupport::IsolatedExecutionState[:current_batch_id] = previous_batch_id
36
34
end
37
35
38
36
def dispatch_finished_batches
39
37
incomplete.order(:id).pluck(:id).each do |id|
40
38
transaction do
41
- where(id: id).non_blocking_lock.each(&:finish)
39
+ where(id: id).includes(:children, :jobs). non_blocking_lock.each(&:finish)
42
40
end
43
41
end
44
42
end
45
43
44
+ def wrap_in_batch_context(batch_id)
45
+ previous_batch_id = current_batch_id.presence || nil
46
+ ActiveSupport::IsolatedExecutionState[:current_batch_id] = batch_id
47
+ yield
48
+ ensure
49
+ ActiveSupport::IsolatedExecutionState[:current_batch_id] = previous_batch_id
50
+ end
51
+
46
52
private
47
53
48
54
def batch_attributes(attributes)
@@ -62,6 +68,8 @@ def batch_attributes(attributes)
62
68
attributes[:on_failure_active_job] = as_active_job(on_failure_klass).serialize
63
69
end
64
70
71
+ attributes[:parent_job_batch_id] = current_batch_id if current_batch_id.present?
72
+
65
73
attributes
66
74
end
67
75
@@ -74,16 +82,13 @@ def as_active_job(active_job_klass)
74
82
def enqueue(attributes = {})
75
83
raise "You cannot enqueue a batch that is already finished" if finished?
76
84
77
- previous_batch_id = self.class.current_batch_id.presence || nil
78
-
79
85
transaction do
80
- ActiveSupport::IsolatedExecutionState[:current_batch_id] = id
81
- yield self
86
+ self.class.wrap_in_batch_context(id) do
87
+ yield self
88
+ end
82
89
end
83
90
84
91
self
85
- ensure
86
- ActiveSupport::IsolatedExecutionState[:current_batch_id] = previous_batch_id
87
92
end
88
93
89
94
def finished?
@@ -110,6 +115,10 @@ def finish
110
115
return unless status.in?([ :finished, :failed ])
111
116
end
112
117
118
+ children.find_each do |child|
119
+ return unless child.finished?
120
+ end
121
+
113
122
if on_finish_active_job.present?
114
123
perform_completion_job(:on_finish_active_job, attrs)
115
124
end
@@ -118,7 +127,10 @@ def finish
118
127
perform_completion_job(:on_success_active_job, attrs)
119
128
end
120
129
121
- update!({ finished_at: Time.zone.now }.merge(attrs))
130
+ transaction do
131
+ parent_job_batch.touch(:changed_at, :last_changed_at) if parent_job_batch_id.present?
132
+ update!({ finished_at: Time.zone.now }.merge(attrs))
133
+ end
122
134
end
123
135
124
136
private
@@ -133,7 +145,9 @@ def perform_completion_job(job_field, attrs)
133
145
active_job = ActiveJob::Base.deserialize(send(job_field))
134
146
active_job.send(:deserialize_arguments_if_needed)
135
147
active_job.arguments = [ self ] + Array.wrap(active_job.arguments)
136
- ActiveJob.perform_all_later([ active_job ])
148
+ self.class.wrap_in_batch_context(id) do
149
+ ActiveJob.perform_all_later([ active_job ])
150
+ end
137
151
active_job.provider_job_id = Job.find_by(active_job_id: active_job.job_id).id
138
152
attrs[job_field] = active_job.serialize
139
153
end
0 commit comments