Skip to content

Commit ebf2d76

Browse files
committed
feat: FSM 4
1 parent 0639ecf commit ebf2d76

File tree

4 files changed

+104
-11
lines changed

4 files changed

+104
-11
lines changed

hivemq-edge/src/main/java/com/hivemq/protocols/fsm/ProtocolAdapterConnectionState.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public enum ProtocolAdapterConnectionState {
9494
final ProtocolAdapterConnectionState fromState = ProtocolAdapterConnectionState.Disconnecting;
9595
return switch (toState) {
9696
case Disconnecting -> ProtocolAdapterConnectionTransitionResponse.notChanged(fromState);
97-
case Connecting -> ProtocolAdapterConnectionTransitionResponse.success(fromState, toState);
97+
case Connecting, Disconnected -> ProtocolAdapterConnectionTransitionResponse.success(fromState, toState);
9898
default -> ProtocolAdapterConnectionTransitionResponse.failure(fromState, toState);
9999
};
100100
}

hivemq-edge/src/main/java/com/hivemq/protocols/fsm/ProtocolAdapterWrapper2.java

Lines changed: 96 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ public class ProtocolAdapterWrapper2 {
3131

3232
public ProtocolAdapterWrapper2(final @NotNull ProtocolAdapter adapter) {
3333
this.adapter = adapter;
34-
northboundConnectionState = ProtocolAdapterConnectionState.Closed;
35-
southboundConnectionState = ProtocolAdapterConnectionState.Closed;
34+
northboundConnectionState = ProtocolAdapterConnectionState.Disconnected;
35+
southboundConnectionState = ProtocolAdapterConnectionState.Disconnected;
3636
state = ProtocolAdapterState.Stopped;
3737
}
3838

@@ -61,7 +61,7 @@ public boolean start() {
6161
ProtocolAdapterTransitionResponse response = transitionTo(ProtocolAdapterState.Starting);
6262
if (response.status().isSuccess()) {
6363
boolean success = startNorthbound();
64-
success = success || startSouthbound();
64+
success = success && startSouthbound();
6565
if (success) {
6666
response = transitionTo(ProtocolAdapterState.Started);
6767
} else {
@@ -75,22 +75,55 @@ public boolean stop(final boolean destroy) {
7575
LOGGER.info("Stopping protocol adapter {}.", getAdapterId());
7676
ProtocolAdapterTransitionResponse response = transitionTo(ProtocolAdapterState.Stopping);
7777
if (response.status().isSuccess()) {
78-
79-
response = transitionTo(ProtocolAdapterState.Stopped);
78+
final boolean southboundSuccess = stopSouthbound();
79+
final boolean northboundSuccess = stopNorthbound();
80+
if (northboundSuccess && southboundSuccess) {
81+
response = transitionTo(ProtocolAdapterState.Stopped);
82+
} else {
83+
response = transitionTo(ProtocolAdapterState.Error);
84+
}
8085
}
8186
return response.status().isSuccess();
8287
}
8388

8489
protected boolean startNorthbound() {
8590
LOGGER.info("Starting northbound for protocol adapter {}.", getAdapterId());
86-
northboundConnectionState = ProtocolAdapterConnectionState.Connected;
87-
return northboundConnectionState.isConnected();
91+
ProtocolAdapterConnectionTransitionResponse response =
92+
transitionNorthboundConnectionTo(ProtocolAdapterConnectionState.Connecting);
93+
if (response.status().isSuccess()) {
94+
response = transitionNorthboundConnectionTo(ProtocolAdapterConnectionState.Connected);
95+
}
96+
return response.status().isSuccess();
8897
}
8998

9099
protected boolean startSouthbound() {
91100
LOGGER.info("Starting southbound for protocol adapter {}.", getAdapterId());
92-
southboundConnectionState = ProtocolAdapterConnectionState.Connected;
93-
return southboundConnectionState.isConnected();
101+
ProtocolAdapterConnectionTransitionResponse response =
102+
transitionSouthboundConnectionTo(ProtocolAdapterConnectionState.Connecting);
103+
if (response.status().isSuccess()) {
104+
response = transitionSouthboundConnectionTo(ProtocolAdapterConnectionState.Connected);
105+
}
106+
return response.status().isSuccess();
107+
}
108+
109+
protected boolean stopNorthbound() {
110+
LOGGER.info("Stopping northbound for protocol adapter {}.", getAdapterId());
111+
ProtocolAdapterConnectionTransitionResponse response =
112+
transitionNorthboundConnectionTo(ProtocolAdapterConnectionState.Disconnecting);
113+
if (response.status().isSuccess()) {
114+
response = transitionNorthboundConnectionTo(ProtocolAdapterConnectionState.Disconnected);
115+
}
116+
return response.status().isSuccess();
117+
}
118+
119+
protected boolean stopSouthbound() {
120+
LOGGER.info("Stopping southbound for protocol adapter {}.", getAdapterId());
121+
ProtocolAdapterConnectionTransitionResponse response =
122+
transitionSouthboundConnectionTo(ProtocolAdapterConnectionState.Disconnecting);
123+
if (response.status().isSuccess()) {
124+
response = transitionSouthboundConnectionTo(ProtocolAdapterConnectionState.Disconnected);
125+
}
126+
return response.status().isSuccess();
94127
}
95128

96129
public synchronized @NotNull ProtocolAdapterTransitionResponse transitionTo(final @NotNull ProtocolAdapterState newState) {
@@ -116,4 +149,58 @@ protected boolean startSouthbound() {
116149
}
117150
return response;
118151
}
152+
153+
public synchronized @NotNull ProtocolAdapterConnectionTransitionResponse transitionSouthboundConnectionTo(
154+
final @NotNull ProtocolAdapterConnectionState newState) {
155+
final ProtocolAdapterConnectionState fromState = southboundConnectionState;
156+
final ProtocolAdapterConnectionTransitionResponse response = fromState.transition(newState);
157+
southboundConnectionState = response.toState();
158+
switch (response.status()) {
159+
case Success -> {
160+
LOGGER.debug("Protocol adapter '{}' southbound connection transitioned from {} to {} successfully.",
161+
getAdapterId(),
162+
fromState,
163+
southboundConnectionState);
164+
}
165+
case Failure -> {
166+
LOGGER.error("Protocol adapter '{}' southbound connection failed to transition from {} to {}.",
167+
getAdapterId(),
168+
fromState,
169+
southboundConnectionState);
170+
}
171+
case NotChanged -> {
172+
LOGGER.warn("Protocol adapter '{}' southbound connection state {} is unchanged.",
173+
getAdapterId(),
174+
southboundConnectionState);
175+
}
176+
}
177+
return response;
178+
}
179+
180+
public synchronized @NotNull ProtocolAdapterConnectionTransitionResponse transitionNorthboundConnectionTo(
181+
final @NotNull ProtocolAdapterConnectionState newState) {
182+
final ProtocolAdapterConnectionState fromState = northboundConnectionState;
183+
final ProtocolAdapterConnectionTransitionResponse response = fromState.transition(newState);
184+
northboundConnectionState = response.toState();
185+
switch (response.status()) {
186+
case Success -> {
187+
LOGGER.debug("Protocol adapter '{}' northbound connection transitioned from {} to {} successfully.",
188+
getAdapterId(),
189+
fromState,
190+
northboundConnectionState);
191+
}
192+
case Failure -> {
193+
LOGGER.error("Protocol adapter '{}' northbound connection failed to transition from {} to {}.",
194+
getAdapterId(),
195+
fromState,
196+
northboundConnectionState);
197+
}
198+
case NotChanged -> {
199+
LOGGER.warn("Protocol adapter '{}' northbound connection state {} is unchanged.",
200+
getAdapterId(),
201+
northboundConnectionState);
202+
}
203+
}
204+
return response;
205+
}
119206
}

hivemq-edge/src/test/java/com/hivemq/protocols/fsm/ProtocolAdapterConnectionStateTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public class ProtocolAdapterConnectionStateTest {
4646
ProtocolAdapterConnectionState.Disconnected,
4747
Set.of(ProtocolAdapterConnectionState.Connecting),
4848
ProtocolAdapterConnectionState.Disconnecting,
49-
Set.of(ProtocolAdapterConnectionState.Connecting),
49+
Set.of(ProtocolAdapterConnectionState.Connecting, ProtocolAdapterConnectionState.Disconnected),
5050
ProtocolAdapterConnectionState.Error,
5151
Set.of(ProtocolAdapterConnectionState.Disconnected),
5252
ProtocolAdapterConnectionState.ErrorClosing,

hivemq-edge/src/test/java/com/hivemq/protocols/fsm/ProtocolAdapterWrapperTest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,15 @@ public void setUp() {
4444
public void whenAdapterIsValid_thenStartAndStopWork() {
4545
final ProtocolAdapterWrapper2 wrapper = new ProtocolAdapterWrapper2(protocolAdapter);
4646
assertThat(wrapper.getState()).isEqualTo(ProtocolAdapterState.Stopped);
47+
assertThat(wrapper.getNorthboundConnectionState()).isEqualTo(ProtocolAdapterConnectionState.Disconnected);
48+
assertThat(wrapper.getSouthboundConnectionState()).isEqualTo(ProtocolAdapterConnectionState.Disconnected);
4749
assertThat(wrapper.start()).isTrue();
4850
assertThat(wrapper.getState()).isEqualTo(ProtocolAdapterState.Started);
51+
assertThat(wrapper.getNorthboundConnectionState()).isEqualTo(ProtocolAdapterConnectionState.Connected);
52+
assertThat(wrapper.getSouthboundConnectionState()).isEqualTo(ProtocolAdapterConnectionState.Connected);
4953
assertThat(wrapper.stop(true)).isTrue();
5054
assertThat(wrapper.getState()).isEqualTo(ProtocolAdapterState.Stopped);
55+
assertThat(wrapper.getNorthboundConnectionState()).isEqualTo(ProtocolAdapterConnectionState.Disconnected);
56+
assertThat(wrapper.getSouthboundConnectionState()).isEqualTo(ProtocolAdapterConnectionState.Disconnected);
5157
}
5258
}

0 commit comments

Comments
 (0)