MQTT API

The MQTT API provides updates of the state of the DE1, scale, and controller. An MQTT broker allows subscribing to the various notifications without putting additional load on the controller.

Versioning

The MQTT API’s JSON payloads are under semantic versioning. At this time, there is not discoverability that the list of topics have changes.

The payload version can be found with the version key. For example:

{
  "arrival_time": 1637014022.035329,
  "create_time": 1637014022.0355482,
  "state": "Idle",
  "substate": "HeatWaterTank",
  "previous_state": "Sleep",
  "previous_substate": "NoState",
  "is_error_state": false,
  "version": "1.0.0",
  "event_time": 1637014022.0356083,
  "sender": "DE1",
  "class": "StateUpdate"
}

The non-JSON payloads, such as the human-readable log messages and MQTT will, are not versioned at this time.

Will (Client Status)

The pyDE1 clients all register a will that is sent by the broker to subscribers in the event of a non-graceful disconnect. The code also tries to indicate if the client is present.

For pyDE1, the wills are set up in pyDE1/api/outbound/mqtt. The two topics are:

  • <config.mqtt.TOPIC_ROOT>/status/mqtt/logging

  • <config.mqtt.TOPIC_ROOT>/status/mqtt/notification

The values presently published are

class MQTTStatusText (enum.Enum):
  on_connection = 'Here'
  on_graceful_disconnect = 'Gone'
  on_will = 'Died'

Logging Feeds

As configured, there are two MQTT feeds of logging data from pyDE1, one with most fields from the LogRecord and the other already formatted. The log level is ERROR, by default. Both the level and the formatter can be set through the config file.

  • <config.mqtt.TOPIC_ROOT>/log

  • <config.mqtt.TOPIC_ROOT>/log/record

For version 1.0.0, the attributes from the LogRecord in the JSON include created, levelname, levelno, message, name, process, processName, thread, threadName.

Client Synchronization

Starting with pyDE1 v2.0, when changes are made to the pyDE1 controller or a DE1 connects, the resulting state of the impacted area this information is now sent over MQTT to its subscribers.

At this time the areas include the following topics:

  • update/de1/control

  • update/de1/setting

  • update/de1/calibration

  • update/de1/profile/id

The packets contain a JSON version similar to what a GET of the resource following the update/ prefix would provide. For example, changing the stop-at-weight level over the HTTP API results in an MQTT packet like

On update/de1/control

{"espresso": {"stop_at_time": null, "stop_at_volume": null,
              "stop_at_weight": 34, "move_on_weight": [],
              "disable_auto_tare": false,
              "profile_can_override_stop_limits": false,
              "profile_can_override_tank_temperature": true,
              "first_drops_threshold": 0.0, "last_drops_minimum_time": 3.0},
 "steam": {"stop_at_time": 200, "disable_auto_tare": false},
 "hot_water": {"stop_at_time": 0, "stop_at_volume": 0,
               "stop_at_weight": null, "disable_auto_tare": false,
               "temperature": 0},
 "hot_water_rinse": {"stop_at_time": 3.0, "disable_auto_tare": false,
                     "temperature": 92.0, "flow": 6.0},
 "tank_water_threshold": {"temperature": 0},
 "timestamp": 1675628682.0626736}

Timestamps are available in the MQTT packets as well as in the HTTP response header x-pyde1-timestamp to assist in disambiguation of the two sources.

Event Payloads

The majority of MQTT updates come from subclasses of the EventPayload class, defined in pyDE1/event_manager/payloads.py. These will always include:

  • arrival_time – When the “trigger” occurred

  • create_time – When the payload was created

  • sender – A string indicating the “source” of the message (typically the class name)

  • event_time – When the event was published

  • version – A string of the semantic version of the payloads

All times are those returned by time.time()

Those generated by an EventWithNotification will also include an action of either set or clear.

Overview of Notifications

This section lists and briefly describes the intent of the main notifications provided by pyde1.

StateUpdate

Contains the state and substate reported by the DE1.

The DE1 reports this only on changes. Current state, if needed, can be queried through the HTTP API, such as for app initialization after contacting to an already connected controller. (The state can’t be queried over Bluetooth from the DE1.)

ShotSampleWithVolumesUpdate

Contains the information reported by the DE1 in the ShotSample packet. This includes various pressures, flow rates, and temperatures.

It is augmented with estimated volumes for preinfuse, flow, and total, as well as an array of by-frame volumes.

Use of de1_time is preferred. At some time in the future, de1_time may represent a best-estimate of the DE1’s notion of reporting time, rather than the packet arrival time.

The DE1 reports this every 25 half-cycles of the AC while not in Sleep. This is 4 per second for 50 Hz and 4.8 per second for 60 Hz. While in Sleep, the rate appears to drop by a factor of three (these and other DE1-generated rates with firmware 1283).

WaterLevelUpdate

Contains the current water level in the tank and the refill level setting of the DE1.

Sent roughly 2.5 times a second when not in Sleep, 1/3 of that during Sleep (on 60 Hz).

WeightAndFlowUpdate

