Skip to content

Commit aa7eea1

Browse files
committed
add defensive code for dealing with concurrency tracking when cluster events cause maps to reset
1 parent 17a7c32 commit aa7eea1

File tree

2 files changed

+20
-9
lines changed

2 files changed

+20
-9
lines changed

common/scala/src/main/scala/org/apache/openwhisk/common/NestedSemaphore.scala

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -100,14 +100,19 @@ class NestedSemaphore[T](memoryPermits: Int) extends ForcibleSemaphore(memoryPer
100100
if (maxConcurrent == 1) {
101101
super.release(memoryPermits)
102102
} else {
103-
val concurrentSlots = actionConcurrentSlotsMap(actionid)
104-
val (memoryRelease, actionRelease) = concurrentSlots.release(1, true)
105-
//concurrent slots
106-
if (memoryRelease) {
107-
super.release(memoryPermits)
108-
}
109-
if (actionRelease) {
110-
actionConcurrentSlotsMap.remove(actionid)
103+
//This map may be recreated (multiple times) due to cluster membership events, so don't assume the entry exists...
104+
actionConcurrentSlotsMap.get(actionid) match {
105+
case Some(concurrentSlots) =>
106+
val (memoryRelease, actionRelease) = concurrentSlots.release(1, true)
107+
//concurrent slots
108+
if (memoryRelease) {
109+
super.release(memoryPermits)
110+
}
111+
if (actionRelease) {
112+
actionConcurrentSlotsMap.remove(actionid)
113+
}
114+
case None =>
115+
//this case can occur if one or more cluster members is restarted, resetting the state of the outstanding activations
111116
}
112117
}
113118
}

common/scala/src/main/scala/org/apache/openwhisk/core/connector/MessageConsumer.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,13 @@ class MessageFeed(description: String,
213213
outstandingMessages = outstandingMessages.tail
214214

215215
if (logHandoff) logging.debug(this, s"processing $topic[$partition][$offset] ($occupancy/$handlerCapacity)")
216-
handler(bytes)
216+
handler(bytes).andThen {
217+
{
218+
case Failure(e) =>
219+
logging.error(this, s"Failed to process message for topic $topic : $e (stack trace included)")
220+
e.printStackTrace()
221+
}
222+
}
217223
handlerCapacity -= 1
218224

219225
sendOutstandingMessages()

0 commit comments

Comments
 (0)