Source code for FreiCtrl_laser.host_utils

# this code is a starting point for the host-application communicating with the CircuitPython
import time
from uuid import UUID

import numpy as np
from pathlib import Path
import json
import serial

import logging

logging.basicConfig(level=logging.INFO)

# from Task_classes import Trial
# get inspiration https://courses.ideate.cmu.edu/16-223/f2021/text/code/pico-remote.html

# json_stream = json.dumps(vars(trial))
# new_dict = json.loads(json_stream)

# ser = serial.Serial('/dev/ttyACM2')  # open serial port

"""
params = TaskParameters().dictionize()
params.update(**{"message_type":"TaskParameters"})

params = json.dumps(params) + "\n"
ser.write(params.encode())
"""


class PythonBoardCommander:
    def __init__(self, ser: serial.Serial):

        self.log = logging.getLogger('PythonBoardComm')
        self.serial = ser
        self.message_type = None
        self.command = None
        self.params = None
        self.gate = None
        self.received = ""
        self.waiting_forecho = False
        self.waiting_forpong = False
        self.params_names = None
        self.mess_in = []
        # self.serial.reset_output_buffer()  # make sure buffers are empty
        # self.serial.reset_input_buffer()

    def clear_message_queu(self):
        self.mess_in = []

    def send_laser_params(self, dictionary: dict):
        dictionary['message_type'] = "LaserParams"
        self.serial.write(f'{json.dumps(dictionary)}\n'.encode('utf-8'))

    def send_task_params(self, dictionary: dict):
        self.message_type = "TaskParameters"
        self.params_names = dictionary.keys()
        self.__dict__.update(**dictionary)
        self.log.info('Sending Task parameters')
        self.send_command()

    def send_StartTask(self):
        self.message_type = "StartTask"
        self.log.info('Sending Task Start Command')
        self.send_command()
        # if not self.send_command():
        #    logging.warning('Start Command was not echoed! Check if Task started')

    def send_EndTask(self):
        self.message_type = "EndTask"
        self.log.info('Sending End Task Command')
        self.send_command()

    def ask_dummy_trial(self):
        self.message_type = "AskTrial"
        self.send_command()
        self.log.info("Asking for dummy trial data")

    def ask_params(self):
        self.message_type = "AskTask"
        self.send_command()
        self.log.info("Asking for task params data")

    def reset_board(self):
        self.message_type = "ResetBoard"
        self.send_command()
        self.log.info("resetting board")

    def initialize_box(self):
        self.message_type = "ResetBox"
        self.send_command()
        self.log.info("Initializing Box")

    def exit_debug(self):
        self.message_type = "ExitDebug"
        self.send_command()
        self.log.info("Exiting Debug")

    def test_startSignal(self):
        self.message_type = "SendStartPul"
        self.send_command()
        self.log.info("Asked to send trial start pulses")

    def PingCircuitPython(self):  # not really needed if i echo commands...
        self.message_type = "Ping"
        self.send_command()
        self.log.debug("send ping")
        self.waiting_forpong = True

    def ToggleLED(self, gate: str, value: bool):
        self.message_type = "SwitchLED"
        if value:
            self.command = 'turn_on'
        else:
            self.command = 'turn_off'
        self.gate = gate
        self.send_command()

    def STOPMove(self):
        self.message_type = "MoveArm"
        self.command = 'stopMotors'
        self.send_command()

    def MoveArmR(self, value: float):
        self.message_type = "MoveArm"
        self.command = 'moveArmR'
        self.params = value
        self.send_command()

    def MoveArmL(self, value: float):
        self.message_type = "MoveArm"
        self.command = 'moveArmL'
        self.params = value
        self.send_command()

    def AskAngles(self):
        self.message_type = "MoveArm"
        self.command = 'askAngles'
        self.send_command()

    def MoveGate(self, gate: str, state: bool):
        self.message_type = "MoveGate"
        if state:
            self.command = 'open_gate'
        else:
            self.command = 'close_gate'
        self.gate = gate
        self.send_command()

    def MoveServo(self, gate: str, value: int):
        self.message_type = "MoveGate"
        self.command = 'move_gate_fast'
        self.gate = gate
        self.params = value
        self.send_command()

    def PlayRewardSound(self):
        self.log.info("playing reward sound")
        self.message_type = "PlaySound"
        self.command = 'play'
        self.gate = 'reward'
        self.send_command()

    def PlayErrorSound(self):
        self.log.info("playing error sound")
        self.message_type = "PlaySound"
        self.command = 'play'
        self.gate = 'noise'
        self.send_command()

    def GiveReward(self, value: int = None):
        self.log.info("Giving reward")
        self.message_type = "GiveReward"
        self.command = 'give_reward'
        self.params = value
        self.send_command()

    def RewardPumpToggle(self, state: bool):
        self.log.info("Toggling Reward Pump")
        self.message_type = "GiveReward"
        if state:
            self.command = 'open_valve'
        else:
            self.command = 'close_valve'
        self.send_command()

    def ToggleCameraTriggers(self, fps: int, state: bool):
        self.log.info("Toggling camera Triggers")
        self.message_type = "CameraTrigger"
        if state:
            self.command = 'startPulsing'
            self.params = fps
        else:
            self.command = 'stopPulsing'
        self.send_command()

    def PingArduino(self):
        self.log.info("Pinging Arduino")
        self.message_type = "PingArduino"
        self.command = 'pingSlave'
        self.send_command()

    def PollBeamBlockes(self):
        self.log.info("PollingBeamBlocks")
        self.message_type = "PollBeamBlockers"
        self.command = 'pollBeamblockers'
        self.send_command()

    def RoomLights(self, value: int):
        self.log.info(f"Turning roomlights {'On' if value == 100 else str(value) if value != 0 else 'Off'}")
        self.message_type = "RoomLights"
        self.command = 'dimm_roomlight'
        self.params = value * 255 // 100
        self.send_command()

    def send_ctrlc(self):
        """ writes ctrl c character to serial """
        self.serial.write(b"\x03")
        # tODO write this to the other serial not data

    def send_ctrld(self):
        self.serial.write(b"\x04")

    def get_bytes(self) -> bytes:
        dict2send = vars(self).copy()
        dict2send.pop('serial', None)
        dict2send.pop('log', None)
        dict2send.pop('received', None)
        dict2send.pop('waiting_forecho', None)
        dict2send.pop('waiting_forpong', None)
        dict2send.pop('mess_in', None)
        dict2send.pop('params_names', None)
        return f'{json.dumps(dict2send)}\n'.encode('utf-8')

    def send_command(self):
        message2send = self.get_bytes()
        self.mess_in.append(message2send)
        # self.serial.write(mess_in)
        self.serial.write(message2send)
        # replace with send
        self.waiting_forecho = True
        # mess_out = self.serial.readline()
        # self.serial.read_input()
        # replace with self.serial.read_input()

    def pico_data_received(self, payload):
        """Process a message from the Pico."""
        self.log.debug("Received: %s", payload.decode())
        self.received = payload.decode()
        # TODO Problem if we send more then one message before aknowledging it, it contains previos params.
        # strip params direktly after sending and not after aknowledgement ?
        if self.waiting_forecho:
            for m_id, message in enumerate(self.mess_in):  # look through messages we await echo from.
                if message.rstrip() == payload:
                    self.log.debug(f"Successfull transmission {m_id + 1} out of {len(self.mess_in)}")
                    self.message_type = None
                    self.command = None
                    self.params = None
                    self.gate = None
                    if self.params_names is not None:
                        for name in self.params_names:
                            delattr(self, name)  # purge params from self to not be resend
                        print(self.__dict__)
                        self.params_names = None
                    self.mess_in.pop(m_id)  # remove message from list
                    if len(self.mess_in) == 0:  # waiting list is empty
                        self.waiting_forecho = False
                    break
            else:  # went throu all but not found
                self.log.info("Echo not equal input !")
        elif self.waiting_forpong:
            self.waiting_forpong = False
            if self.received == 'Pong':
                self.log.info(f'Board on {self.serial._port.portName()} responded to ping')


