Index: tools/usb_gadget/gadget.py |
diff --git a/tools/usb_gadget/gadget.py b/tools/usb_gadget/gadget.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..fb48a2cfb3a122f8fede2b203c6755ec989913ef |
--- /dev/null |
+++ b/tools/usb_gadget/gadget.py |
@@ -0,0 +1,432 @@ |
+# Copyright 2014 The Chromium Authors. All rights reserved. |
+# Use of this source code is governed by a BSD-style license that can be |
+# found in the LICENSE file. |
+ |
+"""Generic USB gadget functionality. |
+""" |
+ |
+import struct |
+ |
+import usb_constants |
+ |
+ |
+class Gadget(object): |
+ """Basic functionality for a USB device. |
+ |
+ Implements standard control requests assuming that a subclass will handle |
+ class- or vendor-specific requests. |
+ """ |
+ |
+ def __init__(self, device_desc, fs_config_desc, hs_config_desc): |
+ """Create a USB gadget device. |
+ |
+ Args: |
+ device_desc: USB device descriptor. |
+ fs_config_desc: Low/full-speed device descriptor. |
+ hs_config_desc: High-speed device descriptor. |
+ """ |
+ self._speed = usb_constants.Speed.UNKNOWN |
+ self._chip = None |
+ self._device_desc = device_desc |
+ self._fs_config_desc = fs_config_desc |
+ self._hs_config_desc = hs_config_desc |
+ # dict mapping language codes to a dict mapping indexes to strings |
+ self._strings = {} |
+ # dict mapping interface numbers to a set of endpoint addresses |
+ self._active_endpoints = {} |
+ |
+ def GetDeviceDescriptor(self): |
+ return self._device_desc |
+ |
+ def GetFullSpeedConfigurationDescriptor(self): |
+ return self._fs_config_desc |
+ |
+ def GetHighSpeedConfigurationDescriptor(self): |
+ return self._hs_config_desc |
+ |
+ def GetConfigurationDescriptor(self): |
+ if self._speed == usb_constants.Speed.FULL: |
+ return self._fs_config_desc |
+ elif self._speed == usb_constants.Speed.HIGH: |
+ return self._hs_config_desc |
+ else: |
+ raise RuntimeError('Device is not connected.') |
+ |
+ def GetSpeed(self): |
+ return self._speed |
+ |
+ def AddStringDescriptor(self, index, value, lang=0x0409): |
+ """Add a string descriptor to this device. |
+ |
+ Args: |
+ index: String descriptor index (matches 'i' fields in descriptors). |
+ value: The string. |
+ lang: Language code (default: English). |
+ |
+ Raises: |
+ ValueError: The index or language code is invalid. |
+ """ |
+ if index < 1 or index > 255: |
+ raise ValueError('String descriptor index out of range.') |
+ if lang < 0 or lang > 0xffff: |
+ raise ValueError('String descriptor language code out of range.') |
+ |
+ lang_strings = self._strings.setdefault(lang, {}) |
+ lang_strings[index] = value |
+ |
+ def Connected(self, chip, speed): |
+ """The device has been connected to a USB host. |
+ |
+ Args: |
+ chip: USB controller. |
+ speed: Connection speed. |
+ """ |
+ self._speed = speed |
+ self._chip = chip |
+ |
+ def Disconnected(self): |
+ """The device has been disconnected from the USB host.""" |
+ self._speed = usb_constants.Speed.UNKNOWN |
+ self._chip = None |
+ self._active_endpoints.clear() |
+ |
+ def IsConnected(self): |
+ return self._chip is not None |
+ |
+ def ControlRead(self, request_type, request, value, index, length): |
+ """Handle a read on the control pipe (endpoint zero). |
+ |
+ Args: |
+ request_type: bmRequestType field of the setup packet. |
+ request: bRequest field of the setup packet. |
+ value: wValue field of the setup packet. |
+ index: wIndex field of the setup packet. |
+ length: Maximum amount of data the host expects the device to return. |
+ |
+ Returns: |
+ A buffer to return to the USB host with len <= length on success or |
+ None to stall the pipe. |
+ """ |
+ assert request_type & usb_constants.Dir.IN |
+ typ = request_type & usb_constants.Type.MASK |
+ recipient = request_type & usb_constants.Recipient.MASK |
+ if typ == usb_constants.Type.STANDARD: |
+ return self.StandardControlRead( |
+ recipient, request, value, index, length) |
+ elif typ == usb_constants.Type.CLASS: |
+ return self.ClassControlRead( |
+ recipient, request, value, index, length) |
+ elif typ == usb_constants.Type.VENDOR: |
+ return self.VendorControlRead( |
+ recipient, request, value, index, length) |
+ |
+ def ControlWrite(self, request_type, request, value, index, data): |
+ """Handle a write to the control pipe (endpoint zero). |
+ |
+ Args: |
+ request_type: bmRequestType field of the setup packet. |
+ request: bRequest field of the setup packet. |
+ value: wValue field of the setup packet. |
+ index: wIndex field of the setup packet. |
+ data: Data stage of the request. |
+ |
+ Returns: |
+ True on success, None to stall the pipe. |
+ """ |
+ assert not request_type & usb_constants.Dir.IN |
+ typ = request_type & usb_constants.Type.MASK |
+ recipient = request_type & usb_constants.Recipient.MASK |
+ if typ == usb_constants.Type.STANDARD: |
+ return self.StandardControlWrite( |
+ recipient, request, value, index, data) |
+ elif typ == usb_constants.Type.CLASS: |
+ return self.ClassControlWrite( |
+ recipient, request, value, index, data) |
+ elif typ == usb_constants.Type.VENDOR: |
+ return self.VendorControlWrite( |
+ recipient, request, value, index, data) |
+ |
+ def SendPacket(self, endpoint, data): |
+ """Send a data packet on the given endpoint. |
+ |
+ Args: |
+ endpoint: Endpoint address. |
+ data: Data buffer. |
+ |
+ Raises: |
+ ValueError: If the endpoint address is not valid. |
+ RuntimeError: If the device is not connected. |
+ """ |
+ if self._chip is None: |
+ raise RuntimeError('Device is not connected.') |
+ if not endpoint & usb_constants.Dir.IN: |
+ raise ValueError('Cannot write to non-input endpoint.') |
+ self._chip.SendPacket(endpoint, data) |
+ |
+ def ReceivePacket(self, endpoint, data): |
+ """Handle an incoming data packet on one of the device's active endpoints. |
+ |
+ This method should be overridden by a subclass implementing endpoint-based |
+ data transfers. |
+ |
+ Args: |
+ endpoint: Endpoint address. |
+ data: Data buffer. |
+ """ |
+ pass |
+ |
+ def HaltEndpoint(self, endpoint): |
+ """Signals a STALL condition to the host on the given endpoint. |
+ |
+ Args: |
+ endpoint: Endpoint address. |
+ """ |
+ self._chip.HaltEndpoint(endpoint) |
+ |
+ def StandardControlRead(self, recipient, request, value, index, length): |
+ """Handle standard control transfers. |
+ |
+ Args: |
+ recipient: Request recipient (device, interface, endpoint, etc.) |
+ request: bRequest field of the setup packet. |
+ value: wValue field of the setup packet. |
+ index: wIndex field of the setup packet. |
+ length: Maximum amount of data the host expects the device to return. |
+ |
+ Returns: |
+ A buffer to return to the USB host with len <= length on success or |
+ None to stall the pipe. |
+ """ |
+ if request == usb_constants.Request.GET_DESCRIPTOR: |
+ desc_type = value >> 8 |
+ desc_index = value & 0xff |
+ desc_lang = index |
+ |
+ print 'GetDescriptor(recipient={}, type={}, index={}, lang={})'.format( |
+ recipient, desc_type, desc_index, desc_lang) |
+ |
+ return self.GetDescriptor(recipient, desc_type, desc_index, desc_lang, |
+ length) |
+ |
+ def GetDescriptor(self, recipient, typ, index, lang, length): |
+ """Handle a standard GET_DESCRIPTOR request. |
+ |
+ See Universal Serial Bus Specification Revision 2.0 section 9.4.3. |
+ |
+ Args: |
+ recipient: Request recipient (device, interface, endpoint, etc.) |
+ typ: Descriptor type. |
+ index: Descriptor index. |
+ lang: Descriptor language code. |
+ length: Maximum amount of data the host expects the device to return. |
+ |
+ Returns: |
+ The value of the descriptor or None to stall the pipe. |
+ """ |
+ if recipient == usb_constants.Recipient.DEVICE: |
+ if typ == usb_constants.DescriptorType.STRING: |
+ return self.GetStringDescriptor(index, lang, length) |
+ |
+ def ClassControlRead(self, recipient, request, value, index, length): |
+ """Handle class-specific control transfers. |
+ |
+ This function should be overridden by a subclass implementing a particular |
+ device class. |
+ |
+ Args: |
+ recipient: Request recipient (device, interface, endpoint, etc.) |
+ request: bRequest field of the setup packet. |
+ value: wValue field of the setup packet. |
+ index: wIndex field of the setup packet. |
+ length: Maximum amount of data the host expects the device to return. |
+ |
+ Returns: |
+ A buffer to return to the USB host with len <= length on success or |
+ None to stall the pipe. |
+ """ |
+ _ = recipient, request, value, index, length |
+ return None |
+ |
+ def VendorControlRead(self, recipient, request, value, index, length): |
+ """Handle vendor-specific control transfers. |
+ |
+ This function should be overridden by a subclass if implementing a device |
+ that responds to vendor-specific requests. |
+ |
+ Args: |
+ recipient: Request recipient (device, interface, endpoint, etc.) |
+ request: bRequest field of the setup packet. |
+ value: wValue field of the setup packet. |
+ index: wIndex field of the setup packet. |
+ length: Maximum amount of data the host expects the device to return. |
+ |
+ Returns: |
+ A buffer to return to the USB host with len <= length on success or |
+ None to stall the pipe. |
+ """ |
+ _ = recipient, request, value, index, length |
+ return None |
+ |
+ def StandardControlWrite(self, recipient, request, value, index, data): |
+ """Handle standard control transfers. |
+ |
+ Args: |
+ recipient: Request recipient (device, interface, endpoint, etc.) |
+ request: bRequest field of the setup packet. |
+ value: wValue field of the setup packet. |
+ index: wIndex field of the setup packet. |
+ data: Data stage of the request. |
+ |
+ Returns: |
+ True on success, None to stall the pipe. |
+ """ |
+ _ = data |
+ |
+ if request == usb_constants.Request.SET_CONFIGURATION: |
+ if recipient == usb_constants.Recipient.DEVICE: |
+ return self.SetConfiguration(value) |
+ elif request == usb_constants.Request.SET_INTERFACE: |
+ if recipient == usb_constants.Recipient.INTERFACE: |
+ return self.SetInterface(index, value) |
+ |
+ def ClassControlWrite(self, recipient, request, value, index, data): |
+ """Handle class-specific control transfers. |
+ |
+ This function should be overridden by a subclass implementing a particular |
+ device class. |
+ |
+ Args: |
+ recipient: Request recipient (device, interface, endpoint, etc.) |
+ request: bRequest field of the setup packet. |
+ value: wValue field of the setup packet. |
+ index: wIndex field of the setup packet. |
+ data: Data stage of the request. |
+ |
+ Returns: |
+ True on success, None to stall the pipe. |
+ """ |
+ _ = recipient, request, value, index, data |
+ return None |
+ |
+ def VendorControlWrite(self, recipient, request, value, index, data): |
+ """Handle vendor-specific control transfers. |
+ |
+ This function should be overridden by a subclass if implementing a device |
+ that responds to vendor-specific requests. |
+ |
+ Args: |
+ recipient: Request recipient (device, interface, endpoint, etc.) |
+ request: bRequest field of the setup packet. |
+ value: wValue field of the setup packet. |
+ index: wIndex field of the setup packet. |
+ data: Data stage of the request. |
+ |
+ Returns: |
+ True on success, None to stall the pipe. |
+ """ |
+ _ = recipient, request, value, index, data |
+ return None |
+ |
+ def GetStringDescriptor(self, index, lang, length): |
+ """Handle a GET_DESCRIPTOR(String) request from the host. |
+ |
+ Descriptor index 0 returns the set of languages supported by the device. |
+ All other indices return the string descriptors registered with those |
+ indices. |
+ |
+ See Universal Serial Bus Specification Revision 2.0 section 9.6.7. |
+ |
+ Args: |
+ index: Descriptor index. |
+ lang: Descriptor language code. |
+ length: Maximum amount of data the host expects the device to return. |
+ |
+ Returns: |
+ The string descriptor or None to stall the pipe if the descriptor is not |
+ found. |
+ """ |
+ if index == 0: |
+ length = 2 + len(self._strings) * 2 |
+ header = struct.pack('<BB', length, usb_constants.DescriptorType.STRING) |
+ lang_codes = [struct.pack('<H', lang) |
+ for lang in self._strings.iterkeys()] |
+ buf = header + ''.join(lang_codes) |
+ assert len(buf) == length |
+ return buf[:length] |
+ elif lang not in self._strings: |
+ return None |
+ elif index not in self._strings[lang]: |
+ return None |
+ else: |
+ string = self._strings[lang][index].encode('UTF-16LE') |
+ header = struct.pack( |
+ '<BB', 2 + len(string), usb_constants.DescriptorType.STRING) |
+ buf = header + string |
+ return buf[:length] |
+ |
+ def SetConfiguration(self, index): |
+ """Handle a SET_CONFIGURATION request from the host. |
+ |
+ See Universal Serial Bus Specification Revision 2.0 section 9.4.7. |
+ |
+ Args: |
+ index: Configuration index selected. |
+ |
+ Returns: |
+ True on success, None on error to stall the pipe. |
+ """ |
+ print 'SetConfiguration({})'.format(index) |
+ |
+ for endpoint_addrs in self._active_endpoints.values(): |
+ for endpoint_addr in endpoint_addrs: |
+ self._chip.StopEndpoint(endpoint_addr) |
+ endpoint_addrs.clear() |
+ |
+ if index == 0: |
+ # SET_CONFIGRATION(0) puts the device into the Address state which |
+ # Windows does before suspending the port. |
+ return True |
+ elif index != 1: |
+ return None |
+ |
+ config_desc = self.GetConfigurationDescriptor() |
+ for interface_desc in config_desc.GetInterfaces(): |
+ if interface_desc.bAlternateSetting != 0: |
+ continue |
+ endpoint_addrs = self._active_endpoints.setdefault( |
+ interface_desc.bInterfaceNumber, set()) |
+ for endpoint_desc in interface_desc.GetEndpoints(): |
+ self._chip.StartEndpoint(endpoint_desc) |
+ endpoint_addrs.add(endpoint_desc.bEndpointAddress) |
+ return True |
+ |
+ def SetInterface(self, interface, alt_setting): |
+ """Handle a SET_INTERFACE request from the host. |
+ |
+ See Universal Serial Bus Specification Revision 2.0 section 9.4.10. |
+ |
+ Args: |
+ interface: Interface number to configure. |
+ alt_setting: Alternate setting to select. |
+ |
+ Returns: |
+ True on success, None on error to stall the pipe. |
+ """ |
+ print 'SetInterface({}, {})'.format(interface, alt_setting) |
+ |
+ config_desc = self.GetConfigurationDescriptor() |
+ interface_desc = None |
+ for interface_option in config_desc.GetInterfaces(): |
+ if (interface_option.bInterfaceNumber == interface and |
+ interface_option.bAlternateSetting == alt_setting): |
+ interface_desc = interface_option |
+ if interface_desc is None: |
+ return None |
+ |
+ endpoint_addrs = self._active_endpoints.setdefault(interface, set()) |
+ for endpoint_addr in endpoint_addrs: |
+ self._chip.StopEndpoint(endpoint_addr) |
+ for endpoint_desc in interface_desc.GetEndpoints(): |
+ self._chip.StartEndpoint(endpoint_desc) |
+ endpoint_addrs.add(endpoint_desc.bEndpointAddress) |
+ return True |