X Tutup
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions Lib/test/test_importlib/source/test_source_encoding.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,12 @@ def test_default_encoding(self):
self.run_test(self.source_line.encode('utf-8'))

# [encoding first line]
@unittest.expectedFailure # TODO: RUSTPYTHON; UnicodeDecodeError: invalid utf-8 sequence of 1 bytes from index 17
def test_encoding_on_first_line(self):
encoding = 'Latin-1'
source = self.create_source(encoding)
self.run_test(source)

# [encoding second line]
@unittest.expectedFailure # TODO: RUSTPYTHON; UnicodeDecodeError: invalid utf-8 sequence of 1 bytes from index 34
def test_encoding_on_second_line(self):
source = b"#/usr/bin/python\n" + self.create_source('Latin-1')
self.run_test(source)
Expand All @@ -84,7 +82,6 @@ def test_bom_and_utf_8(self):
self.run_test(source)

# [BOM conflict]
@unittest.expectedFailure # TODO: RUSTPYTHON; UnicodeDecodeError: invalid utf-8 sequence of 1 bytes from index 20
def test_bom_conflict(self):
source = codecs.BOM_UTF8 + self.create_source('latin-1')
with self.assertRaises(SyntaxError):
Expand Down
1 change: 0 additions & 1 deletion Lib/test/test_runpy.py
Original file line number Diff line number Diff line change
Expand Up @@ -752,7 +752,6 @@ def test_main_recursion_error(self):
with infinite_recursion(25):
self.assertRaises(RecursionError, run_path, zip_name)

@unittest.expectedFailure # TODO: RUSTPYTHON; detect encoding comments in files
def test_encoding(self):
with temp_dir() as script_dir:
filename = os.path.join(script_dir, 'script.py')
Expand Down
8 changes: 1 addition & 7 deletions Lib/test/test_utf8source.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
# This file is marked as binary in the CVS, to prevent MacCVS from recoding it.

import unittest

class PEP3120Test(unittest.TestCase):
Expand All @@ -14,11 +12,9 @@ def test_pep3120(self):
b'\\\xd0\x9f'
)

# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_badsyntax(self):
try:
import test.badsyntax_pep3120
import test.tokenizedata.badsyntax_pep3120 # noqa: F401
except SyntaxError as msg:
msg = str(msg).lower()
self.assertTrue('utf-8' in msg)
Expand All @@ -28,8 +24,6 @@ def test_badsyntax(self):

class BuiltinCompileTests(unittest.TestCase):

# TODO: RUSTPYTHON
@unittest.expectedFailure
# Issue 3574.
def test_latin1(self):
# Allow compile() to read Latin-1 source.
Expand Down
144 changes: 141 additions & 3 deletions crates/vm/src/stdlib/builtins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,145 @@ mod builtins {
_feature_version: OptionalArg<i32>,
}

/// Detect PEP 263 encoding cookie from source bytes.
/// Checks first two lines for `# coding[:=] <encoding>` pattern.
/// Returns the encoding name if found, or None for default (UTF-8).
#[cfg(feature = "parser")]
fn detect_source_encoding(source: &[u8]) -> Option<String> {
fn find_encoding_in_line(line: &[u8]) -> Option<String> {
// PEP 263: '#' must be preceded only by whitespace/formfeed
let hash_pos = line.iter().position(|&b| b == b'#')?;
if !line[..hash_pos]
.iter()
.all(|&b| b == b' ' || b == b'\t' || b == b'\x0c' || b == b'\r')
{
return None;
}
let after_hash = &line[hash_pos..];

// Find "coding" after the #
let coding_pos = after_hash.windows(6).position(|w| w == b"coding")?;
let after_coding = &after_hash[coding_pos + 6..];

// Next char must be ':' or '='
let rest = if after_coding.first() == Some(&b':') || after_coding.first() == Some(&b'=')
{
&after_coding[1..]
} else {
return None;
};

// Skip whitespace
let rest = rest
.iter()
.copied()
.skip_while(|&b| b == b' ' || b == b'\t')
.collect::<Vec<_>>();

// Read encoding name: [-\w.]+
let name: String = rest
.iter()
.take_while(|&&b| b.is_ascii_alphanumeric() || b == b'-' || b == b'_' || b == b'.')
.map(|&b| b as char)
.collect();

if name.is_empty() { None } else { Some(name) }
}

// Split into lines (first two only)
let mut lines = source.splitn(3, |&b| b == b'\n');

if let Some(first) = lines.next() {
// Strip BOM if present
let first = first.strip_prefix(b"\xef\xbb\xbf").unwrap_or(first);
if let Some(enc) = find_encoding_in_line(first) {
return Some(enc);
}
// Only check second line if first line is blank or a comment
let trimmed = first
.iter()
.skip_while(|&&b| b == b' ' || b == b'\t' || b == b'\x0c' || b == b'\r')
.copied()
.collect::<Vec<_>>();
if !trimmed.is_empty() && trimmed[0] != b'#' {
return None;
}
}

lines.next().and_then(find_encoding_in_line)
}

