Skip to content

Commit 6c77832

Browse files
committed
[INTERNAL][CORE] Moves Receiving logic to IReceiver
This patch moves the receiving logic to the logic where it should belong to.
1 parent 23529f7 commit 6c77832

File tree

17 files changed

+280
-250
lines changed

17 files changed

+280
-250
lines changed

core/src/saros/net/IConnectionManager.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,6 @@ public interface IConnectionManager {
2121
*/
2222
public void setServices(int serviceMask);
2323

24-
public void addTransferListener(ITransferListener listener);
25-
26-
public void removeTransferListener(ITransferListener listener);
27-
2824
/** @deprecated */
2925
@Deprecated
3026
public IByteStreamConnection connect(JID peer) throws IOException;

core/src/saros/net/IReceiver.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import org.jivesoftware.smack.PacketListener;
44
import org.jivesoftware.smack.filter.PacketFilter;
55
import org.jivesoftware.smack.packet.Packet;
6-
import saros.net.internal.BinaryXMPPExtension;
76

87
public interface IReceiver {
98

@@ -52,9 +51,6 @@ public interface IReceiver {
5251
*/
5352
public PacketCollector createCollector(PacketFilter filter);
5453

55-
/** FOR INTERNAL USE */
56-
public void processBinaryXMPPExtension(BinaryXMPPExtension extension);
57-
5854
public default void addTransferListener(ITransferListener listener) {
5955
// NOP
6056
}

core/src/saros/net/internal/BinaryChannelConnection.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,10 @@ public void run() {
9090

9191
LOG.debug(connection + " ReceiverThread started.");
9292
try {
93-
while (!isInterrupted()) listener.receive(readNextXMPPExtension());
93+
while (!isInterrupted()) {
94+
final BinaryXMPPExtension extension = readNextXMPPExtension();
95+
if (receiver != null) receiver.receive(extension);
96+
}
9497

9598
} catch (SocketException e) {
9699
LOG.debug(connection + " connection closed locally: " + e.getMessage());
@@ -106,6 +109,8 @@ public void run() {
106109
}
107110
}
108111

112+
private IBinaryXMPPExtensionReceiver receiver;
113+
109114
public BinaryChannelConnection(
110115
JID localAddress,
111116
JID remoteAddress,
@@ -126,6 +131,13 @@ public BinaryChannelConnection(
126131
inputStream = new DataInputStream(new BufferedInputStream(stream.getInputStream()));
127132
}
128133

134+
@Override
135+
public void setBinaryXMPPExtensionReceiver(IBinaryXMPPExtensionReceiver receiver) {
136+
if (this.receiver != null || receiver == null) return;
137+
138+
this.receiver = receiver;
139+
}
140+
129141
@Override
130142
public synchronized void initialize() {
131143
if (initialized) return;

core/src/saros/net/internal/DataTransferManager.java

Lines changed: 36 additions & 122 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package saros.net.internal;
22

3-
import java.io.ByteArrayOutputStream;
43
import java.io.IOException;
54
import java.io.InterruptedIOException;
65
import java.util.ArrayList;
@@ -12,18 +11,13 @@
1211
import java.util.concurrent.TimeUnit;
1312
import java.util.concurrent.locks.Lock;
1413
import java.util.concurrent.locks.ReentrantLock;
15-
import java.util.zip.DataFormatException;
16-
import java.util.zip.Inflater;
1714
import org.apache.log4j.Logger;
1815
import org.jivesoftware.smack.Connection;
1916
import saros.annotations.Component;
2017
import saros.context.IContextKeyBindings.IBBStreamService;
2118
import saros.context.IContextKeyBindings.Socks5StreamService;
2219
import saros.net.ConnectionState;
2320
import saros.net.IConnectionManager;
24-
import saros.net.IPacketInterceptor;
25-
import saros.net.IReceiver;
26-
import saros.net.ITransferListener;
2721
import saros.net.stream.IStreamService;
2822
import saros.net.stream.StreamMode;
2923
import saros.net.xmpp.IConnectionListener;
@@ -44,28 +38,18 @@ public class DataTransferManager implements IConnectionListener, IConnectionMana
4438

4539
private static final Logger LOG = Logger.getLogger(DataTransferManager.class);
4640

47-
private static final int CHUNKSIZE = 16 * 1024;
48-
4941
private static final String DEFAULT_CONNECTION_ID = "default";
5042

5143
private static final String IN = "in";
5244

5345
private static final String OUT = "out";
5446

55-
private final CopyOnWriteArrayList<IPacketInterceptor> packetInterceptors =
56-
new CopyOnWriteArrayList<IPacketInterceptor>();
57-
58-
private final List<ITransferListener> transferListeners =
59-
new CopyOnWriteArrayList<ITransferListener>();
60-
6147
private volatile JID currentLocalJID;
6248

6349
private Connection xmppConnection;
6450

6551
private int serviceMask = -1;
6652

67-
private final IReceiver receiver;
68-
6953
private final IStreamService mainService;
7054

7155
private final IStreamService fallbackService;
@@ -78,57 +62,15 @@ public class DataTransferManager implements IConnectionListener, IConnectionMana
7862

7963
private final List<IStreamService> streamServices = new CopyOnWriteArrayList<IStreamService>();
8064

65+
private final CopyOnWriteArrayList<IByteStreamConnectionListener> connectionListeners =
66+
new CopyOnWriteArrayList<>();
67+
8168
private final IByteStreamConnectionListener byteStreamConnectionListener =
8269
new IByteStreamConnectionListener() {
8370

84-
@Override
85-
public void receive(final BinaryXMPPExtension extension) {
86-
87-
boolean dispatchPacket = true;
88-
89-
for (IPacketInterceptor packetInterceptor : packetInterceptors)
90-
dispatchPacket &= packetInterceptor.receivedPacket(extension);
91-
92-
if (!dispatchPacket) return;
93-
94-
if (LOG.isTraceEnabled())
95-
LOG.trace(
96-
"received binary XMPP extension: "
97-
+ extension.getTransferDescription()
98-
+ ", size: "
99-
+ extension.getCompressedSize()
100-
+ ", RX time: "
101-
+ extension.getTransferDuration()
102-
+ " ms ["
103-
+ extension.getTransferMode()
104-
+ "]");
105-
106-
if (extension.getTransferDescription().compressContent()) {
107-
byte[] payload = extension.getPayload();
108-
long compressedPayloadLength = payload.length;
109-
110-
try {
111-
payload = inflate(payload);
112-
} catch (IOException e) {
113-
LOG.error("could not decompress extension payload", e);
114-
return;
115-
}
116-
117-
extension.setPayload(compressedPayloadLength, payload);
118-
}
119-
120-
notifyDataReceived(
121-
extension.getTransferMode(),
122-
extension.getCompressedSize(),
123-
extension.getUncompressedSize(),
124-
extension.getTransferDuration());
125-
126-
receiver.processBinaryXMPPExtension(extension);
127-
}
128-
12971
@Override
13072
public void connectionChanged(
131-
final String connectionID,
73+
final String connectionId,
13274
final IByteStreamConnection connection,
13375
final boolean incomingRequest) {
13476

@@ -137,7 +79,12 @@ public void connectionChanged(
13779

13880
final String id =
13981
toConnectionIDToken(
140-
connectionID, incomingRequest ? IN : OUT, connection.getRemoteAddress());
82+
connectionId, incomingRequest ? IN : OUT, connection.getRemoteAddress());
83+
84+
/// TODO we currently have to announce not initialized connections otherwise the IReceiver
85+
// will miss updates
86+
87+
notfiyconnectionChanged(id, connection, incomingRequest);
14188

14289
LOG.debug(
14390
"bytestream connection changed "
@@ -180,35 +127,25 @@ public void connectionChanged(
180127
}
181128

182129
@Override
183-
public void connectionClosed(String connectionID, IByteStreamConnection connection) {
184-
closeConnection(connectionID, connection.getRemoteAddress());
130+
public void connectionClosed(
131+
final String connectionId, final IByteStreamConnection connection) {
132+
closeConnection(connectionId, connection.getRemoteAddress());
133+
notfiyConnectionClosed(connectionId, connection);
185134
}
186135
};
187136

188137
public DataTransferManager(
189138
XMPPConnectionService connectionService,
190-
IReceiver receiver,
191139
@Nullable @Socks5StreamService IStreamService mainService,
192140
@Nullable @IBBStreamService IStreamService fallbackService) {
193141

194-
this.receiver = receiver;
195142
this.fallbackService = fallbackService;
196143
this.mainService = mainService;
197144
this.setStreamServices();
198145

199146
connectionService.addListener(this);
200147
}
201148

202-
@Override
203-
public void addTransferListener(ITransferListener listener) {
204-
transferListeners.add(listener);
205-
}
206-
207-
@Override
208-
public void removeTransferListener(ITransferListener listener) {
209-
transferListeners.remove(listener);
210-
}
211-
212149
/** @deprecated */
213150
@Override
214151
@Deprecated
@@ -289,6 +226,14 @@ public StreamMode getTransferMode(String connectionID, JID jid) {
289226
return connection == null ? StreamMode.NONE : connection.getMode();
290227
}
291228

229+
public void addConnectionListener(final IByteStreamConnectionListener listener) {
230+
connectionListeners.addIfAbsent(listener);
231+
}
232+
233+
public void removeConnectionListener(final IByteStreamConnectionListener listener) {
234+
connectionListeners.remove(listener);
235+
}
236+
292237
private IByteStreamConnection connectInternal(String connectionID, JID peer) throws IOException {
293238

294239
IByteStreamConnection connection = null;
@@ -433,27 +378,6 @@ public void connectionStateChanged(Connection connection, ConnectionState newSta
433378
else if (this.xmppConnection != null) disposeConnection();
434379
}
435380

436-
// TODO: move to ITransmitter
437-
public void addPacketInterceptor(IPacketInterceptor interceptor) {
438-
packetInterceptors.addIfAbsent(interceptor);
439-
}
440-
441-
// TODO: move to IReceiver
442-
public void removePacketInterceptor(IPacketInterceptor interceptor) {
443-
packetInterceptors.remove(interceptor);
444-
}
445-
446-
/**
447-
* Left over and <b>MUST</b> only used by the STF
448-
*
449-
* @param extension
450-
* @deprecated
451-
*/
452-
@Deprecated
453-
public void addIncomingTransferObject(BinaryXMPPExtension extension) {
454-
byteStreamConnectionListener.receive(extension);
455-
}
456-
457381
/**
458382
* Returns the current connection for the remote side. If the local side is connected to the
459383
* remote side as well as the remote side is connected to the local side the local to remote
@@ -482,39 +406,29 @@ private static String toConnectionIDToken(String connectionIdentifier, String mo
482406
return connectionIdentifier + ":" + mode + ":" + jid.toString();
483407
}
484408

485-
private void notifyDataReceived(
486-
final StreamMode mode,
487-
final long sizeCompressed,
488-
final long sizeUncompressed,
489-
final long duration) {
409+
private void notfiyConnectionClosed(
410+
final String connectionId, final IByteStreamConnection connection) {
490411

491-
for (final ITransferListener listener : transferListeners) {
412+
for (final IByteStreamConnectionListener listener : connectionListeners) {
492413
try {
493-
listener.received(mode, sizeCompressed, sizeUncompressed, duration);
414+
listener.connectionClosed(connectionId, connection);
494415
} catch (RuntimeException e) {
495-
LOG.error("invoking received() on listener: " + listener + " failed", e);
416+
LOG.error("invoking connectionClosed() on listener: " + listener + " failed", e);
496417
}
497418
}
498419
}
499420

500-
private static byte[] inflate(byte[] input) throws IOException {
501-
502-
ByteArrayOutputStream bos;
503-
Inflater decompressor = new Inflater();
421+
private void notfiyconnectionChanged(
422+
final String connectionId,
423+
final IByteStreamConnection connection,
424+
final boolean incomingRequest) {
504425

505-
decompressor.setInput(input, 0, input.length);
506-
bos = new ByteArrayOutputStream(input.length);
507-
508-
byte[] buf = new byte[CHUNKSIZE];
509-
510-
try {
511-
while (!decompressor.finished()) {
512-
int count = decompressor.inflate(buf);
513-
bos.write(buf, 0, count);
426+
for (final IByteStreamConnectionListener listener : connectionListeners) {
427+
try {
428+
listener.connectionChanged(connectionId, connection, incomingRequest);
429+
} catch (RuntimeException e) {
430+
LOG.error("invoking connectionChanged() on listener: " + listener + " failed", e);
514431
}
515-
return bos.toByteArray();
516-
} catch (DataFormatException e) {
517-
throw new IOException("failed to inflate data", e);
518432
}
519433
}
520434
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package saros.net.internal;
2+
3+
public interface IBinaryXMPPExtensionReceiver {
4+
5+
/**
6+
* Gets called when a {@linkplain BinaryXMPPExtension} was received.
7+
*
8+
* @param extension
9+
*/
10+
public void receive(final BinaryXMPPExtension extension);
11+
}

core/src/saros/net/internal/IByteStreamConnection.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,6 @@ public interface IByteStreamConnection {
3737
public String getConnectionID();
3838

3939
public StreamMode getMode();
40+
41+
public void setBinaryXMPPExtensionReceiver(IBinaryXMPPExtensionReceiver receiver);
4042
}

core/src/saros/net/internal/IByteStreamConnectionListener.java

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,12 @@
33
/**
44
* Listener interface used by IStreamService and IBytestreamConnection to notify about established
55
* or changed connections and incoming XMPP extensions.
6-
*
7-
* @author jurke
86
*/
97
public interface IByteStreamConnectionListener {
108

11-
/**
12-
* Gets called when a {@linkplain BinaryXMPPExtension} was received.
13-
*
14-
* @param extension
15-
*/
16-
public void receive(final BinaryXMPPExtension extension);
17-
18-
public void connectionClosed(String connectionID, IByteStreamConnection connection);
9+
public default void connectionClosed(String connectionID, IByteStreamConnection connection) {
10+
// NOP;
11+
}
1912

2013
/**
2114
* Gets called when a connection change is detected. The {@linkplain IByteStreamConnection
@@ -27,6 +20,8 @@ public interface IByteStreamConnectionListener {
2720
* @param incomingRequest <code>true</code> if the connection was a result of a remote connect
2821
* request, <code>false</code> if the connect request was initiated on the local side
2922
*/
30-
public void connectionChanged(
31-
String connectionID, IByteStreamConnection connection, boolean incomingRequest);
23+
public default void connectionChanged(
24+
String connectionID, IByteStreamConnection connection, boolean incomingRequest) {
25+
// NOP;
26+
}
3227
}

0 commit comments

Comments
 (0)