Skip to content

Feat cdc python #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from
75 changes: 51 additions & 24 deletions micropython/usbd/cdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
endpoint_descriptor,
EP_IN_FLAG
)

from .midi import RingBuf

from micropython import const
import ustruct
import time
Expand Down Expand Up @@ -44,13 +47,15 @@ class CDCControlInterface(USBInterface):
def __init__(self, interface_str):
super().__init__(_CDC_ITF_CONTROL_CLASS, _CDC_ITF_CONTROL_SUBCLASS,
_CDC_ITF_CONTROL_PROT)
self.ep_in = None


def get_itf_descriptor(self, num_eps, itf_idx, str_idx):
# CDC needs a Interface Association Descriptor (IAD)
# first interface is zero, two interfaces in total
desc = ustruct.pack("<BBBBBBBB", 8, _ITF_ASSOCIATION_DESC_TYPE, itf_idx, 2,
_CDC_ITF_CONTROL_CLASS, _CDC_ITF_CONTROL_SUBCLASS,
_CDC_ITF_CONTROL_PROT, 0) # "IAD"
_CDC_ITF_CONTROL_PROT, 4) # "IAD"
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understand, this is an index to a string descriptor describing the function. Do we have a string descriptor for that? Otherwise this value is 0.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I honestly don't remember why I made it 4, just that it worked.


itf, strs = super().get_itf_descriptor(num_eps, itf_idx, str_idx)
desc += itf
Expand All @@ -59,12 +64,13 @@ def get_itf_descriptor(self, num_eps, itf_idx, str_idx):
desc += ustruct.pack("<BBBH", 5, _CS_DESC_TYPE, 0, 0x0120) # "Header"
desc += ustruct.pack("<BBBBB", 5, _CS_DESC_TYPE, 1, 0, 1) # "Call Management"
desc += ustruct.pack("<BBBB", 4, _CS_DESC_TYPE, 2, 2) # "Abstract Control"
desc += ustruct.pack("<BBBH", 5, _CS_DESC_TYPE, 6, itf_idx, 1) # "Union"
desc += ustruct.pack("<BBBBB", 5, _CS_DESC_TYPE, 6, itf_idx, itf_idx+1) # "Union"
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, I corrected the itf_idx+1 already, but missed the struct pack pattern.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the big problem that fixed most things. It took me a while to realize it as well.

return desc, strs

def get_endpoint_descriptors(self, ep_addr, str_idx):
self.ep_in = endpoint_descriptor((ep_addr + 1) | EP_IN_FLAG, "interrupt", 8, 16)
return (self.ep_in, [], ((ep_addr+1) | EP_IN_FLAG,))
self.ep_in = (ep_addr) | EP_IN_FLAG
desc = endpoint_descriptor(self.ep_in, "interrupt", 8, 16)
return (desc, [], (self.ep_in,))
Comment on lines +71 to +73
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that looks good



class CDCDataInterface(USBInterface):
Expand All @@ -74,33 +80,54 @@ def __init__(self, interface_str, timeout=1):
super().__init__(_CDC_ITF_DATA_CLASS, _CDC_ITF_DATA_SUBCLASS,
_CDC_ITF_DATA_PROT)
self.rx_buf = bytearray(256)
self.mv_buf = memoryview(self.rx_buf)
self.rx_done = False
self.rx_nbytes = 0
self.timeout = timeout
# self.mv_buf = memoryview(self.rx_buf)
# self.rx_done = False
# self.rx_nbytes = 0
# self.timeout = timeout
self.rb = RingBuf(256)
self.ep_in = None
self.ep_out = None
self.read_cd_started = False

def get_endpoint_descriptors(self, ep_addr, str_idx):
self.ep_in = (ep_addr + 2) | EP_IN_FLAG
self.ep_out = (ep_addr + 2) & ~EP_IN_FLAG
# XXX OUT = 0x00 but is defined as 0x80?
self.ep_in = (ep_addr) | EP_IN_FLAG
self.ep_out = (ep_addr) & ~EP_IN_FLAG

Comment on lines +93 to +96
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about this one. As I recall, the CDC spec specifically asked for two endpoints for the data interface. So in my latest version it looks like

        self.ep_in = (ep_addr + 1) | EP_IN_FLAG
        self.ep_out = (ep_addr + 2)

# one IN / OUT Endpoint
e_out = endpoint_descriptor(self.ep_out, "bulk", 64, 0)
e_in = endpoint_descriptor(self.ep_in, "bulk", 64, 0)
return (e_out + e_in, [], (self.ep_out, self.ep_in))

def write(self, data):
super().submit_xfer(self.ep_in, data)

def read(self, nbytes=0):
# XXX PoC.. When returning, it should probably
# copy it to a ringbuffer instead of leaving it here
super().submit_xfer(self.ep_out, self.rx_buf, self._cb_rx)
now = time.time()
self.rx_done = False
self.rx_nbytes = 0
while ((time.time() - now) < self.timeout) and not self.rx_done:
time.sleep_ms(10)
return bytes(self.mv_buf[:self.rx_nbytes]) if self.rx_done else None
# we are just going to ignore any sends before it's time
if self.ep_in == None:
return
self.submit_xfer(self.ep_in, data)
Comment on lines +104 to +106
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good


# read nbytes or until stop char is found
def read(self, nbytes=1, stop=None):

if self.read_cd_started == False:
if self.ep_out == None:
# ep_out hasn't been set yet, so we can't start recv'ing
return b'' # TODO: better output then just an empty message?
self._start_rx_cb()
self.read_cd_started = True
res = bytearray()
for i in range(nbytes):
nxt = self.rb.get()
if nxt == None:
break
res.append(nxt)
if nxt == ord(stop):
break
return res

def _start_rx_cb(self):
self.submit_xfer(self.ep_out, self.rx_buf, self._cb_rx)

def _cb_rx(self, ep, res, num_bytes):
self.rx_done = True
self.rx_nbytes = num_bytes
for i in range(0, num_bytes):
self.rb.put(self.rx_buf[i])
self.submit_xfer(self.ep_out, self.rx_buf, self._cb_rx)