8 minute read

This document will provide a walk-through tutorial to use the Open GoPro Interface to send and receive BLE Protobuf Data.

Open GoPro uses Protocol Buffers Version 2

A list of Protobuf Operations can be found in the Protobuf ID Table.

This tutorial only considers sending these as one-off operations. That is, it does not consider state management / synchronization when sending multiple operations. This will be discussed in a future lab.

Requirements

It is assumed that the hardware and software requirements from the connecting BLE tutorial are present and configured correctly.

It is suggested that you have first completed the connect, sending commands, and parsing responses tutorials before going through this tutorial.

Just Show me the Demo(s)!!

  • Each of the scripts for this tutorial can be found in the Tutorial 5 directory.

    Python >= 3.9 and < 3.12 must be used as specified in the requirements

    You can see some basic Protobuf usage, independent of a BLE connection, in the following script:

    $ python protobuf_example.py
    

    You can test sending Set Turbo Mode to your camera through BLE using the following script:

    $ python set_turbo_mode.py
    

    See the help for parameter definitions:

    $ python set_turbo_mode.py --help
    usage: set_turbo_mode.py [-h] [-i IDENTIFIER]
    
    Connect to a GoPro camera, send Set Turbo Mode and parse the response
    
    options:
      -h, --help            show this help message and exit
      -i IDENTIFIER, --identifier IDENTIFIER
                            Last 4 digits of GoPro serial number, which is the last 4 digits of the default
                            camera SSID. If not used, first discovered GoPro will be connected to
    

    TODO

  • TODO

Compiling Protobuf Files

The Protobuf files used to compile source code for the Open GoPro Interface exist in the top-level protobuf directory of the Open GoPro repository.

It is mostly out of the scope of these tutorials to describe how to compile these since this process is clearly defined in the per-language Protobuf Tutorial. For the purposes of these tutorials (and shared with the Python SDK), the Protobuf files are compiled using the Docker image defined in tools/proto_build. This build process can be performed using make protos from the top level of this repo.

This information is strictly explanatory. It is in no way necessary to (re)build the Protobuf files for these tutorials as the pre-compiled Protobuf source code already resides in the same directory as this tutorial’s example code.

Working with Protobuf Messages

Let’s first perform some basic serialization and deserialization of a Protobuf message. For this example, we are going to use the Set Turbo Transfer operation:

protobuf_doc
Set Turbo Mode Documentation

Per the documentation, this operation’s request payload should be serialized using the Protobuf message which can be found either in Documentation:

protobuf_message_doc
RequestSetTurboActive documentation

or source code:

/**
 * Enable/disable display of "Transferring Media" UI
 *
 * Response: @ref ResponseGeneric
 */
message RequestSetTurboActive {
    required bool active = 1; // Enable or disable Turbo Transfer feature
}
This code can be found in protobuf_example.py

Protobuf Message Example

First let’s instantiate the request message by setting the active parameter and log the serialized bytes:

Your IDE should show the Protobuf Message’s API signature since type stubs were generated when compiling the Protobuf files.
  • from tutorial_modules import proto
    
    request = proto.RequestSetTurboActive(active=False)
    logger.info(f"Sending ==> {request}")
    logger.info(request.SerializeToString().hex(":"))
    

    which will log as such:

    Sending ==> active: false
    08:00
    
  • TODO

We’re not going to analyze these bytes since it is the purpose of the Protobuf framework is to abstract this. However it is important to be able to generate the serialized bytes from the instantiated Protobuf Message object in order to send the bytes via BLE.

Similarly, let’s now create a serialized response and show how to deserialize it into a ResponseGeneric object.

  • response_bytes = proto.ResponseGeneric(result=proto.EnumResultGeneric.RESULT_SUCCESS).SerializeToString()
    logger.info(f"Received bytes ==> {response_bytes.hex(':')}")
    response = proto.ResponseGeneric.FromString(response_bytes)
    logger.info(f"Received ==> {response}")
    

    which will log as such:

    Received bytes ==> 08:01
    Received ==> result: RESULT_SUCCESS
    
  • TODO