Contains the current weight along with estimates of weight and mass flow and their corresponding times.

Use of the corresponding times is preferred as they incorporate estimation delays.

current_weight
current_weight_time
average_flow
average_flow_time
median_weight
median_weight_time
median_flow
median_flow_time

Time is as would be reported by time.time(). Weight is as-set on the scale, typically in grams. Mass-flow is in weight units per second, typically grams/second.

scale_time may, in the future, represent a corrected time base for the scale, rather than just using the packet-arrival time.

Sent at the reporting rate of the scale, often 10 per second.

ScaleButtonPress

Sent when a button press is reported by the scale.

Includes an integer to identify the button pressed. Encoding is specific to each scale.

ScaleTareSeen

Sent after a tare request when the scale reports a value “close enough” to zero, within the timeout to respond to the tare request.

SequencerGateNotification

The FlowSequencer is responsible for managing and tracking flow during any of the flow phases, espresso, steam, hot water, and flush (hot water rinse). It assigns a sequence ID at the start of a sequence, that is used to associate the various records in the database with each other. There are then several “gates” that a sequence goes through. All gates are cleared (they are implemented as Event objects and adopt that object’s notion of .set(), .``.clear()``, and .wait()) when a DE1 state change indicates a new sequence beginning. As each gate is passed, it is set. Notifications are sent over MQTT for both clear and set.

class SequencerGateName (EventNotificationName):
    GATE_SEQUENCE_START = "sequence_start"
    GATE_FLOW_BEGIN = "sequence_flow_begin"
    GATE_EXPECT_DROPS = "sequence_expect_drops"
    GATE_EXIT_PREINFUSE = "sequence_exit_preinfuse"
    GATE_FLOW_END = "sequence_flow_end"
    GATE_FLOW_STATE_EXIT = "sequence_flow_state_exit"
    GATE_LAST_DROPS = "sequence_last_drops"
    GATE_SEQUENCE_COMPLETE = "sequence_complete"

The sequence_id is included in all packets, along with the action of either clear or set.

StopAtNotification

When the FlowSequencer is managing termination, a StopAtNotification is sent at termination that includes the stop_at type (time, volume, weight), target_value, current_value, as well as the active_state.

An action is sent to indicate if and when stop-at is active near the start of a sequence.

class StopAtNotificationAction (enum.Enum):
    ENABLED = 'enabled'
    TRIGGERED = 'triggered'
    DISABLED = 'disabled'
    DE1CONTROLLED = 'de1 controlled'

When the stop-at action is controlled by the DE1, no triggered notification is sent.

AutoTareNotification

Sent to indicate when auto-tare is enabled and disabled by the FlowSequencer

class AutoTareNotificationAction (enum.Enum):
    ENABLED = 'enabled'
    DISABLED = 'disabled'

ScannerNotification

Warning

Removed in pyDE1 v2.0 see ScanResults

ScanResults

Starting with pyDE1 v2.0, accumulated scan results are provided during the scan, as well as an indication if the scan has completed. Scanning is done by role, such as DE1, scale, or thermometer. Only devices matching the Bluetooth advertisement filter are returned. Updates are provided as new devices are discovered, facilitating dynamic updating of a picker widget.

In response to curl -X PUT --data 'thermometer' http://localhost:1234/scan

{"arrival_time": 1675618617.1314912, "create_time": 1675618617.1314912,
    "role": "thermometer", "scanning": true,
    "devices": [],
    "version": "1.0.0", "event_time": 1675618617.1315718,
    "sender": "BluetoothScanner", "class": "ScanResults"}

{"arrival_time": 1675618617.3791888, "create_time": 1675618617.3791888,
    "role": "thermometer", "scanning": true,
    "devices": [{"address": "00:A0:50:AA:BB:CC", "name": "BlueDOT", "rssi": -72}],
    "version": "1.0.0", "event_time": 1675618617.3816643,
    "sender": "BluetoothScanner", "class": "ScanResults"}

{"arrival_time": 1675618617.8187141, "create_time": 1675618617.8187141,
    "role": "thermometer", "scanning": true,
    "devices": [{"address": "00:A0:50:AA:BB:CC", "name": "BlueDOT", "rssi": -72}],
    "version": "1.0.0", "event_time": 1675618617.8210998,
    "sender": "BluetoothScanner", "class": "ScanResults"}

{"arrival_time": 1675618622.35119, "create_time": 1675618622.35119,
    "role": "thermometer", "scanning": false,
    "devices": [{"address": "00:A0:50:AA:BB:CC", "name": "BlueDOT", "rssi": -72}],
    "version": "1.0.0", "event_time": 1675618622.3513,
    "sender": "BluetoothScanner", "class": "ScanResults"}

If multiple devices had been found, they would have been added to the array of devices. "scanning": false indicates that the scan has completed

class DeviceRole (enum.Enum):
    DE1 = 'de1'
    SCALE = 'scale'
    THERMOMETER = 'thermometer'
    OTHER = 'other'
    UNKNOWN = 'unknown'

