forked from code-gopher/DnfHelper-Python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy paththread.py
More file actions
71 lines (54 loc) · 1.83 KB
/
thread.py
File metadata and controls
71 lines (54 loc) · 1.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import ctypes
import inspect
import threading
import time
class MyThreadFunc(object):
"""手动终止线程的方法"""
def __init__(self, func, args_tup):
self.myThread = threading.Thread(target=func, args=args_tup)
def start(self):
self.myThread.start()
def stop(self):
try:
for i in range(5):
self.async_raise(self.myThread.ident, SystemExit)
time.sleep(0.01)
except Exception as e:
print(e)
def async_raise(self, tid, exctype):
"""raises the exception, performs cleanup if needed"""
tid = ctypes.c_long(tid)
if not inspect.isclass(exctype):
exctype = type(exctype)
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
if res == 0:
raise ValueError("invalid thread id")
elif res != 1:
# """if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect"""
ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
raise SystemError("PyThreadState_SetAsyncExc failed")
if __name__ == '__main__':
# 定义一个读秒器
def second_count(arg1, arg2):
i = 0
while True:
i += 1
print("{}-{}: {}".format(arg1, arg2, i))
time.sleep(1)
# 声明一个线程类
mythread = MyThreadFunc(second_count, ("好耶", "haoye"))
# 启动 --------------------------------------
mythread.start()
# 等待三秒
time.sleep(3)
# 查看线程状态
mythread.state()
# 等待三秒
time.sleep(3)
# 终止线程 ----------------------------------
mythread.stop()
# 等待三秒
time.sleep(3)
# 再次查看线程状态
mythread.state()