Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/demo-agent.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ jobs:
pip3 install -e '.[test]'
dbus-run-session -- python3 -m pytest -v --cov=scapy_cbor --cov=bp --cov=udpcl --cov=tcpcl src
- name: Transfer udpcl
timeout-minutes: 3
run: dbus-run-session -- python3 -m udpcl.test.bundlegen --log=debug fullvalid
- name: Transfer tcpcl
timeout-minutes: 3
run: dbus-run-session -- python3 -m tcpcl.test.bundlegen --log=debug fullvalid

integration-test-sand:
Expand Down
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ To call DBus methods in one of the nodes:
## ACME Validation Prototype

To perform an ACME validation exchange between two nodes run the script:

```
node000 dbus-send --system --print-reply --dest=org.ietf.dtn.node.bp /org/ietf/dtn/bp/app/admin org.ietf.dtn.bp.admin.start_expect_acme_request string:"dDtaviYTPUWFS3NK37YWfQ" string:"tPUZNY4ONIk6LxErRFEjVw" string:"LPJNul-wow4m6DsqxbninhsWHlwfp0JecwQzYpOLmCQ" && \
docker container exec -it node001 dbus-send --system --print-reply --dest=org.ietf.dtn.node.bp /org/ietf/dtn/bp/app/admin org.ietf.dtn.bp.admin.send_acme_request string:"dtn://node000/" string:"dDtaviYTPUWFS3NK37YWfQ" string:"tPUZNY4ONIk6LxErRFEjVw" string:"p3yRYFU4KxwQaHQjJ2RdiQ" string:"LPJNul-wow4m6DsqxbninhsWHlwfp0JecwQzYpOLmCQ" && \
Expand All @@ -130,9 +131,19 @@ docker container exec -it node000 dbus-send --system --print-reply --dest=org.ie
## SAFE Prototype

To initiate a primary SA with another SAFE endpoint run:

```
./container/run.py --config container/example-safe.yaml exec node000 -- dbus-send --system --print-reply --dest=org.ietf.dtn.node.bp /org/ietf/dtn/bp/app/safe org.ietf.dtn.bp.safe.start string:dtn://node001/safe
```

## UDPCLv2 Prototype

A demonstration of active queue management (AQM) with a variation of the Prague congestion control algorithm (CCA) can be run with the following, which will configure container interfaces to use HTB rate control down to 100kBps with a CoDEL queue for ECN marking and then transfer an 8MB ADU which takes more than a minute at that rate.

```sh
./container/run.py --config container/example-sand.yaml act pkigen build create start ready rate_ctrl && \
sleep 9 && \
docker container exec node001 dbus-send --system --print-reply --dest=org.ietf.dtn.node.bp /org/ietf/dtn/bp/Agent org.ietf.dtn.bp.Agent.ping string:"dtn://node002/sand" int32:8000000
```

# Wireshark Protocols and Dissectors
Expand Down
3 changes: 2 additions & 1 deletion container/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ ENV PIP_DEFAULT_TIMEOUT=300
ENV CARGO_NET_GIT_FETCH_WITH_CLI=true

# Distro upgrade for security patches
RUN apt-get update && apt-get upgrade -y
RUN --mount=type=cache,target=/var/cache/apt,sharing=locked \
apt-get update && apt-get upgrade -y

# Use systemd as top-level process
RUN --mount=type=cache,target=/var/cache/apt,sharing=locked \
Expand Down
84 changes: 68 additions & 16 deletions container/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ def generate_end_entity(self, cafile: str, certfile: str, keyfile: str, mode: st
outfile.write(self._ca_cert.public_bytes(serialization.Encoding.PEM))


def _runcmd(parts, **kwargs):
def _runcmd(parts, **kwargs) -> subprocess.CompletedProcess:
LOGGER.info('Running command %s', ' '.join(f'"{part}"' for part in parts))
return subprocess.run(parts, **kwargs)

Expand All @@ -234,7 +234,7 @@ def __init__(self, stage_dir):
self._docker = re.split(r'\s+', os.environ.get('DOCKER', 'docker').strip())
self.composefile = os.path.join(stage_dir, 'docker-compose.yml')

def run_docker(self, args, **kwargs):
def run_docker(self, args, **kwargs) -> subprocess.CompletedProcess:
env = os.environ
env.update({
'DOCKER_BUILDKIT': '1',
Expand All @@ -243,17 +243,17 @@ def run_docker(self, args, **kwargs):
kwargs.setdefault('check', True)
return _runcmd(self._docker + args, **kwargs)

def run_docker_compose(self, args, **kwargs):
def run_docker_compose(self, args, **kwargs) -> subprocess.CompletedProcess:
env = os.environ
env.update({
'DOCKER_BUILDKIT': '1',
})
kwargs.setdefault('env', env)
kwargs.setdefault('check', True)
filepart = ['compose', '-f', self.composefile, '-p', 'demo']
filepart = ['compose', '-f', os.path.relpath(self.composefile), '-p', 'demo']
return _runcmd(self._docker + filepart + args, **kwargs)

def run_exec(self, args, **kwargs):
def run_exec(self, args, **kwargs) -> subprocess.CompletedProcess:
kwargs.setdefault('check', False)
return self.run_docker_compose(['exec'] + args, **kwargs)

Expand Down Expand Up @@ -296,7 +296,10 @@ def pkigen(self):

for (node_name, node_opts) in self._config['nodes'].items():
ipn_node = node_opts.get('ipn_node')
nodeid = f'ipn:{ipn_node}.0' if ipn_node else 'dtn://{}/'.format(node_name)
if ipn_node:
nodeid = 'ipn:{}.0'.format(ipn_node)
else:
nodeid = 'dtn://{}/'.format(node_name)

# Ubuntu common path mounted to /etc/ssl/
nodedir = os.path.join(self._stagedir, 'nodes', node_name, 'ssl')
Expand Down Expand Up @@ -352,7 +355,6 @@ def build(self):
'driver': 'bridge',
'driver_opts': {
'com.docker.network.bridge.name': f'br-{net_name}',
'com.docker.network.container_iface_prefix': net_name,
'com.docker.network.driver.mtu': net_opts.get('mtu', 1500),
},
'enable_ipv6': ('subnet6' in net_opts),
Expand Down Expand Up @@ -380,7 +382,7 @@ def build(self):
'deploy': {
'resources': {
'limits': {
'memory': '256M',
'memory': '1G',
},
},
},
Expand All @@ -403,7 +405,14 @@ def build(self):
'target': '/var/log/dtn',
},
],
'networks': [net_name for net_name in node_opts['nets']],
'networks': {
net_name: {
'driver_opts': {
'com.docker.network.endpoint.ifname': net_name,
}
}
for net_name in node_opts['nets']
},
}

node_serv['build'] = {
Expand Down Expand Up @@ -440,7 +449,7 @@ def build(self):
'multicast_member': [
{
'addr': 'FF05::114',
'iface': f'{net_name}0',
'iface': f'{net_name}',
}
for net_name in node_opts['nets']
],
Expand All @@ -460,7 +469,7 @@ def build(self):
bp_rx_routes = extconfig.get('bp_rx_routes', [])
bp_rx_routes += [
{
'eid_pattern': f'dtn://{node_name}/.*',
'eid_pattern': 'dtn://{}/.*'.format(node_name),
'action': 'deliver',
},
{
Expand All @@ -484,6 +493,7 @@ def build(self):

'polling': extconfig.get('udpcl_polling', []),
'init_listen': udpcl_listen,
'mtu_default': 1400,
},
'tcpcl': {
'log_level': 'debug',
Expand Down Expand Up @@ -535,7 +545,7 @@ def start(self):
@action
def ready(self):
''' Wait for services to be ready '''
for name, node in self._config['nodes'].items():
for name in self._config['nodes'].keys():
serv_name = 'dtn-bp-agent@node'

args = ['-T', name, 'systemctl', 'is-active', '-q', serv_name]
Expand All @@ -549,22 +559,64 @@ def ready(self):

@action
def check_sand(self):
''' Check container logs for presence of SAND verification '''
stop_at = 2
# limit number of checks
for _ix in range(10):
for _ix in range(20):
least = None
for node_name in self._config['nodes'].keys():
comp = self._docker.run_exec(['-T', node_name, 'journalctl', '--unit=dtn-bp-agent@node'], capture_output=True, text=True)
got = comp.stdout.count('Verified BIB num 3, target block num 1')
if least is None or got < least:
least = got
LOGGER.info('Least number of verified BIBs: %s', least)
if least >= 3:
if least >= stop_at:
return
time.sleep(3)
time.sleep(2)

for node_name in self._config['nodes'].keys():
self._docker.run_exec(['-T', node_name, 'journalctl', '--unit=dtn-bp-agent@node'])
raise RuntimeError('Did not see at least 3 verified BIBs')
raise RuntimeError(f'Did not see at least {stop_at} verified BIBs')

@action
def rate_ctrl(self):
''' Enable AQM and rate limits between nodes '''
net_seq = {}
for (net_name, net_opts) in self._config['nets'].items():
rate_name = '200kbps'
aqm_depth = '1000'
aqm_target = '1ms'
aqm_interval = '20ms'

net_seq[net_name] = [
[
'tc', 'qdisc', 'add', 'dev', net_name,
'root', 'handle', '1:', 'htb',
'default', '10'
],
[ # default flow limit
'tc', 'class', 'add', 'dev', net_name,
'parent', '1:', 'classid', '1:10', 'htb',
'rate', rate_name, 'ceil', rate_name
],
[ # AQM in this flow
'tc', 'qdisc', 'add', 'dev', net_name,
'parent', '1:10', 'handle', '102:', 'codel',
'limit', aqm_depth, 'target', aqm_target,
'interval', aqm_interval, 'ecn'
],
# [ # AQM in this flow
# 'tc', 'qdisc', 'add', 'dev', net_name,
# 'parent', '1:10', 'handle', '102:', 'red',
# 'limit', '20k', 'avpkt', '1400', 'ecn'
# ],
['tc', 'qdisc', 'show', 'dev', net_name]
]

for (node_name, node_opts) in self._config['nodes'].items():
for net_name in node_opts.get('nets', []):
for cmd in net_seq.get(net_name, []):
self._docker.run_exec([node_name] + cmd)

@action
def stop(self):
Expand Down
3 changes: 2 additions & 1 deletion src/bp/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ def send_bundle(self, ctr: BundleContainer):
# Assume the route is a TxRouteItem
cl_obj = self._cl_agent.get(ctr.route.cl_type)
if cl_obj:
self._logger.info('send_bundle raw_config %s', ctr.route.raw_config)
self._logger.info('send_bundle on %s raw_config %s', type(cl_obj), ctr.route.raw_config)
ctr.sender = cl_obj.send_bundle_func(ctr.route.raw_config)

if ctr.sender is None:
Expand Down Expand Up @@ -481,4 +481,5 @@ def ping(self, nodeid, datalen):
btsd=bytes(scapy.volatile.RandString(datalen)),
),
]
self._logger.info('ping to %s', nodeid)
self.send_bundle(ctr)
10 changes: 5 additions & 5 deletions src/tcpcl/test/bundlegen.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,8 @@ class Generator(object):
''' A 'bundle' data generator.
'''

BLOCK_NUM_PRIMARY = 1
BLOCK_TYPE_PRIMARY = 1
BLOCK_NUM_PAYLOAD = 1
BLOCK_TYPE_PAYLOAD = 1
BLOCK_TYPE_BIB = 11
BLOCK_TYPE_BCB = 12

Expand All @@ -193,7 +193,7 @@ class BlockType(enum.IntEnum):
def create_block_data(self, block_type, block_flags, bundle_flags):
''' Block-type-specific data gerator.
'''
if block_type == self.BLOCK_TYPE_PRIMARY and bundle_flags & 0x0002:
if block_type == self.BLOCK_TYPE_PAYLOAD and bundle_flags & 0x0002:
# Admin record
admin_type = 1
admin_data = [ # Status Report
Expand Down Expand Up @@ -329,8 +329,8 @@ def create_valid(self):
blocks.append(block)
# Last block is payload
if True:
block_type = self.BLOCK_TYPE_PRIMARY
block = self.create_block_random(block_type, bundle_flags, {self.BLOCK_NUM_PRIMARY})
block_type = self.BLOCK_TYPE_PAYLOAD
block = self.create_block_random(block_type, bundle_flags, {self.BLOCK_NUM_PAYLOAD})
blocks.append(block)

buf = io.BytesIO()
Expand Down
Loading
Loading