Skip to content

Commit 2d6dccd

Browse files
committed
DEVX-533: Implement LoadBalancer Strategy based on annotation
- added unit tests
1 parent 9919df9 commit 2d6dccd

File tree

5 files changed

+264
-15
lines changed

5 files changed

+264
-15
lines changed

core/controller/src/main/resources/reference.conf

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ whisk {
2121
}
2222
loadbalancer {
2323
strategy {
24-
"default" = org.apache.openwhisk.core.loadBalancer.LeanBalancer
25-
"multitenantrouting" = org.apache.openwhisk.core.loadBalancer.LeanBalancer
24+
default = ""
25+
custom = {}
2626
}
2727
managed-fraction: 90%
2828
blackbox-fraction: 10%

core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/MuxBalancer.scala

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,33 +3,51 @@ package org.apache.openwhisk.core.loadBalancer
33
import akka.actor.{ActorRef, ActorSystem, Props}
44
import akka.stream.ActorMaterializer
55
import org.apache.openwhisk.common.{Logging, TransactionId}
6-
import org.apache.openwhisk.core.WhiskConfig
6+
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
77
import org.apache.openwhisk.core.WhiskConfig._
88
import org.apache.openwhisk.core.connector.{ActivationMessage, MessagingProvider}
99
import org.apache.openwhisk.core.entity._
1010
import org.apache.openwhisk.spi.SpiLoader
11+
import pureconfig.loadConfigOrThrow
1112
import spray.json._
13+
import pureconfig._
14+
import pureconfig.generic.auto._
1215

1316
import scala.concurrent.Future
1417

1518
class MuxBalancer(config: WhiskConfig,
1619
feedFactory: FeedFactory,
1720
controllerInstance: ControllerInstanceId,
18-
implicit val messagingProvider: MessagingProvider = SpiLoader.get[MessagingProvider])(
21+
implicit val messagingProvider: MessagingProvider = SpiLoader.get[MessagingProvider],
22+
override val lbConfig: ShardingContainerPoolBalancerConfig =
23+
loadConfigOrThrow[ShardingContainerPoolBalancerConfig](ConfigKeys.loadbalancer))(
1924
implicit actorSystem: ActorSystem,
2025
logging: Logging,
2126
materializer: ActorMaterializer)
2227
extends CommonLoadBalancer(config, feedFactory, controllerInstance) {
2328

24-
private val balancers: Map[String, LoadBalancer] =
25-
lbConfig.strategy.custom.foldLeft(Map("default" -> getClass(lbConfig.strategy.default))) {
26-
case (result, (name, strategyConfig)) => result + (name -> getClass[LoadBalancer](strategyConfig.className))
29+
private val defaultLoadBalancer =
30+
getClass[LoadBalancerProvider](lbConfig.strategy.default).instance(config, controllerInstance)
31+
private val customLoadBalancerMap: Map[String, LoadBalancer] =
32+
lbConfig.strategy.custom.foldLeft(Map.empty[String, LoadBalancer]) {
33+
case (result, (name, strategyConfig)) =>
34+
result + (name -> getClass[LoadBalancerProvider](strategyConfig.className).instance(config, controllerInstance))
2735
}
2836

29-
def getClass[A](name: String): A = {
30-
logging.info(this, "'" + name + "'$")
37+
/**
38+
* Instantiates an object of the given type.
39+
*
40+
* Similar to SpiLoader.get, with the difference that the constructed class does not need to be declared as Spi.
41+
* Thus there could be multiple classes implementing same interface constructed at the same time
42+
*
43+
* @param name the name of the class
44+
* @tparam A expected type to return
45+
* @return instance of the class
46+
*/
47+
private def getClass[A](name: String): A = {
3148
val clazz = Class.forName(name + "$")
32-
clazz.getField("MODULE$").get(clazz).asInstanceOf[A]
49+
val classInst = clazz.getField("MODULE$").get(clazz).asInstanceOf[A]
50+
classInst
3351
}
3452

3553
override def invokerHealth(): Future[IndexedSeq[InvokerHealth]] = Future.successful(IndexedSeq.empty[InvokerHealth])
@@ -47,15 +65,15 @@ class MuxBalancer(config: WhiskConfig,
4765
implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]] = {
4866
action.annotations.get("activationStrategy") match {
4967
case None =>
50-
balancers("default").publish(action, msg)
68+
defaultLoadBalancer.publish(action, msg)
5169
case Some(JsString(value)) => {
52-
if (balancers.contains(value)) {
53-
balancers(value).publish(action, msg)
70+
if (customLoadBalancerMap.contains(value)) {
71+
customLoadBalancerMap(value).publish(action, msg)
5472
} else {
55-
balancers("default").publish(action, msg)
73+
defaultLoadBalancer.publish(action, msg)
5674
}
5775
}
58-
case Some(_) => balancers("default").publish(action, msg)
76+
case Some(_) => defaultLoadBalancer.publish(action, msg)
5977
}
6078
}
6179
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.openwhisk.core.loadBalancer.test
19+
20+
import akka.actor.ActorSystem
21+
import akka.stream.ActorMaterializer
22+
import org.apache.openwhisk.common.{Logging, TransactionId}
23+
import org.apache.openwhisk.core.WhiskConfig
24+
import org.apache.openwhisk.core.connector.ActivationMessage
25+
import org.apache.openwhisk.core.entity.{
26+
ActivationId,
27+
ControllerInstanceId,
28+
ExecManifest,
29+
ExecutableWhiskActionMetaData,
30+
UUID,
31+
WhiskActivation
32+
}
33+
import org.apache.openwhisk.core.loadBalancer.{InvokerHealth, LoadBalancer, LoadBalancerProvider}
34+
import org.apache.openwhisk.core.WhiskConfig._
35+
36+
import scala.concurrent.Future
37+
38+
class MockLoadBalancer(prefix: String) extends LoadBalancer {
39+
override def invokerHealth(): Future[IndexedSeq[InvokerHealth]] = Future.successful(IndexedSeq.empty[InvokerHealth])
40+
override def clusterSize: Int = 1
41+
override def totalActiveActivations: Future[Int] = Future.successful(1)
42+
override def activeActivationsFor(namespace: UUID): Future[Int] =
43+
Future.successful(0)
44+
override def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)(
45+
implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]] = {
46+
Future.successful(Future.successful(Left(ActivationId(prefix + "-mockLoadBalancerId0"))))
47+
}
48+
}
49+
50+
object MockLoadBalancerCustom extends LoadBalancerProvider {
51+
override def instance(whiskConfig: WhiskConfig, instance: ControllerInstanceId)(
52+
implicit actorSystem: ActorSystem,
53+
logging: Logging,
54+
materializer: ActorMaterializer): LoadBalancer = {
55+
56+
new MockLoadBalancer("custom")
57+
}
58+
59+
def requiredProperties =
60+
ExecManifest.requiredProperties ++
61+
wskApiHost
62+
}
63+
64+
object MockLoadBalancerDefault extends LoadBalancerProvider {
65+
override def instance(whiskConfig: WhiskConfig, instance: ControllerInstanceId)(
66+
implicit actorSystem: ActorSystem,
67+
logging: Logging,
68+
materializer: ActorMaterializer): LoadBalancer = {
69+
70+
new MockLoadBalancer("default")
71+
}
72+
73+
def requiredProperties =
74+
ExecManifest.requiredProperties ++
75+
wskApiHost
76+
}
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.openwhisk.core.loadBalancer.test
19+
20+
import akka.actor.{ActorRef, ActorRefFactory, ActorSystem}
21+
import akka.stream.ActorMaterializer
22+
import akka.testkit.TestProbe
23+
import common.StreamLogging
24+
import org.apache.kafka.clients.producer.RecordMetadata
25+
import org.apache.kafka.common.TopicPartition
26+
import org.apache.openwhisk.common.{Logging, TransactionId}
27+
import org.apache.openwhisk.core.WhiskConfig
28+
import org.apache.openwhisk.core.connector._
29+
import org.apache.openwhisk.core.entity.size._
30+
import org.apache.openwhisk.core.entity.test.ExecHelpers
31+
import org.apache.openwhisk.core.entity._
32+
import org.apache.openwhisk.core.loadBalancer._
33+
import org.junit.runner.RunWith
34+
import org.scalamock.scalatest.MockFactory
35+
import org.scalatest.{FlatSpec, Matchers}
36+
import org.scalatest.junit.JUnitRunner
37+
38+
import scala.collection.immutable.Map
39+
import scala.concurrent.{Await, Future}
40+
import scala.concurrent.duration._
41+
42+
/**
43+
* Unit tests for the MuxBalancer object.
44+
*
45+
*/
46+
@RunWith(classOf[JUnitRunner])
47+
class MuxBalancerTests extends FlatSpec with Matchers with StreamLogging with ExecHelpers with MockFactory {
48+
behavior of "Mux Balancer"
49+
50+
def lbConfig(activationStrategy: ActivationStrategy) =
51+
ShardingContainerPoolBalancerConfig(activationStrategy, 1.0 - 0.5, 0.5, 1, 1.minute)
52+
53+
implicit val transId: TransactionId = TransactionId.testing
54+
55+
val feedProbe = new FeedFactory {
56+
def createFeed(f: ActorRefFactory, m: MessagingProvider, p: (Array[Byte]) => Future[Unit]) =
57+
TestProbe().testActor
58+
59+
}
60+
val invokerPoolProbe = new InvokerPoolFactory {
61+
override def createInvokerPool(
62+
actorRefFactory: ActorRefFactory,
63+
messagingProvider: MessagingProvider,
64+
messagingProducer: MessageProducer,
65+
sendActivationToInvoker: (MessageProducer, ActivationMessage, InvokerInstanceId) => Future[RecordMetadata],
66+
monitor: Option[ActorRef]): ActorRef =
67+
TestProbe().testActor
68+
}
69+
70+
def mockMessaging(): MessagingProvider = {
71+
val messaging = stub[MessagingProvider]
72+
val producer = stub[MessageProducer]
73+
val consumer = stub[MessageConsumer]
74+
(messaging
75+
.getProducer(_: WhiskConfig, _: Option[ByteSize])(_: Logging, _: ActorSystem))
76+
.when(*, *, *, *)
77+
.returns(producer)
78+
(messaging
79+
.getConsumer(_: WhiskConfig, _: String, _: String, _: Int, _: FiniteDuration)(_: Logging, _: ActorSystem))
80+
.when(*, *, *, *, *, *, *)
81+
.returns(consumer)
82+
(producer
83+
.send(_: String, _: Message, _: Int))
84+
.when(*, *, *)
85+
.returns(Future.successful(new RecordMetadata(new TopicPartition("fake", 0), 0, 0, 0l, 0l, 0, 0)))
86+
87+
messaging
88+
}
89+
90+
it should "execute correct LoadBalancer for the default activation strategy" in {
91+
behave like asssertActivation(
92+
ActivationStrategy("org.apache.openwhisk.core.loadBalancer.test.MockLoadBalancerDefault", Map()),
93+
Parameters(),
94+
Left(ActivationId("default-mockLoadBalancerId0")))
95+
}
96+
97+
it should "execute correct LoadBalancer for the custom activation strategy" in {
98+
behave like asssertActivation(
99+
ActivationStrategy(
100+
"org.apache.openwhisk.core.loadBalancer.test.MockLoadBalancerDefault",
101+
Map(
102+
"customLBStrategy01" -> StrategyConfig(
103+
"org.apache.openwhisk.core.loadBalancer.test.MockLoadBalancerCustom"))),
104+
Parameters("activationStrategy", "customLBStrategy01"),
105+
Left(ActivationId("custom-mockLoadBalancerId0")))
106+
}
107+
108+
def asssertActivation(activationStrategy: ActivationStrategy,
109+
annotations: Parameters,
110+
expected: Either[ActivationId, WhiskActivation]) = {
111+
val slots = 10
112+
val memoryPerSlot = MemoryLimit.MIN_MEMORY
113+
val memory = memoryPerSlot * slots
114+
val config = new WhiskConfig(ExecManifest.requiredProperties)
115+
implicit val materializer: ActorMaterializer = ActorMaterializer()
116+
117+
val balancer: LoadBalancer =
118+
new MuxBalancer(config, feedProbe, ControllerInstanceId("0"), mockMessaging, lbConfig(activationStrategy))
119+
val namespace = EntityPath("testspace")
120+
val name = EntityName("testname")
121+
val invocationNamespace = EntityName("invocationSpace")
122+
val concurrency = 5
123+
val actionMem = 256.MB
124+
val uuid = UUID()
125+
val aid = ActivationId.generate()
126+
val actionMetaData =
127+
WhiskActionMetaData(
128+
namespace,
129+
name,
130+
js10MetaData(Some("jsMain"), false),
131+
limits = actionLimits(actionMem, concurrency),
132+
annotations = annotations)
133+
134+
val msg = ActivationMessage(
135+
TransactionId.testing,
136+
actionMetaData.fullyQualifiedName(true),
137+
actionMetaData.rev,
138+
Identity(Subject(), Namespace(invocationNamespace, uuid), BasicAuthenticationAuthKey(uuid, Secret())),
139+
aid,
140+
ControllerInstanceId("0"),
141+
blocking = false,
142+
content = None,
143+
initArgs = Set.empty,
144+
lockedArgs = Map.empty)
145+
146+
val activation = balancer.publish(actionMetaData.toExecutableWhiskAction.get, msg)
147+
Await.ready(activation, 10.seconds)
148+
activation.onComplete(result => {
149+
result.get.onComplete(activation => {
150+
activation.get shouldBe expected
151+
})
152+
})
153+
}
154+
}

tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ class ShardingContainerPoolBalancerTests
9797

9898
def lbConfig(blackboxFraction: Double, managedFraction: Option[Double] = None) =
9999
ShardingContainerPoolBalancerConfig(
100+
ActivationStrategy("org.apache.openwhisk.core.loadBalancer.LeanBalancer", Map()),
100101
managedFraction.getOrElse(1.0 - blackboxFraction),
101102
blackboxFraction,
102103
1,

0 commit comments

Comments
 (0)