import collections
import logging
import sys
import curtsies
import curtsies.window
import curtsies.input
import curtsies.events
from .curtsiesfrontend.repl import BaseRepl
from .curtsiesfrontend.coderunner import SystemExitFromCodeRunner
from .curtsiesfrontend.interpreter import Interp
from . import args as bpargs
from . import translations
from .translations import _
from .curtsiesfrontend import events as bpythonevents
from . import inspection
from .repl import extract_exit_value
logger = logging.getLogger(__name__)
class FullCurtsiesRepl(BaseRepl):
def __init__(self, config, locals_, banner, interp=None):
self.input_generator = curtsies.input.Input(
keynames="curtsies", sigint_event=True, paste_threshold=None
)
self.window = curtsies.window.CursorAwareWindow(
sys.stdout,
sys.stdin,
keep_last_line=True,
hide_cursor=False,
extra_bytes_callback=self.input_generator.unget_bytes,
)
self._request_refresh = self.input_generator.event_trigger(
bpythonevents.RefreshRequestEvent
)
self._schedule_refresh = self.input_generator.scheduled_event_trigger(
bpythonevents.ScheduledRefreshRequestEvent
)
self._request_reload = self.input_generator.threadsafe_event_trigger(
bpythonevents.ReloadEvent
)
self.interrupting_refresh = (
self.input_generator.threadsafe_event_trigger(lambda: None)
)
self.request_undo = self.input_generator.event_trigger(
bpythonevents.UndoEvent
)
with self.input_generator:
pass # temp hack to get .original_stty
super().__init__(
config,
locals_=locals_,
banner=banner,
interp=interp,
orig_tcattrs=self.input_generator.original_stty,
)
def get_term_hw(self):
return self.window.get_term_hw()
def get_cursor_vertical_diff(self):
return self.window.get_cursor_vertical_diff()
def get_top_usable_line(self):
return self.window.top_usable_row
def on_suspend(self):
self.window.__exit__(None, None, None)
self.input_generator.__exit__(None, None, None)
def after_suspend(self):
self.input_generator.__enter__()
self.window.__enter__()
self.interrupting_refresh()
def process_event_and_paint(self, e):
"""If None is passed in, just paint the screen"""
try:
if e is not None:
self.process_event(e)
except (SystemExitFromCodeRunner, SystemExit) as err:
array, cursor_pos = self.paint(
about_to_exit=True,
user_quit=isinstance(err, SystemExitFromCodeRunner),
)
scrolled = self.window.render_to_terminal(array, cursor_pos)
self.scroll_offset += scrolled
raise
else:
array, cursor_pos = self.paint()
scrolled = self.window.render_to_terminal(array, cursor_pos)
self.scroll_offset += scrolled
def mainloop(self, interactive=True, paste=None):
if interactive:
# Add custom help command
# TODO: add methods to run the code
self.initialize_interp()
# run startup file
self.process_event(bpythonevents.RunStartupFileEvent())
# handle paste
if paste:
self.process_event(paste)
# do a display before waiting for first event
self.process_event_and_paint(None)
inputs = combined_events(self.input_generator)
while self.module_gatherer.find_coroutine():
e = inputs.send(0)
if e is not None:
self.process_event_and_paint(e)
for e in inputs:
self.process_event_and_paint(e)
def main(args=None, locals_=None, banner=None, welcome_message=None):
"""
banner is displayed directly after the version information.
welcome_message is passed on to Repl and displayed in the statusbar.
"""
translations.init()
def curtsies_arguments(parser):
parser.add_argument(
"--log",
"-L",
action="count",
help=_("log debug messages to bpython.log"),
)
parser.add_argument(
"--paste",
"-p",
action="store_true",
help=_("start by pasting lines of a file into session"),
)
config, options, exec_args = bpargs.parse(
args,
(
_("curtsies arguments"),
_("Additional arguments specific to the curtsies-based REPL."),
curtsies_arguments,
),
)
if options.log is None:
options.log = 0
logging_levels = (logging.ERROR, logging.INFO, logging.DEBUG)
level = logging_levels[min(len(logging_levels) - 1, options.log)]
logging.getLogger("curtsies").setLevel(level)
logging.getLogger("bpython").setLevel(level)
if options.log:
handler = logging.FileHandler(filename="bpython.log")
logging.getLogger("curtsies").addHandler(handler)
logging.getLogger("curtsies").propagate = False
logging.getLogger("bpython").addHandler(handler)
logging.getLogger("bpython").propagate = False
interp = None
paste = None
if exec_args:
if not options:
raise ValueError("don't pass in exec_args without options")
exit_value = ()
if options.paste:
paste = curtsies.events.PasteEvent()
encoding = inspection.get_encoding_file(exec_args[0])
with open(exec_args[0], encoding=encoding) as f:
sourcecode = f.read()
paste.events.extend(sourcecode)
else:
try:
interp = Interp(locals=locals_)
bpargs.exec_code(interp, exec_args)
except SystemExit as e:
exit_value = e.args
if not options.interactive:
return extract_exit_value(exit_value)
else:
# expected for interactive sessions (vanilla python does it)
sys.path.insert(0, "")
if not options.quiet:
print(bpargs.version_banner())
if banner is not None:
print(banner)
repl = FullCurtsiesRepl(config, locals_, welcome_message, interp)
try:
with repl.input_generator:
with repl.window as win:
with repl:
repl.height, repl.width = win.t.height, win.t.width
exit_value = repl.mainloop(True, paste)
except (SystemExitFromCodeRunner, SystemExit) as e:
exit_value = e.args
return extract_exit_value(exit_value)
def _combined_events(event_provider, paste_threshold):
"""Combines consecutive keypress events into paste events."""
timeout = yield "nonsense_event" # so send can be used immediately
queue = collections.deque()
while True:
e = event_provider.send(timeout)
if isinstance(e, curtsies.events.Event):
timeout = yield e
continue
elif e is None:
timeout = yield None
continue
else:
queue.append(e)
e = event_provider.send(0)
while not (e is None or isinstance(e, curtsies.events.Event)):
queue.append(e)
e = event_provider.send(0)
if len(queue) >= paste_threshold:
paste = curtsies.events.PasteEvent()
paste.events.extend(queue)
queue.clear()
timeout = yield paste
else:
while len(queue):
timeout = yield queue.popleft()
def combined_events(event_provider, paste_threshold=3):
g = _combined_events(event_provider, paste_threshold)
next(g)
return g
if __name__ == "__main__":
sys.exit(main())