|
1 | 1 | package com.github.fridujo.rabbitmq.mock.exchange; |
2 | 2 |
|
3 | | -import static com.github.fridujo.rabbitmq.mock.AmqArguments.empty; |
4 | | -import static com.github.fridujo.rabbitmq.mock.exchange.MockExchangeCreator.creatorWithExchangeType; |
5 | | -import static java.util.Collections.emptyMap; |
6 | | -import static org.assertj.core.api.Assertions.assertThat; |
7 | | -import static org.assertj.core.api.Assertions.within; |
8 | | -import static org.mockito.Mockito.mock; |
| 3 | +import com.github.fridujo.rabbitmq.mock.ReceiverPointer; |
| 4 | +import com.github.fridujo.rabbitmq.mock.ReceiverRegistry; |
| 5 | +import com.github.fridujo.rabbitmq.mock.configuration.Configuration; |
| 6 | +import org.junit.jupiter.api.Test; |
| 7 | +import org.junit.jupiter.params.ParameterizedTest; |
| 8 | +import org.junit.jupiter.params.provider.Arguments; |
| 9 | +import org.junit.jupiter.params.provider.MethodSource; |
| 10 | +import org.junit.jupiter.params.provider.ValueSource; |
9 | 11 |
|
| 12 | +import java.util.ArrayList; |
| 13 | +import java.util.List; |
10 | 14 | import java.util.Map; |
11 | 15 | import java.util.Optional; |
12 | 16 | import java.util.UUID; |
| 17 | +import java.util.concurrent.ThreadLocalRandom; |
13 | 18 | import java.util.function.Function; |
14 | 19 | import java.util.stream.Collectors; |
15 | 20 | import java.util.stream.IntStream; |
| 21 | +import java.util.stream.Stream; |
16 | 22 |
|
17 | | -import org.junit.jupiter.api.Test; |
18 | | - |
19 | | -import com.github.fridujo.rabbitmq.mock.ReceiverPointer; |
20 | | -import com.github.fridujo.rabbitmq.mock.ReceiverRegistry; |
21 | | -import com.github.fridujo.rabbitmq.mock.configuration.Configuration; |
| 23 | +import static com.github.fridujo.rabbitmq.mock.AmqArguments.empty; |
| 24 | +import static com.github.fridujo.rabbitmq.mock.exchange.MockExchangeCreator.creatorWithExchangeType; |
| 25 | +import static java.util.Collections.emptyMap; |
| 26 | +import static java.util.stream.Collectors.toList; |
| 27 | +import static org.assertj.core.api.Assertions.assertThat; |
| 28 | +import static org.assertj.core.api.Assertions.assertThatThrownBy; |
| 29 | +import static org.assertj.core.api.AssertionsForClassTypes.within; |
| 30 | +import static org.mockito.Mockito.mock; |
22 | 31 |
|
23 | 32 | class ConsistentHashExchangeTests { |
24 | 33 |
|
@@ -47,30 +56,68 @@ void same_routing_key_dispatch_to_same_queue() { |
47 | 56 | assertThat(consistentHashEx.selectReceiver(firstRoutingKey, null)).contains(firstReceiverPointerSelected); |
48 | 57 | } |
49 | 58 |
|
50 | | - @Test |
51 | | - void dispatch_respects_queue_weight() { |
| 59 | + @ParameterizedTest |
| 60 | + @MethodSource("buildWeightArguments") |
| 61 | + void dispatch_respects_queue_weight(List<Integer> weights, List<Double> distributions) { |
52 | 62 | SingleReceiverExchange consistentHashEx = (SingleReceiverExchange) mockExchangeFactory.build("test", "x-consistent-hash", empty(), mock(ReceiverRegistry.class)); |
53 | 63 |
|
54 | | - ReceiverPointer q1 = new ReceiverPointer(ReceiverPointer.Type.QUEUE, "Q1"); |
55 | | - consistentHashEx.bind(q1, "32", emptyMap()); |
56 | | - ReceiverPointer q2 = new ReceiverPointer(ReceiverPointer.Type.QUEUE, "Q2"); |
57 | | - consistentHashEx.bind(q2, "64", emptyMap()); |
58 | | - ReceiverPointer q3 = new ReceiverPointer(ReceiverPointer.Type.QUEUE, "Q3"); |
59 | | - consistentHashEx.bind(q3, " ", emptyMap()); |
60 | | - ReceiverPointer q4 = new ReceiverPointer(ReceiverPointer.Type.QUEUE, "Q4"); |
61 | | - consistentHashEx.bind(q4, "AA", emptyMap()); |
62 | | - consistentHashEx.unbind(q4, "AA"); |
63 | | - |
64 | | - int messagesCount = 1_000_000; |
65 | | - Map<ReceiverPointer, Long> deliveriesByReceiver = IntStream.range(0, messagesCount) |
| 64 | + List<ReceiverPointer> receiverPointers = new ArrayList<>(); |
| 65 | + for(int i = 0; i < weights.size(); i++) { |
| 66 | + ReceiverPointer receiverPointer = new ReceiverPointer(ReceiverPointer.Type.QUEUE, "Q" + i); |
| 67 | + receiverPointers.add(receiverPointer); |
| 68 | + consistentHashEx.bind(receiverPointer, weights.get(i).toString(), emptyMap()); |
| 69 | + } |
| 70 | + |
| 71 | + final int minMessagesCount = 1_000; |
| 72 | + final int maxMessagesCount = 1_000_000; |
| 73 | + final int messageCount = ThreadLocalRandom.current().nextInt(minMessagesCount, maxMessagesCount); |
| 74 | + |
| 75 | + Map<ReceiverPointer, Long> deliveriesByReceiver = IntStream.range(0, messageCount) |
66 | 76 | .mapToObj(i -> consistentHashEx.selectReceiver(UUID.randomUUID().toString(), null)) |
67 | 77 | .map(Optional::get) |
68 | 78 | .collect(Collectors.groupingBy(Function.identity(), Collectors.counting())); |
69 | 79 |
|
70 | | - assertThat(deliveriesByReceiver).containsOnlyKeys(q1, q2, q3); |
| 80 | + assertThat(deliveriesByReceiver).containsOnlyKeys(receiverPointers); |
| 81 | + |
| 82 | + for(int i = 0; i < receiverPointers.size(); i++) { |
| 83 | + assertThat(Long.valueOf(deliveriesByReceiver.get(receiverPointers.get(i))).doubleValue() / messageCount).isCloseTo(distributions.get(i), within(0.02)); |
| 84 | + } |
| 85 | + } |
| 86 | + |
| 87 | + private static Stream<Arguments> buildWeightArguments() { |
| 88 | + final int maxArguments = 25; |
| 89 | + final int maxReceivers = 10; |
| 90 | + final int maxWeight = 100; |
| 91 | + |
| 92 | + Function<Integer, List<Integer>> getWeightsForReceiver = r -> IntStream.range(0, r) |
| 93 | + .mapToObj(i -> ThreadLocalRandom.current().nextInt(1, maxWeight)) |
| 94 | + .collect(toList()); |
| 95 | + |
| 96 | + Function<List<Integer>, List<Double>> calculateDistribution = w -> { |
| 97 | + Integer sum = w.stream().mapToInt(Integer::intValue).sum(); |
| 98 | + return w.stream() |
| 99 | + .map(x -> x.doubleValue() / sum) |
| 100 | + .collect(toList()); |
| 101 | + }; |
| 102 | + |
| 103 | + Function<List<Integer>, Arguments> getDispatchDistributionForReceiver = w -> Arguments.of(w, calculateDistribution.apply(w)); |
| 104 | + |
| 105 | + return IntStream.range(1, maxArguments) |
| 106 | + .mapToObj(i -> ThreadLocalRandom.current().nextInt(1, maxReceivers)) |
| 107 | + .map(getWeightsForReceiver) |
| 108 | + .map(getDispatchDistributionForReceiver); |
| 109 | + } |
| 110 | + |
| 111 | + |
| 112 | + @ParameterizedTest(name = "Binding Consistent hash exchange with binding key \"{0}\" throws IllegalArgumentException") |
| 113 | + @ValueSource(strings = {"", "string", "#"}) |
| 114 | + void binding_with_non_integer_key_throws_exception(String bindingKey) { |
| 115 | + SingleReceiverExchange consistentHashEx = (SingleReceiverExchange) mockExchangeFactory.build("test", "x-consistent-hash", empty(), mock(ReceiverRegistry.class)); |
| 116 | + |
| 117 | + ReceiverPointer q1 = new ReceiverPointer(ReceiverPointer.Type.QUEUE, "Q1"); |
71 | 118 |
|
72 | | - assertThat(Long.valueOf(deliveriesByReceiver.get(q1)).doubleValue() / messagesCount).isCloseTo(0.25D, within(0.01)); |
73 | | - assertThat(Long.valueOf(deliveriesByReceiver.get(q2)).doubleValue() / messagesCount).isCloseTo(0.5D, within(0.01)); |
74 | | - assertThat(Long.valueOf(deliveriesByReceiver.get(q3)).doubleValue() / messagesCount).isCloseTo(0.25D, within(0.01)); |
| 119 | + assertThatThrownBy(() -> consistentHashEx.bind(q1, bindingKey, emptyMap())) |
| 120 | + .isInstanceOf(IllegalArgumentException.class) |
| 121 | + .hasMessage("The binding key must be an integer"); |
75 | 122 | } |
76 | 123 | } |
0 commit comments