Skip to content

Commit ed036e1

Browse files
committed
Switch to messaging system for server switching
1 parent 8ee4b08 commit ed036e1

File tree

7 files changed

+192
-32
lines changed

7 files changed

+192
-32
lines changed

build.gradle

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,12 @@ dependencies {
2323
implementation('net.kyori:adventure-text-minimessage:4.12.0')
2424

2525
implementation("com.rabbitmq:amqp-client:5.16.0")
26-
implementation("dev.emortal.api:grpc-sdk:383c276")
26+
implementation("dev.emortal.api:grpc-sdk:393f9a8")
2727
implementation("dev.emortal.api:agones-sdk:1.0.5")
28-
implementation("dev.emortal.api:kurushimi-sdk:d805526")
28+
implementation("dev.emortal.api:kurushimi-sdk:4d626e4")
2929

30-
testImplementation("org.junit.jupiter:junit-jupiter-api:5.9.0")
31-
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:5.9.0")
30+
testImplementation('org.junit.jupiter:junit-jupiter-api:5.9.2')
31+
testRuntimeOnly('org.junit.jupiter:junit-jupiter-engine:5.9.2')
3232
}
3333

3434
shadowJar {

kubernetes.yaml

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
apiVersion: agones.dev/v1
2+
kind: Fleet
3+
metadata:
4+
name: velocity
5+
namespace: emortalmc
6+
7+
spec:
8+
replicas: 1
9+
scheduling: Packed
10+
11+
strategy:
12+
type: RollingUpdate
13+
rollingUpdate:
14+
maxSurge: 25%
15+
maxUnavailable: 25%
16+
17+
template:
18+
spec:
19+
ports:
20+
- name: default
21+
portPolicy: Static
22+
hostPort: 25565
23+
containerPort: 25565
24+
protocol: TCP
25+
26+
health:
27+
initialDelaySeconds: 10
28+
periodSeconds: 6
29+
failureThreshold: 2
30+
31+
template:
32+
spec:
33+
containers:
34+
- name: velocity
35+
image: emortalmc/velocity:dev
36+
imagePullPolicy: IfNotPresent
37+
38+
ports:
39+
- containerPort: 9090
40+
name: grpc
41+
protocol: TCP
42+
43+
lifecycle:
44+
postStart:
45+
exec:
46+
command: [ "/bin/sh", "-c", "echo $VELOCITY_FORWARDING_SECRET > /app/forwarding.secret" ]
47+
48+
env:
49+
- name: JAVA_OPTS
50+
value: "-Xms512M -Xmx512M"
51+
- name: VELOCITY_FORWARDING_SECRET
52+
valueFrom:
53+
secretKeyRef:
54+
name: velocity-forwarding-token
55+
key: forwarding.secret
56+
optional: false
57+
- name: RABBITMQ_HOST
58+
valueFrom:
59+
secretKeyRef:
60+
name: rabbitmq-credentials
61+
key: host
62+
optional: false
63+
- name: RABBITMQ_USERNAME
64+
valueFrom:
65+
secretKeyRef:
66+
name: rabbitmq-credentials
67+
key: username
68+
optional: false
69+
- name: RABBITMQ_PASSWORD
70+
valueFrom:
71+
secretKeyRef:
72+
name: rabbitmq-credentials
73+
key: password
74+
optional: false
75+
76+
resources:
77+
limits:
78+
memory: "512Mi"
79+
cpu: "1"
80+
requests:
81+
memory: "512Mi"
82+
cpu: "250m"
83+
84+
# serviceAccountName: velocity
85+
# automountServiceAccountToken: true

src/main/java/dev/emortal/velocity/CorePlugin.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,14 @@
1414
import dev.emortal.velocity.listener.AgonesListener;
1515
import dev.emortal.velocity.listener.LobbySelectorListener;
1616
import dev.emortal.velocity.listener.McPlayerListener;
17+
import dev.emortal.velocity.listener.ServerChangeNotificationListener;
1718
import dev.emortal.velocity.permissions.PermissionCache;
1819
import dev.emortal.velocity.permissions.commands.PermissionCommand;
1920
import dev.emortal.velocity.permissions.listener.PermissionCheckListener;
2021
import dev.emortal.velocity.privatemessages.LastMessageCache;
2122
import dev.emortal.velocity.privatemessages.PrivateMessageListener;
2223
import dev.emortal.velocity.privatemessages.commands.MessageCommand;
23-
import dev.emortal.velocity.rabbitmq.RabbitMqEventListener;
24+
import dev.emortal.velocity.rabbitmq.RabbitMqCore;
2425
import dev.emortal.velocity.serverlist.ServerPingListener;
2526
import dev.emortal.velocity.tablist.TabList;
2627
import dev.emortal.velocity.utils.ReflectionUtils;
@@ -68,7 +69,7 @@ public class CorePlugin {
6869

6970
private final UsernameSuggestions usernameSuggestions = new UsernameSuggestions();
7071

71-
private final RabbitMqEventListener rabbitMqEventListener = new RabbitMqEventListener();
72+
private final RabbitMqCore rabbitMqCore = new RabbitMqCore();
7273

7374
private final FriendCache friendCache = new FriendCache();
7475
private final SessionCache sessionCache = new SessionCache();
@@ -85,7 +86,6 @@ public CorePlugin(ProxyServer server) {
8586

8687
@Subscribe
8788
public void onProxyInitialize(ProxyInitializeEvent event) {
88-
8989
if (this.stubManager.getAgonesService() != null) {
9090
this.proxy.getEventManager().register(this, new AgonesListener(this.stubManager.getAgonesService(),
9191
this.stubManager.getStandardAgonesService(), this.stubManager.getAlphaAgonesService())
@@ -95,12 +95,11 @@ public void onProxyInitialize(ProxyInitializeEvent event) {
9595
}
9696

9797
ServerManager serverManager = new ServerManager(this, this.proxy);
98-
// OTP status affects a lot of functionality, so we need it to be loaded first
99-
// OtpEventListener otpEventListener = new OtpEventListener(serverManager);
98+
new ServerChangeNotificationListener(this.proxy, this.rabbitMqCore); // Listens for RabbitMQ ProxyServerChangeMessage messages
10099
this.permissionCache = new PermissionCache(this.stubManager);
101100

102101
// rabbitmq
103-
this.proxy.getEventManager().register(this, this.rabbitMqEventListener);
102+
this.proxy.getEventManager().register(this, this.rabbitMqCore);
104103

105104
// friends
106105
this.proxy.getEventManager().register(this, this.friendCache);
@@ -219,7 +218,7 @@ public int readVarInt(ByteBuf buf) {
219218
@Subscribe
220219
public void onProxyShutdown(ProxyShutdownEvent event) {
221220
this.grpcServerContainer.stop();
222-
this.rabbitMqEventListener.shutdown();
221+
this.rabbitMqCore.shutdown();
223222
AgonesUtils.shutdownHealthTask();
224223
}
225224

src/main/java/dev/emortal/velocity/listener/LobbySelectorListener.java

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import dev.emortal.api.kurushimi.Assignment;
1212
import dev.emortal.api.kurushimi.CreateTicketRequest;
1313
import dev.emortal.api.kurushimi.FrontendGrpc;
14+
import dev.emortal.api.kurushimi.KurushimiStubCollection;
1415
import dev.emortal.api.kurushimi.SearchFields;
1516
import dev.emortal.api.kurushimi.Ticket;
1617
import dev.emortal.api.kurushimi.WatchAssignmentRequest;
@@ -19,8 +20,6 @@
1920
import dev.emortal.api.utils.GrpcStubCollection;
2021
import dev.emortal.api.utils.callback.FunctionalFutureCallback;
2122
import dev.emortal.api.utils.callback.FunctionalStreamObserver;
22-
import io.grpc.ManagedChannel;
23-
import io.grpc.ManagedChannelBuilder;
2423
import io.grpc.Status;
2524
import net.kyori.adventure.text.Component;
2625
import net.kyori.adventure.text.minimessage.MiniMessage;
@@ -35,22 +34,12 @@ public class LobbySelectorListener {
3534

3635
private static final Component ERROR_MESSAGE = MiniMessage.miniMessage().deserialize("<red>Failed to connect to lobby");
3736

38-
private final McPlayerGrpc.McPlayerFutureStub mcPlayerService;
39-
private final FrontendGrpc.FrontendFutureStub matchmakingService;
40-
private final FrontendGrpc.FrontendStub matchmakingServiceBlocking;
37+
private final McPlayerGrpc.McPlayerFutureStub mcPlayerService = GrpcStubCollection.getPlayerService().orElse(null);
38+
private final FrontendGrpc.FrontendFutureStub matchmakingService = KurushimiStubCollection.getFutureStub().orElse(null);
39+
private final FrontendGrpc.FrontendStub matchmakingServiceBlocking = KurushimiStubCollection.getStub().orElse(null);
4140
private final ProxyServer proxy;
4241

4342
public LobbySelectorListener(ProxyServer proxy) {
44-
this.mcPlayerService = GrpcStubCollection.getPlayerService().orElse(null);
45-
46-
ManagedChannel channel = ManagedChannelBuilder.forAddress("matchmaker", 9090)
47-
.defaultLoadBalancingPolicy("round_robin")
48-
.usePlaintext()
49-
.build();
50-
51-
this.matchmakingService = FrontendGrpc.newFutureStub(channel);
52-
this.matchmakingServiceBlocking = FrontendGrpc.newStub(channel);
53-
5443
this.proxy = proxy;
5544
}
5645

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package dev.emortal.velocity.listener;
2+
3+
import com.velocitypowered.api.proxy.Player;
4+
import com.velocitypowered.api.proxy.ProxyServer;
5+
import com.velocitypowered.api.proxy.server.RegisteredServer;
6+
import com.velocitypowered.api.proxy.server.ServerInfo;
7+
import dev.emortal.api.messaging.general.ProxyServerSwitchMessage;
8+
import dev.emortal.api.service.ServerDiscoveryProto;
9+
import dev.emortal.velocity.rabbitmq.RabbitMqCore;
10+
import net.kyori.adventure.text.minimessage.MiniMessage;
11+
import net.kyori.adventure.text.minimessage.tag.resolver.Placeholder;
12+
13+
import java.net.InetSocketAddress;
14+
import java.util.HashSet;
15+
import java.util.Set;
16+
import java.util.UUID;
17+
18+
public class ServerChangeNotificationListener {
19+
private static final MiniMessage MINI_MESSAGE = MiniMessage.miniMessage();
20+
21+
private static final String TELEPORT_MESSAGE = "<green>Sending you to <gold><server_id><green>...</green>";
22+
23+
public ServerChangeNotificationListener(ProxyServer proxy, RabbitMqCore rabbitMq) {
24+
rabbitMq.addListener(ProxyServerSwitchMessage.class, message -> {
25+
ProxyServerSwitchMessage switchMessage = (ProxyServerSwitchMessage) message;
26+
27+
Set<Player> presentPlayers = new HashSet<>();
28+
for (String playerIdStr : switchMessage.getPlayerIdsList()) {
29+
UUID playerId = UUID.fromString(playerIdStr);
30+
proxy.getPlayer(playerId).ifPresent(presentPlayers::add);
31+
}
32+
33+
if (presentPlayers.isEmpty()) return;
34+
35+
ServerDiscoveryProto.ConnectableServer connectableServer = switchMessage.getServer();
36+
RegisteredServer server = proxy.getServer(connectableServer.getId()).orElse(null);
37+
if (server == null) {
38+
InetSocketAddress address = InetSocketAddress.createUnresolved(connectableServer.getAddress(), connectableServer.getPort());
39+
server = proxy.registerServer(new ServerInfo(connectableServer.getId(), address));
40+
}
41+
42+
for (Player player : presentPlayers) {
43+
player.sendMessage(MINI_MESSAGE.deserialize(TELEPORT_MESSAGE, Placeholder.unparsed("server_id", connectableServer.getId())));
44+
player.createConnectionRequest(server).fireAndForget();
45+
}
46+
});
47+
}
48+
}

src/main/java/dev/emortal/velocity/rabbitmq/RabbitMqEventListener.java renamed to src/main/java/dev/emortal/velocity/rabbitmq/RabbitMqCore.java

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,46 @@
11
package dev.emortal.velocity.rabbitmq;
22

3-
import dev.emortal.velocity.Environment;
4-
import dev.emortal.velocity.rabbitmq.types.DisconnectEventDataPackage;
5-
import dev.emortal.velocity.rabbitmq.types.ConnectEventDataPackage;
63
import com.google.gson.Gson;
4+
import com.google.protobuf.AbstractMessage;
75
import com.rabbitmq.client.AMQP;
86
import com.rabbitmq.client.Channel;
97
import com.rabbitmq.client.Connection;
108
import com.rabbitmq.client.ConnectionFactory;
119
import com.velocitypowered.api.event.Subscribe;
1210
import com.velocitypowered.api.event.connection.DisconnectEvent;
1311
import com.velocitypowered.api.event.connection.PostLoginEvent;
12+
import dev.emortal.api.utils.parser.ProtoParserRegistry;
13+
import dev.emortal.velocity.Environment;
14+
import dev.emortal.velocity.rabbitmq.types.ConnectEventDataPackage;
15+
import dev.emortal.velocity.rabbitmq.types.DisconnectEventDataPackage;
16+
import org.slf4j.Logger;
17+
import org.slf4j.LoggerFactory;
1418

1519
import java.io.IOException;
1620
import java.nio.charset.StandardCharsets;
1721
import java.util.Date;
22+
import java.util.Map;
23+
import java.util.concurrent.ConcurrentHashMap;
24+
import java.util.function.Consumer;
25+
26+
public class RabbitMqCore {
27+
private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMqCore.class);
1828

19-
public class RabbitMqEventListener {
2029
private static final String CONNECTIONS_EXCHANGE = "mc:connections";
30+
private static final String PROXY_ALL_EXCHANGE = "mc:proxy:all";
2131

2232
private static final String HOST = System.getenv("RABBITMQ_HOST");
2333
private static final String USERNAME = System.getenv("RABBITMQ_USERNAME");
2434
private static final String PASSWORD = System.getenv("RABBITMQ_PASSWORD");
2535

2636
private static final Gson GSON = new Gson();
2737

38+
private final Map<Class<?>, Consumer<AbstractMessage>> protoListeners = new ConcurrentHashMap<>();
2839
private final Connection connection;
2940
private final Channel channel;
41+
private final String selfQueueName;
3042

31-
public RabbitMqEventListener() {
43+
public RabbitMqCore() {
3244
ConnectionFactory connectionFactory = new ConnectionFactory();
3345
connectionFactory.setHost(HOST);
3446
connectionFactory.setUsername(USERNAME);
@@ -40,6 +52,30 @@ public RabbitMqEventListener() {
4052
} catch (Exception e) {
4153
throw new RuntimeException(e);
4254
}
55+
56+
String selfQueueName = null;
57+
58+
try {
59+
selfQueueName = this.channel.queueDeclare().getQueue();
60+
this.channel.queueBind(selfQueueName, PROXY_ALL_EXCHANGE, "");
61+
62+
LOGGER.info("Listening for messages on queue {}", selfQueueName);
63+
this.channel.basicConsume(selfQueueName, true, (consumerTag, delivery) -> {
64+
String type = delivery.getProperties().getType();
65+
66+
AbstractMessage message = ProtoParserRegistry.parse(type, delivery.getBody());
67+
68+
Consumer<AbstractMessage> listener = this.protoListeners.get(message.getClass());
69+
if (listener == null) LOGGER.warn("No listener registered for message of type {}", type);
70+
else listener.accept(message);
71+
72+
}, consumerTag -> LOGGER.warn("Consumer cancelled"));
73+
74+
} catch (IOException ex) {
75+
LOGGER.error("Failed to bind to proxy all exchange", ex);
76+
}
77+
78+
this.selfQueueName = selfQueueName;
4379
}
4480

4581
@Subscribe
@@ -74,6 +110,10 @@ public void onPlayerDisconnect(DisconnectEvent event) {
74110
}
75111
}
76112

113+
public <T extends AbstractMessage> void addListener(Class<T> message, Consumer<AbstractMessage> listener) {
114+
this.protoListeners.put(message, listener);
115+
}
116+
77117
public void shutdown() {
78118
try {
79119
this.channel.close();

src/main/java/dev/emortal/velocity/tablist/TabList.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ public TabList(CorePlugin plugin, ProxyServer proxy) {
4444
if (this.playerTracker == null) {
4545
this.currentFooter = MINI_MESSAGE.deserialize(TAB_LIST_FOOTER, Placeholder.parsed("online_players", "Not Connected"));
4646
} else {
47-
4847
this.proxy.getScheduler().buildTask(plugin, this::updateFooter)
4948
.repeat(5, TimeUnit.SECONDS).schedule();
5049
}

0 commit comments

Comments
 (0)