Source code for keapi.compat._tc_prog

from typing import Any
import requests
import json
import time
from keapi._error import HttpError, TcError


[docs]def connect_tc_prog(host: str, user: str, passwd: str, robot: str): """Establishes a connection to the TcApi and returns a instance of TcProg which can be used to start and stop programs that are already definen in Teach Control :param host: IP Address or Hostname of TcApi Server :type host: str :param user: Username to login to TcApi :type user: str :param passwd: Password to login to TcApi :type passwd: str :param robot: Name of the robot :type robot: str :return: TcProg instance :rtype: TcProg """ tc = TcProg(robot) tc._connect(host, user, passwd) return tc
[docs]class TcProg: """Holds the Auth Cookie and the logic to communicate with the TcApi """ def __init__(self, robot: str) -> None: self._host = None self._cookie = None self._robot = robot self._project_name = None self._program_name = None self._http_headers = { 'accept': 'application/json', 'Content-Type': 'application/json' } self._prog_active = False
[docs] def exec(self, project_name: str, prog_name: str): """Starts a Teach Control programm and waits until it's execution is finished. This function is blocking :param project_name: Name of the Teach Control project :type project_name: str :param prog_name: Name of the Teach Control program :type prog_name: str :raises TcError: When another program is already active :raises HttpError: When http status code != 200 """ self.start(project_name, prog_name) # Not great while self.is_prog_running(): time.sleep(0.1) self.stop()
[docs] def start(self, project_name: str, prog_name: str): """Loads and starts a Teach Control program. :param project_name: Name of the Teach Control project :type project_name: str :param prog_name: Name of the Teach Control program :type prog_name: str :raises TcError: When another program is already active :raises HttpError: When http status code != 200 """ if self._prog_active: raise TcError('Another Program is already active.') self._project_name = project_name self._program_name = prog_name self._get_control_authority() # Load Project url = f'http://{self._host}/api/v3/teach-control/projects/{self._robot}/{self._project_name}.tt' body = json.dumps({'command': 'load'}) res = requests.post(url, headers=self._http_headers, data=body, cookies=self._cookie) if res.status_code != 200: raise HttpError(res.text) # Start Program url = f'http://{self._host}/api/v3/teach-control/programs/{self._robot}/{self._project_name}.tt/{self._program_name}.tip' body = json.dumps({'command': 'start'}) res = requests.post(url, headers=self._http_headers, data=body, cookies=self._cookie) if res.status_code != 200: raise HttpError(res.text) self._prog_active = True
[docs] def stop(self): """Stops the currently executing program and unloads the Teach Control project. :raises TcError: When there is no program to stop :raises HttpError: When http status code != 200 """ if not self._prog_active: raise TcError('There is no active program to stop.') # Stop Program url = f'http://{self._host}/api/v3/teach-control/programs/{self._robot}/{self._project_name}.tt/{self._program_name}.tip' body = json.dumps({'command': 'stop'}) res = requests.post(url, headers=self._http_headers, data=body, cookies=self._cookie) # Unload Project url = f'http://{self._host}/api/v3/teach-control/projects/{self._robot}/{self._project_name}.tt' body = json.dumps({'command': 'unload'}) res = requests.post(url, headers=self._http_headers, data=body, cookies=self._cookie) if res.status_code != 200: raise HttpError(res.text) self._release_control_authority() self._project_name = None self._program_name = None self._prog_active = False
[docs] def load_project(self, project_name: str) -> None: """Loads a Teach Control Project :param project_name: Name of the project :type project_name: str :raises HttpError: When http status code != 200 """ self._get_control_authority() self._project_name = project_name url = f'http://{self._host}/api/v3/teach-control/projects/{self._robot}/{self._project_name}.tt' body = json.dumps({'command': 'load'}) res = requests.post(url, headers=self._http_headers, data=body, cookies=self._cookie) if res.status_code != 200: raise HttpError(res.text) self._release_control_authority()
[docs] def unload_project(self) -> None: """Unloads the currently loaded Teach Control project :raises TcError: When there is not project loaded :raises HttpError: When http status code != 200 """ if not self._project_name: raise TcError('No Project loaded') self._get_control_authority() url = f'http://{self._host}/api/v3/teach-control/projects/{self._robot}/{self._project_name}.tt' body = json.dumps({'command': 'unload'}) res = requests.post(url, headers=self._http_headers, data=body, cookies=self._cookie) if res.status_code != 200: raise HttpError(res.text) self._project_name = None self._release_control_authority()
[docs] def is_prog_running(self) -> bool: """Returns whether a program is running or not :raises TcError: When there is no program started :raises HttpError: When http status code != 200 :return: Program running :rtype: bool """ if not self._prog_active: raise TcError('There is no program active.') url = f'http://{self._host}/api/v3/teach-control/programs/{self._robot}/{self._project_name}.tt/{self._program_name}.tip' res = requests.get(url, headers=self._http_headers, cookies=self._cookie) if res.status_code != 200: raise HttpError(res.text) ans = json.loads(res.text) if 'executionTree' not in ans or ans['executionTree']['status']['state'] != 3: return False return True
def _connect(self, host: str, user: str, passwd: str) -> None: self._host = host url = f'http://{host}/access/login/' body = json.dumps({'username': user, 'password': passwd}) res = json.loads( requests.post(url, headers=self._http_headers, data=body).text ) self._cookie = {'accSvcId': str(res['session'])} self._release_control_authority() def _get_control_authority(self) -> None: url = f'http://{self._host}/api/v3/teach-control/rc/controlauthority/?robot={self._robot}' body = json.dumps({'deviceType': 'Bonder', 'simulate': False}) res = requests.post(url, headers=self._http_headers, data=body, cookies=self._cookie) if res.status_code != 200: raise HttpError(res.text) def _release_control_authority(self) -> None: url = f'http://{self._host}/api/v3/teach-control/rc/controlauthority/?robot={self._robot}&force=true' res = requests.delete(url, cookies=self._cookie) if res.status_code != 200: raise HttpError(res.text)