| Index: tools/usb_gadget/gadget.py
|
| diff --git a/tools/usb_gadget/gadget.py b/tools/usb_gadget/gadget.py
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..fb48a2cfb3a122f8fede2b203c6755ec989913ef
|
| --- /dev/null
|
| +++ b/tools/usb_gadget/gadget.py
|
| @@ -0,0 +1,432 @@
|
| +# Copyright 2014 The Chromium Authors. All rights reserved.
|
| +# Use of this source code is governed by a BSD-style license that can be
|
| +# found in the LICENSE file.
|
| +
|
| +"""Generic USB gadget functionality.
|
| +"""
|
| +
|
| +import struct
|
| +
|
| +import usb_constants
|
| +
|
| +
|
| +class Gadget(object):
|
| + """Basic functionality for a USB device.
|
| +
|
| + Implements standard control requests assuming that a subclass will handle
|
| + class- or vendor-specific requests.
|
| + """
|
| +
|
| + def __init__(self, device_desc, fs_config_desc, hs_config_desc):
|
| + """Create a USB gadget device.
|
| +
|
| + Args:
|
| + device_desc: USB device descriptor.
|
| + fs_config_desc: Low/full-speed device descriptor.
|
| + hs_config_desc: High-speed device descriptor.
|
| + """
|
| + self._speed = usb_constants.Speed.UNKNOWN
|
| + self._chip = None
|
| + self._device_desc = device_desc
|
| + self._fs_config_desc = fs_config_desc
|
| + self._hs_config_desc = hs_config_desc
|
| + # dict mapping language codes to a dict mapping indexes to strings
|
| + self._strings = {}
|
| + # dict mapping interface numbers to a set of endpoint addresses
|
| + self._active_endpoints = {}
|
| +
|
| + def GetDeviceDescriptor(self):
|
| + return self._device_desc
|
| +
|
| + def GetFullSpeedConfigurationDescriptor(self):
|
| + return self._fs_config_desc
|
| +
|
| + def GetHighSpeedConfigurationDescriptor(self):
|
| + return self._hs_config_desc
|
| +
|
| + def GetConfigurationDescriptor(self):
|
| + if self._speed == usb_constants.Speed.FULL:
|
| + return self._fs_config_desc
|
| + elif self._speed == usb_constants.Speed.HIGH:
|
| + return self._hs_config_desc
|
| + else:
|
| + raise RuntimeError('Device is not connected.')
|
| +
|
| + def GetSpeed(self):
|
| + return self._speed
|
| +
|
| + def AddStringDescriptor(self, index, value, lang=0x0409):
|
| + """Add a string descriptor to this device.
|
| +
|
| + Args:
|
| + index: String descriptor index (matches 'i' fields in descriptors).
|
| + value: The string.
|
| + lang: Language code (default: English).
|
| +
|
| + Raises:
|
| + ValueError: The index or language code is invalid.
|
| + """
|
| + if index < 1 or index > 255:
|
| + raise ValueError('String descriptor index out of range.')
|
| + if lang < 0 or lang > 0xffff:
|
| + raise ValueError('String descriptor language code out of range.')
|
| +
|
| + lang_strings = self._strings.setdefault(lang, {})
|
| + lang_strings[index] = value
|
| +
|
| + def Connected(self, chip, speed):
|
| + """The device has been connected to a USB host.
|
| +
|
| + Args:
|
| + chip: USB controller.
|
| + speed: Connection speed.
|
| + """
|
| + self._speed = speed
|
| + self._chip = chip
|
| +
|
| + def Disconnected(self):
|
| + """The device has been disconnected from the USB host."""
|
| + self._speed = usb_constants.Speed.UNKNOWN
|
| + self._chip = None
|
| + self._active_endpoints.clear()
|
| +
|
| + def IsConnected(self):
|
| + return self._chip is not None
|
| +
|
| + def ControlRead(self, request_type, request, value, index, length):
|
| + """Handle a read on the control pipe (endpoint zero).
|
| +
|
| + Args:
|
| + request_type: bmRequestType field of the setup packet.
|
| + request: bRequest field of the setup packet.
|
| + value: wValue field of the setup packet.
|
| + index: wIndex field of the setup packet.
|
| + length: Maximum amount of data the host expects the device to return.
|
| +
|
| + Returns:
|
| + A buffer to return to the USB host with len <= length on success or
|
| + None to stall the pipe.
|
| + """
|
| + assert request_type & usb_constants.Dir.IN
|
| + typ = request_type & usb_constants.Type.MASK
|
| + recipient = request_type & usb_constants.Recipient.MASK
|
| + if typ == usb_constants.Type.STANDARD:
|
| + return self.StandardControlRead(
|
| + recipient, request, value, index, length)
|
| + elif typ == usb_constants.Type.CLASS:
|
| + return self.ClassControlRead(
|
| + recipient, request, value, index, length)
|
| + elif typ == usb_constants.Type.VENDOR:
|
| + return self.VendorControlRead(
|
| + recipient, request, value, index, length)
|
| +
|
| + def ControlWrite(self, request_type, request, value, index, data):
|
| + """Handle a write to the control pipe (endpoint zero).
|
| +
|
| + Args:
|
| + request_type: bmRequestType field of the setup packet.
|
| + request: bRequest field of the setup packet.
|
| + value: wValue field of the setup packet.
|
| + index: wIndex field of the setup packet.
|
| + data: Data stage of the request.
|
| +
|
| + Returns:
|
| + True on success, None to stall the pipe.
|
| + """
|
| + assert not request_type & usb_constants.Dir.IN
|
| + typ = request_type & usb_constants.Type.MASK
|
| + recipient = request_type & usb_constants.Recipient.MASK
|
| + if typ == usb_constants.Type.STANDARD:
|
| + return self.StandardControlWrite(
|
| + recipient, request, value, index, data)
|
| + elif typ == usb_constants.Type.CLASS:
|
| + return self.ClassControlWrite(
|
| + recipient, request, value, index, data)
|
| + elif typ == usb_constants.Type.VENDOR:
|
| + return self.VendorControlWrite(
|
| + recipient, request, value, index, data)
|
| +
|
| + def SendPacket(self, endpoint, data):
|
| + """Send a data packet on the given endpoint.
|
| +
|
| + Args:
|
| + endpoint: Endpoint address.
|
| + data: Data buffer.
|
| +
|
| + Raises:
|
| + ValueError: If the endpoint address is not valid.
|
| + RuntimeError: If the device is not connected.
|
| + """
|
| + if self._chip is None:
|
| + raise RuntimeError('Device is not connected.')
|
| + if not endpoint & usb_constants.Dir.IN:
|
| + raise ValueError('Cannot write to non-input endpoint.')
|
| + self._chip.SendPacket(endpoint, data)
|
| +
|
| + def ReceivePacket(self, endpoint, data):
|
| + """Handle an incoming data packet on one of the device's active endpoints.
|
| +
|
| + This method should be overridden by a subclass implementing endpoint-based
|
| + data transfers.
|
| +
|
| + Args:
|
| + endpoint: Endpoint address.
|
| + data: Data buffer.
|
| + """
|
| + pass
|
| +
|
| + def HaltEndpoint(self, endpoint):
|
| + """Signals a STALL condition to the host on the given endpoint.
|
| +
|
| + Args:
|
| + endpoint: Endpoint address.
|
| + """
|
| + self._chip.HaltEndpoint(endpoint)
|
| +
|
| + def StandardControlRead(self, recipient, request, value, index, length):
|
| + """Handle standard control transfers.
|
| +
|
| + Args:
|
| + recipient: Request recipient (device, interface, endpoint, etc.)
|
| + request: bRequest field of the setup packet.
|
| + value: wValue field of the setup packet.
|
| + index: wIndex field of the setup packet.
|
| + length: Maximum amount of data the host expects the device to return.
|
| +
|
| + Returns:
|
| + A buffer to return to the USB host with len <= length on success or
|
| + None to stall the pipe.
|
| + """
|
| + if request == usb_constants.Request.GET_DESCRIPTOR:
|
| + desc_type = value >> 8
|
| + desc_index = value & 0xff
|
| + desc_lang = index
|
| +
|
| + print 'GetDescriptor(recipient={}, type={}, index={}, lang={})'.format(
|
| + recipient, desc_type, desc_index, desc_lang)
|
| +
|
| + return self.GetDescriptor(recipient, desc_type, desc_index, desc_lang,
|
| + length)
|
| +
|
| + def GetDescriptor(self, recipient, typ, index, lang, length):
|
| + """Handle a standard GET_DESCRIPTOR request.
|
| +
|
| + See Universal Serial Bus Specification Revision 2.0 section 9.4.3.
|
| +
|
| + Args:
|
| + recipient: Request recipient (device, interface, endpoint, etc.)
|
| + typ: Descriptor type.
|
| + index: Descriptor index.
|
| + lang: Descriptor language code.
|
| + length: Maximum amount of data the host expects the device to return.
|
| +
|
| + Returns:
|
| + The value of the descriptor or None to stall the pipe.
|
| + """
|
| + if recipient == usb_constants.Recipient.DEVICE:
|
| + if typ == usb_constants.DescriptorType.STRING:
|
| + return self.GetStringDescriptor(index, lang, length)
|
| +
|
| + def ClassControlRead(self, recipient, request, value, index, length):
|
| + """Handle class-specific control transfers.
|
| +
|
| + This function should be overridden by a subclass implementing a particular
|
| + device class.
|
| +
|
| + Args:
|
| + recipient: Request recipient (device, interface, endpoint, etc.)
|
| + request: bRequest field of the setup packet.
|
| + value: wValue field of the setup packet.
|
| + index: wIndex field of the setup packet.
|
| + length: Maximum amount of data the host expects the device to return.
|
| +
|
| + Returns:
|
| + A buffer to return to the USB host with len <= length on success or
|
| + None to stall the pipe.
|
| + """
|
| + _ = recipient, request, value, index, length
|
| + return None
|
| +
|
| + def VendorControlRead(self, recipient, request, value, index, length):
|
| + """Handle vendor-specific control transfers.
|
| +
|
| + This function should be overridden by a subclass if implementing a device
|
| + that responds to vendor-specific requests.
|
| +
|
| + Args:
|
| + recipient: Request recipient (device, interface, endpoint, etc.)
|
| + request: bRequest field of the setup packet.
|
| + value: wValue field of the setup packet.
|
| + index: wIndex field of the setup packet.
|
| + length: Maximum amount of data the host expects the device to return.
|
| +
|
| + Returns:
|
| + A buffer to return to the USB host with len <= length on success or
|
| + None to stall the pipe.
|
| + """
|
| + _ = recipient, request, value, index, length
|
| + return None
|
| +
|
| + def StandardControlWrite(self, recipient, request, value, index, data):
|
| + """Handle standard control transfers.
|
| +
|
| + Args:
|
| + recipient: Request recipient (device, interface, endpoint, etc.)
|
| + request: bRequest field of the setup packet.
|
| + value: wValue field of the setup packet.
|
| + index: wIndex field of the setup packet.
|
| + data: Data stage of the request.
|
| +
|
| + Returns:
|
| + True on success, None to stall the pipe.
|
| + """
|
| + _ = data
|
| +
|
| + if request == usb_constants.Request.SET_CONFIGURATION:
|
| + if recipient == usb_constants.Recipient.DEVICE:
|
| + return self.SetConfiguration(value)
|
| + elif request == usb_constants.Request.SET_INTERFACE:
|
| + if recipient == usb_constants.Recipient.INTERFACE:
|
| + return self.SetInterface(index, value)
|
| +
|
| + def ClassControlWrite(self, recipient, request, value, index, data):
|
| + """Handle class-specific control transfers.
|
| +
|
| + This function should be overridden by a subclass implementing a particular
|
| + device class.
|
| +
|
| + Args:
|
| + recipient: Request recipient (device, interface, endpoint, etc.)
|
| + request: bRequest field of the setup packet.
|
| + value: wValue field of the setup packet.
|
| + index: wIndex field of the setup packet.
|
| + data: Data stage of the request.
|
| +
|
| + Returns:
|
| + True on success, None to stall the pipe.
|
| + """
|
| + _ = recipient, request, value, index, data
|
| + return None
|
| +
|
| + def VendorControlWrite(self, recipient, request, value, index, data):
|
| + """Handle vendor-specific control transfers.
|
| +
|
| + This function should be overridden by a subclass if implementing a device
|
| + that responds to vendor-specific requests.
|
| +
|
| + Args:
|
| + recipient: Request recipient (device, interface, endpoint, etc.)
|
| + request: bRequest field of the setup packet.
|
| + value: wValue field of the setup packet.
|
| + index: wIndex field of the setup packet.
|
| + data: Data stage of the request.
|
| +
|
| + Returns:
|
| + True on success, None to stall the pipe.
|
| + """
|
| + _ = recipient, request, value, index, data
|
| + return None
|
| +
|
| + def GetStringDescriptor(self, index, lang, length):
|
| + """Handle a GET_DESCRIPTOR(String) request from the host.
|
| +
|
| + Descriptor index 0 returns the set of languages supported by the device.
|
| + All other indices return the string descriptors registered with those
|
| + indices.
|
| +
|
| + See Universal Serial Bus Specification Revision 2.0 section 9.6.7.
|
| +
|
| + Args:
|
| + index: Descriptor index.
|
| + lang: Descriptor language code.
|
| + length: Maximum amount of data the host expects the device to return.
|
| +
|
| + Returns:
|
| + The string descriptor or None to stall the pipe if the descriptor is not
|
| + found.
|
| + """
|
| + if index == 0:
|
| + length = 2 + len(self._strings) * 2
|
| + header = struct.pack('<BB', length, usb_constants.DescriptorType.STRING)
|
| + lang_codes = [struct.pack('<H', lang)
|
| + for lang in self._strings.iterkeys()]
|
| + buf = header + ''.join(lang_codes)
|
| + assert len(buf) == length
|
| + return buf[:length]
|
| + elif lang not in self._strings:
|
| + return None
|
| + elif index not in self._strings[lang]:
|
| + return None
|
| + else:
|
| + string = self._strings[lang][index].encode('UTF-16LE')
|
| + header = struct.pack(
|
| + '<BB', 2 + len(string), usb_constants.DescriptorType.STRING)
|
| + buf = header + string
|
| + return buf[:length]
|
| +
|
| + def SetConfiguration(self, index):
|
| + """Handle a SET_CONFIGURATION request from the host.
|
| +
|
| + See Universal Serial Bus Specification Revision 2.0 section 9.4.7.
|
| +
|
| + Args:
|
| + index: Configuration index selected.
|
| +
|
| + Returns:
|
| + True on success, None on error to stall the pipe.
|
| + """
|
| + print 'SetConfiguration({})'.format(index)
|
| +
|
| + for endpoint_addrs in self._active_endpoints.values():
|
| + for endpoint_addr in endpoint_addrs:
|
| + self._chip.StopEndpoint(endpoint_addr)
|
| + endpoint_addrs.clear()
|
| +
|
| + if index == 0:
|
| + # SET_CONFIGRATION(0) puts the device into the Address state which
|
| + # Windows does before suspending the port.
|
| + return True
|
| + elif index != 1:
|
| + return None
|
| +
|
| + config_desc = self.GetConfigurationDescriptor()
|
| + for interface_desc in config_desc.GetInterfaces():
|
| + if interface_desc.bAlternateSetting != 0:
|
| + continue
|
| + endpoint_addrs = self._active_endpoints.setdefault(
|
| + interface_desc.bInterfaceNumber, set())
|
| + for endpoint_desc in interface_desc.GetEndpoints():
|
| + self._chip.StartEndpoint(endpoint_desc)
|
| + endpoint_addrs.add(endpoint_desc.bEndpointAddress)
|
| + return True
|
| +
|
| + def SetInterface(self, interface, alt_setting):
|
| + """Handle a SET_INTERFACE request from the host.
|
| +
|
| + See Universal Serial Bus Specification Revision 2.0 section 9.4.10.
|
| +
|
| + Args:
|
| + interface: Interface number to configure.
|
| + alt_setting: Alternate setting to select.
|
| +
|
| + Returns:
|
| + True on success, None on error to stall the pipe.
|
| + """
|
| + print 'SetInterface({}, {})'.format(interface, alt_setting)
|
| +
|
| + config_desc = self.GetConfigurationDescriptor()
|
| + interface_desc = None
|
| + for interface_option in config_desc.GetInterfaces():
|
| + if (interface_option.bInterfaceNumber == interface and
|
| + interface_option.bAlternateSetting == alt_setting):
|
| + interface_desc = interface_option
|
| + if interface_desc is None:
|
| + return None
|
| +
|
| + endpoint_addrs = self._active_endpoints.setdefault(interface, set())
|
| + for endpoint_addr in endpoint_addrs:
|
| + self._chip.StopEndpoint(endpoint_addr)
|
| + for endpoint_desc in interface_desc.GetEndpoints():
|
| + self._chip.StartEndpoint(endpoint_desc)
|
| + endpoint_addrs.add(endpoint_desc.bEndpointAddress)
|
| + return True
|
|
|