@@ -127,17 +127,18 @@ void StagingEngine::begin_sub_transaction()
127127 if (not sub_transaction_in_progress_) {
128128 current_sub_transaction_id_++;
129129 sub_transaction_in_progress_ = true ;
130- XBT_DEBUG (" Subscribe Transaction %u started by %s" , current_sub_transaction_id_, sg4::Actor::self ()->get_cname ());
131130 }
132131
133132 num_subscribers_starting_++;
133+ XBT_DEBUG (" Subscribe Transaction %u started by %s (%d/%lu)" , current_sub_transaction_id_, sg4::Actor::self ()->get_cname (),
134+ num_subscribers_starting_,get_num_subscribers ());
135+
134136
135137 // The last subscriber to start a transaction notifies the publishers
136138 if (num_subscribers_starting_ == get_num_subscribers () &&
137139 current_pub_transaction_id_ == current_sub_transaction_id_) {
138140 XBT_DEBUG (" Notify Publishers that they can start their transaction" );
139141 sub_transaction_started_->notify_all ();
140- num_subscribers_starting_ = 0 ;
141142 }
142143
143144 std::unique_lock<sg4::Mutex> lock (*sub_mutex_);
@@ -159,13 +160,17 @@ void StagingEngine::end_sub_transaction()
159160 sub_transaction_.wait_all ();
160161 XBT_DEBUG (" All on-flight subscribe activities are completed. Proceed with the current transaction." );
161162 sub_transaction_.clear ();
162- // Mark this transaction as over
163- sub_transaction_in_progress_ = false ;
164163 }
165164
166165 // Prevent subscribers to start a new transaction before this one is really over
167- if (sub_barrier_)
168- sub_barrier_->wait ();
166+ if (sub_barrier_->wait ())
167+ // Mark this transaction as over
168+ sub_transaction_in_progress_ = false ;
169+ // Decrease counter for next iteration
170+ num_subscribers_starting_--;
171+ XBT_DEBUG (" Subscribe Transaction %u end by %s (%d/%lu)" , current_sub_transaction_id_, sg4::Actor::self ()->get_cname (),
172+ num_subscribers_starting_,get_num_subscribers ());
173+
169174}
170175
171176void StagingEngine::sub_close ()
0 commit comments