X Tutup
""" Rope support in pymode. """ from __future__ import absolute_import, print_function import multiprocessing import os.path import re import site import sys from rope.base import project, libutils, exceptions, change, worder # noqa from rope.base.fscommands import FileSystemCommands # noqa from rope.base.taskhandle import TaskHandle # noqa from rope.contrib import autoimport as rope_autoimport, codeassist, findit, generate # noqa from rope.refactor import ModuleToPackage, ImportOrganizer, rename, extract, inline, usefunction, move, change_signature, importutils # noqa from .environment import env def look_ropeproject(path): """ Search for ropeproject in current and parent dirs. :return str|None: A finded path """ env.debug('Look project', path) p = os.path.abspath(path) while True: if '.ropeproject' in os.listdir(p): return p new_p = os.path.abspath(os.path.join(p, "..")) if new_p == p: return path p = new_p @env.catch_exceptions def completions(): """ Search completions. :return None: """ row, col = env.cursor if env.var('a:findstart', True): count = 0 for char in reversed(env.current.line[:col]): if not re.match(r'[\w\d]', char): break count += 1 env.debug('Complete find start', (col - count)) return env.stop(col - count) base = env.var('a:base') source, offset = env.get_offset_params((row, col), base) proposals = get_proporsals(source, offset, base) return env.stop(proposals) FROM_RE = re.compile(r'^\s*from\s+[\.\w\d_]+$') @env.catch_exceptions def complete(dot=False): """ Ctrl+Space completion. :return bool: success """ row, col = env.cursor source, offset = env.get_offset_params() cline = env.current.line[:col] env.debug('dot completion', cline) if FROM_RE.match(cline) or cline.endswith('..') or cline.endswith('\.'): return env.stop("") proposals = get_proporsals(source, offset, dot=dot) if not proposals: return False prefix = proposals[0]['word'] # Find common part for p in proposals: common = len([ c1 for c1, c2 in zip(prefix, p['word']) if c1 == c2 and c1 != ' ' ]) prefix = prefix[:common] s_offset = codeassist.starting_offset(source, offset) p_prefix = prefix[offset - s_offset:] line = env.lines[row - 1] cline = line[:col] + p_prefix + line[col:] if cline != line: env.curbuf[row - 1] = env.prepare_value(cline, dumps=False) env.current.window.cursor = (row, col + len(p_prefix)) env.run('complete', col - len(prefix) + len(p_prefix) + 1, proposals) return True def get_proporsals(source, offset, base='', dot=False): """ Code assist. :return str: """ with RopeContext() as ctx: try: proposals = codeassist.code_assist( ctx.project, source, offset, ctx.resource, maxfixes=3, later_locals=False) except exceptions.ModuleSyntaxError: proposals = [] proposals = sorted(proposals, key=_sort_proporsals) out = [] preview = 'preview' in ctx.options.get('completeopt') for p in proposals: out.append(dict( word=p.name, menu=p.type, kind=p.scope + ':', info=p.get_doc() or "No docs." if preview else "", )) out = _get_autoimport_proposals(out, ctx, source, offset, dot=dot) return out @env.catch_exceptions def goto(): """ Goto definition. """ with RopeContext() as ctx: source, offset = env.get_offset_params() found_resource, line = codeassist.get_definition_location( ctx.project, source, offset, ctx.resource, maxfixes=3) if not found_resource: env.error('Definition not found') return env.goto_file( found_resource.real_path, cmd=ctx.options.get('goto_definition_cmd')) env.goto_line(line) @env.catch_exceptions def show_doc(): """ Show documentation. """ with RopeContext() as ctx: source, offset = env.get_offset_params() try: doc = codeassist.get_doc( ctx.project, source, offset, ctx.resource, maxfixes=3) if not doc: raise exceptions.BadIdentifierError env.let('l:output', doc.split('\n')) except exceptions.BadIdentifierError: env.error("No documentation found.") def find_it(): """ Find occurrences. """ with RopeContext() as ctx: _, offset = env.get_offset_params() try: occurrences = findit.find_occurrences( ctx.project, ctx.resource, offset) except exceptions.BadIdentifierError: occurrences = [] lst = [] for oc in occurrences: lst.append(dict( filename=oc.resource.path, text=env.lines[oc.lineno - 1] if oc.resource.real_path == env.curbuf.name else "", # noqa lnum=oc.lineno, )) env.let('loclist._loclist', lst) def update_python_path(paths): """ Update sys.path and make sure the new items come first. """ old_sys_path_items = list(sys.path) for path in paths: # see if it is a site dir if path.find('site-packages') != -1: site.addsitedir(path) else: sys.path.insert(0, path) # Reorder sys.path so new directories at the front. new_sys_path_items = set(sys.path) - set(old_sys_path_items) sys.path = list(new_sys_path_items) + old_sys_path_items def organize_imports(): """ Organize imports in current file. """ with RopeContext() as ctx: organizer = ImportOrganizer(ctx.project) changes = organizer.organize_imports(ctx.resource) if changes is not None: progress = ProgressHandler('Organize imports') ctx.project.do(changes, task_handle=progress.handle) reload_changes(changes) @env.catch_exceptions def regenerate(): """ Clear cache. """ with RopeContext() as ctx: ctx.project.pycore._invalidate_resource_cache(ctx.resource) # noqa ctx.importer.generate_cache(resources=[ctx.resource]) ctx.project.sync() def new(): """ Create a new project. """ root = env.var('input("Enter project root: ", getcwd())') prj = project.Project(projectroot=root) prj.close() env.message("Project is opened: %s" % root) def undo(): """ Undo last changes. :return bool: """ with RopeContext() as ctx: changes = ctx.project.history.tobe_undone if changes is None: env.error('Nothing to undo!') return False if env.user_confirm('Undo [%s]?' % str(changes)): progress = ProgressHandler('Undo %s' % str(changes)) for c in ctx.project.history.undo(task_handle=progress.handle): reload_changes(c) def redo(): """ Redo last changes. :return bool: """ with RopeContext() as ctx: changes = ctx.project.history.tobe_redone if changes is None: env.error('Nothing to redo!') return False if env.user_confirm('Redo [%s]?' % str(changes)): progress = ProgressHandler('Redo %s' % str(changes)) for c in ctx.project.history.redo(task_handle=progress.handle): reload_changes(c) def cache_project(cls): """ Cache projects. :return func: """ projects = dict() resources = dict() def get_ctx(*args, **kwargs): path = env.curbuf.name if resources.get(path): return resources.get(path) project_path = env.curdir env.debug('Look ctx', project_path) if env.var('g:pymode_rope_lookup_project', True): project_path = look_ropeproject(project_path) ctx = projects.get(project_path) if not ctx: projects[project_path] = ctx = cls(path, project_path) resources[path] = ctx return ctx return get_ctx def autoimport(): """ Autoimport modules. :return bool: """ word = env.var('a:word') if not word: env.error("Should be word under cursor.") return False with RopeContext() as ctx: if not ctx.importer.names: ctx.generate_autoimport_cache() modules = ctx.importer.get_modules(word) if not modules: env.message('Global name %s not found.' % word) return False if len(modules) == 1: _insert_import(word, modules[0], ctx) else: module = env.user_input_choices('Which module to import:', *modules) _insert_import(word, module, ctx) return True @cache_project class RopeContext(object): """ A context manager to have a rope project context. """ def __init__(self, path, project_path): self.path = path self.project = project.Project( project_path, fscommands=FileSystemCommands()) self.importer = rope_autoimport.AutoImport( project=self.project, observe=False) update_python_path(self.project.prefs.get('python_path', [])) self.resource = None self.current = None self.options = dict( completeopt=env.var('&completeopt'), autoimport=env.var('g:pymode_rope_autoimport', True), autoimport_modules=env.var('g:pymode_rope_autoimport_modules'), goto_definition_cmd=env.var('g:pymode_rope_goto_definition_cmd'), ) if os.path.exists("%s/__init__.py" % project_path): sys.path.append(project_path) if self.options.get('autoimport') == '1': self.generate_autoimport_cache() env.debug('Context init', project_path) env.message('Init Rope project: %s' % project_path) def __enter__(self): env.let('g:pymode_rope_current', self.project.root.real_path) self.project.validate(self.project.root) self.resource = libutils.path_to_resource( self.project, env.curbuf.name, 'file') if not self.resource.exists() or os.path.isdir( self.resource.real_path): self.resource = None else: env.debug('Found resource', self.resource.path) return self def __exit__(self, t, value, traceback): if t is None: self.project.close() def generate_autoimport_cache(self): """ Update autoimport cache. """ env.message('Regenerate autoimport cache.') modules = self.options.get('autoimport_modules', []) def _update_cache(importer, modules=None): importer.generate_cache() if modules: importer.generate_modules_cache(modules) importer.project.sync() process = multiprocessing.Process(target=_update_cache, args=( self.importer, modules)) process.start() class ProgressHandler(object): """ Handle task progress. """ def __init__(self, msg): self.handle = TaskHandle(name="refactoring_handle") self.handle.add_observer(self) self.message = msg def __call__(self): """ Show current progress. """ percent_done = self.handle.current_jobset().get_percent_done() env.message('%s - done %s%%' % (self.message, percent_done)) _scope_weight = { 'local': 10, 'attribute': 20, 'global': 30, 'imported': 40, 'builtin': 50} def _sort_proporsals(p): return ( _scope_weight.get(p.scope, 100), int(p.name.startswith('_')), p.name) class Refactoring(object): # noqa """ Base class for refactor operations. """ def run(self): """ Run refactoring. :return bool: """ with RopeContext() as ctx: if not ctx.resource: env.error("You should save the file before refactoring.") return None try: env.message(self.__doc__) refactor = self.get_refactor(ctx) input_str = self.get_input_str(refactor, ctx) if not input_str: return False changes = self.get_changes(refactor, input_str) action = env.user_input_choices( 'Choose what to do:', 'perform', 'preview') if not action: return False if action == 'preview': print("\n ") print("-------------------------------") print("\n%s\n" % changes.get_description()) print("-------------------------------\n\n") if not env.user_confirm('Do the changes?'): return False progress = ProgressHandler('Apply changes ...') ctx.project.do(changes, task_handle=progress.handle) reload_changes(changes) except exceptions.RefactoringError as e: env.error(str(e)) except Exception as e: env.error('Unhandled exception in Pymode: %s' % e) @staticmethod def get_refactor(ctx): """ Get refactor object. """ raise NotImplementedError @staticmethod def get_input_str(refactor, ctx): """ Get user input. Skip by default. :return bool: True """ return True @staticmethod def get_changes(refactor, input_str): """ Get changes. :return Changes: """ progress = ProgressHandler('Calculate changes ...') return refactor.get_changes( input_str, task_handle=progress.handle) class RenameRefactoring(Refactoring): """ Rename var/function/method/class. """ def __init__(self, module=False): self.module = module super(RenameRefactoring, self).__init__() def get_refactor(self, ctx): """ Function description. :return Rename: """ offset = None if not self.module: _, offset = env.get_offset_params() env.debug('Prepare rename', offset) return rename.Rename(ctx.project, ctx.resource, offset) def get_input_str(self, refactor, ctx): """ Return user input. """ oldname = str(refactor.get_old_name()) msg = 'Renaming method/variable. New name:' if self.module: msg = 'Renaming module. New name:' newname = env.user_input(msg, oldname) if newname == oldname: env.message("Nothing to do.") return False return newname class ExtractMethodRefactoring(Refactoring): """ Extract method. """ @staticmethod def get_input_str(refactor, ctx): """ Return user input. """ return env.user_input('New method name:') @staticmethod def get_refactor(ctx): """ Function description. :return Rename: """ cursor1, cursor2 = env.curbuf.mark('<'), env.curbuf.mark('>') _, offset1 = env.get_offset_params(cursor1) _, offset2 = env.get_offset_params(cursor2) return extract.ExtractMethod( ctx.project, ctx.resource, offset1, offset2) @staticmethod def get_changes(refactor, input_str): """ Get changes. :return Changes: """ return refactor.get_changes(input_str) class ExtractVariableRefactoring(Refactoring): """ Extract variable. """ @staticmethod def get_input_str(refactor, ctx): """ Return user input. """ return env.user_input('New variable name:') @staticmethod def get_refactor(ctx): """ Function description. :return Rename: """ cursor1, cursor2 = env.curbuf.mark('<'), env.curbuf.mark('>') _, offset1 = env.get_offset_params(cursor1) _, offset2 = env.get_offset_params(cursor2) return extract.ExtractVariable( ctx.project, ctx.resource, offset1, offset2) @staticmethod def get_changes(refactor, input_str): """ Get changes. :return Changes: """ return refactor.get_changes(input_str) class InlineRefactoring(Refactoring): """ Inline variable/method. """ @staticmethod def get_refactor(ctx): """ Function description. :return Rename: """ _, offset = env.get_offset_params() return inline.create_inline(ctx.project, ctx.resource, offset) @staticmethod def get_changes(refactor, input_str): """ Get changes. :return Changes: """ progress = ProgressHandler('Calculate changes ...') return refactor.get_changes(task_handle=progress.handle) class UseFunctionRefactoring(Refactoring): """ Use selected function as possible. """ @staticmethod def get_refactor(ctx): """ Function description. :return Rename: """ _, offset = env.get_offset_params() return usefunction.UseFunction(ctx.project, ctx.resource, offset) @staticmethod def get_changes(refactor, input_str): """ Get changes. :return Changes: """ progress = ProgressHandler('Calculate changes ...') return refactor.get_changes( resources=[refactor.resource], task_handle=progress.handle) class ModuleToPackageRefactoring(Refactoring): """ Convert module to package. """ @staticmethod def get_refactor(ctx): """ Function description. :return Rename: """ return ModuleToPackage(ctx.project, ctx.resource) @staticmethod def get_changes(refactor, input_str): """ Get changes. :return Changes: """ return refactor.get_changes() class MoveRefactoring(Refactoring): """ Move method/module to other class/global. """ @staticmethod def get_input_str(refactor, ctx): """ Get destination. :return str: """ return env.user_input('Enter destination:') @staticmethod def get_refactor(ctx): """ Function description. :return Rename: """ _, offset = env.get_offset_params() if offset == 0: offset = None return move.create_move(ctx.project, ctx.resource, offset) class ChangeSignatureRefactoring(Refactoring): """ Change function signature (add/remove/sort arguments). """ @staticmethod def get_input_str(refactor, ctx): """ Get destination. :return str: """ args = refactor.get_args() default = ', '.join(a[0] for a in args) return env.user_input('Change the signature:', default) @staticmethod def get_refactor(ctx): """ Function description. :return Rename: """ _, offset = env.get_offset_params() return change_signature.ChangeSignature( ctx.project, ctx.resource, offset) def get_changes(self, refactor, input_string): """ Function description. :return Rope.changes: """ args = re.sub(r'[\s\(\)]+', '', input_string).split(',') olds = [arg[0] for arg in refactor.get_args()] changers = [] for arg in [a for a in olds if not a in args]: changers.append(change_signature.ArgumentRemover(olds.index(arg))) olds.remove(arg) order = [] for index, arg in enumerate(args): if arg not in olds: changers.append(change_signature.ArgumentAdder(index, arg)) olds.insert(index, arg) order.append(olds.index(arg)) changers.append(change_signature.ArgumentReorderer( order, autodef='None')) return refactor.get_changes(changers) class GenerateElementRefactoring(Refactoring): """ Class description. """ def __init__(self, kind, *args, **kwargs): """ Function description. """ self.kind = kind super(GenerateElementRefactoring, self).__init__(*args, **kwargs) def get_refactor(self, ctx): """ Function description. :return Rename: """ _, offset = env.get_offset_params() return generate.create_generate( self.kind, ctx.project, ctx.resource, offset) def get_changes(self, refactor, input_str): """ Function description. :return Rope.changes: """ return refactor.get_changes() @env.catch_exceptions def reload_changes(changes): """ Reload changed buffers. """ resources = changes.get_changed_resources() moved = _get_moved_resources(changes) # noqa current = env.curbuf.number for f in resources: bufnr = env.var('bufnr("%s")' % f.real_path) env.goto_buffer(bufnr) path = env.curbuf.name if f in moved: path = moved[f].real_path env.debug('Reload', f.real_path, path, bufnr) env.goto_file(path, 'e!', force=True) env.message("%s has been changed." % f.real_path, history=True) env.goto_buffer(current) def _get_moved_resources(changes): moved = dict() if isinstance(changes, change.ChangeSet): for c in changes.changes: moved.update(_get_moved_resources(c)) if isinstance(changes, change.MoveResource): moved[changes.resource] = changes.new_resource return moved def _get_autoimport_proposals(out, ctx, source, offset, dot=False): if not ctx.options.get('autoimport') or dot: return out if '.' in codeassist.starting_expression(source, offset): return out current_offset = offset - 1 while current_offset > 0 and ( source[current_offset].isalnum() or source[current_offset] == '_'): current_offset -= 1 starting = source[current_offset:offset] starting = starting.strip() if not starting: return out for assist in ctx.importer.import_assist(starting): out.append(dict( abbr=' : '.join(assist), word=assist[0], kind='autoimport:', )) return out @env.catch_exceptions def complete_check(): """ Function description. :return bool: """ row, column = env.cursor line = env.lines[row - 1] word_finder = worder.Worder(line, True) parent, name, _ = word_finder.get_splitted_primary_before(column - 1) if parent: return False with RopeContext() as ctx: modules = ctx.importer.get_modules(name) if not modules: return False if name in ctx.project.pycore.resource_to_pyobject(ctx.resource): return False if not env.user_confirm("Import %s?" % name, True): return False if len(modules) == 1: _insert_import(name, modules[0], ctx) else: module = env.user_input_choices('With module to import:', *modules) if module: _insert_import(name, module, ctx) def _insert_import(name, module, ctx): if not ctx.resource: source, _ = env.get_offset_params() lineno = ctx.importer.find_insertion_line(source) line = 'from %s import %s' % (module, name) env.curbuf[lineno - 1:lineno - 1] = [ env.prepare_value(line, dumps=False)] return True pyobject = ctx.project.pycore.resource_to_pyobject(ctx.resource) import_tools = importutils.ImportTools(ctx.project.pycore) module_imports = import_tools.module_imports(pyobject) new_import = importutils.FromImport(module, 0, [[name, None]]) module_imports.add_import(new_import) changes = change.ChangeContents( ctx.resource, module_imports.get_changed_source()) action = env.user_input_choices( 'Choose what to do:', 'perform', 'preview') if not action: return False if action == 'preview': print("\n ") print("-------------------------------") print("\n%s\n" % changes.get_description()) print("-------------------------------\n\n") if not env.user_confirm('Do the changes?'): return False progress = ProgressHandler('Apply changes ...') ctx.project.do(changes, task_handle=progress.handle) reload_changes(changes)
X Tutup