ConnectivityChangeNotification

Deprecated since version v2.0: Use DeviceAvailability

As connectivity to a DE1 or scale progresses through various states, it is reported so that an app can take action when the device is “ready”, as well as change state if connectivity has degraded or been lost. (The pyDE1 core will try to reconnect, without intervention, on an unexpected disconnection.)

class ConnectivityState (enum.Enum):
    UNKNOWN = 'unknown'
    CONNECTING = 'connecting'
    CONNECTED = 'connected'
    READY = 'ready'  # "Ready for use"
    NOT_READY = 'not_ready'  # Was READY, but is no longer
    DISCONNECTING = 'disconnecting'
    DISCONNECTED = 'disconnected'

Not all states are passed through by all paths.

DeviceAvailability

In pyDE1 v2.0, the way that Bluetooth devices are handles was changed to permit a device to be “released” for other uses, then subsequently “captured”. Additionally, scales change class between a generic scale and a device-specific one when captured. Watching the role suggests which device is changing, especially when not associated with a physical device at that moment.

class DeviceAvailabilityState (enum.Enum):
    INITIAL = 'initial'
    UNKNOWN = 'unknown'
    CAPTURING = 'capturing'
    CAPTURED = 'captured'
    READY = 'ready'  # "Ready for use"
    NOT_READY = 'not ready'  # Was READY, but is no longer
    RELEASING = 'releasing'
    RELEASED = 'released'

Not all states are passed through by all paths.

Here is a find/capture sequence that illustrates both the availability states, as well as how the details of the scale change as it moves from a generic to a scale that is ready for use.

{"arrival_time": 1675619181.4547968, "create_time": 1675619181.4813273,
"state": "releasing", "role": "scale",
"id": "", "name": "GenericScale: (unknown)",
"version": "1.1.0", "event_time": 1675619181.4886014,
"sender": "GenericScale", "class": "DeviceAvailability"}

{"arrival_time": 1675619181.5112107, "create_time": 1675619181.523558,
"state": "released", "role": "scale",
"id": "", "name": "GenericScale: (unknown)",
"version": "1.1.0", "event_time": 1675619181.5262911,
"sender": "GenericScale", "class": "DeviceAvailability"}

{"arrival_time": 1675619181.5937052, "create_time": 1675619181.6589596,
"state": "initial", "role": "scale",
"id": "FF:06:AF:AA:BB:CC", "name": "AtomaxSkaleII: (unknown)",
"version": "1.1.0", "event_time": 1675619181.6753747,
"sender": "AtomaxSkaleII", "class": "DeviceAvailability"}

{"arrival_time": 1675619181.632159, "create_time": 1675619181.6696842,
"state": "capturing", "role": "scale",
"id": "FF:06:AF:AA:BB:CC", "name": "AtomaxSkaleII: (unknown)",
"version": "1.1.0", "event_time": 1675619181.6768405,
"sender": "AtomaxSkaleII", "class": "DeviceAvailability"}

{"arrival_time": 1675619183.9847703, "create_time": 1675619184.0039356,
"state": "captured", "role": "scale",
"id": "FF:06:AF:AA:BB:CC", "name": "AtomaxSkaleII: (unknown)",
"version": "1.1.0", "event_time": 1675619184.0151615,
"sender": "AtomaxSkaleII", "class": "DeviceAvailability"}

{"arrival_time": 1675619184.844756, "create_time": 1675619184.8448336,
"state": "ready", "role": "scale",
"id": "FF:06:AF:AA:BB:CC", "name": "AtomaxSkaleII: Skale",
"version": "1.1.0", "event_time": 1675619184.8514688,
"sender": "AtomaxSkaleII", "class": "DeviceAvailability"}

BlueDOTUpdate

Sent when a report is received from a BloeDOT thermometer.

self.temperature: Optional[float] = None
self.high_alarm: Optional[float] = None
self.units: str = "C"
self.alarm_byte: Optional[Union[bytearray, int]] = None
self.name: Optional[str] = None

The units are determined by user setting. The temperatures reported in those units, either C or F.

ScaleButtonPress

Sent when a button is pressed on scales that report such events.

ScaleTareSeen

Sent when a tare requested appears to have been fulfilled. This is usually if the weight is “close enough” to zero.

ScaleChange

In pyDE1 v2.0 and later, the scale can be changed. As different scales may have different capabilities, this provides a packet similar to the DeviceAvailability packet.

FirmwareUpload

Firmware upload to the DE1 is done asynchronously by pyDE1. These updates provide feedback on the progress of the upload with the uploaded and total bytes, along with the state of the upload.

class FirmwareUploadState (enum.Enum):
    STARTING = 'starting'
    UPLOADING = 'uploading'
    COMPLETED = 'completed'
    FAILED = 'failed'
    CANCELED = 'canceled'

Internal-Only Notifications

The following notifications are only used internally. They are not available over MQTT:

  • ScaleWeightUpdate (precursor of WeightAndFlowUpdate)

  • ShotSampleUpdate (precursor of ShotSampleWithVolumesUpdate)