Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public class ConsoleProxy {
static int httpCmdListenPort = 8001;
static int reconnectMaxRetry = 5;
static int readTimeoutSeconds = 90;
public static int defaultBufferSize = 64 * 1024;
static int keyboardType = KEYBOARD_RAW;
static String factoryClzName;
static boolean standaloneStart = false;
Expand Down Expand Up @@ -160,6 +161,12 @@ private static void configProxy(Properties conf) {
readTimeoutSeconds = Integer.parseInt(s);
LOGGER.info("Setting readTimeoutSeconds=" + readTimeoutSeconds);
}

s = conf.getProperty("consoleproxy.defaultBufferSize");
if (s != null) {
defaultBufferSize = Integer.parseInt(s);
LOGGER.info("Setting defaultBufferSize=" + defaultBufferSize);
}
}

public static ConsoleProxyServerFactory getHttpServerFactory() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ public class ConsoleProxyNoVncClient implements ConsoleProxyClient {
private ConsoleProxyClientParam clientParam;
private String sessionUuid;

private ByteBuffer readBuffer = null;
private int flushThreshold = -1;

public ConsoleProxyNoVncClient(Session session) {
this.session = session;
}
Expand Down Expand Up @@ -109,39 +112,54 @@ public void run() {
connectClientToVNCServer(tunnelUrl, tunnelSession, websocketUrl);
authenticateToVNCServer(clientSourceIp);

int readBytes;
byte[] b;
// Track consecutive iterations with no data and sleep accordingly. Only used for NIO socket connections.
int consecutiveZeroReads = 0;
int sleepTime = 1;
while (connectionAlive) {
logger.trace("Connection with client [{}] [IP: {}] is alive.", clientId, clientSourceIp);
if (client.isVncOverWebSocketConnection()) {
if (client.isVncOverWebSocketConnectionOpen()) {
updateFrontEndActivityTime();
}
connectionAlive = session.isOpen();
sleepTime = 1;
} else if (client.isVncOverNioSocket()) {
byte[] bytesArr;
int nextBytes = client.getNextBytes();
bytesArr = new byte[nextBytes];
client.readBytes(bytesArr, nextBytes);
logger.trace("Read [{}] bytes from client [{}].", nextBytes, clientId);
if (nextBytes > 0) {
session.getRemote().sendBytes(ByteBuffer.wrap(bytesArr));
ByteBuffer buffer = getOrCreateReadBuffer();
int bytesRead = client.readAvailableDataIntoBuffer(buffer, buffer.remaining());

if (bytesRead > 0) {
updateFrontEndActivityTime();
consecutiveZeroReads = 0; // Reset counter on successful read

sleepTime = 0; // Still no sleep to catch any remaining data quickly
} else {
connectionAlive = session.isOpen();
consecutiveZeroReads++;
// Use adaptive sleep time to prevent excessive busy waiting
sleepTime = Math.min(consecutiveZeroReads, 10); // Cap at 10ms max
}

final boolean bufferHasData = buffer.position() > 0;
if (bufferHasData && (bytesRead == 0 || buffer.remaining() <= flushThreshold)) {
buffer.flip();
logger.trace("Flushing buffer with [{}] bytes for client [{}]", buffer.remaining(), clientId);
session.getRemote().sendBytes(buffer);
buffer.compact();
}
} else {
b = new byte[100];
readBytes = client.read(b);
byte[] b = new byte[100];
int readBytes = client.read(b);
logger.trace("Read [{}] bytes from client [{}].", readBytes, clientId);
if (readBytes == -1 || (readBytes > 0 && !sendReadBytesToNoVNC(b, readBytes))) {
connectionAlive = false;
}
}
try {
Thread.sleep(1);
} catch (InterruptedException e) {
logger.error("Error on sleep for vnc sessions", e);
if (sleepTime > 0 && connectionAlive) {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
logger.error("Error on sleep for vnc sessions", e);
}
}
}
logger.info("Connection with client [{}] [IP: {}] is dead.", clientId, clientSourceIp);
Expand Down Expand Up @@ -316,9 +334,29 @@ private void setClientParam(ConsoleProxyClientParam param) {
this.clientParam = param;
}

private ByteBuffer getOrCreateReadBuffer() {
if (readBuffer == null) {
readBuffer = ByteBuffer.allocate(ConsoleProxy.defaultBufferSize);
logger.debug("Allocated {} KB read buffer for client [{}]", ConsoleProxy.defaultBufferSize / 1024 , clientId);

// Only apply batching logic for TLS connections to work around 16KB record limitation
// For non-TLS connections, use immediate flush for better responsiveness
if (client != null && client.isTLSConnectionEstablished()) {
flushThreshold = Math.min(ConsoleProxy.defaultBufferSize / 4, 2048);
logger.debug("TLS connection detected - using batching with threshold {} for client [{}]", flushThreshold, clientId);
} else {
flushThreshold = ConsoleProxy.defaultBufferSize + 1; // Always flush immediately
logger.debug("Non-TLS connection - using immediate flush for client [{}]", clientId);
}
}
return readBuffer;
}

@Override
public void closeClient() {
this.connectionAlive = false;
// Clear buffer reference to allow GC when client disconnects
this.readBuffer = null;
ConsoleProxy.removeViewer(this);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -502,18 +502,14 @@ public byte[] readServerInit() {
return nioSocketConnection.readServerInit();
}

public int getNextBytes() {
return nioSocketConnection.readNextBytes();
public int readAvailableDataIntoBuffer(ByteBuffer buffer, int maxSize) {
return nioSocketConnection.readAvailableDataIntoBuffer(buffer, maxSize);
}

public boolean isTLSConnectionEstablished() {
return nioSocketConnection.isTLSConnection();
}

public void readBytes(byte[] arr, int len) {
nioSocketConnection.readNextByteArray(arr, len);
}

public void processHandshakeSecurityType(int secType, String vmPassword, String host, int port) {
waitForNoVNCReply();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ private void initializeSocket() {
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.socket().setSoTimeout(5000);
socketChannel.socket().setKeepAlive(true);
socketChannel.socket().setTcpNoDelay(true);
writeSelector = Selector.open();
readSelector = Selector.open();
socketChannel.register(writeSelector, SelectionKey.OP_WRITE);
Expand Down Expand Up @@ -77,7 +79,6 @@ private void connectSocket(String host, int port) {
socketChannel.register(selector, SelectionKey.OP_CONNECT);

waitForSocketSelectorConnected(selector);
socketChannel.socket().setTcpNoDelay(false);
} catch (IOException e) {
logger.error(String.format("Error creating NioSocket to %s:%s: %s", host, port, e.getMessage()), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ public interface NioSocketHandler {
void readBytes(ByteBuffer data, int length);
String readString();
byte[] readServerInit();
int readNextBytes();
void readNextByteArray(byte[] arr, int len);
int readAvailableDataIntoBuffer(ByteBuffer buffer, int maxSize);

// Write operations
void writeUnsignedInteger(int sizeInBits, int value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.cloud.consoleproxy.vnc.network;


import com.cloud.consoleproxy.ConsoleProxy;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -28,13 +29,11 @@ public class NioSocketHandlerImpl implements NioSocketHandler {
private NioSocketOutputStream outputStream;
private boolean isTLS = false;

private static final int DEFAULT_BUF_SIZE = 16384;

protected Logger logger = LogManager.getLogger(getClass());

public NioSocketHandlerImpl(NioSocket socket) {
this.inputStream = new NioSocketInputStream(DEFAULT_BUF_SIZE, socket);
this.outputStream = new NioSocketOutputStream(DEFAULT_BUF_SIZE, socket);
this.inputStream = new NioSocketInputStream(ConsoleProxy.defaultBufferSize, socket);
this.outputStream = new NioSocketOutputStream(ConsoleProxy.defaultBufferSize, socket);
}

@Override
Expand Down Expand Up @@ -97,13 +96,8 @@ public byte[] readServerInit() {
}

@Override
public int readNextBytes() {
return inputStream.getNextBytes();
}

@Override
public void readNextByteArray(byte[] arr, int len) {
inputStream.readNextByteArrayFromReadBuffer(arr, len);
public int readAvailableDataIntoBuffer(ByteBuffer buffer, int maxSize) {
return inputStream.readAvailableDataIntoBuffer(buffer, maxSize);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,28 +175,38 @@ private byte[] readPixelFormat() {
return ArrayUtils.addAll(ret, (byte) 0, (byte) 0, (byte) 0);
}

protected int getNextBytes() {
int size = 200;
while (size > 0) {
if (checkForSizeWithoutWait(size)) {
break;
}
size--;
/**
* This method checks what data is immediately available and returns a reasonable amount.
*
* @param maxSize Maximum number of bytes to attempt to read
* @return Number of bytes available to read (0 if none available)
*/
protected int getAvailableBytes(int maxSize) {
// First check if we have data already in our buffer
int bufferedData = endPosition - currentPosition;
if (bufferedData > 0) {
return Math.min(bufferedData, maxSize);
}
return size;
}

protected void readNextByteArrayFromReadBuffer(byte[] arr, int len) {
copyBytesFromReadBuffer(len, arr);
// Try to read more data with non-blocking call
// This determines how much data is available
return getReadBytesAvailableToFitSize(1, maxSize, false);
}

protected void copyBytesFromReadBuffer(int length, byte[] arr) {
int ptr = 0;
while (length > 0) {
int n = getReadBytesAvailableToFitSize(1, length, true);
readBytes(ByteBuffer.wrap(arr, ptr, n), n);
ptr += n;
length -= n;
/**
* Read available data directly into a ByteBuffer.
*
* @param buffer ByteBuffer to read data into
* @param maxSize Maximum number of bytes to read
* @return Number of bytes actually read (0 if none available)
*/
protected int readAvailableDataIntoBuffer(ByteBuffer buffer, int maxSize) {
// Get the amount of data available to read
int available = getAvailableBytes(maxSize);
if (available > 0) {
// Read directly into the ByteBuffer
readBytes(buffer, available);
}
return available;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
// under the License.
package com.cloud.consoleproxy.vnc.network;

import com.cloud.consoleproxy.ConsoleProxy;

import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLException;
Expand Down Expand Up @@ -43,9 +45,9 @@ public NioSocketSSLEngineManager(SSLEngine sslEngine, NioSocketHandler socket) {

executor = Executors.newSingleThreadExecutor();

int pktBufSize = engine.getSession().getPacketBufferSize();
myNetData = ByteBuffer.allocate(pktBufSize);
peerNetData = ByteBuffer.allocate(pktBufSize);
int networkBufSize = Math.max(engine.getSession().getPacketBufferSize(), ConsoleProxy.defaultBufferSize);
myNetData = ByteBuffer.allocate(networkBufSize);
peerNetData = ByteBuffer.allocate(networkBufSize);
}

private void handshakeNeedUnwrap(ByteBuffer peerAppData) throws SSLException {
Expand Down Expand Up @@ -155,22 +157,25 @@ public int read(ByteBuffer data) throws IOException {
}

public int write(ByteBuffer data) throws IOException {
int n = 0;
int totalBytesConsumed = 0;
int sessionAppBufSize = engine.getSession().getApplicationBufferSize();
boolean shouldBatch = ConsoleProxy.defaultBufferSize > sessionAppBufSize;

while (data.hasRemaining()) {
SSLEngineResult result = engine.wrap(data, myNetData);
n += result.bytesConsumed();
totalBytesConsumed += result.bytesConsumed();
switch (result.getStatus()) {
case OK:
myNetData.flip();
outputStream.writeBytes(myNetData, myNetData.remaining());
outputStream.flushWriteBuffer();
myNetData.compact();
// Flush immediately if: batching is disabled, small data, or last chunk
if (!shouldBatch || result.bytesConsumed() < sessionAppBufSize || !data.hasRemaining()) {
flush();
}
// Otherwise accumulate for batching (large chunk with more data coming)
break;

case BUFFER_OVERFLOW:
myNetData.flip();
outputStream.writeBytes(myNetData, myNetData.remaining());
myNetData.compact();
// Flush when buffer is full
flush();
break;

case CLOSED:
Expand All @@ -181,7 +186,16 @@ public int write(ByteBuffer data) throws IOException {
break;
}
}
return n;
return totalBytesConsumed;
}

public void flush() {
if (myNetData.position() > 0) {
myNetData.flip();
outputStream.writeBytes(myNetData, myNetData.remaining());
outputStream.flushWriteBuffer();
myNetData.compact();
}
}

public SSLSession getSession() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.
package com.cloud.consoleproxy.vnc.network;

import com.cloud.consoleproxy.ConsoleProxy;
import com.cloud.utils.exception.CloudRuntimeException;

import java.io.IOException;
Expand All @@ -26,7 +27,7 @@ public class NioSocketTLSInputStream extends NioSocketInputStream {
private final NioSocketSSLEngineManager sslEngineManager;

public NioSocketTLSInputStream(NioSocketSSLEngineManager sslEngineManager, NioSocket socket) {
super(sslEngineManager.getSession().getApplicationBufferSize(), socket);
super(ConsoleProxy.defaultBufferSize, socket);
this.sslEngineManager = sslEngineManager;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
// under the License.
package com.cloud.consoleproxy.vnc.network;

import com.cloud.consoleproxy.ConsoleProxy;

import java.io.IOException;
import java.nio.ByteBuffer;

Expand All @@ -24,7 +26,7 @@ public class NioSocketTLSOutputStream extends NioSocketOutputStream {
private final NioSocketSSLEngineManager sslEngineManager;

public NioSocketTLSOutputStream(NioSocketSSLEngineManager sslEngineManager, NioSocket socket) {
super(sslEngineManager.getSession().getApplicationBufferSize(), socket);
super(ConsoleProxy.defaultBufferSize, socket);
this.sslEngineManager = sslEngineManager;
}

Expand All @@ -38,6 +40,7 @@ public void flushWriteBuffer() {
}

currentPosition = start;
sslEngineManager.flush();
}

protected int writeThroughSSLEngineManager(byte[] data, int startPos, int length) {
Expand Down
1 change: 1 addition & 0 deletions systemvm/agent/conf/consoleproxy.properties
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ consoleproxy.httpCmdListenPort=8001
consoleproxy.jarDir=./applet/
consoleproxy.viewerLinger=180
consoleproxy.reconnectMaxRetry=5
consoleproxy.defaultBufferSize=65536
Loading