/// Decode source bytes to a string, handling PEP 263 encoding declarations
/// and BOM. Raises SyntaxError for invalid UTF-8 without an encoding
/// declaration (matching CPython behavior).
/// Check if an encoding name is a UTF-8 variant after normalization.
/// Matches: utf-8, utf_8, utf8, UTF-8, etc.
#[cfg(feature = "parser")]
fn is_utf8_encoding(name: &str) -> bool {
let normalized: String = name.chars().filter(|&c| c != '-' && c != '_').collect();
normalized.eq_ignore_ascii_case("utf8")
}

#[cfg(feature = "parser")]
fn decode_source_bytes(source: &[u8], filename: &str, vm: &VirtualMachine) -> PyResult<String> {
let has_bom = source.starts_with(b"\xef\xbb\xbf");
let encoding = detect_source_encoding(source);

let is_utf8 = encoding.as_deref().is_none_or(is_utf8_encoding);

// Validate BOM + encoding combination
if has_bom && !is_utf8 {
return Err(vm.new_exception_msg(
vm.ctx.exceptions.syntax_error.to_owned(),
format!("encoding problem for '{filename}': utf-8").into(),
));
}

if is_utf8 {
let src = if has_bom { &source[3..] } else { source };
match core::str::from_utf8(src) {
Ok(s) => Ok(s.to_owned()),
Err(e) => {
let bad_byte = src[e.valid_up_to()];
let line = src[..e.valid_up_to()]
.iter()
.filter(|&&b| b == b'\n')
.count()
+ 1;
Err(vm.new_exception_msg(
vm.ctx.exceptions.syntax_error.to_owned(),
format!(
"Non-UTF-8 code starting with '\\x{bad_byte:02x}' \
on line {line}, but no encoding declared; \
see https://peps.python.org/pep-0263/ for details \
({filename}, line {line})"
)
.into(),
))
}
}
} else {
// Use codec registry for non-UTF-8 encodings
let enc = encoding.as_deref().unwrap();
let bytes_obj = vm.ctx.new_bytes(source.to_vec());
let decoded = vm
.state
.codec_registry
.decode_text(bytes_obj.into(), enc, None, vm)
.map_err(|exc| {
if exc.fast_isinstance(vm.ctx.exceptions.lookup_error) {
vm.new_exception_msg(
vm.ctx.exceptions.syntax_error.to_owned(),
format!("unknown encoding for '{filename}': {enc}").into(),
)
} else {
exc
}
})?;
Ok(decoded.to_string_lossy().into_owned())
}
}

#[cfg(any(feature = "parser", feature = "compiler"))]
#[pyfunction]
fn compile(args: CompileArgs, vm: &VirtualMachine) -> PyResult {
Expand Down Expand Up @@ -203,9 +342,8 @@ mod builtins {
let source = ArgStrOrBytesLike::try_from_object(vm, args.source)?;
let source = source.borrow_bytes();

// TODO: compiler::compile should probably get bytes
let source = core::str::from_utf8(&source)
.map_err(|e| vm.new_unicode_decode_error(e.to_string()))?;
let source = decode_source_bytes(&source, &args.filename.to_string_lossy(), vm)?;
let source = source.as_str();

let flags = args.flags.map_or(Ok(0), |v| v.try_to_primitive(vm))?;

Expand Down
Loading
X Tutup