Usage¶
The main class for communication in the stateful API provided by this package is ICOsystem. Depending on the current state of objects of this class you will be able to use different coroutines to interact with the system.
State Diagram¶
stateDiagram-v2
disconnected: Disconnected
stu_connected: STU Connected
sensor_node_connected: Sensor Node Connected
disconnected --> stu_connected: connect_stu
stu_connected --> disconnected: disconnect_stu
stu_connected --> stu_connected: enable_ota, collect_sensor_nodes, rename, reset
stu_connected --> sensor_node_connected: connect_sensor_node_mac
sensor_node_connected --> sensor_node_connected: rename
sensor_node_connected --> stu_connected: disconnect_sensor_node
In addition to coroutines that label the edges of the state diagram above you can also use the coroutine ICOsystem.is_sensor_node_connected(), which works in any state.
STU¶
Connecting to STU¶
Before you work with the ICOtronic system you need to set up the CAN connection to the STU (Stationary Transceiver Unit), which you can do using the coroutine ICOsystem.connect_stu(). After you are done working working with the STU you also need to disconnect the connection using the coroutine ICOsystem.disconnect_stu().
>>> from asyncio import run
>>> from icostate import ICOsystem
>>> async def connect_disconnect_stu(icosystem: ICOsystem):
... await icosystem.connect_stu()
... await icosystem.disconnect_stu()
>>> run(connect_disconnect_stu(ICOsystem()))
Resetting STU¶
In case you have the ICOtronic system does not react as you expect you can reset it using the coroutine ICOsystem.reset_stu().
>>> from asyncio import run
>>> from icostate import ICOsystem
>>> async def reset_stu(icosystem: ICOsystem):
... await icosystem.connect_stu()
... await icosystem.reset_stu()
... await icosystem.disconnect_stu()
>>> run(reset_stu(ICOsystem()))
Finding Available Sensor Nodes¶
To retrieve information about available sensor nodes use the coroutine ICOsystem.collect_sensor_nodes().
>>> from asyncio import run
>>> from netaddr import EUI
>>> from icostate import ICOsystem, SensorNodeInfo
>>> async def get_sensor_nodes(icosystem: ICOsystem) -> list[SensorNodeInfo]:
... await icosystem.connect_stu()
... sensor_nodes = await icosystem.collect_sensor_nodes()
... await icosystem.disconnect_stu()
... return sensor_nodes
>>> sensor_nodes = run(get_sensor_nodes(ICOsystem()))
>>> # We assume that at least one sensor node is available
>>> len(sensor_nodes) >= 1
True
>>> node_info = sensor_nodes[0]
>>> # Each list entry contains information about name, MAC address and RSSI
>>> isinstance(node_info.name, str)
True
>>> len(node_info.name) <= 8
True
>>> isinstance(node_info.mac_address, EUI)
True
>>> -70 < node_info.rssi < 0
True
Sensor Node¶
Connecting to Sensor Node¶
Before you start a measurement you need to connect to a sensor node. To do that use the coroutine ICOsystem.connect_sensor_node_mac(). Please do not forget to disconnect from the node with the coroutine ICOsystem.disconnect_sensor_node() afterwards.
>>> from asyncio import run
>>> from icostate import ICOsystem
>>> async def connect_disconnect_sensor_node(icosystem: ICOsystem,
... mac_address: str):
... await icosystem.connect_stu()
... print(f"Connected: {await icosystem.is_sensor_node_connected()}")
... await icosystem.connect_sensor_node_mac(mac_address)
... print(f"Connected: {await icosystem.is_sensor_node_connected()}")
... await icosystem.disconnect_sensor_node()
... print(f"Connected: {await icosystem.is_sensor_node_connected()}")
... await icosystem.disconnect_stu()
>>> mac_address = (
... "08-6B-D7-01-DE-81") # Change to MAC address of your sensor node
>>> run(connect_disconnect_sensor_node(ICOsystem(), mac_address))
Connected: False
Connected: True
Connected: False
Rename a Sensor Node¶
To rename a sensor node use the coroutine ICOsystem.rename(), which requires the new name of the sensor node as parameter. If the system is currently not connected to a sensor node, then the coroutine also requires the MAC address of the sensor node. After using the coroutine successfully the system will switch back to the state it was in before renaming the sensor node (either “STU Connected” or “Sensor Node Connected”).
>>> from asyncio import run
>>> from icostate import ICOsystem
>>> async def rename_disconnected(icosystem: ICOsystem,
... mac_address: str,
... new_name: str):
... await icosystem.connect_stu()
... print(f"State Before: {icosystem.state!r}")
... await icosystem.rename(new_name, mac_address)
... print(f"State After: {icosystem.state!r}")
... await icosystem.disconnect_stu()
>>> mac_address = (
... "08-6B-D7-01-DE-81") # Change to MAC address of your sensor node
>>> name = "Test-STH"
>>> run(rename_disconnected(ICOsystem(), mac_address, name))
State Before: STU Connected
State After: STU Connected
Events¶
Objects of the ICOsystem provides an event based API (based on [pyee](https://pyee.readthedocs.io)) you can use to react to changes to the system. Currently the following events are supported:
sensor_node_name: Called when the name of the current sensor node changessensor_node_mac_address: Called when the MAC address of a sensor node changes
The example below shows how you can react to changes of the sensor node name:
>>> from asyncio import sleep, run
>>> from icostate import ICOsystem
>>> async def react_sensor_node_name(icosystem: ICOsystem, mac_address: str):
...
... @icosystem.on("sensor_node_name")
... async def name_changed(name: str):
... print(f"Name of sensor node: {name}")
...
... await icosystem.connect_stu()
... await icosystem.connect_sensor_node_mac(mac_address)
... await sleep(0) # Allow scheduler to trigger event coroutines
... await icosystem.disconnect_sensor_node()
... await icosystem.disconnect_stu()
>>> mac_address = "08-6B-D7-01-DE-81" # Change to MAC address of your
>>> # sensor node
>>> run(react_sensor_node_name(ICOsystem(), mac_address))
Name of sensor node: Test-STH