// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.
#pragma warning disable SYSLIB0001 // UTF-7 code paths are obsolete in .NET 5
#nullable enable
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Runtime.Versioning;
using System.Text;
using IronPython.Runtime;
using IronPython.Runtime.Exceptions;
using IronPython.Runtime.Operations;
using Microsoft.Scripting.Runtime;
using DisallowNullAttribute = System.Diagnostics.CodeAnalysis.DisallowNullAttribute;
[assembly: PythonModule("_codecs", typeof(IronPython.Modules.PythonCodecs))]
namespace IronPython.Modules {
public static class PythonCodecs {
public const string __doc__ = "Provides access to various codecs (ASCII, UTF7, UTF8, etc...)";
internal const int EncoderIndex = 0;
internal const int DecoderIndex = 1;
internal const int StreamReaderIndex = 2;
internal const int StreamWriterIndex = 3;
public static PythonTuple lookup(CodeContext/*!*/ context, [NotNone] string encoding)
=> PythonOps.LookupEncoding(context, encoding);
[LightThrowing]
public static object lookup_error(CodeContext/*!*/ context, [NotNone] string name)
=> PythonOps.LookupEncodingError(context, name);
public static void register(CodeContext/*!*/ context, object? search_function)
=> PythonOps.RegisterEncoding(context, search_function);
public static void register_error(CodeContext/*!*/ context, [NotNone] string name, object? handler)
=> PythonOps.RegisterEncodingError(context, name, handler);
#region ASCII Encoding
public static PythonTuple ascii_decode(CodeContext context, [NotNone] IBufferProtocol input, string? errors = null) {
using var buffer = input.GetBuffer();
return DoDecode(context, "ascii", Encoding.ASCII, buffer, errors).ToPythonTuple();
}
public static PythonTuple ascii_encode(CodeContext context, [NotNone] string input, string? errors = null)
=> DoEncode(context, "ascii", Encoding.ASCII, input, errors).ToPythonTuple();
#endregion
#region Charmap Encoding
///
/// Creates an optimized encoding mapping that can be consumed by an optimized version of charmap_encode/charmap_decode.
///
public static EncodingMap charmap_build([NotNone] string decoding_table) {
if (decoding_table.Length == 0) {
throw PythonOps.TypeError("charmap_build expected non-empty string");
}
return new EncodingMap(decoding_table, compileForDecoding: false, compileForEncoding: true);
}
///
/// Encodes the input string with the specified optimized encoding map.
///
public static PythonTuple charmap_encode(CodeContext context, [NotNone] string input, string? errors, [NotNone] EncodingMap map) {
return DoEncode(context, "charmap", new EncodingMapEncoding(map), input, errors).ToPythonTuple();
}
public static PythonTuple charmap_encode(CodeContext context, [NotNone] string input, string? errors = null, IDictionary? map = null) {
if (map != null) {
return DoEncode(context, "charmap", new CharmapEncoding(map), input, errors).ToPythonTuple();
} else {
return latin_1_encode(context, input, errors);
}
}
///
/// Decodes the input string using the provided string mapping.
///
public static PythonTuple charmap_decode(CodeContext context, [NotNone] IBufferProtocol input, string? errors, [NotNone] string map) {
EncodingMap em = new EncodingMap(map, compileForDecoding: true, compileForEncoding: false);
using IPythonBuffer buffer = input.GetBuffer();
return DoDecode(context, "charmap", new EncodingMapEncoding(em), buffer, errors).ToPythonTuple();
}
public static PythonTuple charmap_decode(CodeContext context, [NotNone] IBufferProtocol input, string? errors = null, IDictionary? map = null) {
if (map != null) {
using IPythonBuffer buffer = input.GetBuffer();
return DoDecode(context, "charmap", new CharmapEncoding(map), buffer, errors).ToPythonTuple();
} else {
return latin_1_decode(context, input, errors);
}
}
#endregion
#region Generic Encoding
public static object decode(CodeContext/*!*/ context, object? obj, [NotNone, DisallowNull] string? encoding = null!, [NotNone] string errors = "strict") {
if (encoding == null) {
PythonContext lc = context.LanguageContext;
if (obj is IBufferProtocol bp) {
using IPythonBuffer buffer = bp.GetBuffer();
return StringOps.DoDecode(context, buffer, errors, lc.GetDefaultEncodingName(), lc.DefaultEncoding);
} else {
throw PythonOps.TypeError("expected bytes-like object, got {0}", PythonOps.GetPythonTypeName(obj));
}
} else {
object? decoder = lookup(context, encoding)[DecoderIndex];
if (!PythonOps.IsCallable(context, decoder)) {
throw PythonOps.TypeError("decoding with '{0}' codec failed; decoder must be callable ('{1}' object is not callable)", encoding, PythonOps.GetPythonTypeName(decoder));
}
return PythonOps.GetIndex(context, PythonCalls.Call(context, decoder, obj, errors), 0);
}
}
public static object encode(CodeContext/*!*/ context, object? obj, [NotNone, DisallowNull] string? encoding = null!, [NotNone] string errors = "strict") {
if (encoding == null) {
if (obj is string str) {
PythonContext lc = context.LanguageContext;
return StringOps.DoEncode(context, str, errors, lc.GetDefaultEncodingName(), lc.DefaultEncoding, includePreamble: true);
} else {
throw PythonOps.TypeError("expected str, got {0}", PythonOps.GetPythonTypeName(obj));
}
} else {
object? encoder = lookup(context, encoding)[EncoderIndex];
if (!PythonOps.IsCallable(context, encoder)) {
throw PythonOps.TypeError("encoding with '{0}' codec failed; encoder must be callable ('{1}' object is not callable)", encoding, PythonOps.GetPythonTypeName(encoder));
}
return PythonOps.GetIndex(context, PythonCalls.Call(context, encoder, obj, errors), 0);
}
}
#endregion
#region Escape Encoding
public static PythonTuple escape_decode(CodeContext/*!*/ context, [NotNone] string data, string? errors = null)
=> escape_decode(StringOps.DoEncodeUtf8(context, data), errors);
public static PythonTuple escape_decode([NotNone] IBufferProtocol data, string? errors = null) {
using IPythonBuffer buffer = data.GetBuffer();
var span = buffer.AsReadOnlySpan();
var res = LiteralParser.ParseBytes(span, isRaw: false, isAscii: false, normalizeLineEndings: false, getErrorHandler(errors));
return PythonTuple.MakeTuple(Bytes.Make(res.ToArray()), span.Length);
static LiteralParser.ParseBytesErrorHandler? getErrorHandler(string? errors) {
if (errors == null) return default;
Func?>? eh = null;
return delegate (in ReadOnlySpan data, int start, int end, string message) {
eh ??= errors switch
{
"strict" => idx => throw PythonOps.ValueError(@"invalid \x escape at position {0}", idx),
"replace" => idx => _replacementMarker ??= new[] { (byte)'?' },
"ignore" => idx => null,
_ => idx => throw PythonOps.ValueError("decoding error; unknown error handling code: " + errors),
};
return eh(start);
};
}
}
[DisallowNull]
private static byte[]? _replacementMarker;
public static PythonTuple/*!*/ escape_encode([NotNone] Bytes data, string? errors = null) {
using IPythonBuffer buffer = ((IBufferProtocol)data).GetBuffer();
var span = buffer.AsReadOnlySpan();
var result = new List(span.Length);
for (int i = 0; i < span.Length; i++) {
byte b = span[i];
switch (b) {
case (byte)'\n': result.Add((byte)'\\'); result.Add((byte)'n'); break;
case (byte)'\r': result.Add((byte)'\\'); result.Add((byte)'r'); break;
case (byte)'\t': result.Add((byte)'\\'); result.Add((byte)'t'); break;
case (byte)'\\': result.Add((byte)'\\'); result.Add((byte)'\\'); break;
case (byte)'\'': result.Add((byte)'\\'); result.Add((byte)'\''); break;
default:
if (b < 0x20 || b >= 0x7f) {
result.AddRange($"\\x{b:x2}".Select(c => unchecked((byte)c)));
} else {
result.Add(b);
}
break;
}
}
return PythonTuple.MakeTuple(Bytes.Make(result.ToArray()), span.Length);
}
#endregion
#region Latin-1 Functions
public static PythonTuple latin_1_decode(CodeContext context, [NotNone] IBufferProtocol input, string? errors = null) {
using IPythonBuffer buffer = input.GetBuffer();
return DoDecode(context, "latin-1", StringOps.Latin1Encoding, buffer, errors).ToPythonTuple();
}
public static PythonTuple latin_1_encode(CodeContext context, [NotNone] string input, string? errors = null)
=> DoEncode(context, "latin-1", StringOps.Latin1Encoding, input, errors).ToPythonTuple();
#endregion
#region MBCS Functions
[SupportedOSPlatform("windows"), PythonHidden(PlatformsAttribute.PlatformFamily.Unix)]
public static PythonTuple mbcs_decode(CodeContext/*!*/ context, [NotNone] IBufferProtocol input, string? errors = null, bool final = false) {
using IPythonBuffer buffer = input.GetBuffer();
return DoDecode(context, "mbcs", StringOps.CodecsInfo.MbcsEncoding, buffer, errors).ToPythonTuple();
}
[SupportedOSPlatform("windows"), PythonHidden(PlatformsAttribute.PlatformFamily.Unix)]
public static PythonTuple mbcs_encode(CodeContext/*!*/ context, [NotNone] string input, string? errors = null)
=> DoEncode(context, "mbcs", StringOps.CodecsInfo.MbcsEncoding, input, errors).ToPythonTuple();
#endregion
#region Code Page Functions
[SupportedOSPlatform("windows"), PythonHidden(PlatformsAttribute.PlatformFamily.Unix)]
public static PythonTuple code_page_decode(CodeContext context, int codepage, [NotNone] IBufferProtocol input, string? errors = null, bool final = false) {
// TODO: Use Win32 API MultiByteToWideChar https://docs.microsoft.com/en-us/windows/win32/api/stringapiset/nf-stringapiset-multibytetowidechar
string encodingName = $"cp{codepage}";
Encoding encoding = Encoding.GetEncoding(codepage);
using IPythonBuffer buffer = input.GetBuffer();
return DoDecode(context, encodingName, encoding, buffer, errors).ToPythonTuple();
}
[SupportedOSPlatform("windows"), PythonHidden(PlatformsAttribute.PlatformFamily.Unix)]
public static PythonTuple code_page_encode(CodeContext context, int codepage, [NotNone] string input, string? errors = null) {
// TODO: Use Win32 API WideCharToMultiByte https://docs.microsoft.com/en-us/windows/win32/api/stringapiset/nf-stringapiset-widechartomultibyte
string encodingName = $"cp{codepage}";
Encoding encoding = Encoding.GetEncoding(codepage);
return DoEncode(context, encodingName, encoding, input, errors, includePreamble: true).ToPythonTuple();
}
#endregion
#region Raw Unicode Escape Encoding Functions
public static PythonTuple raw_unicode_escape_decode(CodeContext/*!*/ context, [NotNone] string input, string? errors = null) {
// Encoding with UTF-8 is probably a bug or at least a mistake, as it mutilates non-ASCII characters,
// but this is what CPython does. Probably encoding with "raw-unicode-escape" would be more reasonable.
return raw_unicode_escape_decode(context, StringOps.DoEncodeUtf8(context, input), errors);
}
public static PythonTuple raw_unicode_escape_decode(CodeContext/*!*/ context, [NotNone] IBufferProtocol input, string? errors = null) {
using IPythonBuffer buffer = input.GetBuffer();
return PythonTuple.MakeTuple(
StringOps.DoDecode(context, buffer, errors, "raw-unicode-escape", StringOps.CodecsInfo.RawUnicodeEscapeEncoding),
buffer.NumBytes()
);
}
public static PythonTuple raw_unicode_escape_encode(CodeContext/*!*/ context, [NotNone] string input, string? errors = null) {
return PythonTuple.MakeTuple(
StringOps.DoEncode(context, input, errors, "raw-unicode-escape", StringOps.CodecsInfo.RawUnicodeEscapeEncoding, includePreamble: false),
input.Length
);
}
#endregion
#region Unicode Escape Encoding Functions
public static PythonTuple unicode_escape_decode(CodeContext/*!*/ context, [NotNone] string input, string? errors = null) {
// Encoding with UTF-8 is probably a bug or at least a mistake, as it mutilates non-ASCII characters,
// but this is what CPython does. Probably encoding with "unicode-escape" would be more reasonable.
return unicode_escape_decode(context, StringOps.DoEncodeUtf8(context, input), errors);
}
public static PythonTuple unicode_escape_decode(CodeContext/*!*/ context, [NotNone] IBufferProtocol input, string? errors = null) {
using IPythonBuffer buffer = input.GetBuffer();
return PythonTuple.MakeTuple(
StringOps.DoDecode(context, buffer, errors, "unicode-escape", StringOps.CodecsInfo.UnicodeEscapeEncoding),
buffer.NumBytes()
);
}
public static PythonTuple unicode_escape_encode(CodeContext/*!*/ context, [NotNone] string input, string? errors = null) {
return PythonTuple.MakeTuple(
StringOps.DoEncode(context, input, errors, "unicode-escape", StringOps.CodecsInfo.UnicodeEscapeEncoding, includePreamble: false),
input.Length
);
}
#endregion
#region Readbuffer Functions
public static PythonTuple readbuffer_encode(CodeContext/*!*/ context, [NotNone] string input, string? errors = null)
=> readbuffer_encode(StringOps.DoEncodeUtf8(context, input), errors);
public static PythonTuple readbuffer_encode([NotNone] IBufferProtocol input, string? errors = null) {
using IPythonBuffer buffer = input.GetBuffer();
var bytes = Bytes.Make(buffer.AsReadOnlySpan().ToArray());
return PythonTuple.MakeTuple(bytes, bytes.Count);
}
#endregion
#region Unicode Internal Encoding Functions
public static PythonTuple unicode_internal_decode(CodeContext context, [NotNone] string input, string? errors = null) {
PythonOps.Warn(context, PythonExceptions.DeprecationWarning, "unicode_internal codec has been deprecated");
return PythonTuple.MakeTuple(input, input.Length);
}
public static PythonTuple unicode_internal_decode(CodeContext context, [NotNone] IBufferProtocol input, string? errors = null) {
PythonOps.Warn(context, PythonExceptions.DeprecationWarning, "unicode_internal codec has been deprecated");
using IPythonBuffer buffer = input.GetBuffer();
return DoDecode(context, "unicode-internal", Encoding.Unicode, buffer, errors).ToPythonTuple();
}
public static PythonTuple unicode_internal_encode(CodeContext context, [NotNone] string input, string? errors = null) {
PythonOps.Warn(context, PythonExceptions.DeprecationWarning, "unicode_internal codec has been deprecated");
return DoEncode(context, "unicode-internal", Encoding.Unicode, input, errors, false).ToPythonTuple();
}
public static PythonTuple unicode_internal_encode(CodeContext context, [NotNone] IBufferProtocol input, string? errors = null) {
PythonOps.Warn(context, PythonExceptions.DeprecationWarning, "unicode_internal codec has been deprecated");
using IPythonBuffer buffer = input.GetBuffer();
var bytes = Bytes.Make(buffer.AsReadOnlySpan().ToArray());
return PythonTuple.MakeTuple(bytes, bytes.Count);
}
#endregion
#region Utf-16 Functions
public static PythonTuple utf_16_decode(CodeContext context, [NotNone] IBufferProtocol input, string? errors = null, bool final = false) {
PythonTuple res = utf_16_ex_decode(context, input, errors, 0, final);
return PythonTuple.MakeTuple(res[0], res[1]);
}
public static PythonTuple utf_16_encode(CodeContext context, [NotNone] string input, string? errors = null)
=> DoEncode(context, "utf-16", Utf16LeBomEncoding, input, errors, true).ToPythonTuple();
public static PythonTuple utf_16_ex_decode(CodeContext context, [NotNone] IBufferProtocol input, string? errors = null, int byteorder = 0, bool final = false) {
using IPythonBuffer buffer = input.GetBuffer();
var span = buffer.AsReadOnlySpan();
Tuple res;
if (byteorder != 0) {
res = (byteorder > 0) ?
DoDecode(context, "utf-16-be", Utf16BeEncoding, buffer, errors, NumEligibleUtf16Bytes(span, final, false))
:
DoDecode(context, "utf-16-le", Utf16LeEncoding, buffer, errors, NumEligibleUtf16Bytes(span, final, true));
} else {
byteorder = Utf16DetectByteorder(span);
res = (byteorder > 0) ?
DoDecode(context, "utf-16-be", Utf16BeBomEncoding, buffer, errors, NumEligibleUtf16Bytes(span, final, false))
:
DoDecode(context, "utf-16-le", Utf16LeBomEncoding, buffer, errors, NumEligibleUtf16Bytes(span, final, true));
}
return PythonTuple.MakeTuple(res.Item1, res.Item2, byteorder);
}
private static int Utf16DetectByteorder(ReadOnlySpan input) {
if (input.StartsWith(BOM_UTF16_LE)) return -1;
if (input.StartsWith(BOM_UTF16_BE)) return 1;
return 0;
}
private static int NumEligibleUtf16Bytes(ReadOnlySpan input, bool final, bool isLE) {
int numBytes = input.Length;
if (!final) {
numBytes -= numBytes % 2;
if (numBytes >= 2 && (input[numBytes - (isLE ? 1 : 2)] & 0xFC) == 0xD8) { // high surrogate
numBytes -= 2;
}
}
return numBytes;
}
#endregion
#region Utf-16-LE Functions
private static Encoding Utf16LeEncoding => _utf16LeEncoding ??= new UnicodeEncoding(bigEndian: false, byteOrderMark: false);
[DisallowNull] private static Encoding? _utf16LeEncoding;
private static Encoding Utf16LeBomEncoding => Encoding.Unicode; // same as new UnicodeEncoding(bigEndian: false, byteOrderMark: true);
private static byte[] BOM_UTF16_LE => _bom_utf16_le ??= Utf16LeBomEncoding.GetPreamble();
[DisallowNull] private static byte[]? _bom_utf16_le;
public static PythonTuple utf_16_le_decode(CodeContext context, [NotNone] IBufferProtocol input, string? errors = null, bool final = false) {
using IPythonBuffer buffer = input.GetBuffer();
return DoDecode(context, "utf-16-le", Utf16LeEncoding, buffer, errors, NumEligibleUtf16Bytes(buffer.AsReadOnlySpan(), final, isLE: true)).ToPythonTuple();
}
public static PythonTuple utf_16_le_encode(CodeContext context, [NotNone] string input, string? errors = null)
=> DoEncode(context, "utf-16-le", Utf16LeEncoding, input, errors).ToPythonTuple();
#endregion
#region Utf-16-BE Functions
private static Encoding Utf16BeEncoding => _utf16BeEncoding ??= new UnicodeEncoding(bigEndian: true, byteOrderMark: false);
[DisallowNull] private static Encoding? _utf16BeEncoding;
private static Encoding Utf16BeBomEncoding => Encoding.BigEndianUnicode; // same as new UnicodeEncoding(bigEndian: true, byteOrderMark: true);
private static byte[] BOM_UTF16_BE => _bom_utf16_be ??= Utf16BeBomEncoding.GetPreamble();
[DisallowNull] private static byte[]? _bom_utf16_be;
public static PythonTuple utf_16_be_decode(CodeContext context, [NotNone] IBufferProtocol input, string? errors = null, bool final = false) {
using IPythonBuffer buffer = input.GetBuffer();
return DoDecode(context, "utf-16-be", Utf16BeEncoding, buffer, errors, NumEligibleUtf16Bytes(buffer.AsReadOnlySpan(), final, isLE: false)).ToPythonTuple();
}
public static PythonTuple utf_16_be_encode(CodeContext context, [NotNone] string input, string? errors = null)
=> DoEncode(context, "utf-16-be", Utf16BeEncoding, input, errors).ToPythonTuple();
#endregion
#region Utf-7 Functions
public static PythonTuple utf_7_decode(CodeContext context, [NotNone] IBufferProtocol input, string? errors = null, bool final = false) {
using IPythonBuffer buffer = input.GetBuffer();
return DoDecode(context, "utf-7", Encoding.UTF7, buffer, errors, NumEligibleUtf7Bytes(buffer.AsReadOnlySpan(), final)).ToPythonTuple();
}
public static PythonTuple utf_7_encode(CodeContext context, [NotNone] string input, string? errors = null)
=> DoEncode(context, "utf-7", Encoding.UTF7, input, errors).ToPythonTuple();
private static int NumEligibleUtf7Bytes(ReadOnlySpan input, bool final) {
int numBytes = input.Length;
if (!final) {
int blockStart = -1;
for (int i = 0; i < numBytes; i++) {
byte b = input[i];
if (blockStart < 0 && b == '+') {
blockStart = i;
} else if (blockStart >= 0 && !b.IsLetter() && !b.IsDigit() && b != '+' && b != '/' && !b.IsWhiteSpace()) {
blockStart = -1;
}
}
if (blockStart >= 0) numBytes = blockStart;
}
return numBytes;
}
#endregion
#region Utf-8 Functions
private static Encoding Utf8Encoding => _utf8Encoding ??= new UTF8Encoding(encoderShouldEmitUTF8Identifier: false);
[DisallowNull] private static Encoding? _utf8Encoding;
public static PythonTuple utf_8_decode(CodeContext context, [NotNone] IBufferProtocol input, string? errors = null, bool final = false) {
using IPythonBuffer buffer = input.GetBuffer();
return DoDecode(context, "utf-8", Utf8Encoding, buffer, errors, NumEligibleUtf8Bytes(buffer.AsReadOnlySpan(), final)).ToPythonTuple();
}
public static PythonTuple utf_8_encode(CodeContext context, [NotNone] string input, string? errors = null)
=> DoEncode(context, "utf-8", Encoding.UTF8, input, errors).ToPythonTuple();
private static int NumEligibleUtf8Bytes(ReadOnlySpan input, bool final) {
int numBytes = input.Length;
if (!final) {
// scan for incomplete but valid sequence at the end
for (int i = 1; i < 4; i++) { // 4 is the max length of a valid sequence
int pos = numBytes - i;
if (pos < 0) break;
byte b = input[pos];
if ((b & 0b10000000) == 0) return numBytes; // ASCII
if ((b & 0b11000000) == 0b11000000) { // start byte
if ((b | 0b00011111) == 0b11011111 && i < 2) return pos; // 2-byte seq start
if ((b | 0b00001111) == 0b11101111 && i < 3) return pos; // 3-byte seq start
if ((b | 0b00000111) == 0b11110111) { // 4-byte seq start
if (b < 0b11110100) return pos; // chars up to U+FFFFF
if ((b == 0b11110100) && (i == 1 || input[numBytes - i + 1] < 0x90)) return pos; // U+100000 to U+10FFFF
}
return numBytes; // invalid sequence or valid but complete
}
// else continuation byte (0b10xxxxxx) hence continue scanning
}
}
return numBytes;
}
#endregion
#region Utf-32 Functions
public static PythonTuple utf_32_decode(CodeContext context, [NotNone] IBufferProtocol input, string? errors = null, bool final = false) {
PythonTuple res = utf_32_ex_decode(context, input, errors, 0, final);
return PythonTuple.MakeTuple(res[0], res[1]);
}
public static PythonTuple utf_32_encode(CodeContext context, [NotNone] string input, string? errors = null)
=> DoEncode(context, "utf-32", Utf32LeBomEncoding, input, errors, includePreamble: true).ToPythonTuple();
public static PythonTuple utf_32_ex_decode(CodeContext context, [NotNone] IBufferProtocol input, string? errors = null, int byteorder = 0, bool final = false) {
using IPythonBuffer buffer = input.GetBuffer();
var span = buffer.AsReadOnlySpan();
int numBytes = NumEligibleUtf32Bytes(span, final);
Tuple res;
if (byteorder != 0) {
res = (byteorder > 0) ?
DoDecode(context, "utf-32-be", Utf32BeEncoding, buffer, errors, numBytes)
:
DoDecode(context, "utf-32-le", Utf32LeEncoding, buffer, errors, numBytes);
} else {
byteorder = Utf32DetectByteorder(span);
res = (byteorder > 0) ?
DoDecode(context, "utf-32-be", Utf32BeBomEncoding, buffer, errors, numBytes)
:
DoDecode(context, "utf-32-le", Utf32LeBomEncoding, buffer, errors, numBytes);
}
return PythonTuple.MakeTuple(res.Item1, res.Item2, byteorder);
}
private static int Utf32DetectByteorder(ReadOnlySpan input) {
if (input.StartsWith(BOM_UTF32_LE)) return -1;
if (input.StartsWith(BOM_UTF32_BE)) return 1;
return 0;
}
private static int NumEligibleUtf32Bytes(ReadOnlySpan input, bool final) {
int numBytes = input.Length;
if (!final) numBytes -= numBytes % 4;
return numBytes;
}
#endregion
#region Utf-32-LE Functions
private static Encoding Utf32LeEncoding => _utf32LeEncoding ??= new UTF32Encoding(bigEndian: false, byteOrderMark: false);
[DisallowNull] private static Encoding? _utf32LeEncoding;
private static Encoding Utf32LeBomEncoding => Encoding.UTF32; // same as new UTF32Encoding(bigEndian: false, byteOrderMark: true);
private static byte[] BOM_UTF32_LE => _bom_utf32_le ??= Utf32LeBomEncoding.GetPreamble();
[DisallowNull] private static byte[]? _bom_utf32_le;
public static PythonTuple utf_32_le_decode(CodeContext context, [NotNone] IBufferProtocol input, string? errors = null, bool final = false) {
using IPythonBuffer buffer = input.GetBuffer();
return DoDecode(context, "utf-32-le", Utf32LeEncoding, buffer, errors, NumEligibleUtf32Bytes(buffer.AsReadOnlySpan(), final)).ToPythonTuple();
}
public static PythonTuple utf_32_le_encode(CodeContext context, [NotNone] string input, string? errors = null)
=> DoEncode(context, "utf-32-le", Utf32LeEncoding, input, errors).ToPythonTuple();
#endregion
#region Utf-32-BE Functions
private static Encoding Utf32BeEncoding => _utf32BeEncoding ??= new UTF32Encoding(bigEndian: true, byteOrderMark: false);
[DisallowNull] private static Encoding? _utf32BeEncoding;
private static Encoding Utf32BeBomEncoding => _utf32BeBomEncoding ??= new UTF32Encoding(bigEndian: true, byteOrderMark: true);
[DisallowNull] private static Encoding? _utf32BeBomEncoding;
private static byte[] BOM_UTF32_BE => _bom_utf32_be ??= Utf32BeBomEncoding.GetPreamble();
[DisallowNull] private static byte[]? _bom_utf32_be;
public static PythonTuple utf_32_be_decode(CodeContext context, [NotNone] IBufferProtocol input, string? errors = null, bool final = false) {
using IPythonBuffer buffer = input.GetBuffer();
return DoDecode(context, "utf-32-be", Utf32BeEncoding, buffer, errors, NumEligibleUtf32Bytes(buffer.AsReadOnlySpan(), final)).ToPythonTuple();
}
public static PythonTuple utf_32_be_encode(CodeContext context, [NotNone] string input, string? errors = null)
=> DoEncode(context, "utf-32-be", Utf32BeEncoding, input, errors).ToPythonTuple();
#endregion
#region Private implementation
private static Tuple DoDecode(CodeContext context, string encodingName, Encoding encoding, IPythonBuffer input, string? errors, int numBytes = -1) {
var decoded = StringOps.DoDecode(context, input, errors, encodingName, encoding, numBytes);
return Tuple.Create(decoded, numBytes >= 0 ? numBytes : input.NumBytes());
}
private static Tuple DoEncode(CodeContext context, string encodingName, Encoding encoding, string input, string? errors, bool includePreamble = false) {
var res = StringOps.DoEncode(context, input, errors, encodingName, encoding, includePreamble);
return Tuple.Create(res, input.Length);
}
#endregion
}
///
/// Optimized encoding mapping that can be consumed by charmap_encode/EncodingMapEncoding.
///
[PythonType, PythonHidden]
public class EncodingMap {
private readonly string _smap;
[DisallowNull] private Dictionary? _dmap;
[DisallowNull] private Dictionary? _emap;
internal EncodingMap(string stringMap, bool compileForDecoding, bool compileForEncoding) {
_smap = stringMap;
if (compileForDecoding) CompileDecodingMap();
if (compileForEncoding) CompileEncodingMap();
}
private void CompileEncodingMap() {
if (_emap == null) {
_emap = new Dictionary(Math.Min(_smap.Length, 256));
for (int i = 0, cp = 0; i < _smap.Length && cp < 256; i++, cp++) {
if (char.IsHighSurrogate(_smap[i]) && i < _smap.Length - 1 && char.IsLowSurrogate(_smap[i + 1])) {
_emap[char.ConvertToUtf32(_smap[i], _smap[i + 1])] = unchecked((byte)cp);
i++;
} else if (_smap[i] != '\uFFFE') {
_emap[_smap[i]] = unchecked((byte)cp);
}
}
}
}
private void CompileDecodingMap() {
// scan for a surrogate pair
bool spFound = false;
for (int i = 0; i < _smap.Length && !spFound; i++) {
spFound = char.IsHighSurrogate(_smap[i]) && i < _smap.Length - 1 && char.IsLowSurrogate(_smap[i + 1]);
}
if (spFound) {
_dmap = new Dictionary(Math.Min(_smap.Length, 256));
for (int i = 0, cp = 0; i < _smap.Length && cp < 256; i++, cp++) {
if (char.IsHighSurrogate(_smap[i]) && i < _smap.Length - 1 && char.IsLowSurrogate(_smap[i + 1])) {
_dmap[unchecked((byte)cp)] = char.ConvertToUtf32(_smap[i], _smap[i + 1]);
i++;
} else if (_smap[i] != '\uFFFE') {
_dmap[unchecked((byte)cp)] = _smap[i];
}
}
}
}
public bool TryGetCharValue(byte b, out int val) {
if (_dmap != null) {
return _dmap.TryGetValue(b, out val);
} else if (b < _smap.Length) {
val = _smap[b];
return val != '\uFFFE';
} else {
val = '\0';
return false;
}
}
public bool TryGetByteValue(int c, out byte val) {
CompileEncodingMap();
if (_emap != null) {
return _emap.TryGetValue(c, out val);
} else {
val = 0;
return false;
}
}
}
///
/// This implementation is not suitable for incremental encoding.
///
internal class EncodingMapEncoding : Encoding {
private readonly EncodingMap _map;
public EncodingMapEncoding(EncodingMap map) {
_map = map;
}
public override string EncodingName => "charmap";
public override int GetByteCount(char[] chars, int index, int count)
=> GetBytes(chars, index, count, null, 0);
public override int GetBytes(char[] chars, int charIndex, int charCount, byte[]? bytes, int byteIndex) {
if (chars == null) throw new ArgumentNullException(nameof(chars));
int charEnd = charIndex + charCount;
int byteStart = byteIndex;
EncoderFallbackBuffer? efb = null;
while (charIndex < charEnd) {
int codepoint;
char c = chars[charIndex];
int nextIndex = charIndex + 1;
if (char.IsHighSurrogate(c) && nextIndex < charEnd && char.IsLowSurrogate(chars[nextIndex])) {
codepoint = char.ConvertToUtf32(c, chars[nextIndex++]);
} else {
codepoint = c;
}
if (!_map.TryGetByteValue(codepoint, out byte val)) {
efb ??= EncoderFallback.CreateFallbackBuffer();
try {
if (efb.Fallback(c, charIndex)) {
while (efb.Remaining != 0) {
c = efb.GetNextChar();
int fbCodepoint = c;
if (char.IsHighSurrogate(c) && efb.Remaining != 0) {
char d = efb.GetNextChar();
if (char.IsLowSurrogate(d)) {
fbCodepoint = char.ConvertToUtf32(c, d);
} else {
efb.MovePrevious();
}
}
if (!_map.TryGetByteValue(fbCodepoint, out val)) {
throw new EncoderFallbackException(); // no recursive fallback
}
if (bytes != null) bytes[byteIndex] = val;
byteIndex++;
}
}
} catch (EncoderFallbackException) {
throw PythonOps.UnicodeEncodeError(EncodingName, new string(chars), charIndex, charIndex + 1, "character maps to ");
}
} else {
if (bytes != null) bytes[byteIndex] = val;
byteIndex++;
}
charIndex = nextIndex;
}
return byteIndex - byteStart;
}
public override int GetCharCount(byte[] bytes, int index, int count)
=> GetChars(bytes, index, count, null, 0);
public override int GetChars(byte[] bytes, int byteIndex, int byteCount, char[]? chars, int charIndex) {
if (bytes == null) throw new ArgumentNullException(nameof(bytes));
int byteEnd = byteIndex + byteCount;
int charStart = charIndex;
DecoderFallbackBuffer? dfb = null;
while (byteIndex < byteEnd) {
byte b = bytes[byteIndex];
if (!_map.TryGetCharValue(b, out int val)) {
dfb ??= DecoderFallback.CreateFallbackBuffer();
byte[] bytesUnknown = new[] { b };
try {
if (dfb.Fallback(bytesUnknown, byteIndex)) {
while (dfb.Remaining != 0) {
char c = dfb.GetNextChar();
if (chars != null) {
chars[charIndex] = c;
}
charIndex++;
}
}
} catch (DecoderFallbackException) {
throw PythonOps.UnicodeDecodeError("character maps to ", bytesUnknown, byteIndex);
}
} else {
if (val >= 0x10000) {
string s32 = char.ConvertFromUtf32(val);
if (chars != null) {
chars[charIndex] = s32[0];
chars[charIndex + 1] = s32[1];
}
charIndex += 2;
} else {
if (chars != null) chars[charIndex] = unchecked((char)val);
charIndex++;
}
}
byteIndex++;
}
return charIndex - charStart;
}
public override int GetMaxByteCount(int charCount) {
return charCount;
}
public override int GetMaxCharCount(int byteCount) {
return byteCount * 2; // account for surrogate pairs
}
}
///
/// This implementation is not suitable for incremental encoding.
///
internal class CharmapEncoding : Encoding {
private readonly IDictionary _map;
private int _maxEncodingReplacementLength;
private int _maxDecodingReplacementLength;
public CharmapEncoding(IDictionary map) {
_map = map;
}
public override string EncodingName => "charmap";
public override int GetByteCount(char[] chars, int index, int count)
=> GetBytes(chars, index, count, null, 0);
public override int GetBytes(char[] chars, int charIndex, int charCount, byte[]? bytes, int byteIndex) {
if (chars == null) throw new ArgumentNullException(nameof(chars));
int charEnd = charIndex + charCount;
int byteStart = byteIndex;
EncoderFallbackBuffer? efb = null;
while (charIndex < charEnd) {
object charObj;
char c = chars[charIndex];
int nextIndex = charIndex + 1;
if (char.IsHighSurrogate(c) && nextIndex < charEnd && char.IsLowSurrogate(chars[nextIndex])) {
charObj = ScriptingRuntimeHelpers.Int32ToObject(char.ConvertToUtf32(c, chars[nextIndex++]));
} else {
charObj = ScriptingRuntimeHelpers.Int32ToObject(c);
}
if (!_map.TryGetValue(charObj, out object? val) || val == null) {
efb ??= EncoderFallback.CreateFallbackBuffer();
try {
for (int idx = charIndex; idx < nextIndex; idx++) {
if (efb.Fallback(chars[idx], idx)) {
while (efb.Remaining != 0) {
c = efb.GetNextChar();
object fbCharObj = ScriptingRuntimeHelpers.Int32ToObject(c);
if (char.IsHighSurrogate(c) && efb.Remaining != 0) {
char d = efb.GetNextChar();
if (char.IsLowSurrogate(d)) {
fbCharObj = ScriptingRuntimeHelpers.Int32ToObject(char.ConvertToUtf32(c, d));
} else {
efb.MovePrevious();
}
}
if (!_map.TryGetValue(fbCharObj, out val) || val == null) {
throw new EncoderFallbackException(); // no recursive fallback
}
byteIndex += ProcessEncodingReplacementValue(val, bytes, byteIndex);
}
}
}
} catch (EncoderFallbackException) {
throw PythonOps.UnicodeEncodeError(EncodingName, new string(chars), charIndex, nextIndex, "character maps to ");
}
charIndex = nextIndex;
} else {
byteIndex += ProcessEncodingReplacementValue(val, bytes, byteIndex);
charIndex = nextIndex;
}
}
return byteIndex - byteStart;
}
private static int ProcessEncodingReplacementValue(object replacement, byte[]? bytes, int byteIndex) {
Debug.Assert(replacement != null);
switch (replacement) {
case IList b:
if (bytes != null) {
for (int i = 0; i < b.Count; i++, byteIndex++) {
bytes[byteIndex] = b[i];
}
}
return b.Count;
case int n:
if (n < 0 || n > 0xFF) throw PythonOps.TypeError("character mapping must be in range(256)");
if (bytes != null) {
bytes[byteIndex] = unchecked((byte)n);
}
return 1;
default:
throw PythonOps.TypeError("character mapping must return integer, bytes or None, not {0}", PythonOps.GetPythonTypeName(replacement));
}
}
public override int GetCharCount(byte[] bytes, int index, int count)
=> GetChars(bytes, index, count, null, 0);
public override int GetChars(byte[] bytes, int byteIndex, int byteCount, char[]? chars, int charIndex) {
if (bytes == null) throw new ArgumentNullException(nameof(bytes));
int byteEnd = byteIndex + byteCount;
int charStart = charIndex;
DecoderFallbackBuffer? dfb = null;
while (byteIndex < byteEnd) {
byte b = bytes[byteIndex++];
object byteObj = ScriptingRuntimeHelpers.Int32ToObject(b);
if (_map.TryGetValue(byteObj, out object? val) && val != null) {
if (val is string s) {
if (s.Length == 0 || s[0] != '\uFFFE') {
for (int i = 0; i < s.Length; i++) {
if (chars != null) chars[charIndex] = s[i];
charIndex++;
}
continue;
}
} else if (val is int n) {
if (n < 0 || n > 0x10FFFF) {
throw PythonOps.TypeError("character mapping must be in range(0x110000)");
} else if (n > 0xFFFF) {
var sp = char.ConvertFromUtf32(n);
if (chars != null) chars[charIndex] = sp[0];
charIndex++;
if (chars != null) chars[charIndex] = sp[1];
charIndex++;
continue;
} else if (n != 0xFFFE) {
if (chars != null) chars[charIndex] = unchecked((char)n);
charIndex++;
continue;
}
} else {
throw PythonOps.TypeError("character mapping must return integer, None or str, not {0}", PythonOps.GetPythonTypeName(val));
}
}
// byte unhandled, try fallback
dfb ??= DecoderFallback.CreateFallbackBuffer();
byte[] bytesUnknown = new[] { b };
try {
if (dfb.Fallback(bytesUnknown, byteIndex - 1)) {
while (dfb.Remaining != 0) {
char c = dfb.GetNextChar();
if (chars != null) chars[charIndex] = c;
charIndex++;
}
}
} catch (DecoderFallbackException) {
throw PythonOps.UnicodeDecodeError("character maps to ", bytesUnknown, byteIndex - 1);
}
}
return charIndex - charStart;
}
public override int GetMaxByteCount(int charCount) {
if (_maxEncodingReplacementLength == 0) {
_maxEncodingReplacementLength = 1;
foreach (object val in _map.Values) {
if (val is IList b && b.Count > _maxEncodingReplacementLength) {
_maxEncodingReplacementLength = b.Count;
}
}
}
return charCount * _maxEncodingReplacementLength;
}
public override int GetMaxCharCount(int byteCount) {
if (_maxDecodingReplacementLength == 0) {
_maxDecodingReplacementLength = 2; // surrogate pair for codepoint
foreach (object val in _map.Values) {
if (val is string s && s.Length > _maxDecodingReplacementLength) {
_maxDecodingReplacementLength = s.Length;
};
}
}
return byteCount * _maxDecodingReplacementLength;
}
}
}