MQTT API
The MQTT API provides updates of the state of the DE1, scale, and controller. An MQTT broker allows subscribing to the various notifications without putting additional load on the controller.
Versioning
The MQTT API’s JSON payloads are under semantic versioning. At this time, there is not discoverability that the list of topics have changes.
The payload version can be found with the version
key. For example:
{
"arrival_time": 1637014022.035329,
"create_time": 1637014022.0355482,
"state": "Idle",
"substate": "HeatWaterTank",
"previous_state": "Sleep",
"previous_substate": "NoState",
"is_error_state": false,
"version": "1.0.0",
"event_time": 1637014022.0356083,
"sender": "DE1",
"class": "StateUpdate"
}
The non-JSON payloads, such as the human-readable log messages and MQTT will, are not versioned at this time.
Will (Client Status)
The pyDE1 clients all register a will that is sent by the broker to subscribers in the event of a non-graceful disconnect. The code also tries to indicate if the client is present.
For pyDE1, the wills are set up in pyDE1/api/outbound/mqtt
.
The two topics are:
<config.mqtt.TOPIC_ROOT>/status/mqtt/logging
<config.mqtt.TOPIC_ROOT>/status/mqtt/notification
The values presently published are
class MQTTStatusText (enum.Enum):
on_connection = 'Here'
on_graceful_disconnect = 'Gone'
on_will = 'Died'
Logging Feeds
As configured, there are two MQTT feeds of logging data from pyDE1, one with
most fields from the LogRecord
and the other already formatted.
The log level is ERROR, by default. Both the
level and the formatter can be set through the config file.
<config.mqtt.TOPIC_ROOT>/log
<config.mqtt.TOPIC_ROOT>/log/record
For version 1.0.0, the attributes from the LogRecord
in the JSON include
created
,
levelname
,
levelno
,
message
,
name
,
process
,
processName
,
thread
,
threadName
.
Client Synchronization
Starting with pyDE1 v2.0, when changes are made to the pyDE1 controller or a DE1 connects, the resulting state of the impacted area this information is now sent over MQTT to its subscribers.
At this time the areas include the following topics:
update/de1/control
update/de1/setting
update/de1/calibration
update/de1/profile/id
The packets contain a JSON version similar to what a GET of the resource
following the update/
prefix would provide. For example, changing
the stop-at-weight level over the HTTP API results in an MQTT packet like
On update/de1/control
{"espresso": {"stop_at_time": null, "stop_at_volume": null,
"stop_at_weight": 34, "move_on_weight": [],
"disable_auto_tare": false,
"profile_can_override_stop_limits": false,
"profile_can_override_tank_temperature": true,
"first_drops_threshold": 0.0, "last_drops_minimum_time": 3.0},
"steam": {"stop_at_time": 200, "disable_auto_tare": false},
"hot_water": {"stop_at_time": 0, "stop_at_volume": 0,
"stop_at_weight": null, "disable_auto_tare": false,
"temperature": 0},
"hot_water_rinse": {"stop_at_time": 3.0, "disable_auto_tare": false,
"temperature": 92.0, "flow": 6.0},
"tank_water_threshold": {"temperature": 0},
"timestamp": 1675628682.0626736}
Timestamps are available in the MQTT packets as well as in the HTTP response
header x-pyde1-timestamp
to assist in disambiguation of the two sources.
Event Payloads
The majority of MQTT updates come from subclasses of the EventPayload
class,
defined in pyDE1/event_manager/payloads.py
. These will always include:
arrival_time
– When the “trigger” occurredcreate_time
– When the payload was createdsender
– A string indicating the “source” of the message (typically the class name)event_time
– When the event was publishedversion
– A string of the semantic version of the payloads
All times are those returned by time.time()
Those generated by an EventWithNotification
will also
include an action
of either set
or clear
.
Overview of Notifications
This section lists and briefly describes the intent of the main notifications provided by pyde1.
StateUpdate
Contains the state and substate reported by the DE1.
The DE1 reports this only on changes. Current state, if needed, can be queried through the HTTP API, such as for app initialization after contacting to an already connected controller. (The state can’t be queried over Bluetooth from the DE1.)
ShotSampleWithVolumesUpdate
Contains the information reported by the DE1 in the ShotSample packet. This includes various pressures, flow rates, and temperatures.
It is augmented with estimated volumes for preinfuse, flow, and total, as well as an array of by-frame volumes.
Use of de1_time
is preferred. At some time in the future, de1_time
may represent a best-estimate of the DE1’s notion of reporting time,
rather than the packet arrival time.
The DE1 reports this every 25 half-cycles of the AC while not in Sleep. This is 4 per second for 50 Hz and 4.8 per second for 60 Hz. While in Sleep, the rate appears to drop by a factor of three (these and other DE1-generated rates with firmware 1283).
WaterLevelUpdate
Contains the current water level in the tank and the refill level setting of the DE1.
Sent roughly 2.5 times a second when not in Sleep, 1/3 of that during Sleep (on 60 Hz).
WeightAndFlowUpdate
Contains the current weight along with estimates of weight and mass flow and their corresponding times.
Use of the corresponding times is preferred as they incorporate estimation delays.
current_weight
current_weight_time
average_flow
average_flow_time
median_weight
median_weight_time
median_flow
median_flow_time
Time is as would be reported by time.time()
. Weight is as-set on the scale,
typically in grams. Mass-flow is in weight units per second,
typically grams/second.
scale_time
may, in the future, represent a corrected time base for the
scale, rather than just using the packet-arrival time.
Sent at the reporting rate of the scale, often 10 per second.
ScaleTareSeen
Sent after a tare request when the scale reports a value “close enough” to zero, within the timeout to respond to the tare request.
SequencerGateNotification
The FlowSequencer
is responsible for managing and tracking flow during
any of the flow phases, espresso, steam, hot water, and flush (hot water rinse).
It assigns a sequence ID at the start of a sequence, that is used to associate
the various records in the database with each other. There are then several
“gates” that a sequence goes through. All gates are cleared (they are
implemented as Event
objects and adopt that object’s notion of .set()
,
.``.clear()``, and .wait()
) when a DE1 state change indicates a new
sequence beginning. As each gate is passed, it is set. Notifications are
sent over MQTT for both clear and set.
class SequencerGateName (EventNotificationName):
GATE_SEQUENCE_START = "sequence_start"
GATE_FLOW_BEGIN = "sequence_flow_begin"
GATE_EXPECT_DROPS = "sequence_expect_drops"
GATE_EXIT_PREINFUSE = "sequence_exit_preinfuse"
GATE_FLOW_END = "sequence_flow_end"
GATE_FLOW_STATE_EXIT = "sequence_flow_state_exit"
GATE_LAST_DROPS = "sequence_last_drops"
GATE_SEQUENCE_COMPLETE = "sequence_complete"
The sequence_id
is included in all packets, along with the action
of
either clear
or set
.
StopAtNotification
When the FlowSequencer
is managing termination, a StopAtNotification is
sent at termination that includes the stop_at
type (time, volume, weight),
target_value
, current_value
, as well as the active_state
.
An action
is sent to indicate if and when stop-at is active near the start
of a sequence.
class StopAtNotificationAction (enum.Enum):
ENABLED = 'enabled'
TRIGGERED = 'triggered'
DISABLED = 'disabled'
DE1CONTROLLED = 'de1 controlled'
When the stop-at action is controlled by the DE1, no triggered
notification
is sent.
AutoTareNotification
Sent to indicate when auto-tare is enabled and disabled by the FlowSequencer
class AutoTareNotificationAction (enum.Enum):
ENABLED = 'enabled'
DISABLED = 'disabled'
ScannerNotification
Warning
Removed in pyDE1 v2.0 see ScanResults
ScanResults
Starting with pyDE1 v2.0, accumulated scan results are provided during the scan, as well as an indication if the scan has completed. Scanning is done by role, such as DE1, scale, or thermometer. Only devices matching the Bluetooth advertisement filter are returned. Updates are provided as new devices are discovered, facilitating dynamic updating of a picker widget.
In response to curl -X PUT --data 'thermometer' http://localhost:1234/scan
{"arrival_time": 1675618617.1314912, "create_time": 1675618617.1314912,
"role": "thermometer", "scanning": true,
"devices": [],
"version": "1.0.0", "event_time": 1675618617.1315718,
"sender": "BluetoothScanner", "class": "ScanResults"}
{"arrival_time": 1675618617.3791888, "create_time": 1675618617.3791888,
"role": "thermometer", "scanning": true,
"devices": [{"address": "00:A0:50:AA:BB:CC", "name": "BlueDOT", "rssi": -72}],
"version": "1.0.0", "event_time": 1675618617.3816643,
"sender": "BluetoothScanner", "class": "ScanResults"}
{"arrival_time": 1675618617.8187141, "create_time": 1675618617.8187141,
"role": "thermometer", "scanning": true,
"devices": [{"address": "00:A0:50:AA:BB:CC", "name": "BlueDOT", "rssi": -72}],
"version": "1.0.0", "event_time": 1675618617.8210998,
"sender": "BluetoothScanner", "class": "ScanResults"}
{"arrival_time": 1675618622.35119, "create_time": 1675618622.35119,
"role": "thermometer", "scanning": false,
"devices": [{"address": "00:A0:50:AA:BB:CC", "name": "BlueDOT", "rssi": -72}],
"version": "1.0.0", "event_time": 1675618622.3513,
"sender": "BluetoothScanner", "class": "ScanResults"}
If multiple devices had been found, they would have been added to the array
of devices. "scanning": false
indicates that the scan has completed
class DeviceRole (enum.Enum):
DE1 = 'de1'
SCALE = 'scale'
THERMOMETER = 'thermometer'
OTHER = 'other'
UNKNOWN = 'unknown'
ConnectivityChangeNotification
Deprecated since version v2.0: Use DeviceAvailability
As connectivity to a DE1 or scale progresses through various states, it is reported so that an app can take action when the device is “ready”, as well as change state if connectivity has degraded or been lost. (The pyDE1 core will try to reconnect, without intervention, on an unexpected disconnection.)
class ConnectivityState (enum.Enum):
UNKNOWN = 'unknown'
CONNECTING = 'connecting'
CONNECTED = 'connected'
READY = 'ready' # "Ready for use"
NOT_READY = 'not_ready' # Was READY, but is no longer
DISCONNECTING = 'disconnecting'
DISCONNECTED = 'disconnected'
Not all states are passed through by all paths.
DeviceAvailability
In pyDE1 v2.0, the way that Bluetooth devices are handles was changed
to permit a device to be “released” for other uses, then subsequently
“captured”. Additionally, scales change class between a generic scale
and a device-specific one when captured. Watching the role
suggests
which device is changing, especially when not associated with a physical
device at that moment.
class DeviceAvailabilityState (enum.Enum):
INITIAL = 'initial'
UNKNOWN = 'unknown'
CAPTURING = 'capturing'
CAPTURED = 'captured'
READY = 'ready' # "Ready for use"
NOT_READY = 'not ready' # Was READY, but is no longer
RELEASING = 'releasing'
RELEASED = 'released'
Not all states are passed through by all paths.
Here is a find/capture sequence that illustrates both the availability states, as well as how the details of the scale change as it moves from a generic to a scale that is ready for use.
{"arrival_time": 1675619181.4547968, "create_time": 1675619181.4813273,
"state": "releasing", "role": "scale",
"id": "", "name": "GenericScale: (unknown)",
"version": "1.1.0", "event_time": 1675619181.4886014,
"sender": "GenericScale", "class": "DeviceAvailability"}
{"arrival_time": 1675619181.5112107, "create_time": 1675619181.523558,
"state": "released", "role": "scale",
"id": "", "name": "GenericScale: (unknown)",
"version": "1.1.0", "event_time": 1675619181.5262911,
"sender": "GenericScale", "class": "DeviceAvailability"}
{"arrival_time": 1675619181.5937052, "create_time": 1675619181.6589596,
"state": "initial", "role": "scale",
"id": "FF:06:AF:AA:BB:CC", "name": "AtomaxSkaleII: (unknown)",
"version": "1.1.0", "event_time": 1675619181.6753747,
"sender": "AtomaxSkaleII", "class": "DeviceAvailability"}
{"arrival_time": 1675619181.632159, "create_time": 1675619181.6696842,
"state": "capturing", "role": "scale",
"id": "FF:06:AF:AA:BB:CC", "name": "AtomaxSkaleII: (unknown)",
"version": "1.1.0", "event_time": 1675619181.6768405,
"sender": "AtomaxSkaleII", "class": "DeviceAvailability"}
{"arrival_time": 1675619183.9847703, "create_time": 1675619184.0039356,
"state": "captured", "role": "scale",
"id": "FF:06:AF:AA:BB:CC", "name": "AtomaxSkaleII: (unknown)",
"version": "1.1.0", "event_time": 1675619184.0151615,
"sender": "AtomaxSkaleII", "class": "DeviceAvailability"}
{"arrival_time": 1675619184.844756, "create_time": 1675619184.8448336,
"state": "ready", "role": "scale",
"id": "FF:06:AF:AA:BB:CC", "name": "AtomaxSkaleII: Skale",
"version": "1.1.0", "event_time": 1675619184.8514688,
"sender": "AtomaxSkaleII", "class": "DeviceAvailability"}
BlueDOTUpdate
Sent when a report is received from a BloeDOT thermometer.
self.temperature: Optional[float] = None
self.high_alarm: Optional[float] = None
self.units: str = "C"
self.alarm_byte: Optional[Union[bytearray, int]] = None
self.name: Optional[str] = None
The units are determined by user setting. The temperatures reported
in those units, either C
or F
.
ScaleButtonPress
Sent when a button is pressed on scales that report such events.
ScaleTareSeen
Sent when a tare requested appears to have been fulfilled. This is usually if the weight is “close enough” to zero.
ScaleChange
In pyDE1 v2.0 and later, the scale can be changed. As different scales may have different capabilities, this provides a packet similar to the DeviceAvailability packet.
FirmwareUpload
Firmware upload to the DE1 is done asynchronously by pyDE1.
These updates provide feedback on the progress of the upload with
the uploaded
and total
bytes, along with the state
of the upload.
class FirmwareUploadState (enum.Enum):
STARTING = 'starting'
UPLOADING = 'uploading'
COMPLETED = 'completed'
FAILED = 'failed'
CANCELED = 'canceled'
Internal-Only Notifications
The following notifications are only used internally. They are not available over MQTT:
ScaleWeightUpdate (precursor of WeightAndFlowUpdate)
ShotSampleUpdate (precursor of ShotSampleWithVolumesUpdate)