import csv
import json
import logging
from pathlib import Path
from datetime import datetime


[docs] class DataWriter: """Class for writing data received from serial to csv and JSON file data arrives per trial and is written as individual row""" def __init__(self, session_id: str, path2save: (str, Path) = None, write_csv: bool = False): self.log = logging.getLogger('DataWriter') self.log.setLevel('DEBUG') self.session_id = session_id if path2save is None: prepath = Path('data') else: prepath = Path(path2save) prepath.mkdir(parents=True, exist_ok=True) self.file_name = prepath / f'{self.session_id}_behav.csv' self.json_file = self.file_name.parent / f"{self.file_name.name.split('.')[0]}.json" self.header_written = False # flag to check if header was written # here one can define the sorting in the csv! self.fieldnames = ['trial_nr', "trial_start_absolute", "trial_end_absolute", "duration", "outcome", "choice", "hit", "omission", "error", "nose_poke_timingC", "NP_exitC", 'nose_poke_durationC', "nosepoke_SETdur_C", "nose_poke_timingR", "NP_exitR", 'nose_poke_durationR', "nosepoke_SETdur_R", "nose_poke_timingL", "NP_exitL", 'nose_poke_durationL', "nosepoke_SETdur_L", "time_to_NPd", "time_to_NPc", "gate_openedC", "gate_closedC", "gate_openedR", "gate_closedR", "gate_openedL", "gate_closedL", "angle_R", "angle_L", "reward_prob_C", "reward_prob_R", "reward_prob_L", "reward_pump_on", "reward_pump_off", "lick_start", "time_to_lick", "tone_on", "tone_off", "tone_dur", "tone_type", "LED_C_on", "LED_C_off", "LED_R_on", "LED_R_off", "LED_L_on", "LED_L_off", "trial_sync_pulse"] self.task = None self.trial_counter = 0 self.error_counter = 0 self.hit_counter = 0 self.omission_counter = 0 self.miss_counter = 0 self.right_counter = 0 self.left_counter = 0 self.lr_balance = RunningAverage(5) self.np_duration = RunningAverage(5) self.motivation_counter = RunningAverage(10) self.laser_counter = 0 self.write_csv = write_csv # whether or not to have a csv copy of the trial data
[docs] def injest_task(self, task: dict): """get a dictionary from GUI writes the params to Json""" self.task = task # self.task.update(**{'session_id': self.session_id}) self.task['session_id'] = self.session_id self.task['trials'] = list() self.task['motortimes'] = list() self.writeJSON()
[docs] def injest_trial(self, trial: dict, do_additional_math: bool = True): """ gets a trial dictionary, writes to csv and to json """ if self.task is None: self.log.error('No task was passed!, saving only trial info') self.task = {'session_id': self.session_id, 'trials': list()} self.trial_counter += 1 if trial['trial_nr'] != self.trial_counter: self.log.warning('Trial counter host/remote do not correspond !') try: trial = self.make_trial_math(trial) # add some additional info to the trial except TypeError as e: self.log.error(e) if self.write_csv: self.writeTrial(trial) # append trial to csv self.task['trials'].append(trial) self.writeJSON() if trial['outcome'] == 'omission': self.omission_counter += 1 elif trial['outcome'] == 'error': self.error_counter += 1 elif trial['outcome'] == 'hit' or trial['reward_pump_on']: self.hit_counter += 1 elif trial['outcome'] == 'miss': self.miss_counter += 1 if trial['outcome'] == 'omission': self.motivation_counter.add_value(1) else: self.motivation_counter.add_value(0) # self.log.info(f'Current mot counter{ self.motivation_counter.moving_sum}') if trial['nose_poke_timingR'] is not None: self.right_counter += 1 self.np_duration.add_value(trial['nose_poke_durationR']) self.lr_balance.add_value(1) side = "right" elif trial['nose_poke_timingL'] is not None: self.left_counter += 1 self.np_duration.add_value(trial['nose_poke_durationL']) self.lr_balance.add_value(-1) side = "left" else: side = "None" self.np_duration.add_value(np.nan) self.lr_balance.add_value(np.nan) if trial.get("laser_trigger_time") is not None: if not trial.get("laser_params", True).get("trigger1", True).get('mock', True): self.laser_counter += 1 self.log.info(f"Received trial {trial['trial_nr']} with {trial['outcome']} at {side}")
def injest_motortimes(self, motor_time: list): self.task['motortimes'] = motor_time self.writeJSON()
[docs] def make_trial_math(self, dict_trial: dict) -> dict: """modify the trial dictionary with some more calculated values""" mod_dict_trial = dict_trial.copy() # calculate the duration of the trial mod_dict_trial["duration"] = mod_dict_trial["trial_end_absolute"] - mod_dict_trial["trial_start_absolute"] # NP timing from the gate being open if mod_dict_trial['nose_poke_timingR'] is not None: mod_dict_trial['time_to_NPd'] = mod_dict_trial['nose_poke_timingR'] - mod_dict_trial[ 'gate_openedR'] elif mod_dict_trial['nose_poke_timingL'] is not None: mod_dict_trial['time_to_NPd'] = mod_dict_trial['nose_poke_timingL'] - mod_dict_trial[ 'gate_openedL'] else: mod_dict_trial['time_to_NPd'] = None # NP timing for central_port if mod_dict_trial['nose_poke_timingC'] is not None: mod_dict_trial['time_to_NPc'] = mod_dict_trial['nose_poke_timingC'] - mod_dict_trial[ 'gate_openedC'] else: mod_dict_trial['time_to_NPc'] = None # timing to reward # use hit time instead ? if mod_dict_trial['lick_start'] is not None and mod_dict_trial['hit'] is not None: mod_dict_trial['time_to_lick'] = mod_dict_trial['lick_start'] - mod_dict_trial['hit'] else: mod_dict_trial['time_to_lick'] = None # Choise if mod_dict_trial['nose_poke_timingR'] is not None: mod_dict_trial['choice'] = "right" elif mod_dict_trial['nose_poke_timingL'] is not None: mod_dict_trial['choice'] = "left" else: mod_dict_trial['choice'] = "None" # tone duration if mod_dict_trial['tone_on'] is not None and mod_dict_trial['tone_off'] is not None: mod_dict_trial['tone_dur'] = mod_dict_trial['tone_off'] - mod_dict_trial['tone_on'] else: mod_dict_trial['tone_dur'] = None # Nosepoke exit if mod_dict_trial['nose_poke_timingR'] is not None: mod_dict_trial['NP_exitR'] = mod_dict_trial['nose_poke_timingR'] + mod_dict_trial['nose_poke_durationR'] else: mod_dict_trial['NP_exitR'] = None if mod_dict_trial['nose_poke_timingL'] is not None: mod_dict_trial['NP_exitL'] = mod_dict_trial['nose_poke_timingL'] + mod_dict_trial['nose_poke_durationL'] else: mod_dict_trial['NP_exitL'] = None # NosepokeC exit if mod_dict_trial['nose_poke_timingC'] is not None: mod_dict_trial['NP_exitC'] = mod_dict_trial['nose_poke_timingC'] + mod_dict_trial['nose_poke_durationC'] else: mod_dict_trial['NP_exitC'] = None return mod_dict_trial
[docs] def writeJSON(self): """Write global parameters as json input is Task.strip_parameters(return_global = True) """ # header.update(**{'datetime': datetime.now().strftime('%Y%m%d_%H%M')}) with open(self.json_file, 'w') as fi: json.dump(self.task, fi, indent=4, sort_keys=True)
[docs] def writeTrial(self, trial: dict): """write a dict representing Information about a single Trial to file""" with open(self.file_name, 'a', newline='') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=self.fieldnames, extrasaction='ignore') self.log.debug(f"Opened {self.file_name} for writing") if not self.header_written: writer.writeheader() self.header_written = True writer.writerow(trial) self.log.debug(f"Written trial {trial['trial_nr']} to file")
[docs] def copyCSV(self, target_path: Path): """ copy files to specified path, To be called from GUI with DB knowledge """ # TODO trow an error if files not exist ! import shutil try: shutil.copy2(self.file_name, target_path) except FileNotFoundError: pass shutil.copy2(self.json_file, target_path) self.log.info(f"Copied {self.file_name.name} to {target_path.as_posix()}")
[docs] def purgeFiles(self): """delete created files, in case of early abort""" self.json_file.unlink() self.file_name.unlink() self.log.info(f"Deleted {self.file_name.name}.")
import hashlib import uuid
[docs] def dict_to_uuid(key: dict): """Given a dictionary `key`, returns a hash string as UUID Args: key (dict): Any python dictionary""" hashed = hashlib.md5() for k, v in sorted(key.items()): hashed.update(str(k).encode()) hashed.update(str(v).encode()) return uuid.UUID(hex=hashed.hexdigest())
from collections import namedtuple Stage_params = namedtuple('stage_tuple', "stage_name,params,stage_description") Version_params = namedtuple('version_tuple', "version,stages_list") """ from Task_parameters import TaskParameters params = TaskParameters().dictionize() NP_init = Stage_params("NP_init",params,'Initial stage, whereby the animal learns to interact with the nose poke.') ver0 = Version_params(0,[NP_init]) """ class HostsideParamsOld: def __init__(self, path2json: (str, Path) = "params.json"): self.all_params = None self.path2json = path2json self.log = logging.getLogger('ParameterCommander') self.log.setLevel(logging.DEBUG) self.import_taskParams() # self.all_versions() def init_from_list(self, all_params: list): for v in range(len(all_params)): all_params[v] = Version_params._make(all_params[v]) for st in range(len(all_params[v].stages_list)): all_params[v].stages_list[st] = Stage_params._make(all_params[v].stages_list[st]) all_params[v].stages_list[st].params.update( **{"version": all_params[v].version, "training_stage": all_params[v].stages_list[st].stage_name}) self.all_params = all_params def write_json(self, path2json: (str, Path) = "params.json"): with open(path2json, 'w') as fi: json.dump(self.all_params, fi, indent=4) def modify_params(self, version: int, stage: str, params: dict): self.log.debug(f"modifying parameters vor v{version} and stage: {stage}") try: for v_id in range(len(self.all_params)): if self.all_params[v_id].version != version: continue for st in range(len(self.all_params[v_id].stages_list)): if self.all_params[v_id].stages_list[st].stage_name != stage: continue self.all_params[v_id].stages_list[st].params.update(**params) self.write_json() except Exception as e: self.log.warning(e) def import_taskParams(self): """reads the parameters json""" from collections import namedtuple Stage_params = namedtuple('stage_tuple', "stage_name,params") Version_params = namedtuple('version_tuple', "version,stages_list") try: with open(self.path2json, "r") as fi: all_params = json.load(fi) self.init_from_list(all_params) except FileNotFoundError: print('No parameter File exists!') # for v in range(len(all_params)): # all_params[v] = Version_params._make(all_params[v]) # for st in range(len(all_params[v].stages_list)): # all_params[v].stages_list[v] = Stage_params._make(all_params[v].stages_list[v]) # all_params[v].stages_list[v].params.update( # **{"version": all_params[v].version, "training_stage": all_params[v].stages_list[v].stage_name}) # self.all_params = all_params def hash_params(self): """create a hash of the parameters to check if those already exist""" # maybe remove the version number from the hash pass def all_versions(self) -> list: self.versions = [ver.version for ver in self.all_params] return self.versions def all_stages(self, version: int) -> list: """returns all stages in a certain version of parameters_set""" stages = list() for ver in self.all_params: if ver.version != version: continue for stage in ver.stages_list: stages.append(Stage_params._make(stage).stage_name) return stages def get_params(self, version: int, stage2find: str) -> (dict, None): if version not in self.all_versions(): print(f'Ver.{version} doesnt exist') return None if stage2find not in self.all_stages(version): print(f'Stage {stage2find} doesnt exist in ver.{version}') return None for ver in self.all_params: if ver.version != version: continue for stage in ver.stages_list: stage = Stage_params._make(stage) if stage.stage_name != stage2find: continue params = stage.params return params def add_new_stage(self): pass def add_new_version(self, version, stage, params): pass class HostsideParams: def __init__(self, path2json: (str, Path) = "params.json"): self.stages = None self.versions = None self.all_params = {} self.path2json = path2json self.log = logging.getLogger('ParameterCommander') self.log.setLevel(logging.DEBUG) self.read_json() # self.all_versions() def reload_file(self): self.read_json() def write_json(self, path2json: (str, Path) = "params.json"): with open(path2json, 'w') as fi: json.dump(self.all_params, fi, indent=4) def read_json(self): """reads the parameters json""" try: with open(self.path2json, "r") as fi: self.all_params = json.load(fi) except FileNotFoundError: self.log.warning('No parameter File exists!') self.all_params = {} def add_new_stage(self, stage_name: str, params: dict, state_discription: str = ""): # json.load(state_discription) self.all_params[stage_name] = {"version_0": params} self.write_json() def add_new_version(self, stage: str, params: dict, note: str = "") -> int: """adds a new version to the parameters""" if stage not in self.all_params.keys(): self.log.warning(f"stage {stage} doesnt exist") return None version = len(self.all_params[stage].keys()) params["note"] = note self.all_params[stage][f"version_{version}"] = params self.write_json() return version def modify_params(self, version: int, stage: str, params: dict): self.log.debug(f"modifying parameters vor v{version} and stage: {stage}") if stage not in self.all_params.keys(): self.log.warning(f"stage {stage} doesnt exist") return if f"version_{version}" not in self.all_params[stage].keys(): self.log.warning(f"version {version} doesnt exist") return self.all_params[stage][f"version_{version}"].update(**params) self.write_json() def hash_params(self, params: dict = None, version: int = 0, stage2find: str = "") -> UUID: """create a hash of the parameters to check if those already exist""" # maybe remove the version number from the hash if params is None: params = self.get_params(version, stage2find) return dict_to_uuid(params) def all_versions(self, stage: str) -> list: try: self.versions = [int(ver.split("_")[-1]) for ver in self.all_params[stage].keys()] return self.versions except KeyError: return [] def all_stages(self) -> list: """returns all stages of parameters_set""" stages = list() for stage in self.all_params.keys(): stages.append(stage) self.stages = stages return stages def get_params(self, version: int, stage2find: str) -> (dict, None): if stage2find not in self.all_stages(): self.log.warning(f'Stage {stage2find} doesnt exist in ver.{version}') return None if version not in self.all_versions(stage2find): self.log.warning(f'Ver.{version} doesnt exist') return None params = self.all_params[stage2find][f"version_{version}"] return params class Session_backupCreator: def __init__(self): from datastructure_tools.DataBaseAccess import DataBaseAccess self.path2datafiles = Path("./data/") self.typical_exptype = 'HillYmaze_Training' self.DB = DataBaseAccess() self.sessions_indb = self.DB.Session.fetch('session_id') self.sessions2add = [] self.go_through_files() def fill_session_notinDB(self): self.add_session_to_DB() def change_animal_nr(self, session_id: str, new_animal: str): from datastructure_tools.utils import SessionClass import shutil assert session_id in self.sessions_indb, f"Session {session_id} not in DB!" assert new_animal in self.DB.Animal.fetch('animal_id'), f"Animal {new_animal} not in DB!" print(f'Trying to patch {session_id} to be from animal {new_animal}') # assert session_id in self.sessions2add, f"Session {session_id} not in files!" session = ( self.DB.Session.Experiment * self.DB.Session.Root * self.DB.Session.Project * self.DB.Session.User * self.DB.Session & { "session_id": session_id}).fetch1() session_path = self.DB.server_path / session["session_dir"] # self.DB.Session.update1({"session_id": session_id, "animal_id": new_animal}) new_sessname = "_".join([session_id.split("_")[0], new_animal, session_id.split("_")[-1]]) # create new session session_class = SessionClass(self.DB, animal_id=new_animal, session_datetime=session["session_datetime"], project=session["project"], user=session['user'], expName=session["experiment_name"], experiment_template=session['experiment_template'], session_id=new_sessname, test=True if new_animal == 'MusterMaus' else False) pathcreationSuccess = session_class.createSession_path() # create Paths on server assert pathcreationSuccess, "paths could not be created" new_path = self.DB.server_path / session_class.session_dir print(f'Created new fodler structure at {new_path}') # rename folder print(f'renaming{session_path} to {new_path}') shutil.move(session_path, new_path) # rename files for file in new_path.rglob("*"): if file.is_file(): old_name = file.name new_name = [old_name.split("_")[0], new_animal] new_name.extend(old_name.split("_")[3:]) new_name = '_'.join(new_name) print(f'renaming {old_name} to {new_name}') shutil.move(file, file.parent.parent / new_name) PushSuccess = session_class.checkInputs() # checks inputs and pushes to DB print(f'Created new Session{new_sessname}') if PushSuccess: try: old_weight = (self.DB.Animal.Weight & {'weight_date': session_class.session_datetime}).fetch1() session_class.weight = old_weight['weight'] session_class.weight_note = old_weight['weight_note'] session_class.pushWeights() print(f'Pushed to DB') except Exception: print('Could not pull weight from DB! enter manually') # delete entry print('Deleting wrong entry') (self.DB.Session & session).delete() # TODO Delete preprocessed and processed files # rename session in in the behavior file. in daq data is still wrongly named! try: for file in (new_path / 'behav').rglob('*.json'): try: with open(file, 'r') as fi: sess_data = json.load(fi) except json.decoder.JSONDecodeError: # not a json sess_data = None if sess_data is not None: try: assert sess_data['session_id'] == session_id sess_data['session_id'] = new_sessname with open(file, 'w') as fi: json.dump(sess_data,fi) break except AssertionError: print("Session id in beh is not as expected") except Exception as e: pass def go_through_files(self): for file in self.path2datafiles.rglob('*_behav.json'): sess_id = '_'.join(file.name.split('_')[:-1]) if sess_id not in self.sessions_indb and 'MusterMaus' not in sess_id: self.sessions2add.append(sess_id) print(f"Found {len(self.sessions2add)} session not in DB") def add_session_to_DB(self): from datastructure_tools.utils import SessionClass from GUI_full import PROJECT_NAME, CURRENT_EXPERIMENT, BEHAVIOUR_FOLDER from datetime import datetime import shutil for sess_id in self.sessions2add: animal_id = "_".join(sess_id.split('_')[1:-1]) session_datetime = datetime.strptime("_".join(sess_id.split('_')[0::3]), '%Y%m%d_%H%M') session_class = SessionClass(self.DB, animal_id=animal_id, session_datetime=session_datetime, project=PROJECT_NAME, user='as153', expName=CURRENT_EXPERIMENT, experiment_template=self.typical_exptype, session_id=sess_id, test=True if animal_id == 'MusterMaus' else False) pathcreationSuccess = session_class.createSession_path() # create Paths on server if pathcreationSuccess: print(f'Created session{sess_id} on the server') #shutil.copyfile(self.path2datafiles / (sess_id + '_behav.csv'), # self.DB.server_path / session_class.session_dir # / BEHAVIOUR_FOLDER / (sess_id + '_behav.csv')) shutil.copyfile(self.path2datafiles / (sess_id + '_behav.json'), self.DB.server_path / session_class.session_dir / BEHAVIOUR_FOLDER / (sess_id + '_behav.json')) print(f'copied files') PushSuccess = session_class.checkInputs() # checks inputs and pushes to DB if PushSuccess: weight = None while weight is None: weight = input(f'Enter animal weight for {animal_id} on {session_datetime.date()}') try: weight.replace(',', '.') weight = float(weight) except ValueError: print('Wrong weight format, try again') weight = None session_class.weight = weight session_class.weight_note = 'Training' session_class.pushWeights() print(f'Pushed to DB') class RunningAverage: def __init__(self, window=5): self.window = window self.data = np.zeros((self.window,)) * np.nan self.previous_value = np.nan @property def moving_average(self): return np.nanmean(self.data) @property def moving_sum(self): return np.nansum(self.data) def add_value(self, value): self.previous_value = self.moving_average self.data[:-1] = self.data[1:] self.data[-1] = value def reset(self): self.data = np.zeros((self.window,)) * np.nan self.previous_value = np.nan if __name__ == "__main__": sess_cleanup = Session_backupCreator() sess_cleanup.sessions2add = [sess_cleanup.sessions2add[0]] sess_cleanup.fill_session_notinDB() #sess_cleanup.change_animal_nr("20231211_r0093_wt_1607", "r0092_wt") # DONT RUN while a session is running !