API¶
System¶
- class icostate.ICOsystem(*arguments, **keyword_arguments)¶
Stateful access to ICOtronic system
- Parameters:
*arguments – Positional arguments (handled by pyee)
**keyword_arguments – Keyword arguments (handled by pyee)
- async collect_sensor_nodes()¶
Get available sensor nodes
This coroutine collects sensor node information until either
no new sensor node was found or
until the given timeout, if no sensor node was found.
- Return type:
list[SensorNodeInfo]- Returns:
A list containing information about the available sensor nodes
- Raises:
NoResponseError – If there was no response to an request made by this coroutine
Examples
Import necessary code
>>> from asyncio import run
Collect sensor nodes
>>> async def collect_sensor_nodes(icosystem: ICOsystem ... ) -> list[SensorNodeInfo]: ... await icosystem.connect_stu() ... nodes = await icosystem.collect_sensor_nodes() ... await icosystem.disconnect_stu() ... return nodes >>> sensor_nodes = run(collect_sensor_nodes(ICOsystem())) >>> # We assume that at least one sensor node is available >>> len(sensor_nodes) >= 1 True
- async connect_sensor_node_mac(mac_address)¶
Connect to the node with the specified MAC address
- Parameters:
mac_address (
str) – The MAC address of the sensor node- Raises:
ValueError – If the specified MAC address is not valid
NoResponseError – If there was no response to an request made by this coroutine
- Return type:
None
Examples
Import necessary code
>>> from asyncio import run
Connect to and disconnect from sensor node
>>> async def connect_sensor_node(icosystem: ICOsystem, ... mac_address: str): ... await icosystem.connect_stu() ... await icosystem.connect_sensor_node_mac(mac_address) ... print(icosystem.state) ... await icosystem.disconnect_sensor_node() ... print(icosystem.state) ... await icosystem.disconnect_stu() >>> mac_address = ( ... "08-6B-D7-01-DE-81") # Change to MAC address of your node >>> run(connect_sensor_node(ICOsystem(), mac_address)) Sensor Node Connected STU Connected
Try to connect using an invalid MAC address
>>> async def try_connect_incorrect_mac(icosystem: ICOsystem): ... try: ... await icosystem.connect_stu() ... await icosystem.connect_sensor_node_mac( ... "Not a MAC address") ... finally: ... await icosystem.disconnect_stu()
>>> run(try_connect_incorrect_mac(ICOsystem())) Traceback (most recent call last): ... ValueError: “Not a MAC address” is not a valid MAC address:...
- async connect_stu()¶
Connect to STU
- Raises:
NoResponseError – If there was no response to an request made by this coroutine
- Return type:
None
Examples
Import necessary code
>>> from asyncio import run
Connect and disconnect from STU
>>> async def connect_disconnect_stu(icosystem: ICOsystem): ... print(f"Before connection to STU: {icosystem.state}") ... await icosystem.connect_stu() ... print(f"After connection to STU: {icosystem.state}") ... await icosystem.disconnect_stu() ... print(f"After disconnection fro STU: {icosystem.state}") >>> run(connect_disconnect_stu(ICOsystem())) Before connection to STU: Disconnected After connection to STU: STU Connected After disconnection fro STU: Disconnected
- async disconnect_sensor_node()¶
Disconnect from current sensor node
- Raises:
NoResponseError – If there was no response to an request made by this coroutine
- Return type:
None
- async disconnect_stu()¶
Disconnect from STU
- Raises:
NoResponseError – If there was no response to an request made by this coroutine
- Return type:
None
- async get_adc_configuration(mac_address=None)¶
Read the ADC configuration of a sensor node
Depending on the state the system is in this coroutine will either:
connect to the sensor device with the given MAC address, if there is no connection yet and disconnect afterwards or
just use the current connection and get the ADC configuration of the current sensor device. In this case the given MAC address will be ignored!
- Parameters:
mac_address (
str|None) – The MAC address of the sensor device for which we want to retrieve the ADC configuration- Raises:
NoResponseError – If there was no response to an request made by this coroutine
ValueError – If you call this method without specifying the MAC address while the system is not connected to a sensor node
- Return type:
ADCConfiguration- Returns:
The ADC configuration of the sensor node
Examples
Import necessary code
>>> from asyncio import run
Read the ADC configuration of a disconnected sensor node
>>> async def get_adc_configuration(icosystem: ICOsystem, ... mac_address: str ... ) -> ADCConfiguration: ... await icosystem.connect_stu() ... print(f"Before reading ADC config: {icosystem.state}") ... adc_config = (await ... icosystem.get_adc_configuration(mac_address)) ... print(f"After reading ADC config: {icosystem.state}") ... await icosystem.disconnect_stu() ... return adc_config >>> mac_address = ( ... "08-6B-D7-01-DE-81") # Change to MAC address of your node >>> config = run(get_adc_configuration(ICOsystem(), mac_address)) Before reading ADC config: STU Connected After reading ADC config: STU Connected >>> isinstance(config.prescaler, int) True >>> isinstance(config.acquisition_time, int) True >>> isinstance(config.oversampling_rate, int) True >>> isinstance(config.reference_voltage, float) True >>> 1 <= config.prescaler <= 127 True
Read the ADC configuration of a connected sensor node
>>> async def get_adc_configuration(icosystem: ICOsystem, ... mac_address: str ... ) -> ADCConfiguration: ... await icosystem.connect_stu() ... await icosystem.connect_sensor_node_mac(mac_address) ... adc_config = await icosystem.get_adc_configuration() ... await icosystem.disconnect_sensor_node() ... await icosystem.disconnect_stu() ... return adc_config >>> mac_address = ( ... "08-6B-D7-01-DE-81") # Change to MAC address of your node >>> config = run(get_adc_configuration(ICOsystem(), mac_address)) >>> isinstance(config, ADCConfiguration) True
- async is_sensor_node_connected()¶
Check if the STU is connected to a sensor node
- Return type:
bool- Returns:
True, if the STU is connected to a sensor nodeFalse, otherwise
Examples
Import necessary code
>>> from asyncio import run
Check if a sensor node is connected
>>> async def connect_sensor_node(icosystem: ICOsystem, ... mac_address: str): ... print("Before connection to STU:", ... await icosystem.is_sensor_node_connected()) ... await icosystem.connect_stu() ... print("After connection to STU:", ... await icosystem.is_sensor_node_connected()) ... await icosystem.connect_sensor_node_mac(mac_address) ... print("After connection to sensor node:", ... await icosystem.is_sensor_node_connected()) ... await icosystem.disconnect_sensor_node() ... print("After disconnection from sensor node:", ... await icosystem.is_sensor_node_connected()) ... await icosystem.disconnect_stu() >>> mac_address = ( ... "08-6B-D7-01-DE-81") # Change to MAC address of your node >>> run(connect_sensor_node(ICOsystem(), mac_address)) Before connection to STU: False After connection to STU: False After connection to sensor node: True After disconnection from sensor node: False
- async rename(new_name, mac_address=None)¶
Set the name of the sensor node with the specified MAC address
Depending on the state the system is in this coroutine will either:
connect to the sensor device with the given MAC address, if there is no connection yet and disconnect afterwards or
just use the current connection and rename the current sensor device. In this case the given MAC address will be ignored!
- Parameters:
new_name (
str) – The new name of the sensor devicemac_address (
str|None) – The MAC address of the sensor device that should be renamed
- Raises:
NoResponseError – If there was no response to an request made by this coroutine
ValueError – If you call this method without specifying the MAC address while the system is not connected to a sensor node
Examples
Import necessary code :rtype:
None>>> from asyncio import run
Rename a disconnected sensor node
>>> async def rename_disconnected(icosystem: ICOsystem, ... mac_address: str, ... name: str): ... await icosystem.connect_stu() ... print(f"Before renaming: {icosystem.state}") ... await icosystem.rename(name, mac_address) ... print(f"After renaming: {icosystem.state}") ... await icosystem.disconnect_stu() >>> mac_address = ( ... "08-6B-D7-01-DE-81") # Change to MAC address of your node >>> name = "Test-STH" >>> run(rename_disconnected(ICOsystem(), mac_address, name)) Before renaming: STU Connected After renaming: STU Connected
Rename a connected sensor node
>>> async def rename_connected(icosystem: ICOsystem, ... mac_address: str, ... name: str): ... await icosystem.connect_stu() ... await icosystem.connect_sensor_node_mac(mac_address) ... print(f"Before renaming: {icosystem.state}") ... await icosystem.rename(name, None) ... print(f"After renaming: {icosystem.state}") ... await icosystem.disconnect_sensor_node() ... await icosystem.disconnect_stu() >>> mac_address = ( ... "08-6B-D7-01-DE-81") # Change to MAC address of your node >>> name = "Test-STH" >>> run(rename_connected(ICOsystem(), mac_address, name)) Before renaming: Sensor Node Connected After renaming: Sensor Node Connected
- async reset_stu()¶
Reset STU
- Raises:
NoResponseError – If there was no response to an request made by this coroutine
- Return type:
None
Examples
Import necessary code
>>> from asyncio import run
Reset a connected STU
>>> async def reset_stu(icosystem: ICOsystem): ... await icosystem.connect_stu() ... await icosystem.reset_stu() ... await icosystem.disconnect_stu() >>> run(reset_stu(ICOsystem()))
Resetting the STU will not work if the STU is not connected
>>> async def reset_stu_without_connection(icosystem: ICOsystem): ... await icosystem.reset_stu() >>> run(reset_stu_without_connection( ... ICOsystem())) Traceback (most recent call last): ... icostate.error.IncorrectStateError: Resetting STU only allowed in the state: STU Connected
- sensor_node_attributes: SensorNodeAttributes | None¶
Information about currently connected sensor node
- async set_adc_configuration(adc_configuration, mac_address=None)¶
Change the ADC configuration of a sensor node
Depending on the state the system is in this coroutine will either:
connect to the sensor device with the given MAC address, if there is no connection yet and disconnect afterwards or
just use the current connection and change the ADC configuration of the current sensor device. In this case the given MAC address will be ignored!
- Parameters:
mac_address (
str|None) – The MAC address of the sensor device for which we want to change the ADC configuration- Raises:
NoResponseError – If there was no response to an request made by this coroutine
ValueError – If you call this method without specifying the MAC address while the system is not connected to a sensor node
Examples
Import necessary code :rtype:
None>>> from asyncio import run
Set the ADC configuration of a disconnected sensor node
>>> async def set_adc_configuration(icosystem: ICOsystem, ... adc_config: ADCConfiguration, ... mac_address: str): ... await icosystem.connect_stu() ... print(f"Before setting ADC config: {icosystem.state}") ... adc_config = (await ... icosystem.set_adc_configuration(adc_config, ... mac_address)) ... print(f"After setting ADC config: {icosystem.state}") ... await icosystem.disconnect_stu() >>> mac_address = ( ... "08-6B-D7-01-DE-81") # Change to MAC address of your node >>> config = ADCConfiguration(prescaler=2, ... acquisition_time=8, ... oversampling_rate=64) >>> config = run(set_adc_configuration(ICOsystem(), config, ... mac_address)) Before setting ADC config: STU Connected After setting ADC config: STU Connected
Set the ADC configuration of a connected sensor node
>>> async def set_adc_configuration(icosystem: ICOsystem, ... adc_config: ADCConfiguration, ... mac_address: str): ... await icosystem.connect_stu() ... await icosystem.connect_sensor_node_mac(mac_address) ... print(f"Before setting ADC config: {icosystem.state}") ... adc_config = (await ... icosystem.set_adc_configuration(adc_config)) ... print(f"After setting ADC config: {icosystem.state}") ... await icosystem.disconnect_sensor_node() ... await icosystem.disconnect_stu() >>> mac_address = ( ... "08-6B-D7-01-DE-81") # Change to MAC address of your node >>> config = ADCConfiguration(prescaler=2, ... acquisition_time=8, ... oversampling_rate=64) >>> config = run(set_adc_configuration(ICOsystem(), config, ... mac_address)) Before setting ADC config: Sensor Node Connected After setting ADC config: Sensor Node Connected
- async start_measurement(configuration, update_rate=60)¶
Start Measurement
- Parameters:
configuration (
StreamingConfiguration) – The streaming configuration that should be used for the measurementupdate_rate (
float) – The measurement update rate in Hz, i.e. how many times in a secondicosystememits thesensor_node_measurement_dataevent.
- Return type:
None
- async stop_measurement()¶
Stop measurement
- Return type:
None
Measurement¶
- class icostate.StreamingConfiguration(first=True, second=False, third=False)¶
Streaming configuration
- Parameters:
first (
bool) – Specifies if the first channel is enabled or notsecond (
bool) – Specifies if the second channel is enabled or notthird (
bool) – Specifies if the third channel is enabled or not
- Raises:
ValueError – if none of the channels is active
Examples
Create some example streaming configurations
>>> config = StreamingConfiguration() >>> config = StreamingConfiguration( ... first=False, second=True, third=True)
Creating streaming configurations without active channels will fail
>>> config = StreamingConfiguration(first=False) Traceback (most recent call last): ... ValueError: At least one channel needs to be active
>>> config = StreamingConfiguration( ... first=False, second=False, third=False) Traceback (most recent call last): ... ValueError: At least one channel needs to be active
- axes()¶
Get the activated axes returned by this streaming configuration
- Return type:
list[str]- Returns:
A list containing all activated axes in alphabetical order
Examples
Get the activated axes for example streaming configurations
>>> StreamingConfiguration( ... first=False, second=True, third=True).axes() ['y', 'z'] >>> StreamingConfiguration( ... first=True, second=True, third=False).axes() ['x', 'y']
- data_length()¶
Returns the streaming data length
This will be either:
2 (when 2 channels are active), or
3 (when 1 or 3 channels are active)
For more information, please take a look here.
- Return type:
int- Returns:
The length of the streaming data resulting from this channel configuration
Examples
Get the data length of example streaming configurations
>>> StreamingConfiguration().data_length() 3
>>> StreamingConfiguration( ... first=False, second=True, third=False).data_length() 3
>>> StreamingConfiguration( ... first=True, second=True, third=True).data_length() 3
>>> StreamingConfiguration( ... first=False, second=True, third=True).data_length() 2
- enabled_channels()¶
Get the number of activated channels
- Return type:
int- Returns:
The number of enabled channels
Examples
Get the number of enabled channels for example streaming configs
>>> StreamingConfiguration(first=True).enabled_channels() 1
>>> StreamingConfiguration(first=False, second=True, third=False ... ).enabled_channels() 1
>>> StreamingConfiguration(first=True, second=True, third=True ... ).enabled_channels() 3
- property first: bool¶
Check the activation state of the first channel
Returns:
True, if the first channel is enabled orFalseotherwiseExamples
Check channel one activation status for example configs
>>> StreamingConfiguration(first=True, second=False, ... third=False).first True >>> StreamingConfiguration(first=False, second=False, ... third=True).first False
- property second: bool¶
Check the activation state of the second channel
- Returns:
True, if the second channel is enabled orFalseotherwise
Examples
Check channel two activation status for example configs
>>> StreamingConfiguration( ... first=True, second=False, third=False).second False >>> StreamingConfiguration( ... first=False, second=True, third=True).second True
- property third: bool¶
Check the activation state of the third channel
Returns:
True, if the third channel is enabled orFalseotherwiseExamples
Check channel three activation status for example configs
>>> StreamingConfiguration( ... first=True, second=False, third=False).third False >>> StreamingConfiguration( ... first=False, second=False, third=True).third True
- class icostate.MeasurementData(configuration)¶
Measurement data
- Parameters:
configuration (
StreamingConfiguration) – The streaming configuration that was is used to collect the measurement data
- append(data)¶
Append some streaming data to the measurement
- Parameters:
data (
StreamingData) – The streaming data that should be added to the measurement- Return type:
None
Examples
Append some streaming data to a measurement
>>> config = StreamingConfiguration(first=True, second=True, ... third=True) >>> data = MeasurementData(config) >>> data Channel 1 enabled, Channel 2 enabled, Channel 3 enabled
>>> s1 = StreamingData(values=[4, 5, 3], counter=15, ... timestamp=1756197008.776551) >>> data.append(s1) >>> data Channel 1 enabled, Channel 2 enabled, Channel 3 enabled [4, 5, 3]@1756197008.776551 #15
- dataloss()¶
Get measurement dataloss based on message counters
- Return type:
float- Returns:
The overall amount of dataloss as number between 0 (no data loss) and 1 (all data lost).
- extend(data)¶
Extend this measurement data with some other measurement data
- Parameters:
data (
MeasurementData) – The measurement data that should be added to this measurement- Return type:
None
Examples
Extend measurement data with other measurement data
>>> config = StreamingConfiguration(first=True, second=False, ... third=True) >>> data1 = MeasurementData(config) >>> s1 = StreamingData(values=[1, 2], counter=255, ... timestamp=1756125747.528234) >>> s2 = StreamingData(values=[3, 4], counter=0, ... timestamp=1756125747.528237) >>> data1.append(s1) >>> data1.append(s2) >>> data1 Channel 1 enabled, Channel 2 disabled, Channel 3 enabled [1, 2]@1756125747.528234 #255 [3, 4]@1756125747.528237 #0
>>> data2 = MeasurementData(config) >>> s3 = StreamingData(values=[10, 20], counter=1, ... timestamp=1756125747.678912) >>> data2.append(s3) >>> data2 Channel 1 enabled, Channel 2 disabled, Channel 3 enabled [10, 20]@1756125747.678912 #1
>>> data1.extend(data2) >>> data1 Channel 1 enabled, Channel 2 disabled, Channel 3 enabled [1, 2]@1756125747.528234 #255 [3, 4]@1756125747.528237 #0 [10, 20]@1756125747.678912 #1
- first()¶
Get all data of the first measurement channel
- Return type:
ChannelData- Returns:
Data values for the first measurement channel
Examples
Get first channel data of measurement with two enabled channels
>>> config = StreamingConfiguration(first=True, second=True, ... third=False) >>> data = MeasurementData(config) >>> s1 = StreamingData(values=[1, 2], counter=255, ... timestamp=1756125747.528234) >>> s2 = StreamingData(values=[3, 4], counter=0, ... timestamp=1756125747.528237) >>> data.append(s1) >>> data.append(s2) >>> data.first() 1@1756125747.528234 #255 3@1756125747.528237 #0 >>> data.second() 2@1756125747.528234 #255 4@1756125747.528237 #0 >>> data.third()
Get first channel data of measurement with one enabled channel
>>> config = StreamingConfiguration(first=True, second=False, ... third=False) >>> data = MeasurementData(config) >>> s1 = StreamingData(values=[1, 2, 3], counter=10, ... timestamp=1756126628.820695) >>> s2 = StreamingData(values=[4, 5, 6], counter=20, ... timestamp=1756126628.8207) >>> data.append(s1) >>> data.append(s2) >>> data.first() 1@1756126628.820695 #10 2@1756126628.820695 #10 3@1756126628.820695 #10 4@1756126628.8207 #20 5@1756126628.8207 #20 6@1756126628.8207 #20 >>> data.second() >>> data.third()
- second()¶
Get all data of the second measurement channel
- Return type:
ChannelData- Returns:
Data values for the second measurement channel
Examples
Get second channel data of measurement with two enabled channels
>>> config = StreamingConfiguration(first=False, second=True, ... third=True) >>> data = MeasurementData(config) >>> s1 = StreamingData(values=[1, 2], counter=255, ... timestamp=1756125747.528234) >>> s2 = StreamingData(values=[3, 4], counter=0, ... timestamp=1756125747.528237) >>> data.append(s1) >>> data.append(s2) >>> data.first() >>> data.second() 1@1756125747.528234 #255 3@1756125747.528237 #0 >>> data.third() 2@1756125747.528234 #255 4@1756125747.528237 #0
Get second channel data of measurement with one enabled channel
>>> config = StreamingConfiguration(first=False, second=True, ... third=False) >>> data = MeasurementData(config) >>> s1 = StreamingData(values=[1, 2, 3], counter=10, ... timestamp=1756126628.820695) >>> s2 = StreamingData(values=[4, 5, 6], counter=20, ... timestamp=1756126628.8207) >>> data.append(s1) >>> data.append(s2) >>> data.first() >>> data.second() 1@1756126628.820695 #10 2@1756126628.820695 #10 3@1756126628.820695 #10 4@1756126628.8207 #20 5@1756126628.8207 #20 6@1756126628.8207 #20 >>> data.third()
- third()¶
Get all data of the third measurement channel
- Return type:
ChannelData- Returns:
Data values for the third measurement channel
Examples
Get third channel data of measurement with two enabled channels
>>> config = StreamingConfiguration(first=True, second=False, ... third=True) >>> data = MeasurementData(config) >>> s1 = StreamingData(values=[1, 2], counter=255, ... timestamp=1756125747.528234) >>> s2 = StreamingData(values=[3, 4], counter=0, ... timestamp=1756125747.528237) >>> data.append(s1) >>> data.append(s2) >>> data.first() 1@1756125747.528234 #255 3@1756125747.528237 #0 >>> data.second() >>> data.third() 2@1756125747.528234 #255 4@1756125747.528237 #0
Get third channel data of measurement with one enabled channel
>>> config = StreamingConfiguration(first=False, second=False, ... third=True) >>> data = MeasurementData(config) >>> s1 = StreamingData(values=[1, 2, 3], counter=10, ... timestamp=1756126628.820695) >>> s2 = StreamingData(values=[4, 5, 6], counter=20, ... timestamp=1756126628.8207) >>> data.append(s1) >>> data.append(s2) >>> data.first() >>> data.second() >>> data.third() 1@1756126628.820695 #10 2@1756126628.820695 #10 3@1756126628.820695 #10 4@1756126628.8207 #20 5@1756126628.8207 #20 6@1756126628.8207 #20