Skip to content

Commit ecb5e10

Browse files
committed
[INTERNAL][CORE] Prepares Stream access
This patch introduces an new class / interface which is just used for the ITransmitter and IReceiver. This is necessary to distinguish between plan stream connections and internal stuff.
1 parent 6c77832 commit ecb5e10

27 files changed

+752
-548
lines changed

core/src/saros/net/IConnectionManager.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package saros.net;
22

33
import java.io.IOException;
4-
import saros.net.internal.IByteStreamConnection;
4+
import saros.net.internal.IConnection;
55
import saros.net.stream.StreamMode;
66
import saros.net.xmpp.JID;
77

@@ -21,24 +21,29 @@ public interface IConnectionManager {
2121
*/
2222
public void setServices(int serviceMask);
2323

24+
public void addStreamConnectionListener(final IStreamConnectionListener listener);
25+
26+
public void removeStreamConnectionListener(final IStreamConnectionListener listener);
27+
28+
public IStreamConnection connectStream(String id, Object address) throws IOException;
2429
/** @deprecated */
2530
@Deprecated
26-
public IByteStreamConnection connect(JID peer) throws IOException;
31+
public IConnection connect(Object address) throws IOException;
2732

28-
public IByteStreamConnection connect(String connectionID, JID peer) throws IOException;
33+
public IConnection connect(String connectionID, Object address) throws IOException;
2934

3035
/**
31-
* @deprecated Disconnects {@link IByteStreamConnection} with the specified peer
32-
* @param peer {@link JID} of the peer to disconnect the {@link IByteStreamConnection}
36+
* @deprecated Disconnects with the specified address
37+
* @param address
3338
*/
3439
@Deprecated
35-
public boolean closeConnection(JID peer);
40+
public boolean closeConnection(Object address);
3641

37-
public boolean closeConnection(String connectionIdentifier, JID peer);
42+
public boolean closeConnection(String connectionIdentifier, Object address);
3843

3944
/** @deprecated */
4045
@Deprecated
4146
public StreamMode getTransferMode(JID jid);
4247

43-
public StreamMode getTransferMode(String connectionID, JID jid);
48+
public StreamMode getTransferMode(String connectionID, Object address);
4449
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package saros.net;
2+
3+
import java.io.IOException;
4+
import java.io.InputStream;
5+
import java.io.OutputStream;
6+
import saros.net.internal.IConnection;
7+
8+
public interface IStreamConnection extends IConnection {
9+
10+
public InputStream getInputStream() throws IOException;
11+
12+
public OutputStream getOutputStream() throws IOException;
13+
14+
public int getReadTimeout() throws IOException;
15+
16+
public void setReadTimeout(int timeout) throws IOException;
17+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package saros.net;
2+
3+
public interface IStreamConnectionListener {
4+
5+
public boolean streamConnectionEstablished(String id, IStreamConnection connection);
6+
}

core/src/saros/net/internal/BinaryChannelConnection.java

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
* @author coezbek
3131
* @author srossbach
3232
*/
33-
public class BinaryChannelConnection implements IByteStreamConnection {
33+
public class BinaryChannelConnection implements IPacketConnection {
3434

3535
private static final Logger LOG = Logger.getLogger(BinaryChannelConnection.class);
3636

@@ -49,21 +49,23 @@ private static class Opcode {
4949
/** Max size of data chunks */
5050
private static final int CHUNKSIZE = 32 * 1024 - 1;
5151

52-
private IByteStreamConnectionListener listener;
5352
private ReceiverThread receiveThread;
5453

5554
private final JID remoteAddress;
5655
private final JID localAddress;
5756

5857
private final String connectionID;
5958

59+
private final IConnectionClosedCallback callback;
60+
6061
private IDPool idPool = new IDPool();
6162

6263
private boolean connected;
6364
private boolean initialized;
6465

6566
private Map<Integer, ByteArrayOutputStream> pendingFragmentedPackets =
6667
new HashMap<Integer, ByteArrayOutputStream>();
68+
6769
private Map<Integer, BinaryXMPPExtension> pendingXMPPExtensions =
6870
new HashMap<Integer, BinaryXMPPExtension>();
6971

@@ -111,24 +113,16 @@ public void run() {
111113

112114
private IBinaryXMPPExtensionReceiver receiver;
113115

114-
public BinaryChannelConnection(
115-
JID localAddress,
116-
JID remoteAddress,
117-
String connectionID,
118-
ByteStream stream,
119-
StreamMode mode,
120-
IByteStreamConnectionListener listener)
121-
throws IOException {
122-
this.listener = listener;
123-
this.localAddress = localAddress;
124-
this.remoteAddress = remoteAddress;
125-
this.connectionID = connectionID;
126-
this.stream = stream;
127-
this.stream.setReadTimeout(0); // keep connection alive
128-
this.mode = mode;
116+
public BinaryChannelConnection(ByteStream stream, IConnectionClosedCallback callback) {
117+
this.callback = callback;
118+
// FIXME
119+
this.localAddress = (JID) stream.getLocalAddress();
120+
// FIXME
121+
this.remoteAddress = (JID) stream.getRemoteAddress();
129122

130-
outputStream = new DataOutputStream(new BufferedOutputStream(stream.getOutputStream()));
131-
inputStream = new DataInputStream(new BufferedInputStream(stream.getInputStream()));
123+
this.connectionID = stream.getId();
124+
this.mode = stream.getMode();
125+
this.stream = stream;
132126
}
133127

134128
@Override
@@ -138,14 +132,17 @@ public void setBinaryXMPPExtensionReceiver(IBinaryXMPPExtensionReceiver receiver
138132
this.receiver = receiver;
139133
}
140134

141-
@Override
142-
public synchronized void initialize() {
135+
public synchronized void initialize() throws IOException {
143136
if (initialized) return;
144137

145138
/*
146139
* it is ok to start the receiver a bit later because the data will be
147140
* already buffered by SMACK or the OS
148141
*/
142+
stream.setReadTimeout(0); // keep connection alive
143+
outputStream = new DataOutputStream(new BufferedOutputStream(stream.getOutputStream()));
144+
inputStream = new DataInputStream(new BufferedInputStream(stream.getInputStream()));
145+
149146
receiveThread = new ReceiverThread();
150147
receiveThread.setName("BinaryChannel-" + remoteAddress.getName());
151148
receiveThread.start();
@@ -154,13 +151,13 @@ public synchronized void initialize() {
154151
}
155152

156153
@Override
157-
public String getConnectionID() {
158-
return connectionID;
154+
public Object getLocalAddress() {
155+
return stream.getRemoteAddress();
159156
}
160157

161158
@Override
162-
public synchronized boolean isConnected() {
163-
return connected;
159+
public String getId() {
160+
return connectionID;
164161
}
165162

166163
@Override
@@ -192,10 +189,9 @@ public void close() {
192189
}
193190
}
194191

195-
listener.connectionClosed(connectionID, this);
192+
if (callback != null) callback.connectionClosed(this);
196193
}
197194

198-
@Override
199195
public StreamMode getMode() {
200196
return mode;
201197
}
@@ -436,6 +432,10 @@ private BinaryXMPPExtension readNextXMPPExtension() throws IOException {
436432
throw new InterruptedIOException("interrupted while reading stream data");
437433
}
438434

435+
private synchronized boolean isConnected() {
436+
return connected;
437+
}
438+
439439
private synchronized void sendData(int fragmentId, byte[] data, int offset, int length)
440440
throws IOException {
441441

core/src/saros/net/internal/ConnectionPool.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,7 @@ final class ConnectionPool {
1515

1616
private boolean isOpen;
1717

18-
private final Map<String, IByteStreamConnection> pool =
19-
new HashMap<String, IByteStreamConnection>();
18+
private final Map<String, IConnection> pool = new HashMap<>();
2019

2120
/**
2221
* Opens the connection pool. After the connection pool is opened connections can be added,
@@ -31,19 +30,19 @@ public synchronized void open() {
3130
*/
3231
public void close() {
3332

34-
final Map<String, IByteStreamConnection> currentPoolCopy;
33+
final Map<String, IConnection> currentPoolCopy;
3534

3635
synchronized (this) {
3736
if (!isOpen) return;
3837

3938
isOpen = false;
40-
currentPoolCopy = new HashMap<String, IByteStreamConnection>(pool);
39+
currentPoolCopy = new HashMap<>(pool);
4140
pool.clear();
4241
}
4342

44-
for (Entry<String, IByteStreamConnection> entry : currentPoolCopy.entrySet()) {
43+
for (Entry<String, IConnection> entry : currentPoolCopy.entrySet()) {
4544
final String id = entry.getKey();
46-
final IByteStreamConnection connection = entry.getValue();
45+
final IConnection connection = entry.getValue();
4746
connection.close();
4847

4948
LOG.debug("closed connection [id=" + id + "]: " + connection);
@@ -57,7 +56,7 @@ public void close() {
5756
* @return the connection associated with the id or <code>null</code> if no such connection exists
5857
* or the pool is closed
5958
*/
60-
public synchronized IByteStreamConnection get(final String id) {
59+
public synchronized IConnection get(final String id) {
6160
return pool.get(id);
6261
}
6362

@@ -70,8 +69,7 @@ public synchronized IByteStreamConnection get(final String id) {
7069
* that was already added with the given id or <code>null</code> if no connection was added
7170
* with the given id
7271
*/
73-
public synchronized IByteStreamConnection add(
74-
final String id, final IByteStreamConnection connection) {
72+
public synchronized IConnection add(final String id, final IConnection connection) {
7573

7674
if (!isOpen) return connection;
7775

@@ -85,7 +83,7 @@ public synchronized IByteStreamConnection add(
8583
* @return the connection associated with the id or <code>null</code> if no such connection exists
8684
* or the pool is closed
8785
*/
88-
public synchronized IByteStreamConnection remove(final String id) {
86+
public synchronized IConnection remove(final String id) {
8987
if (!isOpen) return null;
9088

9189
return pool.remove(id);

0 commit comments

Comments
 (0)