Skip to content

Commit 1fbe7f8

Browse files
committed
XCOMMONS-3451: Realtime editing doesn't support clustering
1 parent 55304d5 commit 1fbe7f8

20 files changed

+1213
-367
lines changed

xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/Bot.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ default boolean onJoinChannel(Channel channel)
5757
* @param sender the user who sent the message
5858
* @param message the message that was sent
5959
*/
60-
default void onUserMessage(User sender, List<Object> message)
60+
default void onUserMessage(LocalUser sender, List<Object> message)
6161
{
6262
// Do nothing by default
6363
}

xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/Channel.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import java.util.Deque;
2323
import java.util.LinkedHashMap;
2424
import java.util.LinkedList;
25-
import java.util.List;
2625
import java.util.Map;
2726

2827
import org.apache.commons.lang3.builder.EqualsBuilder;
@@ -88,14 +87,6 @@ public Map<String, Bot> getBots()
8887
return this.bots;
8988
}
9089

91-
/**
92-
* @return the list of users that are currently connected to this channel
93-
*/
94-
public List<User> getConnectedUsers()
95-
{
96-
return this.users.values().stream().filter(user -> user.getSession() != null && user.isConnected()).toList();
97-
}
98-
9990
/**
10091
* @return the channel messages
10192
*/

xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/ChannelStore.java

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,22 @@ public class ChannelStore
5454
*/
5555
public Channel create()
5656
{
57-
Channel channel = new Channel(this.idGenerator.generateChannelId());
57+
return create(this.idGenerator.generateChannelId());
58+
}
59+
60+
/**
61+
* Creates a new channel with a passed key.
62+
*
63+
* @param channelKey the identifier of the new channel
64+
* @return the new channel
65+
* @since 17.10.0RC1
66+
*/
67+
public Channel create(String channelKey)
68+
{
69+
Channel channel = new Channel(channelKey);
5870
askBotsToJoin(channel);
5971
this.channelByKey.put(channel.getKey(), channel);
72+
6073
return channel;
6174
}
6275

@@ -85,6 +98,25 @@ public Channel get(String key)
8598
return this.channelByKey.get(key);
8699
}
87100

101+
/**
102+
* Access an existing channel by its key.
103+
*
104+
* @param key the channel key
105+
* @param create if true, create the channel when it does not exist
106+
* @return the corresponding channel
107+
* @since 17.10.0RC1
108+
*/
109+
public Channel get(String key, boolean create)
110+
{
111+
Channel channel = get(key);
112+
113+
if (channel == null) {
114+
channel = create(key);
115+
}
116+
117+
return channel;
118+
}
119+
88120
/**
89121
* Remove a channel from memory.
90122
*
@@ -106,8 +138,7 @@ public void prune()
106138
try {
107139
long currentTime = System.currentTimeMillis();
108140
for (Channel channel : this.channelByKey.values()) {
109-
if (channel.getConnectedUsers().isEmpty()
110-
&& (currentTime - channel.getCreationDate()) > (1000 * 60 * 60 * 2)) {
141+
if (channel.getUsers().isEmpty() && (currentTime - channel.getCreationDate()) > (1000 * 60 * 60 * 2)) {
111142
remove(channel);
112143
}
113144
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* See the NOTICE file distributed with this work for additional
3+
* information regarding copyright ownership.
4+
*
5+
* This is free software; you can redistribute it and/or modify it
6+
* under the terms of the GNU Lesser General Public License as
7+
* published by the Free Software Foundation; either version 2.1 of
8+
* the License, or (at your option) any later version.
9+
*
10+
* This software is distributed in the hope that it will be useful,
11+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13+
* Lesser General Public License for more details.
14+
*
15+
* You should have received a copy of the GNU Lesser General Public
16+
* License along with this software; if not, write to the Free
17+
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
18+
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
19+
*/
20+
package org.xwiki.netflux.internal;
21+
22+
import jakarta.inject.Singleton;
23+
import jakarta.websocket.Session;
24+
25+
import org.xwiki.component.annotation.Component;
26+
27+
/**
28+
* Default implementation of {@link LocalUserFactory}.
29+
* <p>
30+
* Use the session id as user id.
31+
*
32+
* @version $Id$
33+
* @since 17.10.0RC1
34+
*/
35+
@Component
36+
@Singleton
37+
public class DefaultLocalUserFactory implements LocalUserFactory
38+
{
39+
@Override
40+
public LocalUser createLocalUser(Session session)
41+
{
42+
return new LocalUser(session, session.getId());
43+
}
44+
45+
protected String getId(Session session)
46+
{
47+
return session.getId();
48+
}
49+
}

