Source code for FreiCtrl_laser.socket_utils

import socket
import ssl
import threading
import json
import time
import select
import logging

from enum import Enum


[docs] class MessageType(Enum): start_daq = 'start_rec' stop_daq = 'stop' start_daq_pulses = 'start_pulses' stop_daq_pulses = 'stop_pulses' start_daq_viewing = 'start_viewing' poll_status = 'status_poll' start_video_rec = 'start_rec' start_video_view = 'start_viewing' stop_video = 'stop' start_video_calibrec = 'start_calibrec' status = 'status' response = 'response' disconnected = 'disconnected' copy_files = 'copy_files' purge_files = 'purge_files'
[docs] class MessageStatus(Enum): ready = 'ready' error = 'error' viewing = 'viewing' recording = 'recording' viewing_ok = 'viewing_ok' recording_ok = 'recording_ok' recording_fail = 'recording_fail' stop_ok = 'stop_ok' pulsing_ok = 'pulsing_ok' calib_ok = 'calib_ok' copy_ok = 'copy_ok' copy_fail = 'copy_fail'
class SocketMessage: status_error = {'type': MessageType.status.value, 'status': MessageStatus.error.value} status_ready = {'type': MessageType.status.value, 'status': MessageStatus.ready.value} status_recording = {'type': MessageType.status.value, 'status': MessageStatus.recording.value} status_viewing = {'type': MessageType.status.value, 'status': MessageStatus.viewing.value} respond_recording = {'type': MessageType.response.value, 'status': MessageStatus.recording_ok.value} respond_recording_fail = {'type': MessageType.response.value, 'status': MessageStatus.recording_fail.value} respond_viewing = {'type': MessageType.response.value, 'status': MessageStatus.viewing_ok.value} respond_stop = {'type': MessageType.response.value, 'status': MessageStatus.stop_ok.value} respond_pulsing = {'type': MessageType.response.value, 'status': MessageStatus.pulsing_ok.value} respond_calib = {'type': MessageType.response.value, 'status': MessageStatus.calib_ok.value} respond_copy = {'type': MessageType.response.value, 'status': MessageStatus.copy_ok.value} respond_copy_fail = {'type': MessageType.response.value, 'status': MessageStatus.copy_fail.value} client_disconnected = {'type': MessageType.disconnected.value} def __init__(self): self._session_path = None self._fps = 30 self._session_id = "test" self._daq_setting_file = '' self._basler_setting_file = '' self._pulse_lag = 0 self.start_daq = {'type': MessageType.start_daq.value, 'session_id': self._session_id, 'setting_file': self._daq_setting_file} self.stop_daq = {'type': MessageType.stop_daq.value} self.start_daq_pulses = {'type': MessageType.start_daq_pulses.value, 'fps': self._fps, 'pulse_lag': self._pulse_lag} self.stop_daq_pulses = {'type': MessageType.stop_daq_pulses.value} self.start_daq_viewing = {'type': MessageType.start_daq_viewing.value, 'session_id': self._session_id, 'setting_file': self._daq_setting_file} self.poll_status = {'type': MessageType.poll_status.value} self.start_video_rec = {'type': MessageType.start_video_rec.value, 'session_id': self._session_id, 'setting_file': self._basler_setting_file, 'frame_rate': self._fps} self.start_video_view = {'type': MessageType.start_video_view.value, 'session_id': self._session_id, 'setting_file': self._basler_setting_file, 'frame_rate': self._fps} self.stop_video = {'type': MessageType.stop_video.value} self.start_video_calibrec = {'type': MessageType.start_video_calibrec.value, 'session_id': 'calibration', 'setting_file': self._basler_setting_file, 'frame_rate': 5} self.copy_files = {'type': MessageType.copy_files.value, 'session_id': self._session_id, 'session_path': self._session_path} self.purge_files = {'type': MessageType.purge_files.value, 'session_id': self._session_id} @property def pulse_lag(self): return self._pulse_lag @pulse_lag.setter def pulse_lag(self, value: int): self._pulse_lag = value self.update_messages() @property def session_id(self): return self._session_id @session_id.setter def session_id(self, value: str): self._session_id = value self.update_messages() @property def session_path(self): return self._session_path @session_path.setter def session_path(self, value: str): self._session_path = value self.update_messages() @property def fps(self): return self._fps @fps.setter def fps(self, value: float): self._fps = value self.update_messages() @property def daq_setting_file(self): return self._daq_setting_file @daq_setting_file.setter def daq_setting_file(self, value: str): self._daq_setting_file = value self.update_messages() @property def basler_setting_file(self): return self._basler_setting_file @basler_setting_file.setter def basler_setting_file(self, value: str): self._basler_setting_file = value self.update_messages() def update_messages(self): self.start_daq.update(**{'session_id': self.session_id, 'setting_file': self.daq_setting_file}) self.start_daq_viewing.update(**{'session_id': self._session_id, 'setting_file': self.daq_setting_file}) self.start_daq_pulses.update(**{'fps': self.fps, 'pulse_lag': self.pulse_lag}) self.start_video_rec.update(**{'session_id': self.session_id, 'setting_file': self.basler_setting_file, 'frame_rate': self.fps}) self.start_video_view.update(**{'session_id': self._session_id, 'setting_file': self.basler_setting_file, 'frame_rate': self.fps}) self.start_video_calibrec.update(**{'session_id': 'calibration', 'setting_file': self.basler_setting_file}) self.copy_files.update(**{'session_id': self.session_id, 'session_path': self._session_path}) self.purge_files.update(**{'session_id': self._session_id})
[docs] class SocketComm: """ Socket communication class """ def __init__(self, type: str = "server", host: str = "localhost", port: int = 8800, use_ssl: bool = False): self.acception_thread = None self.ssl_sock = None self.sock = None self._sock = None self._ssl_sock = None self.type = type self.host = host self.port = port if self.type == "server": self.context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) else: self.context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH) # self.context.set_ciphers('DEFAULT') self.use_ssl = use_ssl # this doesnt work yet get some weird error from ssl module self.connected = False self.stop_event = threading.Event() self.log = logging.getLogger(f"SocketComm_{self.type}") self.log.setLevel(logging.DEBUG) self.message_time = time.monotonic() def create_socket(self): self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) if self.type == 'client': pass elif self.type == 'server': try: self._sock.bind((self.host, self.port)) except OSError: self.log.warning('Adress alrady in use.. need to delete somehow ?') self._sock.listen() if self.use_ssl: self._ssl_sock = self.context.wrap_socket(self._sock, server_side=True, do_handshake_on_connect=False) def accept_connection(self): self.create_socket() while not self.stop_event.is_set(): if time.monotonic() - self.message_time > 5: self.message_time = time.monotonic() self.log.debug('waiting for connection...') if self.use_ssl: ready, _, _ = select.select([self._ssl_sock], [], [], 0.1) if ready: self.ssl_sock, self.addr = self._ssl_sock.accept() self.ssl_sock.settimeout(0.1) self.connected = True self.log.info(f"Connected to {self.addr}") break else: ready, _, _ = select.select([self._sock], [], [], 0.1) if ready: self.sock, self.addr = self._sock.accept() self.sock.settimeout(0.1) self.connected = True self.log.info(f"Connected to {self.addr}") break else: self.log.debug("Stop event set. Stopping thread...") return
[docs] def threaded_accept_connection(self): """ Accepts connection in a separate thread, to not block the main thread """ self.stop_event.clear() self.acception_thread = threading.Thread(target=self.accept_connection) self.acception_thread.start()
[docs] def stop_waiting_for_connection(self): """ sets the stop event, so the thread will stop waiting for a connection """ self.stop_event.set()
[docs] def connect(self) -> bool: """ Connects to the server """ if self.type == 'client': if self.use_ssl: self.ssl_sock = self.context.wrap_socket(self._sock, server_hostname=self.host, do_handshake_on_connect=False) else: self.sock = self._sock self.sock.settimeout(0.1) # otherwise we get issues if nothing is comming self._connect(self.host, self.port) self.connected = True return True else: return False
# raise RuntimeError("Error: Cannot connect on server socket") def close_socket(self): if self.use_ssl: if self.ssl_sock: self.ssl_sock.close() self._ssl_sock.close() if self.sock: self.sock.close() if self._sock: self._sock.close() self.connected = False def read_json_message(self) -> dict: try: message = self._recv_until(b'\n') if message is not None: message = json.loads(message.decode()) else: return message except json.decoder.JSONDecodeError: message = None return message def read_json_message_fast(self) -> dict: try: message = self._recv(1024) if message == -1: return SocketMessage.client_disconnected if message is not None: message = json.loads(message.decode()) else: return message except json.decoder.JSONDecodeError: message = None print('message decoding failed') return message def read_json_message_fast_linebreak(self) -> dict: try: message = self._recv_until(b'\n') if message == -1: return SocketMessage.client_disconnected if message is not None: message = json.loads(message.decode()) except json.decoder.JSONDecodeError: message = None print('message decoding failed') except OSError: message = None print('socket disconnected and deleted') return message def send_json_message(self, message: dict): message = json.dumps(message).encode() message += b'\n' self._send(message) def _connect(self, host, port): if self.use_ssl: self.ssl_sock.connect((host, port)) else: self.sock.connect((host, port)) def _send(self, data): try: if self.use_ssl: self.ssl_sock.sendall(data) else: self.sock.sendall(data) except ConnectionResetError: self.log.error("Connection reset by peer") def _recv(self, size) -> (bytes, int): try: if self.use_ssl: return self.ssl_sock.recv(size) else: return self.sock.recv(size) except socket.timeout: return None except ConnectionResetError: self.log.warning("Client disconnected") return -1 def _recv_until(self, delimiter) -> bytes: data = b'' try: if self.use_ssl: while not data.endswith(delimiter): data += self.ssl_sock.recv(1) else: while not data.endswith(delimiter): data += self.sock.recv(1) except socket.timeout: data = None except ConnectionResetError: self.log.warning("Client disconnected") data = -1 return data def _recv_all(self): data = b'' if self.use_ssl: while True: try: data += self.ssl_sock.recv(1024) except socket.timeout: break else: while True: try: data += self.sock.recv(1024) except socket.timeout: break return data
if __name__ == "__main__": import time import argparse import json """ parser = argparse.ArgumentParser(description='Socket communication test') parser.add_argument('--type', type=str, default='server', help='Socket type: client or server') parser.add_argument('--host', type=str, default='localhost', help='Host IP address') parser.add_argument('--port', type=int, default=8800, help='Port number') parser.add_argument('--use_ssl', type=bool, default=False, help='Use SSL') args = parser.parse_args() sock = SocketComm('server') sock.create_socket() sock.threaded_accept_connection() while not sock.connected: print('no connection established,waiting...') time.sleep(1) try: data = sock.read_json_message() print(data) except Exception as e: print(e) pass sock.close_socket() """ import json sock = SocketComm('client') sock.create_socket() sock.connect() time.sleep(0.5) response = sock.read_json_message_fast() print(response) message = {'type': 'start_run', 'session_id': 'test', 'setting_file': ''} time.sleep(5) sock._send(json.dumps(message).encode()) time.sleep(25) message = {'type': 'stop'} sock._send(json.dumps(message).encode()) sock.close_socket()