1
0
mirror of https://github.com/RIOT-OS/RIOT.git synced 2024-12-29 04:50:03 +01:00

pyterm: added some documentation

This commit is contained in:
Oleg Hahm 2014-07-17 00:25:54 +02:00
parent aaccb81794
commit 9a09cba325

View File

@ -7,14 +7,16 @@ except ImportError:
import ConfigParser as configparser import ConfigParser as configparser
import cmd, serial, socket, sys, threading, readline, time, logging, os, argparse, re, codecs, signal import cmd, serial, socket, sys, threading, readline, time, logging, os, \
argparse, re, codecs, signal
### import twisted if available, define dummy classes otherwise ### import twisted if available, define dummy classes otherwise
try: try:
from twisted.internet import reactor from twisted.internet import reactor
from twisted.internet.protocol import Protocol, ReconnectingClientFactory from twisted.internet.protocol import Protocol, ReconnectingClientFactory
except ImportError: except ImportError:
logging.getLogger("").warn("Twisted not available, please install it if you want to use pyterm's JSON capabilities") logging.getLogger("").warn("Twisted not available, please install it if you"
"want to use pyterm's JSON capabilities")
class Protocol(): class Protocol():
def __init__(self): def __init__(self):
@ -27,15 +29,43 @@ except ImportError:
import platform import platform
defaulthostname = platform.node() defaulthostname = platform.node()
### default serial port
defaultport = "/dev/ttyUSB0" defaultport = "/dev/ttyUSB0"
### default baudrate for serial connection
defaultbaud = 115200 defaultbaud = 115200
### directory to store configuration and log files
defaultdir = os.environ['HOME'] + os.path.sep + '.pyterm' defaultdir = os.environ['HOME'] + os.path.sep + '.pyterm'
### configuration file name
defaultfile = "pyterm-" + defaulthostname + ".conf" defaultfile = "pyterm-" + defaulthostname + ".conf"
### logging subfolder
defaultrunname = "default-run" defaultrunname = "default-run"
class SerCmd(cmd.Cmd): class SerCmd(cmd.Cmd):
"""Main class for pyterm based on Python's Cmd class.
def __init__(self, port=None, baudrate=None, tcp_serial=None, confdir=None, conffile=None, host=None, run_name=None): Runs an interactive terminal that transfer between stdio and serial port.
"""
def __init__(self, port=None, baudrate=None, tcp_serial=None, confdir=None,
conffile=None, host=None, run_name=None):
"""Constructor.
Args:
port (str): serial port
baudrate (int): serial baudrate
tcp_serial (iht): TCP port to connect to (alternatively)
confdir (str): configuration directory
conffile (str): configuration file name
host (str): local host name
run_name (str): identifier for log filer subdirectory
"""
# initialize class members
cmd.Cmd.__init__(self) cmd.Cmd.__init__(self)
self.port = port self.port = port
self.baudrate = baudrate self.baudrate = baudrate
@ -62,6 +92,7 @@ class SerCmd(cmd.Cmd):
self.init_cmd = [] self.init_cmd = []
self.load_config() self.load_config()
# check for a history file
try: try:
readline.read_history_file() readline.read_history_file()
except IOError: except IOError:
@ -78,7 +109,8 @@ class SerCmd(cmd.Cmd):
directory = self.configdir + os.path.sep + self.host directory = self.configdir + os.path.sep + self.host
if not os.path.exists(directory): if not os.path.exists(directory):
os.makedirs(directory) os.makedirs(directory)
logging.basicConfig(filename=directory + os.path.sep + self.run_name + '.log', level=logging.DEBUG, format=fmt_str) logging.basicConfig(filename=directory + os.path.sep + self.run_name + '.log', \
level=logging.DEBUG, format=fmt_str)
ch = logging.StreamHandler() ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG) ch.setLevel(logging.DEBUG)
@ -93,12 +125,18 @@ class SerCmd(cmd.Cmd):
def preloop(self): def preloop(self):
"""Executed bat program start.
"""
# if no serial or TCP is specified use default serial port
if not self.port and not self.tcp_serial: if not self.port and not self.tcp_serial:
sys.stderr.write("No port specified, using default (%s)!\n" % (defaultport)) sys.stderr.write("No port specified, using default (%s)!\n" % (defaultport))
self.port = defaultport self.port = defaultport
# if a TCP port is specified try to connect
if self.tcp_serial: if self.tcp_serial:
self.logger.info("Connect to localhost:%s" % self.tcp_serial) self.logger.info("Connect to localhost:%s" % self.tcp_serial)
for res in socket.getaddrinfo('localhost', self.tcp_serial, socket.AF_UNSPEC, socket.SOCK_STREAM): for res in socket.getaddrinfo('localhost', self.tcp_serial, \
socket.AF_UNSPEC, socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res af, socktype, proto, canonname, sa = res
try: try:
s = fdsocket(af, socktype, proto) s = fdsocket(af, socktype, proto)
@ -115,14 +153,17 @@ class SerCmd(cmd.Cmd):
if s: if s:
self.ser = s self.ser = s
else: else:
self.logger.error("Something went wrong connecting to localhost:%s" % self.tcp_serial) self.logger.error("Something went wrong connecting to localhost:%s"
% self.tcp_serial)
sys.exit(1) sys.exit(1)
# otherwise go for the serial port
elif self.port: elif self.port:
self.logger.info("Connect to serial port %s" % self.port) self.logger.info("Connect to serial port %s" % self.port)
self.ser = serial.Serial(port=self.port, baudrate=self.baudrate, dsrdtr=0, rtscts=0) self.ser = serial.Serial(port=self.port, baudrate=self.baudrate, dsrdtr=0, rtscts=0)
self.ser.setDTR(0) self.ser.setDTR(0)
self.ser.setRTS(0) self.ser.setRTS(0)
# wait until connection is established and fire startup commands to the node
time.sleep(1) time.sleep(1)
for cmd in self.init_cmd: for cmd in self.init_cmd:
self.logger.debug("WRITE ----->>>>>> '" + cmd + "'\n") self.logger.debug("WRITE ----->>>>>> '" + cmd + "'\n")
@ -134,31 +175,47 @@ class SerCmd(cmd.Cmd):
receiver_thread.start() receiver_thread.start()
def precmd(self, line): def precmd(self, line):
"""Check for command prefixes to distinguish between Pyterm interal
commands and commands that should be send to the node.
"""
self.logger.debug("processing line #%s#" % line) self.logger.debug("processing line #%s#" % line)
if (line.startswith("/")): if (line.startswith("/")):
return "PYTERM_" + line[1:] return "PYTERM_" + line[1:]
return line return line
def default(self, line): def default(self, line):
"""In case of no Pyterm specific prefix is detected, split string by
colons and send it to the node.
"""
self.logger.debug("%s is no pyterm command, sending to default out" % line) self.logger.debug("%s is no pyterm command, sending to default out" % line)
for tok in line.split(';'): for tok in line.split(';'):
tok = self.get_alias(tok) tok = self.get_alias(tok)
self.ser.write((tok.strip() + "\n").encode("utf-8")) self.ser.write((tok.strip() + "\n").encode("utf-8"))
def do_help(self, line): def do_help(self, line):
"""Do not use Cmd's internal help function, but redirect to the node.
"""
self.ser.write("help\n".encode("utf-8")) self.ser.write("help\n".encode("utf-8"))
def complete_date(self, text, line, begidx, endidm): def complete_date(self, text, line, begidx, endidm):
"""Auto completion for date string.
"""
date = time.strftime("%Y-%m-%d %H:%M:%S") date = time.strftime("%Y-%m-%d %H:%M:%S")
return ["{}".format(date)] return ["{}".format(date)]
def do_PYTERM_reset(self, line): def do_PYTERM_reset(self, line):
"""Pyterm command: Send a reset to the node.
"""
self.ser.setDTR(1) self.ser.setDTR(1)
self.ser.setDTR(0) self.ser.setDTR(0)
def do_PYTERM_exit(self, line, unused=None): def do_PYTERM_exit(self, line, unused=None):
"""Pyterm command: Exit Pyterm.
"""
self.logger.info("Exiting Pyterm") self.logger.info("Exiting Pyterm")
# save history file
readline.write_history_file() readline.write_history_file()
# shut down twisted if running
if reactor.running: if reactor.running:
reactor.callFromThread(reactor.stop) reactor.callFromThread(reactor.stop)
if self.tcp_serial: if self.tcp_serial:
@ -166,6 +223,8 @@ class SerCmd(cmd.Cmd):
return True return True
def do_PYTERM_save(self, line): def do_PYTERM_save(self, line):
"""Pyterm command: Save Pyterm configuration to file.
"""
if not self.config.has_section("general"): if not self.config.has_section("general"):
self.config.add_section("general") self.config.add_section("general")
self.config.set("general", "port", self.port) self.config.set("general", "port", self.port)
@ -211,10 +270,14 @@ class SerCmd(cmd.Cmd):
self.logger.info("Config saved") self.logger.info("Config saved")
def do_PYTERM_show_config(self, line): def do_PYTERM_show_config(self, line):
"""Pyterm command: Show current configuration.
"""
for key in self.__dict__: for key in self.__dict__:
print(str(key) + ": " + str(self.__dict__[key])) print(str(key) + ": " + str(self.__dict__[key]))
def do_PYTERM_alias(self, line): def do_PYTERM_alias(self, line):
"""Pyterm command: Register an alias or show an list of all registered aliases.
"""
if line.endswith("list"): if line.endswith("list"):
for alias in self.aliases: for alias in self.aliases:
self.logger.info("{} = {}".format(alias, self.aliases[alias])) self.logger.info("{} = {}".format(alias, self.aliases[alias]))
@ -228,16 +291,22 @@ class SerCmd(cmd.Cmd):
self.aliases[alias] = command self.aliases[alias] = command
def do_PYTERM_rmalias(self, line): def do_PYTERM_rmalias(self, line):
"""Pyterm command: Unregister an alias.
"""
if not self.aliases.pop(line, None): if not self.aliases.pop(line, None):
sys.stderr.write("Alias not found") sys.stderr.write("Alias not found")
def get_alias(self, tok): def get_alias(self, tok):
"""Internal function to check for aliases.
"""
for alias in self.aliases: for alias in self.aliases:
if tok.split()[0] == alias: if tok.split()[0] == alias:
return self.aliases[alias] + tok[len(alias):] return self.aliases[alias] + tok[len(alias):]
return tok return tok
def do_PYTERM_trigger(self, line): def do_PYTERM_trigger(self, line):
"""Pyterm command: Register an trigger Regex.
"""
if not line.count("="): if not line.count("="):
sys.stderr.write("Usage: /trigger <regex> = <CMD>\n") sys.stderr.write("Usage: /trigger <regex> = <CMD>\n")
return return
@ -247,13 +316,19 @@ class SerCmd(cmd.Cmd):
self.triggers[re.compile(trigger)] = action self.triggers[re.compile(trigger)] = action
def do_PYTERM_rmtrigger(self, line): def do_PYTERM_rmtrigger(self, line):
"""Pyterm command: Unregister an trigger.
"""
if not self.triggers.pop(line, None): if not self.triggers.pop(line, None):
sys.stderr.write("Trigger not found") sys.stderr.write("Trigger not found")
def do_PYTERM_ignore(self, line): def do_PYTERM_ignore(self, line):
"""Pyterm command: Ignore lines with these Regexes matching.
"""
self.ignores.append(re.compile(line.strip())) self.ignores.append(re.compile(line.strip()))
def do_PYTERM_unignore(self, line): def do_PYTERM_unignore(self, line):
"""Pyterm command: Remote an ignore Regex.
"""
for r in self.ignores: for r in self.ignores:
if (r.pattern == line.strip()): if (r.pattern == line.strip()):
self.logger.info("Remove ignore for %s" % r.pattern) self.logger.info("Remove ignore for %s" % r.pattern)
@ -262,9 +337,13 @@ class SerCmd(cmd.Cmd):
sys.stderr.write("Ignore for %s not found\n" % line.strip()) sys.stderr.write("Ignore for %s not found\n" % line.strip())
def do_PYTERM_filter(self, line): def do_PYTERM_filter(self, line):
"""Pyterm command: Show only lines matching this Regex.
"""
self.filters.append(re.compile(line.strip())) self.filters.append(re.compile(line.strip()))
def do_PYTERM_unfilter(self, line): def do_PYTERM_unfilter(self, line):
"""Pyterm command: Remove a filter.
"""
for r in self.filters: for r in self.filters:
if (r.pattern == line.strip()): if (r.pattern == line.strip()):
self.logger.info("Remove filter for %s" % r.pattern) self.logger.info("Remove filter for %s" % r.pattern)
@ -273,16 +352,24 @@ class SerCmd(cmd.Cmd):
sys.stderr.write("Filter for %s not found\n" % line.strip()) sys.stderr.write("Filter for %s not found\n" % line.strip())
def do_PYTERM_json(self, line): def do_PYTERM_json(self, line):
"""Pyterm command: Transfer lines matching this Regex as JSON object.
"""
self.json_regs[line.split(' ')[0].strip()] = re.compile(line.partition(' ')[2].strip()) self.json_regs[line.split(' ')[0].strip()] = re.compile(line.partition(' ')[2].strip())
def do_PYTERM_unjson(self, line): def do_PYTERM_unjson(self, line):
"""Pyterm command: Remove a JSON filter.
"""
if not self.aliases.pop(line, None): if not self.aliases.pop(line, None):
sys.stderr.write("JSON regex with ID %s not found" % line) sys.stderr.write("JSON regex with ID %s not found" % line)
def do_PYTERM_init(self, line): def do_PYTERM_init(self, line):
"""Pyterm command: Add an startup command. (Only useful in addition with /save).
"""
self.init_cmd.append(line.strip()) self.init_cmd.append(line.strip())
def load_config(self): def load_config(self):
"""Internal function to laod configuration from file.
"""
self.config = configparser.SafeConfigParser() self.config = configparser.SafeConfigParser()
self.config.read([self.configdir + os.path.sep + self.configfile]) self.config.read([self.configdir + os.path.sep + self.configfile])
@ -312,6 +399,11 @@ class SerCmd(cmd.Cmd):
self.__dict__[opt] = self.config.get(sec, opt) self.__dict__[opt] = self.config.get(sec, opt)
def process_line(self, line): def process_line(self, line):
"""Processes a valid line from node that should be printed and possibly forwarded.
Args:
line (str): input from node.
"""
self.logger.info(line) self.logger.info(line)
# check if line matches a trigger and fire the command(s) # check if line matches a trigger and fire the command(s)
for trigger in self.triggers: for trigger in self.triggers:
@ -345,6 +437,11 @@ class SerCmd(cmd.Cmd):
self.factory.myproto.sendMessage(json_obj) self.factory.myproto.sendMessage(json_obj)
def handle_line(self, line): def handle_line(self, line):
"""Handle line from node and check for further processing requirements.
Args:
line (str): input line from node.
"""
# First check if line should be ignored # First check if line should be ignored
ignored = False ignored = False
if (len(self.ignores)): if (len(self.ignores)):
@ -369,9 +466,11 @@ class SerCmd(cmd.Cmd):
""" """
output = "" output = ""
while (1): while (1):
# check if serial port can be accessed.
try: try:
sr = codecs.getreader("UTF-8")(self.ser, errors='replace') sr = codecs.getreader("UTF-8")(self.ser, errors='replace')
c = sr.read(1) c = sr.read(1)
# try to re-open it with a timeout of 1s otherwise
except (serial.SerialException, ValueError) as se: except (serial.SerialException, ValueError) as se:
self.logger.warn("Serial port disconnected, waiting to get reconnected...") self.logger.warn("Serial port disconnected, waiting to get reconnected...")
self.ser.close() self.ser.close()