We’re not hard-coding serialized bytes here since it may not be constant across Protobuf versions

Performing a Protobuf Operation

Now let’s actually perform a Protobuf Operation via BLE. First we need to discuss additional non-Protobuf-defined header bytes that are required for Protobuf Operations in the Open GoPro Interface.

Protobuf Packet Format

Besides having a compressed payload as defined per the Protobuf Specification, Open GoPro Protobuf operations also are identified by “Feature” and “Action” IDs. The top level message format (not including the standard headers) is as follows:

Feature ID Action ID Serialized Protobuf Payload
1 Byte 1 Byte Variable Length

This Feature / Action ID pair is used to identify the Protobuf Message that should be used to serialize / deserialize the payload. This mapping can be found in the Protobuf ID Table.

Protobuf Response Parser

Since the parsing of Protobuf messages is different than TLV Parsing, we need to create a ProtobufResponse class by extending the Response class from the TLV Parsing Tutorial. This ProtobufResponse parse method will:

  1. Extract Feature and Action ID’s
  2. Parse the Protobuf payload using the specified Protobuf Message
  • This code can be found in set_turbo_mode.py
    class ProtobufResponse(Response):
        ...
    
        def parse(self, proto: type[ProtobufMessage]) -> None:
            self.feature_id = self.raw_bytes[0]
            self.action_id = self.raw_bytes[1]
            self.data = proto.FromString(bytes(self.raw_bytes[2:]))
    
  • TODO

The accumulation process is the same for TLV and Protobuf responses so have not overridden the base Response class’s accumulation method and we are using the same notification handler as previous labs.

Set Turbo Transfer

Now let’s perform the Set Turbo Transfer operation and receive the response. First, we build the serialized byte request in the same manner as above), then prepend the Feature ID, Action ID, and length bytes:

  • turbo_mode_request = bytearray(
        [
            0xF1,  # Feature ID
            0x6B,  # Action ID
            *proto.RequestSetTurboActive(active=False).SerializeToString(),
        ]
    )
    turbo_mode_request.insert(0, len(turbo_mode_request))
    
  • TODO

We then send the message, wait to receive the response, and parse the response using the Protobuf Message specified from the Set Turbo Mode Documentation: ResponseGeneric.

  • await client.write_gatt_char(request_uuid.value, turbo_mode_request, response=True)
    response = await received_responses.get()
    response.parse(proto.ResponseGeneric)
    assert response.feature_id == 0xF1
    assert response.action_id == 0xEB
    logger.info(response.data)
    

    which will log as such:

    Setting Turbo Mode Off
    Writing 04:f1:6b:08:00 to GoProUuid.COMMAND_REQ_UUID
    Received response at UUID GoProUuid.COMMAND_RSP_UUID: 04:f1:eb:08:01
    Set Turbo Mode response complete received.
    Successfully set turbo mode
    result: RESULT_SUCCESS
    
  • TODO

Deciphering Response Type

This same procedure is used for all Protobuf Operations. Coupled with the information from previous tutorials, you are now capable of parsing any response received from the GoPro.

However we have not yet covered how to decipher the response type: Command, Query, Protobuf, etc. The algorithm to do so is defined in the GoPro BLE Spec and reproduced here for reference:

Message Deciphering
Message Deciphering Algorithm

Response Manager

