# The MIT License
#
# Copyright (c) 2009 the bpython authors.
# Copyright (c) 2012-2021 Sebastian Ramacher
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import os
from pathlib import Path
import stat
from itertools import islice, chain
from typing import TextIO
from collections.abc import Iterable
from .translations import _
from .filelock import FileLock
class History:
"""Stores readline-style history and current place in it"""
def __init__(
self,
entries: Iterable[str] | None = None,
duplicates: bool = True,
hist_size: int = 100,
) -> None:
if entries is None:
self.entries = [""]
else:
self.entries = list(entries)
# how many lines back in history is currently selected where 0 is the
# saved typed line, 1 the prev entered line
self.index = 0
# what was on the prompt before using history
self.saved_line = ""
self.duplicates = duplicates
self.hist_size = hist_size
def append(self, line: str) -> None:
self.append_to(self.entries, line)
def append_to(self, entries: list[str], line: str) -> None:
line = line.rstrip("\n")
if line:
if not self.duplicates:
# remove duplicates
try:
while True:
entries.remove(line)
except ValueError:
pass
entries.append(line)
def first(self) -> str:
"""Move back to the beginning of the history."""
if not self.is_at_end:
self.index = len(self.entries)
return self.entries[-self.index]
def back(
self,
start: bool = True,
search: bool = False,
target: str | None = None,
include_current: bool = False,
) -> str:
"""Move one step back in the history."""
if target is None:
target = self.saved_line
if not self.is_at_end:
if search:
self.index += self.find_partial_match_backward(
target, include_current
)
elif start:
self.index += self.find_match_backward(target, include_current)
else:
self.index += 1
return self.entry
@property
def entry(self) -> str:
"""The current entry, which may be the saved line"""
return self.entries[-self.index] if self.index else self.saved_line
@property
def entries_by_index(self) -> list[str]:
return list(chain((self.saved_line,), reversed(self.entries)))
def find_match_backward(
self, search_term: str, include_current: bool = False
) -> int:
add = 0 if include_current else 1
start = self.index + add
for idx, val in enumerate(islice(self.entries_by_index, start, None)):
if val.startswith(search_term):
return idx + add
return 0
def find_partial_match_backward(
self, search_term: str, include_current: bool = False
) -> int:
add = 0 if include_current else 1
start = self.index + add
for idx, val in enumerate(islice(self.entries_by_index, start, None)):
if search_term in val:
return idx + add
return 0
def forward(
self,
start: bool = True,
search: bool = False,
target: str | None = None,
include_current: bool = False,
) -> str:
"""Move one step forward in the history."""
if target is None:
target = self.saved_line
if self.index > 1:
if search:
self.index -= self.find_partial_match_forward(
target, include_current
)
elif start:
self.index -= self.find_match_forward(target, include_current)
else:
self.index -= 1
return self.entry
else:
self.index = 0
return self.saved_line
def find_match_forward(
self, search_term: str, include_current: bool = False
) -> int:
add = 0 if include_current else 1
end = max(0, self.index - (1 - add))
for idx in range(end):
val = self.entries_by_index[end - 1 - idx]
if val.startswith(search_term):
return idx + (0 if include_current else 1)
return self.index
def find_partial_match_forward(
self, search_term: str, include_current: bool = False
) -> int:
add = 0 if include_current else 1
end = max(0, self.index - (1 - add))
for idx in range(end):
val = self.entries_by_index[end - 1 - idx]
if search_term in val:
return idx + add
return self.index
def last(self) -> str:
"""Move forward to the end of the history."""
if not self.is_at_start:
self.index = 0
return self.entries[0]
@property
def is_at_end(self) -> bool:
return self.index >= len(self.entries) or self.index == -1
@property
def is_at_start(self) -> bool:
return self.index == 0
def enter(self, line: str) -> None:
if self.index == 0:
self.saved_line = line
def reset(self) -> None:
self.index = 0
self.saved_line = ""
def load(self, filename: Path, encoding: str) -> None:
with open(filename, encoding=encoding, errors="ignore") as hfile:
with FileLock(hfile, filename=str(filename)):
self.entries = self.load_from(hfile)
def load_from(self, fd: TextIO) -> list[str]:
entries: list[str] = []
for line in fd:
self.append_to(entries, line)
return entries if len(entries) else [""]
def save(self, filename: Path, encoding: str, lines: int = 0) -> None:
fd = os.open(
filename,
os.O_WRONLY | os.O_CREAT | os.O_TRUNC,
stat.S_IRUSR | stat.S_IWUSR,
)
with open(fd, "w", encoding=encoding, errors="ignore") as hfile:
with FileLock(hfile, filename=str(filename)):
self.save_to(hfile, self.entries, lines)
def save_to(
self, fd: TextIO, entries: list[str] | None = None, lines: int = 0
) -> None:
if entries is None:
entries = self.entries
for line in entries[-lines:]:
fd.write(line)
fd.write("\n")
def append_reload_and_write(
self, s: str, filename: Path, encoding: str
) -> None:
if not self.hist_size:
return self.append(s)
try:
fd = os.open(
filename,
os.O_APPEND | os.O_RDWR | os.O_CREAT,
stat.S_IRUSR | stat.S_IWUSR,
)
with open(fd, "a+", encoding=encoding, errors="ignore") as hfile:
with FileLock(hfile, filename=str(filename)):
# read entries
hfile.seek(0, os.SEEK_SET)
entries = self.load_from(hfile)
self.append_to(entries, s)
# write new entries
hfile.seek(0, os.SEEK_SET)
hfile.truncate()
self.save_to(hfile, entries, self.hist_size)
self.entries = entries
except OSError as err:
raise RuntimeError(
_("Error occurred while writing to file %s (%s)")
% (filename, err.strerror)
)
else:
if len(self.entries) == 0:
# Make sure that entries contains at least one element. If the
# file and s are empty, this can occur.
self.entries = [""]