Skip to content

Commit 76d49fc

Browse files
committed
[INTERNAL][CORE] Adds stream access
This patch introduces new classes and interface. This is necessary to distinguish between plain stream connections and internal stuff. In general this patch is a whole rewrite of the DTM. It now manages two connection pools. One for packet connections and one for stream connections. In addition this commits adds Javadoc, TODOs and FIXMEs.
1 parent 6c77832 commit 76d49fc

28 files changed

+999
-687
lines changed
Lines changed: 47 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,26 @@
11
package saros.net;
22

33
import java.io.IOException;
4-
import saros.net.internal.IByteStreamConnection;
54
import saros.net.stream.StreamMode;
6-
import saros.net.xmpp.JID;
7-
8-
// TODO Javadoc
95

6+
/**
7+
* The connection manager is responsible for establishing connections to remote addresses.
8+
*
9+
* <p>It offers support for two connection types.
10+
*
11+
* <ol>
12+
* <li>Establishing a connection using {@link #connect(String, Object)} to establish a connection
13+
* which is needed in conjunction with the {@link ITransmitter transmitter} and {@link
14+
* IReceiver receiver} in order to send and receive related packages.
15+
* <li>Establishing a connection using {@link #connectStream(String, Object)} to establish a
16+
* {@link IStreamConnection connection} that can be used for custom purposes. In order to get
17+
* notified about such a connection on the remote side you have to install a {@link
18+
* IStreamConnectionListener}.
19+
* </ol>
20+
*
21+
* <p><b>Note</b>: Stream connections must be closed by calling {@link IStreamConnection#close()
22+
* close} on the given connection.
23+
*/
1024
public interface IConnectionManager {
1125

1226
public static final int IBB_SERVICE = 1;
@@ -21,24 +35,40 @@ public interface IConnectionManager {
2135
*/
2236
public void setServices(int serviceMask);
2337

24-
/** @deprecated */
25-
@Deprecated
26-
public IByteStreamConnection connect(JID peer) throws IOException;
38+
public void addStreamConnectionListener(final IStreamConnectionListener listener);
2739

28-
public IByteStreamConnection connect(String connectionID, JID peer) throws IOException;
40+
public void removeStreamConnectionListener(final IStreamConnectionListener listener);
2941

3042
/**
31-
* @deprecated Disconnects {@link IByteStreamConnection} with the specified peer
32-
* @param peer {@link JID} of the peer to disconnect the {@link IByteStreamConnection}
43+
* Connects to the given address using the given stream ID.
44+
*
45+
* @param id the ID of the stream
46+
* @param address the remote address to connect to
47+
* @return the stream connection
48+
* @throws IOException if an I/O error occurs or such a stream already exists
3349
*/
34-
@Deprecated
35-
public boolean closeConnection(JID peer);
50+
public IStreamConnection connectStream(String id, Object address) throws IOException;
3651

37-
public boolean closeConnection(String connectionIdentifier, JID peer);
52+
/**
53+
* Connects to the given address using the given ID.
54+
*
55+
* @param id
56+
* @param address
57+
* @throws IOException if an I/O error occurs
58+
*/
59+
public void connect(String id, Object address) throws IOException;
3860

39-
/** @deprecated */
40-
@Deprecated
41-
public StreamMode getTransferMode(JID jid);
61+
/**
62+
* Closes the given connection.
63+
*
64+
* @param id the ID of the connection
65+
* @param address the remote address
66+
* @return <code>true</code> if the connection was closed, <code>false</code> if no such
67+
* connection exists
68+
*/
69+
// TODO rename to close
70+
public boolean closeConnection(String id, Object address);
4271

43-
public StreamMode getTransferMode(String connectionID, JID jid);
72+
// TODO RENAME
73+
public StreamMode getTransferMode(String connectionID, Object address);
4474
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package saros.net;
2+
3+
import java.io.IOException;
4+
import java.io.InputStream;
5+
import java.io.OutputStream;
6+
import saros.net.internal.IConnection;
7+
8+
/**
9+
* A stream connection consists of an input and out stream. It up to the caller to gracefully
10+
* shutdown the connections.
11+
*/
12+
public interface IStreamConnection extends IConnection {
13+
14+
/**
15+
* Gets the input stream to receive data.
16+
*
17+
* @return the input stream to receive data
18+
* @throws IOException if an I/O error occurs
19+
*/
20+
public InputStream getInputStream() throws IOException;
21+
22+
/**
23+
* Gets the output stream to send data.
24+
*
25+
* @return the output stream to send data
26+
* @throws IOException if an I/O error occurs
27+
*/
28+
public OutputStream getOutputStream() throws IOException;
29+
30+
/**
31+
* Returns the read timeout in milliseconds.
32+
*
33+
* @return the read timeout
34+
* @throws IOException if an I/O error occurs
35+
*/
36+
public int getReadTimeout() throws IOException;
37+
38+
/**
39+
* Sets the read timeout in milliseconds.
40+
*
41+
* @throws IOException if an I/O error occurs
42+
*/
43+
public void setReadTimeout(int timeout) throws IOException;
44+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package saros.net;
2+
3+
/**
4+
* Listener interface for accepting remote stream connections.
5+
*
6+
* <p>The listener must either accept or reject the connection. If there are multiple listeners
7+
* installed via {@link IConnectionManager#addStreamConnectionListener(IStreamConnectionListener)}
8+
* the connection will only be rejected if all listeners reject.
9+
*
10+
* <p>Furthermore if a listener accepts the request every outstanding listener will <b>not</b> get
11+
* notified about this connection establishment.
12+
*/
13+
public interface IStreamConnectionListener {
14+
15+
/**
16+
* Gets called when a connection was established.
17+
*
18+
* @param connection the established connection
19+
* @return <code>true</code> to accept the connection or <code>false</code> to reject the
20+
* connection.
21+
*/
22+
public boolean connectionEstablished(IStreamConnection connection);
23+
}

core/src/saros/net/ITransmitter.java

Lines changed: 26 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,35 @@
1-
/*
2-
* DPP - Serious Distributed Pair Programming (c) Freie Universität Berlin -
3-
* Fachbereich Mathematik und Informatik - 2006 (c) Riad Djemili - 2006
4-
*
5-
* This program is free software; you can redistribute it and/or modify it under
6-
* the terms of the GNU General Public License as published by the Free Software
7-
* Foundation; either version 1, or (at your option) any later version.
8-
*
9-
* This program is distributed in the hope that it will be useful, but WITHOUT
10-
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11-
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12-
* details.
13-
*
14-
* You should have received a copy of the GNU General Public License along with
15-
* this program; if not, write to the Free Software Foundation, Inc., 675 Mass
16-
* Ave, Cambridge, MA 02139, USA.
17-
*/
18-
191
package saros.net;
202

213
import java.io.IOException;
224
import org.jivesoftware.smack.packet.Packet;
235
import org.jivesoftware.smack.packet.PacketExtension;
24-
import saros.annotations.Component;
256
import saros.net.xmpp.JID;
267

278
/**
28-
* A humble interface that is responsible for network functionality. The idea behind this interface
29-
* is to only encapsulates the least possible amount of functionality - the one that can't be easily
30-
* tested.
9+
* Interface for sending packets to remote addresses. In general this interface offers two
10+
* possibilities for sending packets.
3111
*
32-
* @author rdjemili
12+
* <ol>
13+
* <li>Either sending packets using the default network environment by calling {@link
14+
* #sendPacket(Packet)} or {@link #sendPacketExtension(JID, PacketExtension)}.
15+
* <li>Using {@link #send(String, JID, PacketExtension)} using a specific connection that must
16+
* first be established by calling {@link IConnectionManager#connect(String, Object)}.
17+
* </ol>
18+
*
19+
* The second option should always be used as default option when sending packets frequently and
20+
* over a longer time span to an already known address.
21+
*
22+
* <p><b>Implementation notes</b>: Implementation should consider to support connection ID's through
23+
* the {@link IConnectionManager}. If this is not possible the implementation <b>must</b> ensure
24+
* that connection lost is properly detected, i.e sending packets to a server which may route the
25+
* packets at a later time without getting an acknowledgement if the packet has been received is a
26+
* <b>violation</b> of the contract.
27+
*
28+
* @see IConnectionManager
29+
*/
30+
/*
31+
* TODO ensure we use IQ packets so the server must return an error. Afterwards we can change the contract of this interface.
3332
*/
34-
@Component(module = "net")
3533
public interface ITransmitter {
3634

3735
/**
@@ -63,13 +61,14 @@ public interface ITransmitter {
6361
public void send(JID recipient, PacketExtension extension) throws IOException;
6462

6563
/**
66-
* Sends the given {@link PacketExtension} to the given {@link JID} using a direct stream
67-
* connection. The connection must be already established to the recipient with the given id.
64+
* Sends the given {@link PacketExtension} to the given {@link JID} using the given connection ID.
65+
* A connection with the given connection id must already been established.
6866
*
69-
* @param connectionID the id of the connection
67+
* @param connectionID the ID of the connection
7068
* @param recipient the recipient of the extension
7169
* @param extension the extension to send
7270
* @throws IOException if an I/O error occurs
71+
* @see IConnectionManager#connect(String, Object)
7372
*/
7473
public void send(String connectionID, JID recipient, PacketExtension extension)
7574
throws IOException;

core/src/saros/net/internal/BinaryChannelConnection.java

Lines changed: 46 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
* @author coezbek
3131
* @author srossbach
3232
*/
33-
public class BinaryChannelConnection implements IByteStreamConnection {
33+
public class BinaryChannelConnection implements IPacketConnection {
3434

3535
private static final Logger LOG = Logger.getLogger(BinaryChannelConnection.class);
3636

@@ -49,21 +49,23 @@ private static class Opcode {
4949
/** Max size of data chunks */
5050
private static final int CHUNKSIZE = 32 * 1024 - 1;
5151

52-
private IByteStreamConnectionListener listener;
5352
private ReceiverThread receiveThread;
5453

5554
private final JID remoteAddress;
5655
private final JID localAddress;
5756

5857
private final String connectionID;
5958

59+
private final IConnectionClosedCallback callback;
60+
6061
private IDPool idPool = new IDPool();
6162

6263
private boolean connected;
6364
private boolean initialized;
6465

6566
private Map<Integer, ByteArrayOutputStream> pendingFragmentedPackets =
6667
new HashMap<Integer, ByteArrayOutputStream>();
68+
6769
private Map<Integer, BinaryXMPPExtension> pendingXMPPExtensions =
6870
new HashMap<Integer, BinaryXMPPExtension>();
6971

@@ -111,24 +113,16 @@ public void run() {
111113

112114
private IBinaryXMPPExtensionReceiver receiver;
113115

114-
public BinaryChannelConnection(
115-
JID localAddress,
116-
JID remoteAddress,
117-
String connectionID,
118-
ByteStream stream,
119-
StreamMode mode,
120-
IByteStreamConnectionListener listener)
121-
throws IOException {
122-
this.listener = listener;
123-
this.localAddress = localAddress;
124-
this.remoteAddress = remoteAddress;
125-
this.connectionID = connectionID;
126-
this.stream = stream;
127-
this.stream.setReadTimeout(0); // keep connection alive
128-
this.mode = mode;
116+
public BinaryChannelConnection(ByteStream stream, IConnectionClosedCallback callback) {
117+
this.callback = callback;
118+
// FIXME
119+
this.localAddress = (JID) stream.getLocalAddress();
120+
// FIXME
121+
this.remoteAddress = (JID) stream.getRemoteAddress();
129122

130-
outputStream = new DataOutputStream(new BufferedOutputStream(stream.getOutputStream()));
131-
inputStream = new DataInputStream(new BufferedInputStream(stream.getInputStream()));
123+
this.connectionID = stream.getId();
124+
this.mode = stream.getMode();
125+
this.stream = stream;
132126
}
133127

134128
@Override
@@ -138,14 +132,17 @@ public void setBinaryXMPPExtensionReceiver(IBinaryXMPPExtensionReceiver receiver
138132
this.receiver = receiver;
139133
}
140134

141-
@Override
142-
public synchronized void initialize() {
135+
public synchronized void initialize() throws IOException {
143136
if (initialized) return;
144137

145138
/*
146139
* it is ok to start the receiver a bit later because the data will be
147140
* already buffered by SMACK or the OS
148141
*/
142+
stream.setReadTimeout(0); // keep connection alive
143+
outputStream = new DataOutputStream(new BufferedOutputStream(stream.getOutputStream()));
144+
inputStream = new DataInputStream(new BufferedInputStream(stream.getInputStream()));
145+
149146
receiveThread = new ReceiverThread();
150147
receiveThread.setName("BinaryChannel-" + remoteAddress.getName());
151148
receiveThread.start();
@@ -154,13 +151,23 @@ public synchronized void initialize() {
154151
}
155152

156153
@Override
157-
public String getConnectionID() {
158-
return connectionID;
154+
public Object getLocalAddress() {
155+
return localAddress;
159156
}
160157

161158
@Override
162-
public synchronized boolean isConnected() {
163-
return connected;
159+
public JID getRemoteAddress() {
160+
return remoteAddress;
161+
}
162+
163+
@Override
164+
public StreamMode getMode() {
165+
return mode;
166+
}
167+
168+
@Override
169+
public String getId() {
170+
return connectionID;
164171
}
165172

166173
@Override
@@ -192,17 +199,7 @@ public void close() {
192199
}
193200
}
194201

195-
listener.connectionClosed(connectionID, this);
196-
}
197-
198-
@Override
199-
public StreamMode getMode() {
200-
return mode;
201-
}
202-
203-
@Override
204-
public JID getRemoteAddress() {
205-
return remoteAddress;
202+
if (callback != null) callback.connectionClosed(this);
206203
}
207204

208205
@Override
@@ -436,6 +433,10 @@ private BinaryXMPPExtension readNextXMPPExtension() throws IOException {
436433
throw new InterruptedIOException("interrupted while reading stream data");
437434
}
438435

436+
private synchronized boolean isConnected() {
437+
return connected;
438+
}
439+
439440
private synchronized void sendData(int fragmentId, byte[] data, int offset, int length)
440441
throws IOException {
441442

@@ -485,7 +486,15 @@ private void splitAndSend(byte[] data, int chunks, int fragmentId) throws IOExce
485486

486487
@Override
487488
public String toString() {
488-
return "[mode=" + getMode() + ", id=" + connectionID + "]" + " " + remoteAddress;
489+
return "PacketConnection [id="
490+
+ getId()
491+
+ ", mode="
492+
+ getMode()
493+
+ ", localAddress="
494+
+ getLocalAddress()
495+
+ ", remoteAddress="
496+
+ getRemoteAddress()
497+
+ "]";
489498
}
490499

491500
static class IDPool {

0 commit comments

Comments
 (0)