We’re now going to create a monolithic ResponseManager class to implement this algorithm to perform (at least initial) parsing of all response types:

  • The sample code below is taken from decipher_response.py

    The ResponseManager is a wrapper around a BleakClient to manage accumulating, parsing, and retrieving responses.

    First, let’s create a non-initialized response manager, connect to get a BleakClient and initialize the manager by setting the client:

    manager = ResponseManager()
    manager.set_client(await connect_ble(manager.notification_handler, identifier))
    

    Then, in the notification handler, we “decipher” the response before enqueueing it to the received response queue:

    async def notification_handler(self, characteristic: BleakGATTCharacteristic, data: bytearray) -> None:
        uuid = GoProUuid(self.client.services.characteristics[characteristic.handle].uuid)
        logger.debug(f'Received response at {uuid}: {data.hex(":")}')
    
        response = self._responses_by_uuid[uuid]
        response.accumulate(data)
    
        # Enqueue if we have received the entire response
        if response.is_received:
            await self._q.put(self.decipher_response(response))
            # Reset the accumulating response
            self._responses_by_uuid[uuid] = Response(uuid)
    

    where “deciphering” is the implementation of the above algorithm:

    def decipher_response(self, undeciphered_response: Response) -> ConcreteResponse:
        payload = undeciphered_response.raw_bytes
        # Are the first two payload bytes a real Fetaure / Action ID pair?
        if (index := ProtobufId(payload[0], payload[1])) in ProtobufIdToMessage:
            if not (proto_message := ProtobufIdToMessage.get(index)):
                # We've only added protobuf messages for operations used in this tutorial.
                raise RuntimeError(
                    f"{index} is a valid Protobuf identifier but does not currently have a defined message."
                )
            else:
                # Now use the protobuf messaged identified by the Feature / Action ID pair to parse the remaining payload
                response = ProtobufResponse.from_received_response(undeciphered_response)
                response.parse(proto_message)
                return response
        # TLV. Should it be parsed as Command or Query?
        if undeciphered_response.uuid is GoProUuid.QUERY_RSP_UUID:
            # It's a TLV query
            response = QueryResponse.from_received_response(undeciphered_response)
        else:
            # It's a TLV command / setting.
            response = TlvResponse.from_received_response(undeciphered_response)
        # Parse the TLV payload (query, command, or setting)
        response.parse()
        return response
    
    Only the minimal functionality needed for these tutorials have been added. For example, many Protobuf Feature / Action ID pairs do not have corresponding Protobuf Messages defined.
  • TODO

After deciphering, the parsed method is placed in the response queue as a either a TlvResponse, QueryResponse, or ProtobufResponse.

Examples of Each Response Type

Now let’s perform operations that will demonstrate each response type:

  • # TLV Command (Setting)
    await set_resolution(manager)
    # TLV Command
    await get_resolution(manager)
    # TLV Query
    await set_shutter_off(manager)
    # Protobuf
    await set_turbo_mode(manager)
    

    These four methods will perform the same functionality we’ve demonstrated in previous tutorials, now using our ResponseManager.

    We’ll walk through the get_resolution method here. First build the request and send it:

    request = bytes([0x03, 0x02, 0x01, 0x09])
    request_uuid = GoProUuid.SETTINGS_REQ_UUID
    await manager.client.write_gatt_char(request_uuid.value, request, response=True)
    

    Then retrieve the response from the manager:

    tlv_response = await manager.get_next_response_as_tlv()
    logger.info(f"Set resolution status: {tlv_response.status}")
    

    This logs as such:

    Getting the current resolution
    Writing to GoProUuid.QUERY_REQ_UUID: 02:12:02
    Received response at GoProUuid.QUERY_RSP_UUID: 05:12:00:02:01:09
    Received current resolution: Resolution.RES_1080
    

    Note that each example retrieves the parsed response from the manager via one of the following methods:

    • get_next_response_as_tlv
    • get_next_response_as_query
    • get_next_response_as_response
    These are functionally the same as they just retrieve the next received response from the manager’s queue and only exist as helpers to simplify typing.
  • TODO

Troubleshooting

See the first tutorial’s troubleshooting section.

Good Job!

Congratulations 🤙

You can now accumulate, decipher, and parse any BLE response received from the GoPro.

Updated: