X Tutup
# To gradually migrate to mypy we aren't setting these globally yet # mypy: disallow_untyped_defs=True # mypy: disallow_untyped_calls=True import argparse import collections import logging import sys import curtsies import curtsies.events import curtsies.input import curtsies.window from . import args as bpargs, translations, inspection from .config import Config from .curtsiesfrontend import events from .curtsiesfrontend.coderunner import SystemExitFromCodeRunner from .curtsiesfrontend.interpreter import Interp from .curtsiesfrontend.repl import BaseRepl from .repl import extract_exit_value from .translations import _ from typing import ( Any, Dict, List, Optional, Protocol, Tuple, Union, ) from collections.abc import Callable, Generator, Sequence logger = logging.getLogger(__name__) class SupportsEventGeneration(Protocol): def send( self, timeout: float | None ) -> str | curtsies.events.Event | None: ... def __iter__(self) -> "SupportsEventGeneration": ... def __next__(self) -> str | curtsies.events.Event | None: ... class FullCurtsiesRepl(BaseRepl): def __init__( self, config: Config, locals_: dict[str, Any] | None = None, banner: str | None = None, interp: Interp | None = None, ) -> None: self.input_generator = curtsies.input.Input( keynames="curtsies", sigint_event=True, paste_threshold=None ) window = curtsies.window.CursorAwareWindow( sys.stdout, sys.stdin, keep_last_line=True, hide_cursor=False, extra_bytes_callback=self.input_generator.unget_bytes, ) self._request_refresh_callback: Callable[[], None] = ( self.input_generator.event_trigger(events.RefreshRequestEvent) ) self._schedule_refresh_callback = ( self.input_generator.scheduled_event_trigger( events.ScheduledRefreshRequestEvent ) ) self._request_reload_callback = ( self.input_generator.threadsafe_event_trigger(events.ReloadEvent) ) self._interrupting_refresh_callback = ( self.input_generator.threadsafe_event_trigger(lambda: None) ) self._request_undo_callback = self.input_generator.event_trigger( events.UndoEvent ) with self.input_generator: pass # temp hack to get .original_stty super().__init__( config, window, locals_=locals_, banner=banner, interp=interp, orig_tcattrs=self.input_generator.original_stty, ) def _request_refresh(self) -> None: return self._request_refresh_callback() def _schedule_refresh(self, when: float) -> None: return self._schedule_refresh_callback(when) def _request_reload(self, files_modified: Sequence[str]) -> None: return self._request_reload_callback(files_modified=files_modified) def interrupting_refresh(self) -> None: return self._interrupting_refresh_callback() def request_undo(self, n: int = 1) -> None: return self._request_undo_callback(n=n) def get_term_hw(self) -> tuple[int, int]: return self.window.get_term_hw() def get_cursor_vertical_diff(self) -> int: return self.window.get_cursor_vertical_diff() def get_top_usable_line(self) -> int: return self.window.top_usable_row def on_suspend(self) -> None: self.window.__exit__(None, None, None) self.input_generator.__exit__(None, None, None) def after_suspend(self) -> None: self.input_generator.__enter__() self.window.__enter__() self.interrupting_refresh() def process_event_and_paint( self, e: str | curtsies.events.Event | None ) -> None: """If None is passed in, just paint the screen""" try: if e is not None: self.process_event(e) except (SystemExitFromCodeRunner, SystemExit) as err: array, cursor_pos = self.paint( about_to_exit=True, user_quit=isinstance(err, SystemExitFromCodeRunner), ) scrolled = self.window.render_to_terminal(array, cursor_pos) self.scroll_offset += scrolled raise else: array, cursor_pos = self.paint() scrolled = self.window.render_to_terminal(array, cursor_pos) self.scroll_offset += scrolled def mainloop( self, interactive: bool = True, paste: curtsies.events.PasteEvent | None = None, ) -> None: if interactive: # Add custom help command # TODO: add methods to run the code self.initialize_interp() # run startup file self.process_event(events.RunStartupFileEvent()) # handle paste if paste: self.process_event(paste) # do a display before waiting for first event self.process_event_and_paint(None) inputs = combined_events(self.input_generator) while self.module_gatherer.find_coroutine(): e = inputs.send(0) if e is not None: self.process_event_and_paint(e) for e in inputs: self.process_event_and_paint(e) def main( args: list[str] | None = None, locals_: dict[str, Any] | None = None, banner: str | None = None, welcome_message: str | None = None, ) -> Any: """ banner is displayed directly after the version information. welcome_message is passed on to Repl and displayed in the statusbar. """ translations.init() def curtsies_arguments(parser: argparse._ArgumentGroup) -> None: parser.add_argument( "--paste", "-p", action="store_true", help=_("start by pasting lines of a file into session"), ) config, options, exec_args = bpargs.parse( args, ( _("curtsies arguments"), _("Additional arguments specific to the curtsies-based REPL."), curtsies_arguments, ), ) interp = None paste = None exit_value: tuple[Any, ...] = () if exec_args: if not options: raise ValueError("don't pass in exec_args without options") if options.paste: paste = curtsies.events.PasteEvent() encoding = inspection.get_encoding_file(exec_args[0]) with open(exec_args[0], encoding=encoding) as f: sourcecode = f.read() paste.events.extend(sourcecode) else: try: interp = Interp(locals=locals_) bpargs.exec_code(interp, exec_args) except SystemExit as e: exit_value = e.args if not options.interactive: return extract_exit_value(exit_value) else: # expected for interactive sessions (vanilla python does it) sys.path.insert(0, "") if not options.quiet: print(bpargs.version_banner()) if banner is not None: print(banner) if welcome_message is None and not options.quiet and config.help_key: welcome_message = ( _("Welcome to bpython!") + " " + _("Press <%s> for help.") % config.help_key ) repl = FullCurtsiesRepl(config, locals_, welcome_message, interp) try: with repl.input_generator: with repl.window as win: with repl: repl.height, repl.width = win.t.height, win.t.width repl.mainloop(True, paste) except (SystemExitFromCodeRunner, SystemExit) as e: exit_value = e.args return extract_exit_value(exit_value) def _combined_events( event_provider: SupportsEventGeneration, paste_threshold: int ) -> Generator[str | curtsies.events.Event | None, float | None, None]: """Combines consecutive keypress events into paste events.""" timeout = yield "nonsense_event" # so send can be used immediately queue: collections.deque = collections.deque() while True: e = event_provider.send(timeout) if isinstance(e, curtsies.events.Event): timeout = yield e continue elif e is None: timeout = yield None continue else: queue.append(e) e = event_provider.send(0) while not (e is None or isinstance(e, curtsies.events.Event)): queue.append(e) e = event_provider.send(0) if len(queue) >= paste_threshold: paste = curtsies.events.PasteEvent() paste.events.extend(queue) queue.clear() timeout = yield paste else: while len(queue): timeout = yield queue.popleft() def combined_events( event_provider: SupportsEventGeneration, paste_threshold: int = 3 ) -> SupportsEventGeneration: g = _combined_events(event_provider, paste_threshold) next(g) return g if __name__ == "__main__": sys.exit(main())
X Tutup