Source code for cts_chamber._cts_chamber

"""
Module for the control of the CTS environmental chamber.
"""

import logging
import re
from typing import Optional, Tuple

import pyvisa
import pyvisa.constants

from ._constants import CTSChamberModel
from ._exceptions import (
    CTSChamberCommandError,
    CTSChamberCommunicationError,
)
from ._ramp_parameters import CTSChamberRampParameters
from ._state import CTSState, CTSStateError

__author__ = "Leandro Lanzieri"
__copyright__ = "Deutsches Elektronen-Synchrotron, DESY"
__license__ = "LGPL-3.0"

_LOGGER = logging.getLogger(__name__)


[docs] class CTSChamber: """ Implements the control of a CTS environmental chamber. Args: serial_device (Optional[str]): The serial device to use for communication, in case RS-232 or USB (via an adapter) is used. Cannot be used together with resource_path. Default is ``None``. resource_path (Optional[str]): The resource path to use for communication. Cannot be used together with serial_device. Default is ``None``. resource_manager (Optional[pyvisa.ResourceManager]): The resource manager to use for communication. If ``None``, a new resource manager will be created. Default is ``None``. ascii_protocol_address (Optional[int]): The ASCII protocol address to use for communication. Default is 0x81. Must be between 0x81 and 0xA0. communication_timeout (Optional[int]): The communication timeout in milliseconds. Default is 1000. communication_retries (Optional[int]): The number of retries for communication. Default is 3. chamber_model (Optional[cts_chamber.CTSChamberModel]): The model of the CTS environmental chamber. If ``None``, the cts_chamber.CTSChamberModel.C_40 model will be used. Default is ``None``. Raises: CTSChamberCommunicationError: If the connection to the device cannot be established. """ DEFAULT_ASCII_PROTOCOL_ADDRESS = 0x81 """Default ASCII protocol address for the device.""" DEFAULT_COMMUNICATION_TIMEOUT = 1000 """Default communication timeout in milliseconds.""" _RS232_BAUDRATE = 19200 """Baud rate for the RS232 communication.""" _RS232_BITS = 8 """Number of bits for the RS232 communication.""" _RS232_STOP_BITS = pyvisa.constants.StopBits.one """Number of stop bits for the RS232 communication.""" _RS232_PARITY = pyvisa.constants.Parity.odd """Parity for the RS232 communication.""" _RS232_FLOW_CONTROL = pyvisa.constants.ControlFlow.none """Flow control for the RS232 communication.""" DEFAULT_CHAMBER_MODEL = CTSChamberModel.C_40 """Default model of the CTS environmental chamber.""" _ANALOG_CHANNEL_TEMPERATURE = 0 """Analog channel for temperature.""" _ANALOG_CHANNEL_HUMIDITY = 1 """Analog channel for humidity.""" _RAMP_CHANNEL_TEMPERATURE = 1 """Ramp channel for temperature.""" _RAMP_CHANNEL_HUMIDITY = 2 """Ramp channel for humidity.""" def __init__( self, serial_device: Optional[str] = None, ascii_protocol_address: Optional[int] = None, resource_path: Optional[str] = None, resource_manager: Optional[pyvisa.ResourceManager] = None, communication_timeout: Optional[int] = None, communication_retries: Optional[int] = 3, chamber_model: Optional[CTSChamberModel] = None, ): self._communication_timeout = communication_timeout or self.DEFAULT_COMMUNICATION_TIMEOUT self._chamber_model = chamber_model or self.DEFAULT_CHAMBER_MODEL self._resource_manager = resource_manager or pyvisa.ResourceManager("@py") assert serial_device is not None or resource_path is not None, ( "Either serial_device or resource_path must be provided, but not both." ) assert not (serial_device is not None and resource_path is not None), ( "Cannot use both serial_device and resource_path at the same time." ) if communication_retries is not None: assert communication_retries >= 0, "communication_retries must be non-negative." self._communication_retries = communication_retries if resource_path is None: resource_path = f"ASRL{serial_device}::INSTR" self._resource_path = resource_path if ascii_protocol_address is None: ascii_protocol_address = self.DEFAULT_ASCII_PROTOCOL_ADDRESS self._ascii_protocol_address = ascii_protocol_address assert 0x81 <= self._ascii_protocol_address <= 0xA0, ( "ASCII protocol address must be between 0x81 and 0xFF" ) try: self._resource: pyvisa.resources.SerialInstrument = ( self._resource_manager.open_resource(self._resource_path) ) except pyvisa.VisaIOError as e: _LOGGER.exception(f"Failed to open resource {self._resource_path}: {e}") raise CTSChamberCommunicationError( f"Could not open resource {self._resource_path}. " "Please check the connection and the resource path." ) self._resource.timeout = self._communication_timeout self._resource.baud_rate = self._RS232_BAUDRATE self._resource.data_bits = self._RS232_BITS self._resource.stop_bits = self._RS232_STOP_BITS self._resource.parity = self._RS232_PARITY self._resource.flow_control = self._RS232_FLOW_CONTROL self._resource.read_termination = chr(0x03) # ETX (End of Text) def _prepare_frame(self, data: str) -> bytes: """ Prepares a frame to be sent to the device. Args: data: The ASCII string data to be sent to the device. Returns: The prepared frame. """ data_bytes = data.encode("ascii") # Convert string to bytes # convert to bytearray data_bytes = bytearray(data_bytes) # set MSB of all data for i in range(len(data_bytes)): data_bytes[i] |= 0x80 # add the ASCII protocol address in the first byte data_bytes.insert(0, self._ascii_protocol_address) # calculate the checksum checksum = 0 for byte in data_bytes: checksum ^= byte # set MSB of the checksum checksum |= 0x80 # build the final frame frame = bytearray([0x02]) + data_bytes + bytearray([checksum, 0x03]) return frame def _send_cmd(self, data: str) -> str: """ Sends bytes to the device and waits for a response. Args: data: The ASCII string data to be sent to the device. Returns: The response from the device as a string. Raises: CTSChamberCommandError: If the command fails or the response is invalid. CTSChamberCommunicationError: If there is an error in communication with the device. """ frame = self._prepare_frame(data) _LOGGER.debug(f"Sending frame: {frame.hex()}") self._resource.write_raw(frame) try: response = self._resource.read_raw() except Exception as e: _LOGGER.exception(f"Error reading from device: {e}") raise CTSChamberCommandError( f"Failed to read response from device: {self._resource_path}. " "Please check the connection and the command." ) response = bytearray(response) _LOGGER.debug(f"Received frame: {response.hex()}") if len(response) < 4: _LOGGER.error("Invalid response length") raise CTSChamberCommandError( f"Invalid response length: {len(response)}. Expected at least 4 bytes." ) if response[0] != 0x02: _LOGGER.error("Response does not start with STX (0x02)") raise CTSChamberCommandError( f"Response does not start with STX (0x02): {response.hex()}" ) if response[-1] != 0x03: _LOGGER.error("Response does not end with ETX (0x03)") raise CTSChamberCommandError( f"Response does not end with ETX (0x03): {response.hex()}" ) # Exclude MSB from all bytes for i in range(len(response)): response[i] &= 0x7F # verify checksum checksum = 0 for byte in response[1:-2]: # Exclude STX and ETX checksum ^= byte if checksum != response[-2]: # Checksum is the second last byte _LOGGER.error("Checksum mismatch in response") raise CTSChamberCommandError( f"Checksum mismatch in response: {response.hex()}" ) return response[2:-2].decode("ascii") # Exclude STX and ETX, and checksum def _send_cmd_with_retries(self, data: str) -> str: """ Sends a command to the device with retries in case of communication errors. Args: data: The ASCII string data to be sent to the device. Returns: The response from the device as a string. Raises: CTSChamberCommandError: If the command fails or the response is invalid. CTSChamberCommunicationError: If there is an error in communication with the device. """ attempts = 0 while True: try: return self._send_cmd(data) except (CTSChamberCommunicationError, CTSChamberCommandError) as e: attempts += 1 if (self._communication_retries is not None and attempts > self._communication_retries): _LOGGER.exception( f"Exceeded maximum retries ({self._communication_retries})" f" for command: {data}", exc_info=e ) raise # re-raise the last exception _LOGGER.warning( f"Communication error on attempt {attempts} for command: {data}. Retrying..." ) def _get_analog_channel(self, channel: int) -> Tuple[float, float]: """ Gets the current value and the set value of an analog channel from the CTS chamber. Args: channel: The channel number to retrieve the values from [0-6]. Returns: A tuple containing the current value and the set value. Raises: CTSChamberCommandError: If the command fails or the response is invalid. CTSChamberCommunicationError: If there is an error in communication with the device. """ assert 0 <= channel <= 6, "Channel must be between 0 and 6" message = f"A{channel}" response = self._send_cmd_with_retries(message) pattern = f"^A{channel}" pattern += r" (?:\d{3}|-\d{2})\.\d (?:\d{3}|-\d{2})\.\d$" if re.match(pattern, response): actual_temperature, set_temperature = response[3:].split() return float(actual_temperature), float(set_temperature) else: _LOGGER.error(f"Response does not match expected format: {response}") raise CTSChamberCommandError( f"Invalid response format: {response}. Expected: 'A{channel} xxx.xxx xxx.xxx'" ) def _set_analog_channel(self, channel: int, value: float) -> None: """ Sets the value of an analog channel on the CTS chamber. Args: channel: The channel number to set the value for [0-6]. value: The value to set for the channel. Raises: CTSChamberCommandError: If the command fails or the response is invalid. CTSChamberCommunicationError: If there is an error in communication with the device. """ assert 0 <= channel <= 6, "Channel must be between 0 and 6" value_str = f"{value:05.1f}" message = f"a{channel} {value_str}" response = self._send_cmd_with_retries(message) if response != "a": _LOGGER.error(f"Response does not match expected format: {response}") raise CTSChamberCommandError( f"Invalid response format: {response}. Expected format: 'a'" ) def _set_digital_channel(self, channel: int, value: bool) -> None: """ Sets the value of a digital channel on the CTS chamber. Args: channel: The channel number to set the value for [1-11]. value: The value to set for the channel (True for ON, False for OFF). Raises: CTSChamberCommandError: If the command fails or the response is invalid. CTSChamberCommunicationError: If there is an error in communication with the device. """ if not (1 <= channel <= 11): raise CTSChamberCommandError("Channel must be between 1 and 11") if channel == 10: channel_str = ":" elif channel == 11: channel_str = ";" else: channel_str = str(channel) value_str = "1" if value else "0" message = f"s{channel_str} {value_str}" response = self._send_cmd_with_retries(message) if not response.startswith("s") or int(response[-1]) != channel: _LOGGER.error(f"Response does not match expected format: {response}") raise CTSChamberCommandError( f"Invalid response format: {response}. Expected format: 's{channel}'" )
[docs] def get_temperature(self) -> Tuple[float, float]: """ Gets the current temperature and set temperature from the CTS chamber. Returns: A tuple containing the current temperature and the set temperature. Raises: CTSChamberCommandError: If the command fails or the response is invalid. CTSChamberCommunicationError: If there is an error in communication with the device. """ return self._get_analog_channel(self._ANALOG_CHANNEL_TEMPERATURE)
[docs] def get_humidity(self) -> Tuple[float, float]: """ Gets the current humidity and set humidity from the CTS chamber. Returns: A tuple containing the current humidity and the set humidity. Raises: CTSChamberCommandError: If the command fails or the response is invalid. CTSChamberCommunicationError: If there is an error in communication with the device. """ return self._get_analog_channel(self._ANALOG_CHANNEL_HUMIDITY)
[docs] def set_temperature(self, value: float) -> None: """ Sets the temperature on the CTS chamber. Args: value: The temperature value to set. Raises: CTSChamberCommandError: If the command fails or the response is invalid. CTSChamberCommunicationError: If there is an error in communication with the device. """ if self._chamber_model == CTSChamberModel.C_40: minimum_temperature = -40.0 elif self._chamber_model == CTSChamberModel.C_65: minimum_temperature = -65.0 elif self._chamber_model == CTSChamberModel.C_70: minimum_temperature = -70.0 else: minimum_temperature = -40.0 if value < minimum_temperature: raise CTSChamberCommandError( f"Temperature must be greater than or equal to {minimum_temperature}" ) if value > 180.0: raise CTSChamberCommandError("Temperature must be less than or equal to 180.0") self._set_analog_channel(self._ANALOG_CHANNEL_TEMPERATURE, value)
[docs] def set_humidity(self, value: float) -> None: """ Sets the humidity on the CTS chamber. Args: value: The humidity value to set. Raises: CTSChamberCommandError: If the command fails or the response is invalid. CTSChamberCommunicationError: If there is an error in communication with the device. """ if value < 0.0 or value > 98.0: raise CTSChamberCommandError("Humidity must be between 0.0 and 98.0") self._set_analog_channel(self._ANALOG_CHANNEL_HUMIDITY, value)
def _set_ramp(self, channel: int, up: bool, rate: Optional[float] = None) -> None: """ Sets the ramp rate for a specific channel (temperature or humidity). Args: channel: The channel number to set the ramp rate for. Either _RAMP_CHANNEL_TEMPERATURE or _RAMP_CHANNEL_HUMIDITY. up: True if setting the ramp rate for increasing values, False for decreasing values. rate: The ramp rate in K/min. If None, the maximum ramp rate will be used. Raises: CTSChamberCommandError: If the command fails or the response is invalid. CTSChamberCommunicationError: If there is an error in communication with the device. """ if channel not in (self._RAMP_CHANNEL_TEMPERATURE, self._RAMP_CHANNEL_HUMIDITY): raise CTSChamberCommandError("Invalid channel for ramp setting.") if rate is None: rate = 999.9 message = "u" if up else "d" message += f"{channel} " message += f"{rate:05.1f}" response = self._send_cmd_with_retries(message) pattern = r"u" if up else r"d" if not re.match(pattern, response): raise CTSChamberCommandError(f"Invalid response format: {response}.")
[docs] def ramp_to_temperature( self, target: float, ramp_up_rate: Optional[float] = None, ramp_down_rate: Optional[float] = None ) -> None: """ Ramps up or down to a target temperature on the CTS chamber using the specified ramp rates If a rate is None, the chamber will use the maximum ramp rate. Args: target: The target temperature to ramp to in degrees Celsius. ramp_up_rate: The rate (K/min) for ramping up (target > current). ramp_down_rate: The rate (K/min) for ramping down (target < current). Raises: CTSChamberCommandError: If the command fails or the response is invalid. CTSChamberCommunicationError: If there is an error in communication with the device. """ # first, set the ramp rates self._set_ramp(channel=self._RAMP_CHANNEL_TEMPERATURE, up=True, rate=ramp_up_rate) self._set_ramp(channel=self._RAMP_CHANNEL_TEMPERATURE, up=False, rate=ramp_down_rate) # then, set the target temperature self._set_analog_channel(self._ANALOG_CHANNEL_TEMPERATURE, target)
[docs] def ramp_to_humidity( self, target: float, ramp_up_rate: Optional[float] = None, ramp_down_rate: Optional[float] = None ) -> None: # first, set the ramp rates self._set_ramp(channel=self._RAMP_CHANNEL_HUMIDITY, up=True, rate=ramp_up_rate) self._set_ramp(channel=self._RAMP_CHANNEL_HUMIDITY, up=False, rate=ramp_down_rate) # then, set the target humidity self._set_analog_channel(self._ANALOG_CHANNEL_HUMIDITY, target)
def _get_ramp_information(self, channel: int) -> CTSChamberRampParameters: """ Gets the ramp information for the specified channel. Args: channel: The channel number to retrieve the ramp information. Either _RAMP_CHANNEL_TEMPERATURE or _RAMP_CHANNEL_HUMIDITY. Raises: CTSChamberCommandError: If the command fails or the response is invalid. CTSChamberCommunicationError: If there is an error in communication with the device. """ if channel not in (self._RAMP_CHANNEL_TEMPERATURE, self._RAMP_CHANNEL_HUMIDITY): raise CTSChamberCommandError("Invalid channel for ramp information.") message = f"R{channel}" response = self._send_cmd_with_retries(message) pattern = ( rf"^R{channel} " r"(?P<ramp_active>[01])" r"(?P<ramp_running>[01]) " r"(?P<ramp_rate_up>(?:\d{4}|-\d{2})\.\d{2}) " r"(?P<ramp_rate_down>(?:\d{4}|-\d{2})\.\d{2}) " r"(?P<ramp_target>(?:\d{4}|-\d{2})\.\d{2})$" ) if match := re.match(pattern, response): return CTSChamberRampParameters( ramp_active=bool(int(match.group("ramp_active"))), ramp_running=bool(int(match.group("ramp_running"))), ramp_rate_up=float(match.group("ramp_rate_up")), ramp_rate_down=float(match.group("ramp_rate_down")), ramp_target=float(match.group("ramp_target")) ) raise CTSChamberCommandError(f"Invalid response format: {response}.")
[docs] def get_temperature_ramp_information(self) -> CTSChamberRampParameters: """ Gets information about the ongoing temperature ramp operation. Raises: CTSChamberCommandError: If the command fails or the response is invalid. CTSChamberCommunicationError: If there is an error in communication with the device. """ return self._get_ramp_information(self._RAMP_CHANNEL_TEMPERATURE)
[docs] def get_humidity_ramp_information(self) -> CTSChamberRampParameters: """ Gets information about the ongoing humidity ramp operation. Raises: CTSChamberCommandError: If the command fails or the response is invalid. CTSChamberCommunicationError: If there is an error in communication with the device. """ return self._get_ramp_information(self._RAMP_CHANNEL_HUMIDITY)
[docs] def get_state(self) -> CTSState: """ Gets the current state of the CTS chamber. Returns: An instance of CTSState containing the current state of the chamber. Raises: CTSChamberCommandError: If the command fails or the response is invalid. CTSChamberCommunicationError: If there is an error in communication with the device. """ message = "S" response = self._send_cmd_with_retries(message) pattern = ( r"^S" r"(?P<running>[01])" r"(?P<error>[01])" r"(?P<not_paused>[01])" r"(?P<humidity_on>[01])" r"(?P<dew_gt_7>[01])" r"(?P<dew_lt_7>[01])" r"(?P<deep_humidity>[01])" r"(?P<reg_supply_air>[01])" r"(?P<error_number>.)" ) if match := re.match(pattern, response): try: error_number = match.group("error_number") error_number = CTSStateError.from_value(int(error_number)) except ValueError: _LOGGER.error(f"Invalid error number in response: {response}") raise CTSChamberCommandError( f"Invalid error number in response: {response}." ) state = CTSState( running=bool(int(match.group("running"))), error=bool(int(match.group("error"))), paused=not bool(int(match.group("not_paused"))), humidity_on=bool(int(match.group("humidity_on"))), dew_point_above_seven=bool(int(match.group("dew_gt_7"))), dew_point_below_seven=bool(int(match.group("dew_lt_7"))), deep_dehumidity_on=bool(int(match.group("deep_humidity"))), reg_suply_air=bool(int(match.group("reg_supply_air"))), error_number=error_number ) return state else: _LOGGER.error(f"Response does not match expected format: {response}") raise CTSChamberCommandError( f"Invalid response format: {response}." )
[docs] def start(self) -> None: """ Starts the CTS chamber. Raises: CTSChamberCommandError: If the command fails or the response is invalid. CTSChamberCommunicationError: If there is an error in communication with the device. """ self._set_digital_channel(1, True) # Start the chamber
[docs] def stop(self) -> None: """ Stops the CTS chamber. Raises: CTSChamberCommandError: If the command fails or the response is invalid. CTSChamberCommunicationError: If there is an error in communication with the device. """ self._set_digital_channel(1, False)
[docs] def pause(self) -> None: """ Pauses the CTS chamber. Raises: CTSChamberCommandError: If the command fails or the response is invalid. CTSChamberCommunicationError: If there is an error in communication with the device. """ self._set_digital_channel(3, False) # Pause the chamber
[docs] def resume(self) -> None: """ Resumes the CTS chamber from a paused state. Raises: CTSChamberCommandError: If the command fails or the response is invalid. CTSChamberCommunicationError: If there is an error in communication with the device. """ self._set_digital_channel(3, True)
[docs] def collect_errors(self) -> None: """ Collects errors from the CTS chamber. Errors are available in the state after calling this method. Raises: CTSChamberCommandError: If the command fails or the response is invalid. CTSChamberCommunicationError: If there is an error in communication with the device. """ self._set_digital_channel(2, True)
[docs] def close(self): """ Closes the connection to the device. """ try: self._resource.close() except Exception as e: _LOGGER.exception(f"Error closing resource: {e}")
[docs] def send_command(self, command: str) -> str: """ Sends a command to the CTS chamber and returns the response. The command should be an ASCII string, which will be encoded to bytes before sending. Args: command: The command to send to the chamber. Returns: The response from the chamber. Raises: CTSChamberCommandError: If the command fails or the response is invalid. CTSChamberCommunicationError: If there is an error in communication with the device. """ return self._send_cmd_with_retries(command)
def __del__(self): self.close()