Tutorial 5: BLE Protobuf Operations
This document will provide a walk-through tutorial to use the Open GoPro Interface to send and receive BLE Protobuf Data.
A list of Protobuf Operations can be found in the Protobuf ID Table.
Requirements
It is assumed that the hardware and software requirements from the connecting BLE tutorial are present and configured correctly.
Just Show me the Demo(s)!!
-
Each of the scripts for this tutorial can be found in the Tutorial 5 directory.
Python >= 3.9 and < 3.12 must be used as specified in the requirementsYou can see some basic Protobuf usage, independent of a BLE connection, in the following script:
$ python protobuf_example.py
You can test sending Set Turbo Mode to your camera through BLE using the following script:
$ python set_turbo_mode.py
See the help for parameter definitions:
$ python set_turbo_mode.py --help usage: set_turbo_mode.py [-h] [-i IDENTIFIER] Connect to a GoPro camera, send Set Turbo Mode and parse the response options: -h, --help show this help message and exit -i IDENTIFIER, --identifier IDENTIFIER Last 4 digits of GoPro serial number, which is the last 4 digits of the default camera SSID. If not used, first discovered GoPro will be connected to
TODO
-
TODO
Compiling Protobuf Files
The Protobuf files used to compile source code for the Open GoPro Interface exist in the top-level protobuf directory of the Open GoPro repository.
It is mostly out of the scope of these tutorials to describe how to compile these since this process is clearly defined
in the per-language Protobuf Tutorial. For the purposes of these tutorials
(and shared with the Python SDK), the Protobuf files are compiled
using the Docker image defined in tools/proto_build.
This build process can be performed using make protos
from the top level of this repo.
Working with Protobuf Messages
Let’s first perform some basic serialization and deserialization of a Protobuf message. For this example, we are going to use the Set Turbo Transfer operation:
Per the documentation, this operation’s request payload should be serialized using the Protobuf message which can be found either in Documentation:
or source code:
/**
* Enable/disable display of "Transferring Media" UI
*
* Response: @ref ResponseGeneric
*/
message RequestSetTurboActive {
required bool active = 1; // Enable or disable Turbo Transfer feature
}
protobuf_example.py
Protobuf Message Example
First let’s instantiate the request message by setting the active
parameter and log the serialized bytes:
-
from tutorial_modules import proto request = proto.RequestSetTurboActive(active=False) logger.info(f"Sending ==> {request}") logger.info(request.SerializeToString().hex(":"))
which will log as such:
Sending ==> active: false 08:00
-
TODO
We’re not going to analyze these bytes since it is the purpose of the Protobuf framework is to abstract this. However it is important to be able to generate the serialized bytes from the instantiated Protobuf Message object in order to send the bytes via BLE.
Similarly, let’s now create a serialized response and show how to deserialize it into a ResponseGeneric object.
-
response_bytes = proto.ResponseGeneric(result=proto.EnumResultGeneric.RESULT_SUCCESS).SerializeToString() logger.info(f"Received bytes ==> {response_bytes.hex(':')}") response = proto.ResponseGeneric.FromString(response_bytes) logger.info(f"Received ==> {response}")
which will log as such:
Received bytes ==> 08:01 Received ==> result: RESULT_SUCCESS
-
TODO
Performing a Protobuf Operation
Now let’s actually perform a Protobuf Operation via BLE. First we need to discuss additional non-Protobuf-defined header bytes that are required for Protobuf Operations in the Open GoPro Interface.
Protobuf Packet Format
Besides having a compressed payload as defined per the Protobuf Specification, Open GoPro Protobuf operations also are identified by “Feature” and “Action” IDs. The top level message format (not including the standard headers) is as follows:
Feature ID | Action ID | Serialized Protobuf Payload |
---|---|---|
1 Byte | 1 Byte | Variable Length |
This Feature / Action ID pair is used to identify the Protobuf Message that should be used to serialize / deserialize the payload. This mapping can be found in the Protobuf ID Table.
Protobuf Response Parser
Since the parsing of Protobuf messages is different than
TLV Parsing, we need to create a
ProtobufResponse
class by extending the Response
class from the
TLV Parsing Tutorial. This ProtobufResponse
parse
method will:
- Extract Feature and Action ID’s
- Parse the Protobuf payload using the specified Protobuf Message
-
This code can be found in
set_turbo_mode.py
class ProtobufResponse(Response): ... def parse(self, proto: type[ProtobufMessage]) -> None: self.feature_id = self.raw_bytes[0] self.action_id = self.raw_bytes[1] self.data = proto.FromString(bytes(self.raw_bytes[2:]))
-
TODO
The accumulation process is the same for TLV and Protobuf responses so have not overridden the base Response
class’s
accumulation
method and we are using the same notification handler as previous labs.
Set Turbo Transfer
Now let’s perform the Set Turbo Transfer operation and receive the response. First, we build the serialized byte request in the same manner as above), then prepend the Feature ID, Action ID, and length bytes:
-
turbo_mode_request = bytearray( [ 0xF1, # Feature ID 0x6B, # Action ID *proto.RequestSetTurboActive(active=False).SerializeToString(), ] ) turbo_mode_request.insert(0, len(turbo_mode_request))
-
TODO
We then send the message, wait to receive the response, and parse the response using the Protobuf Message specified from the Set Turbo Mode Documentation: ResponseGeneric.
-
await client.write_gatt_char(request_uuid.value, turbo_mode_request, response=True) response = await received_responses.get() response.parse(proto.ResponseGeneric) assert response.feature_id == 0xF1 assert response.action_id == 0xEB logger.info(response.data)
which will log as such:
Setting Turbo Mode Off Writing 04:f1:6b:08:00 to GoProUuid.COMMAND_REQ_UUID Received response at UUID GoProUuid.COMMAND_RSP_UUID: 04:f1:eb:08:01 Set Turbo Mode response complete received. Successfully set turbo mode result: RESULT_SUCCESS
-
TODO
Deciphering Response Type
This same procedure is used for all Protobuf Operations. Coupled with the information from previous tutorials, you are now capable of parsing any response received from the GoPro.
However we have not yet covered how to decipher the response type: Command, Query, Protobuf, etc. The algorithm to do so is defined in the GoPro BLE Spec and reproduced here for reference:
Response Manager
We’re now going to create a monolithic ResponseManager
class to implement this algorithm to perform (at least initial)
parsing of all response types:
-
The sample code below is taken from
decipher_response.py
The
ResponseManager
is a wrapper around aBleakClient
to manage accumulating, parsing, and retrieving responses.First, let’s create a non-initialized response manager, connect to get a
BleakClient
and initialize the manager by setting the client:manager = ResponseManager() manager.set_client(await connect_ble(manager.notification_handler, identifier))
Then, in the notification handler, we “decipher” the response before enqueueing it to the received response queue:
async def notification_handler(self, characteristic: BleakGATTCharacteristic, data: bytearray) -> None: uuid = GoProUuid(self.client.services.characteristics[characteristic.handle].uuid) logger.debug(f'Received response at {uuid}: {data.hex(":")}') response = self._responses_by_uuid[uuid] response.accumulate(data) # Enqueue if we have received the entire response if response.is_received: await self._q.put(self.decipher_response(response)) # Reset the accumulating response self._responses_by_uuid[uuid] = Response(uuid)
where “deciphering” is the implementation of the above algorithm:
def decipher_response(self, undeciphered_response: Response) -> ConcreteResponse: payload = undeciphered_response.raw_bytes # Are the first two payload bytes a real Fetaure / Action ID pair? if (index := ProtobufId(payload[0], payload[1])) in ProtobufIdToMessage: if not (proto_message := ProtobufIdToMessage.get(index)): # We've only added protobuf messages for operations used in this tutorial. raise RuntimeError( f"{index} is a valid Protobuf identifier but does not currently have a defined message." ) else: # Now use the protobuf messaged identified by the Feature / Action ID pair to parse the remaining payload response = ProtobufResponse.from_received_response(undeciphered_response) response.parse(proto_message) return response # TLV. Should it be parsed as Command or Query? if undeciphered_response.uuid is GoProUuid.QUERY_RSP_UUID: # It's a TLV query response = QueryResponse.from_received_response(undeciphered_response) else: # It's a TLV command / setting. response = TlvResponse.from_received_response(undeciphered_response) # Parse the TLV payload (query, command, or setting) response.parse() return response
Only the minimal functionality needed for these tutorials have been added. For example, many Protobuf Feature / Action ID pairs do not have corresponding Protobuf Messages defined. -
TODO
After deciphering, the parsed method is placed in the response queue as a either a TlvResponse
, QueryResponse
, or
ProtobufResponse
.
Examples of Each Response Type
Now let’s perform operations that will demonstrate each response type:
-
# TLV Command (Setting) await set_resolution(manager) # TLV Command await get_resolution(manager) # TLV Query await set_shutter_off(manager) # Protobuf await set_turbo_mode(manager)
These four methods will perform the same functionality we’ve demonstrated in previous tutorials, now using our
ResponseManager
.We’ll walk through the
get_resolution
method here. First build the request and send it:request = bytes([0x03, 0x02, 0x01, 0x09]) request_uuid = GoProUuid.SETTINGS_REQ_UUID await manager.client.write_gatt_char(request_uuid.value, request, response=True)
Then retrieve the response from the manager:
tlv_response = await manager.get_next_response_as_tlv() logger.info(f"Set resolution status: {tlv_response.status}")
This logs as such:
Getting the current resolution Writing to GoProUuid.QUERY_REQ_UUID: 02:12:02 Received response at GoProUuid.QUERY_RSP_UUID: 05:12:00:02:01:09 Received current resolution: Resolution.RES_1080
Note that each example retrieves the parsed response from the manager via one of the following methods:
get_next_response_as_tlv
get_next_response_as_query
get_next_response_as_response
These are functionally the same as they just retrieve the next received response from the manager’s queue and only exist as helpers to simplify typing. -
TODO
Troubleshooting
See the first tutorial’s troubleshooting section.
Good Job!
You can now accumulate, decipher, and parse any BLE response received from the GoPro.