xwiki-commons-core/xwiki-commons-netflux/src/main/java/org/xwiki/netflux/internal/HistoryKeeper.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030

3131
import org.slf4j.Logger;
3232
import org.xwiki.component.annotation.Component;
33+
import org.xwiki.netflux.internal.event.NetfluxMessageUserEvent;
34+
import org.xwiki.observation.ObservationManager;
3335

3436
/**
3537
* Holds the key of the history keeper fake user that is added to all Netflux channels.
@@ -51,6 +53,9 @@ public class HistoryKeeper extends AbstractBot
5153
@Inject
5254
private MessageBuilder messageBuilder;
5355

56+
@Inject
57+
private ObservationManager observation;
58+
5459
@Override
5560
public String getId()
5661
{
@@ -60,7 +65,7 @@ public String getId()
6065
}
6166

6267
@Override
63-
public void onUserMessage(User sender, List<Object> message)
68+
public void onUserMessage(LocalUser sender, List<Object> message)
6469
{
6570
// The history keeper responds only to GET_HISTORY messages.
6671

@@ -99,7 +104,7 @@ private void sendChannelHistory(User user, String channelKey)
99104

100105
try {
101106
for (String msg : (Iterable<String>) messages::iterator) {
102-
user.getSession().getBasicRemote().sendText(msg);
107+
this.observation.notify(new NetfluxMessageUserEvent(user.getName(), msg), null);
103108
}
104109
} catch (Exception e) {
105110
this.logger.debug("Failed to send channel history.", e);
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* See the NOTICE file distributed with this work for additional
3+
* information regarding copyright ownership.
4+
*
5+
* This is free software; you can redistribute it and/or modify it
6+
* under the terms of the GNU Lesser General Public License as
7+
* published by the Free Software Foundation; either version 2.1 of
8+
* the License, or (at your option) any later version.
9+
*
10+
* This software is distributed in the hope that it will be useful,
11+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13+
* Lesser General Public License for more details.
14+
*
15+
* You should have received a copy of the GNU Lesser General Public
16+
* License along with this software; if not, write to the Free
17+
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
18+
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
19+
*/
20+
package org.xwiki.netflux.internal;
21+
22+
import jakarta.websocket.Session;
23+
24+
/**
25+
* A user accessing the current instance.
26+
*
27+
* @version $Id$
28+
* @since 17.10.0RC1
29+
*/
30+
public class LocalUser extends User
31+
{
32+
private final Session session;
33+
34+
/**
35+
* Creates a new user with the specified name, using the given WebSocket session.
36+
*
37+
* @param session the WebSocket session used to communicate with the user
38+
* @param name the identifier of the user
39+
*/
40+
public LocalUser(Session session, String name)
41+
{
42+
super(name);
43+
44+
this.session = session;
45+
}
46+
47+
/**
48+
* @return the WebSocket session
49+
*/
50+
public Session getSession()
51+
{
52+
return this.session;
53+
}
54+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* See the NOTICE file distributed with this work for additional
3+
* information regarding copyright ownership.
4+
*
5+
* This is free software; you can redistribute it and/or modify it
6+
* under the terms of the GNU Lesser General Public License as
7+
* published by the Free Software Foundation; either version 2.1 of
8+
* the License, or (at your option) any later version.
9+
*
10+
* This software is distributed in the hope that it will be useful,
11+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13+
* Lesser General Public License for more details.
14+
*
15+
* You should have received a copy of the GNU Lesser General Public
16+
* License along with this software; if not, write to the Free
17+
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
18+
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
19+
*/
20+
package org.xwiki.netflux.internal;
21+
22+
import jakarta.websocket.Session;
23+
24+
/**
25+
* Component in charge of generating a new local user for a given session.
26+
*
27+
* @version $Id$
28+
* @since 17.10.0RC1
29+
*/
30+
public interface LocalUserFactory
31+
{
32+
/**
33+
* @param session the WebSocket session
34+
* @return the new instance of {@link LocalUser}
35+
*/
36+
LocalUser createLocalUser(Session session);
37+
}

0 commit comments

Comments
 (0)