X Tutup
using System; using System.IO; using System.Threading; using System.Threading.Tasks; using NUnit.Framework; using Python.Runtime; namespace Python.EmbeddingTest { public class TestFinalizer { [OneTimeSetUp] public void SetUp() { PythonEngine.Initialize(); } [OneTimeTearDown] public void Dispose() { PythonEngine.Shutdown(); } [Test] public void TestClrObjectFullRelease() { var gs = PythonEngine.BeginAllowThreads(); WeakReference weakRef; try { var weakRefCreateTask = Task.Factory.StartNew(() => { using (Py.GIL()) { byte[] testObject = new byte[100]; var testObjectWeakReference = new WeakReference(testObject); dynamic pyList = new PyList(); pyList.append(testObject); return testObjectWeakReference; } }); weakRef = weakRefCreateTask.Result; } finally { PythonEngine.EndAllowThreads(gs); } // Triggering C# finalizer (PyList ref should be scheduled to decref). GC.Collect(); GC.WaitForPendingFinalizers(); // Forcing dec reference thread to wakeup and decref PyList. PythonEngine.EndAllowThreads(PythonEngine.BeginAllowThreads()); Thread.Sleep(200); PythonEngine.CurrentRefDecrementer.WaitForPendingDecReferences(); // Now python free up GCHandle on CLRObject and subsequent GC should fully remove testObject. GC.Collect(); GC.WaitForPendingFinalizers(); Assert.IsFalse(weakRef.IsAlive, "Clr object should be collected."); } [Test] [Ignore("For debug only")] public void TestExceptionMemoryLeak() { dynamic pymodule; PyObject gc; dynamic testmethod; var ts = PythonEngine.BeginAllowThreads(); IntPtr pylock = PythonEngine.AcquireLock(); #if NETCOREAPP const string s = "../../fixtures"; #else const string s = "../fixtures"; #endif string testPath = Path.Combine(TestContext.CurrentContext.TestDirectory, s); IntPtr str = Runtime.Runtime.PyString_FromString(testPath); IntPtr path = Runtime.Runtime.PySys_GetObject("path"); Runtime.Runtime.PyList_Append(path, str); { PyObject sys = PythonEngine.ImportModule("sys"); gc = PythonEngine.ImportModule("gc"); pymodule = PythonEngine.ImportModule("MemoryLeakTest.pyraise"); testmethod = pymodule.test_raise_exception; } PythonEngine.ReleaseLock(pylock); double floatarg1 = 5.1f; dynamic res = null; { for (int i = 1; i <= 10000000; i++) { using (Py.GIL()) { try { res = testmethod(Py.kw("number", floatarg1), Py.kw("astring", "bbc")); } catch (Exception e) { if (i % 10000 == 0) { TestContext.Progress.WriteLine(e.Message); } } } if (i % 10000 == 0) { GC.Collect(); } } } PythonEngine.EndAllowThreads(ts); } } }
X Tutup