first commit

This commit is contained in:
Florian Schmitt 2024-10-08 15:38:46 +03:00
commit 2e5df21095
2308 changed files with 453496 additions and 0 deletions

View file

@ -0,0 +1,201 @@
import os
import pygame
import sys
import tempfile
import time
is_pygame_pkg = __name__.startswith("pygame.tests.")
###############################################################################
def tostring(row):
"""Convert row of bytes to string. Expects `row` to be an
``array``.
"""
return row.tobytes()
def geterror():
return sys.exc_info()[1]
###############################################################################
this_dir = os.path.dirname(os.path.abspath(__file__))
trunk_dir = os.path.split(os.path.split(this_dir)[0])[0]
if is_pygame_pkg:
test_module = "tests"
else:
test_module = "test"
def trunk_relative_path(relative):
return os.path.normpath(os.path.join(trunk_dir, relative))
def fixture_path(path):
return trunk_relative_path(os.path.join(test_module, "fixtures", path))
def example_path(path):
return trunk_relative_path(os.path.join("examples", path))
sys.path.insert(0, trunk_relative_path("."))
################################## TEMP FILES #################################
def get_tmp_dir():
return tempfile.mkdtemp()
###############################################################################
def question(q):
return input(f"\n{q.rstrip(' ')} (y/n): ").lower().strip() == "y"
def prompt(p):
return input(f"\n{p.rstrip(' ')} (press enter to continue): ")
#################################### HELPERS ##################################
def rgba_between(value, minimum=0, maximum=255):
if value < minimum:
return minimum
elif value > maximum:
return maximum
else:
return value
def combinations(seqs):
"""
Recipe 496807 from ActiveState Python CookBook
Non recursive technique for getting all possible combinations of a sequence
of sequences.
"""
r = [[]]
for x in seqs:
r = [i + [y] for y in x for i in r]
return r
def gradient(width, height):
"""
Yields a pt and corresponding RGBA tuple, for every (width, height) combo.
Useful for generating gradients.
Actual gradient may be changed, no tests rely on specific values.
Used in transform.rotate lossless tests to generate a fixture.
"""
for l in range(width):
for t in range(height):
yield (l, t), tuple(map(rgba_between, (l, t, l, l + t)))
def rect_area_pts(rect):
for l in range(rect.left, rect.right):
for t in range(rect.top, rect.bottom):
yield l, t
def rect_perimeter_pts(rect):
"""
Returns pts ((L, T) tuples) encompassing the perimeter of a rect.
The order is clockwise:
topleft to topright
topright to bottomright
bottomright to bottomleft
bottomleft to topleft
Duplicate pts are not returned
"""
clock_wise_from_top_left = (
[(l, rect.top) for l in range(rect.left, rect.right)],
[(rect.right - 1, t) for t in range(rect.top + 1, rect.bottom)],
[(l, rect.bottom - 1) for l in range(rect.right - 2, rect.left - 1, -1)],
[(rect.left, t) for t in range(rect.bottom - 2, rect.top, -1)],
)
for line in clock_wise_from_top_left:
yield from line
def rect_outer_bounds(rect):
"""
Returns topleft outerbound if possible and then the other pts, that are
"exclusive" bounds of the rect
?------O
|RECT| ?|0)uterbound
|----|
O O
"""
return ([(rect.left - 1, rect.top)] if rect.left else []) + [
rect.topright,
rect.bottomleft,
rect.bottomright,
]
def import_submodule(module):
m = __import__(module)
for n in module.split(".")[1:]:
m = getattr(m, n)
return m
class SurfaceSubclass(pygame.Surface):
"""A subclassed Surface to test inheritance."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.test_attribute = True
def test():
"""
Lightweight test for helpers
"""
r = pygame.Rect(0, 0, 10, 10)
assert rect_outer_bounds(r) == [(10, 0), (0, 10), (10, 10)] # tr # bl # br
assert len(list(rect_area_pts(r))) == 100
r = pygame.Rect(0, 0, 3, 3)
assert list(rect_perimeter_pts(r)) == [
(0, 0),
(1, 0),
(2, 0), # tl -> tr
(2, 1),
(2, 2), # tr -> br
(1, 2),
(0, 2), # br -> bl
(0, 1), # bl -> tl
]
print("Tests: OK")

View file

@ -0,0 +1,438 @@
import sys
import ctypes
from ctypes import *
import unittest
__all__ = [
"PAI_CONTIGUOUS",
"PAI_FORTRAN",
"PAI_ALIGNED",
"PAI_NOTSWAPPED",
"PAI_WRITEABLE",
"PAI_ARR_HAS_DESCR",
"ArrayInterface",
]
if sizeof(c_uint) == sizeof(c_void_p):
c_size_t = c_uint
c_ssize_t = c_int
elif sizeof(c_ulong) == sizeof(c_void_p):
c_size_t = c_ulong
c_ssize_t = c_long
elif sizeof(c_ulonglong) == sizeof(c_void_p):
c_size_t = c_ulonglong
c_ssize_t = c_longlong
SIZEOF_VOID_P = sizeof(c_void_p)
if SIZEOF_VOID_P <= sizeof(c_int):
Py_intptr_t = c_int
elif SIZEOF_VOID_P <= sizeof(c_long):
Py_intptr_t = c_long
elif "c_longlong" in globals() and SIZEOF_VOID_P <= sizeof(c_longlong):
Py_intptr_t = c_longlong
else:
raise RuntimeError("Unrecognized pointer size %i" % (SIZEOF_VOID_P,))
class PyArrayInterface(Structure):
_fields_ = [
("two", c_int),
("nd", c_int),
("typekind", c_char),
("itemsize", c_int),
("flags", c_int),
("shape", POINTER(Py_intptr_t)),
("strides", POINTER(Py_intptr_t)),
("data", c_void_p),
("descr", py_object),
]
PAI_Ptr = POINTER(PyArrayInterface)
try:
PyCObject_AsVoidPtr = pythonapi.PyCObject_AsVoidPtr
except AttributeError:
def PyCObject_AsVoidPtr(o):
raise TypeError("Not available")
else:
PyCObject_AsVoidPtr.restype = c_void_p
PyCObject_AsVoidPtr.argtypes = [py_object]
PyCObject_GetDesc = pythonapi.PyCObject_GetDesc
PyCObject_GetDesc.restype = c_void_p
PyCObject_GetDesc.argtypes = [py_object]
try:
PyCapsule_IsValid = pythonapi.PyCapsule_IsValid
except AttributeError:
def PyCapsule_IsValid(capsule, name):
return 0
else:
PyCapsule_IsValid.restype = c_int
PyCapsule_IsValid.argtypes = [py_object, c_char_p]
PyCapsule_GetPointer = pythonapi.PyCapsule_GetPointer
PyCapsule_GetPointer.restype = c_void_p
PyCapsule_GetPointer.argtypes = [py_object, c_char_p]
PyCapsule_GetContext = pythonapi.PyCapsule_GetContext
PyCapsule_GetContext.restype = c_void_p
PyCapsule_GetContext.argtypes = [py_object]
PyCapsule_Destructor = CFUNCTYPE(None, py_object)
PyCapsule_New = pythonapi.PyCapsule_New
PyCapsule_New.restype = py_object
PyCapsule_New.argtypes = [c_void_p, c_char_p, POINTER(PyCapsule_Destructor)]
def capsule_new(p):
return PyCapsule_New(addressof(p), None, None)
PAI_CONTIGUOUS = 0x01
PAI_FORTRAN = 0x02
PAI_ALIGNED = 0x100
PAI_NOTSWAPPED = 0x200
PAI_WRITEABLE = 0x400
PAI_ARR_HAS_DESCR = 0x800
class ArrayInterface:
def __init__(self, arr):
try:
self._cobj = arr.__array_struct__
except AttributeError:
raise TypeError("The array object lacks an array structure")
if not self._cobj:
raise TypeError("The array object has a NULL array structure value")
try:
vp = PyCObject_AsVoidPtr(self._cobj)
except TypeError:
if PyCapsule_IsValid(self._cobj, None):
vp = PyCapsule_GetPointer(self._cobj, None)
else:
raise TypeError("The array object has an invalid array structure")
self.desc = PyCapsule_GetContext(self._cobj)
else:
self.desc = PyCObject_GetDesc(self._cobj)
self._inter = cast(vp, PAI_Ptr)[0]
def __getattr__(self, name):
if name == "typekind":
return self._inter.typekind.decode("latin-1")
return getattr(self._inter, name)
def __str__(self):
if isinstance(self.desc, tuple):
ver = self.desc[0]
else:
ver = "N/A"
return (
"nd: %i\n"
"typekind: %s\n"
"itemsize: %i\n"
"flags: %s\n"
"shape: %s\n"
"strides: %s\n"
"ver: %s\n"
% (
self.nd,
self.typekind,
self.itemsize,
format_flags(self.flags),
format_shape(self.nd, self.shape),
format_strides(self.nd, self.strides),
ver,
)
)
def format_flags(flags):
names = []
for flag, name in [
(PAI_CONTIGUOUS, "CONTIGUOUS"),
(PAI_FORTRAN, "FORTRAN"),
(PAI_ALIGNED, "ALIGNED"),
(PAI_NOTSWAPPED, "NOTSWAPPED"),
(PAI_WRITEABLE, "WRITEABLE"),
(PAI_ARR_HAS_DESCR, "ARR_HAS_DESCR"),
]:
if flag & flags:
names.append(name)
return ", ".join(names)
def format_shape(nd, shape):
return ", ".join([str(shape[i]) for i in range(nd)])
def format_strides(nd, strides):
return ", ".join([str(strides[i]) for i in range(nd)])
class Exporter:
def __init__(
self, shape, typekind=None, itemsize=None, strides=None, descr=None, flags=None
):
if typekind is None:
typekind = "u"
if itemsize is None:
itemsize = 1
if flags is None:
flags = PAI_WRITEABLE | PAI_ALIGNED | PAI_NOTSWAPPED
if descr is not None:
flags |= PAI_ARR_HAS_DESCR
if len(typekind) != 1:
raise ValueError("Argument 'typekind' must be length 1 string")
nd = len(shape)
self.typekind = typekind
self.itemsize = itemsize
self.nd = nd
self.shape = tuple(shape)
self._shape = (c_ssize_t * self.nd)(*self.shape)
if strides is None:
self._strides = (c_ssize_t * self.nd)()
self._strides[self.nd - 1] = self.itemsize
for i in range(self.nd - 1, 0, -1):
self._strides[i - 1] = self.shape[i] * self._strides[i]
strides = tuple(self._strides)
self.strides = strides
elif len(strides) == nd:
self.strides = tuple(strides)
self._strides = (c_ssize_t * self.nd)(*self.strides)
else:
raise ValueError("Mismatch in length of strides and shape")
self.descr = descr
if self.is_contiguous("C"):
flags |= PAI_CONTIGUOUS
if self.is_contiguous("F"):
flags |= PAI_FORTRAN
self.flags = flags
sz = max(shape[i] * strides[i] for i in range(nd))
self._data = (c_ubyte * sz)()
self.data = addressof(self._data)
self._inter = PyArrayInterface(
2,
nd,
typekind.encode("latin_1"),
itemsize,
flags,
self._shape,
self._strides,
self.data,
descr,
)
self.len = itemsize
for i in range(nd):
self.len *= self.shape[i]
__array_struct__ = property(lambda self: capsule_new(self._inter))
def is_contiguous(self, fortran):
if fortran in "CA":
if self.strides[-1] == self.itemsize:
for i in range(self.nd - 1, 0, -1):
if self.strides[i - 1] != self.shape[i] * self.strides[i]:
break
else:
return True
if fortran in "FA":
if self.strides[0] == self.itemsize:
for i in range(0, self.nd - 1):
if self.strides[i + 1] != self.shape[i] * self.strides[i]:
break
else:
return True
return False
class Array(Exporter):
_ctypes = {
("u", 1): c_uint8,
("u", 2): c_uint16,
("u", 4): c_uint32,
("u", 8): c_uint64,
("i", 1): c_int8,
("i", 2): c_int16,
("i", 4): c_int32,
("i", 8): c_int64,
}
def __init__(self, *args, **kwds):
super().__init__(*args, **kwds)
try:
if self.flags & PAI_NOTSWAPPED:
ct = self._ctypes[self.typekind, self.itemsize]
elif c_int.__ctype_le__ is c_int:
ct = self._ctypes[self.typekind, self.itemsize].__ctype_be__
else:
ct = self._ctypes[self.typekind, self.itemsize].__ctype_le__
except KeyError:
ct = c_uint8 * self.itemsize
self._ctype = ct
self._ctype_p = POINTER(ct)
def __getitem__(self, key):
return cast(self._addr_at(key), self._ctype_p)[0]
def __setitem__(self, key, value):
cast(self._addr_at(key), self._ctype_p)[0] = value
def _addr_at(self, key):
if not isinstance(key, tuple):
key = (key,)
if len(key) != self.nd:
raise ValueError("wrong number of indexes")
for i in range(self.nd):
if not (0 <= key[i] < self.shape[i]):
raise IndexError(f"index {i} out of range")
return self.data + sum(i * s for i, s in zip(key, self.strides))
class ExporterTest(unittest.TestCase):
def test_strides(self):
self.check_args(0, (10,), "u", (2,), 20, 20, 2)
self.check_args(0, (5, 3), "u", (6, 2), 30, 30, 2)
self.check_args(0, (7, 3, 5), "u", (30, 10, 2), 210, 210, 2)
self.check_args(0, (13, 5, 11, 3), "u", (330, 66, 6, 2), 4290, 4290, 2)
self.check_args(3, (7, 3, 5), "i", (2, 14, 42), 210, 210, 2)
self.check_args(3, (7, 3, 5), "x", (2, 16, 48), 210, 240, 2)
self.check_args(3, (13, 5, 11, 3), "%", (440, 88, 8, 2), 4290, 5720, 2)
self.check_args(3, (7, 5), "-", (15, 3), 105, 105, 3)
self.check_args(3, (7, 5), "*", (3, 21), 105, 105, 3)
self.check_args(3, (7, 5), " ", (3, 24), 105, 120, 3)
def test_is_contiguous(self):
a = Exporter((10,), itemsize=2)
self.assertTrue(a.is_contiguous("C"))
self.assertTrue(a.is_contiguous("F"))
self.assertTrue(a.is_contiguous("A"))
a = Exporter((10, 4), itemsize=2)
self.assertTrue(a.is_contiguous("C"))
self.assertTrue(a.is_contiguous("A"))
self.assertFalse(a.is_contiguous("F"))
a = Exporter((13, 5, 11, 3), itemsize=2, strides=(330, 66, 6, 2))
self.assertTrue(a.is_contiguous("C"))
self.assertTrue(a.is_contiguous("A"))
self.assertFalse(a.is_contiguous("F"))
a = Exporter((10, 4), itemsize=2, strides=(2, 20))
self.assertTrue(a.is_contiguous("F"))
self.assertTrue(a.is_contiguous("A"))
self.assertFalse(a.is_contiguous("C"))
a = Exporter((13, 5, 11, 3), itemsize=2, strides=(2, 26, 130, 1430))
self.assertTrue(a.is_contiguous("F"))
self.assertTrue(a.is_contiguous("A"))
self.assertFalse(a.is_contiguous("C"))
a = Exporter((2, 11, 6, 4), itemsize=2, strides=(576, 48, 8, 2))
self.assertFalse(a.is_contiguous("A"))
a = Exporter((2, 11, 6, 4), itemsize=2, strides=(2, 4, 48, 288))
self.assertFalse(a.is_contiguous("A"))
a = Exporter((3, 2, 2), itemsize=2, strides=(16, 8, 4))
self.assertFalse(a.is_contiguous("A"))
a = Exporter((3, 2, 2), itemsize=2, strides=(4, 12, 24))
self.assertFalse(a.is_contiguous("A"))
def check_args(
self, call_flags, shape, typekind, strides, length, bufsize, itemsize, offset=0
):
if call_flags & 1:
typekind_arg = typekind
else:
typekind_arg = None
if call_flags & 2:
strides_arg = strides
else:
strides_arg = None
a = Exporter(shape, itemsize=itemsize, strides=strides_arg)
self.assertEqual(sizeof(a._data), bufsize)
self.assertEqual(a.data, ctypes.addressof(a._data) + offset)
m = ArrayInterface(a)
self.assertEqual(m.data, a.data)
self.assertEqual(m.itemsize, itemsize)
self.assertEqual(tuple(m.shape[0 : m.nd]), shape)
self.assertEqual(tuple(m.strides[0 : m.nd]), strides)
class ArrayTest(unittest.TestCase):
def __init__(self, *args, **kwds):
unittest.TestCase.__init__(self, *args, **kwds)
self.a = Array((20, 15), "i", 4)
def setUp(self):
# Every test starts with a zeroed array.
memset(self.a.data, 0, sizeof(self.a._data))
def test__addr_at(self):
a = self.a
self.assertEqual(a._addr_at((0, 0)), a.data)
self.assertEqual(a._addr_at((0, 1)), a.data + 4)
self.assertEqual(a._addr_at((1, 0)), a.data + 60)
self.assertEqual(a._addr_at((1, 1)), a.data + 64)
def test_indices(self):
a = self.a
self.assertEqual(a[0, 0], 0)
self.assertEqual(a[19, 0], 0)
self.assertEqual(a[0, 14], 0)
self.assertEqual(a[19, 14], 0)
self.assertEqual(a[5, 8], 0)
a[0, 0] = 12
a[5, 8] = 99
self.assertEqual(a[0, 0], 12)
self.assertEqual(a[5, 8], 99)
self.assertRaises(IndexError, a.__getitem__, (-1, 0))
self.assertRaises(IndexError, a.__getitem__, (0, -1))
self.assertRaises(IndexError, a.__getitem__, (20, 0))
self.assertRaises(IndexError, a.__getitem__, (0, 15))
self.assertRaises(ValueError, a.__getitem__, 0)
self.assertRaises(ValueError, a.__getitem__, (0, 0, 0))
a = Array((3,), "i", 4)
a[1] = 333
self.assertEqual(a[1], 333)
def test_typekind(self):
a = Array((1,), "i", 4)
self.assertTrue(a._ctype is c_int32)
self.assertTrue(a._ctype_p is POINTER(c_int32))
a = Array((1,), "u", 4)
self.assertTrue(a._ctype is c_uint32)
self.assertTrue(a._ctype_p is POINTER(c_uint32))
a = Array((1,), "f", 4) # float types unsupported: size system dependent
ct = a._ctype
self.assertTrue(issubclass(ct, ctypes.Array))
self.assertEqual(sizeof(ct), 4)
def test_itemsize(self):
for size in [1, 2, 4, 8]:
a = Array((1,), "i", size)
ct = a._ctype
self.assertTrue(issubclass(ct, ctypes._SimpleCData))
self.assertEqual(sizeof(ct), size)
def test_oddball_itemsize(self):
for size in [3, 5, 6, 7, 9]:
a = Array((1,), "i", size)
ct = a._ctype
self.assertTrue(issubclass(ct, ctypes.Array))
self.assertEqual(sizeof(ct), size)
def test_byteswapped(self):
a = Array((1,), "u", 4, flags=(PAI_ALIGNED | PAI_WRITEABLE))
ct = a._ctype
self.assertTrue(ct is not c_uint32)
if sys.byteorder == "little":
self.assertTrue(ct is c_uint32.__ctype_be__)
else:
self.assertTrue(ct is c_uint32.__ctype_le__)
i = 0xA0B0C0D
n = c_uint32(i)
a[0] = i
self.assertEqual(a[0], i)
self.assertEqual(a._data[0:4], cast(addressof(n), POINTER(c_uint8))[3:-1:-1])
if __name__ == "__main__":
unittest.main()

View file

@ -0,0 +1,301 @@
################################################################################
"""
Modification of http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/440554
"""
#################################### IMPORTS ###################################
import os
import platform
import subprocess
import errno
import time
import sys
import unittest
import tempfile
def geterror():
return sys.exc_info()[1]
null_byte = "\x00".encode("ascii")
if platform.system() == "Windows":
def encode(s):
return s.encode("ascii")
def decode(b):
return b.decode("ascii")
try:
import ctypes
from ctypes.wintypes import DWORD
kernel32 = ctypes.windll.kernel32
TerminateProcess = ctypes.windll.kernel32.TerminateProcess
def WriteFile(handle, data, ol=None):
c_written = DWORD()
success = ctypes.windll.kernel32.WriteFile(
handle,
ctypes.create_string_buffer(encode(data)),
len(data),
ctypes.byref(c_written),
ol,
)
return ctypes.windll.kernel32.GetLastError(), c_written.value
def ReadFile(handle, desired_bytes, ol=None):
c_read = DWORD()
buffer = ctypes.create_string_buffer(desired_bytes + 1)
success = ctypes.windll.kernel32.ReadFile(
handle, buffer, desired_bytes, ctypes.byref(c_read), ol
)
buffer[c_read.value] = null_byte
return ctypes.windll.kernel32.GetLastError(), decode(buffer.value)
def PeekNamedPipe(handle, desired_bytes):
c_avail = DWORD()
c_message = DWORD()
if desired_bytes > 0:
c_read = DWORD()
buffer = ctypes.create_string_buffer(desired_bytes + 1)
success = ctypes.windll.kernel32.PeekNamedPipe(
handle,
buffer,
desired_bytes,
ctypes.byref(c_read),
ctypes.byref(c_avail),
ctypes.byref(c_message),
)
buffer[c_read.value] = null_byte
return decode(buffer.value), c_avail.value, c_message.value
else:
success = ctypes.windll.kernel32.PeekNamedPipe(
handle,
None,
desired_bytes,
None,
ctypes.byref(c_avail),
ctypes.byref(c_message),
)
return "", c_avail.value, c_message.value
except ImportError:
from win32file import ReadFile, WriteFile
from win32pipe import PeekNamedPipe
from win32api import TerminateProcess
import msvcrt
else:
from signal import SIGINT, SIGTERM, SIGKILL
import select
import fcntl
################################### CONSTANTS ##################################
PIPE = subprocess.PIPE
################################################################################
class Popen(subprocess.Popen):
def recv(self, maxsize=None):
return self._recv("stdout", maxsize)
def recv_err(self, maxsize=None):
return self._recv("stderr", maxsize)
def send_recv(self, input="", maxsize=None):
return self.send(input), self.recv(maxsize), self.recv_err(maxsize)
def read_async(self, wait=0.1, e=1, tr=5, stderr=0):
if tr < 1:
tr = 1
x = time.time() + wait
y = []
r = ""
pr = self.recv
if stderr:
pr = self.recv_err
while time.time() < x or r:
r = pr()
if r is None:
if e:
raise Exception("Other end disconnected!")
else:
break
elif r:
y.append(r)
else:
time.sleep(max((x - time.time()) / tr, 0))
return "".join(y)
def send_all(self, data):
while len(data):
sent = self.send(data)
if sent is None:
raise Exception("Other end disconnected!")
data = memoryview(data, sent)
def get_conn_maxsize(self, which, maxsize):
if maxsize is None:
maxsize = 1024
elif maxsize < 1:
maxsize = 1
return getattr(self, which), maxsize
def _close(self, which):
getattr(self, which).close()
setattr(self, which, None)
if platform.system() == "Windows":
def kill(self):
# Recipes
# http://me.in-berlin.de/doc/python/faq/windows.html#how-do-i-emulate-os-kill-in-windows
# http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/347462
"""kill function for Win32"""
TerminateProcess(int(self._handle), 0) # returns None
def send(self, input):
if not self.stdin:
return None
try:
x = msvcrt.get_osfhandle(self.stdin.fileno())
(errCode, written) = WriteFile(x, input)
except ValueError:
return self._close("stdin")
except (subprocess.pywintypes.error, Exception):
if geterror()[0] in (109, errno.ESHUTDOWN):
return self._close("stdin")
raise
return written
def _recv(self, which, maxsize):
conn, maxsize = self.get_conn_maxsize(which, maxsize)
if conn is None:
return None
try:
x = msvcrt.get_osfhandle(conn.fileno())
(read, nAvail, nMessage) = PeekNamedPipe(x, 0)
if maxsize < nAvail:
nAvail = maxsize
if nAvail > 0:
(errCode, read) = ReadFile(x, nAvail, None)
except ValueError:
return self._close(which)
except (subprocess.pywintypes.error, Exception):
if geterror()[0] in (109, errno.ESHUTDOWN):
return self._close(which)
raise
if self.universal_newlines:
# Translate newlines. For Python 3.x assume read is text.
# If bytes then another solution is needed.
read = read.replace("\r\n", "\n").replace("\r", "\n")
return read
else:
def kill(self):
for i, sig in enumerate([SIGTERM, SIGKILL] * 2):
if i % 2 == 0:
os.kill(self.pid, sig)
time.sleep((i * (i % 2) / 5.0) + 0.01)
killed_pid, stat = os.waitpid(self.pid, os.WNOHANG)
if killed_pid != 0:
return
def send(self, input):
if not self.stdin:
return None
if not select.select([], [self.stdin], [], 0)[1]:
return 0
try:
written = os.write(self.stdin.fileno(), input)
except OSError:
if geterror()[0] == errno.EPIPE: # broken pipe
return self._close("stdin")
raise
return written
def _recv(self, which, maxsize):
conn, maxsize = self.get_conn_maxsize(which, maxsize)
if conn is None:
return None
if not select.select([conn], [], [], 0)[0]:
return ""
r = conn.read(maxsize)
if not r:
return self._close(which)
if self.universal_newlines:
r = r.replace("\r\n", "\n").replace("\r", "\n")
return r
################################################################################
def proc_in_time_or_kill(cmd, time_out, wd=None, env=None):
proc = Popen(
cmd,
cwd=wd,
env=env,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=1,
)
ret_code = None
response = []
t = time.time()
while ret_code is None and ((time.time() - t) < time_out):
ret_code = proc.poll()
response += [proc.read_async(wait=0.1, e=0)]
if ret_code is None:
ret_code = f'"Process timed out (time_out = {time_out} secs) '
try:
proc.kill()
ret_code += 'and was successfully terminated"'
except Exception:
ret_code += f'and termination failed (exception: {geterror()})"'
return ret_code, "".join(response)
################################################################################
class AsyncTest(unittest.TestCase):
def test_proc_in_time_or_kill(self):
ret_code, response = proc_in_time_or_kill(
[sys.executable, "-c", "while True: pass"], time_out=1
)
self.assertIn("rocess timed out", ret_code)
self.assertIn("successfully terminated", ret_code)
################################################################################
if __name__ == "__main__":
unittest.main()

View file

@ -0,0 +1,607 @@
"""Module pygame.tests.test_utils.array
Export the Exporter and Importer classes.
Class Exporter has configurable shape and strides. Exporter objects
provide a convenient target for unit tests on Pygame objects and functions
that import a new buffer interface.
Class Importer imports a buffer interface with the given PyBUF_* flags.
It returns NULL Py_buffer fields as None. The shape, strides, and suboffsets
arrays are returned as tuples of ints. All Py_buffer field properties are
read-only. This class is useful in comparing exported buffer interfaces
with the actual request. The simular Python builtin memoryview currently
does not support configurable PyBUF_* flags.
This module contains its own unit tests. When Pygame is installed, these tests
can be run with the following command line statement:
python -m pygame.tests.test_utils.array
"""
import pygame
if not pygame.HAVE_NEWBUF:
emsg = "This Pygame build does not support the new buffer protocol"
raise ImportError(emsg)
import pygame.newbuffer
from pygame.newbuffer import (
PyBUF_SIMPLE,
PyBUF_FORMAT,
PyBUF_ND,
PyBUF_WRITABLE,
PyBUF_STRIDES,
PyBUF_C_CONTIGUOUS,
PyBUF_F_CONTIGUOUS,
PyBUF_ANY_CONTIGUOUS,
PyBUF_INDIRECT,
PyBUF_STRIDED,
PyBUF_STRIDED_RO,
PyBUF_RECORDS,
PyBUF_RECORDS_RO,
PyBUF_FULL,
PyBUF_FULL_RO,
PyBUF_CONTIG,
PyBUF_CONTIG_RO,
)
import unittest
import ctypes
import operator
from functools import reduce
__all__ = ["Exporter", "Importer"]
try:
ctypes.c_ssize_t
except AttributeError:
void_p_sz = ctypes.sizeof(ctypes.c_void_p)
if ctypes.sizeof(ctypes.c_short) == void_p_sz:
ctypes.c_ssize_t = ctypes.c_short
elif ctypes.sizeof(ctypes.c_int) == void_p_sz:
ctypes.c_ssize_t = ctypes.c_int
elif ctypes.sizeof(ctypes.c_long) == void_p_sz:
ctypes.c_ssize_t = ctypes.c_long
elif ctypes.sizeof(ctypes.c_longlong) == void_p_sz:
ctypes.c_ssize_t = ctypes.c_longlong
else:
raise RuntimeError("Cannot set c_ssize_t: sizeof(void *) is %i" % void_p_sz)
def _prop_get(fn):
return property(fn)
class Exporter(pygame.newbuffer.BufferMixin):
"""An object that exports a multi-dimension new buffer interface
The only array operation this type supports is to export a buffer.
"""
prefixes = {
"@": "",
"=": "=",
"<": "=",
">": "=",
"!": "=",
"2": "2",
"3": "3",
"4": "4",
"5": "5",
"6": "6",
"7": "7",
"8": "8",
"9": "9",
}
types = {
"c": ctypes.c_char,
"b": ctypes.c_byte,
"B": ctypes.c_ubyte,
"=c": ctypes.c_int8,
"=b": ctypes.c_int8,
"=B": ctypes.c_uint8,
"?": ctypes.c_bool,
"=?": ctypes.c_int8,
"h": ctypes.c_short,
"H": ctypes.c_ushort,
"=h": ctypes.c_int16,
"=H": ctypes.c_uint16,
"i": ctypes.c_int,
"I": ctypes.c_uint,
"=i": ctypes.c_int32,
"=I": ctypes.c_uint32,
"l": ctypes.c_long,
"L": ctypes.c_ulong,
"=l": ctypes.c_int32,
"=L": ctypes.c_uint32,
"q": ctypes.c_longlong,
"Q": ctypes.c_ulonglong,
"=q": ctypes.c_int64,
"=Q": ctypes.c_uint64,
"f": ctypes.c_float,
"d": ctypes.c_double,
"P": ctypes.c_void_p,
"x": ctypes.c_ubyte * 1,
"2x": ctypes.c_ubyte * 2,
"3x": ctypes.c_ubyte * 3,
"4x": ctypes.c_ubyte * 4,
"5x": ctypes.c_ubyte * 5,
"6x": ctypes.c_ubyte * 6,
"7x": ctypes.c_ubyte * 7,
"8x": ctypes.c_ubyte * 8,
"9x": ctypes.c_ubyte * 9,
}
def __init__(self, shape, format=None, strides=None, readonly=None, itemsize=None):
if format is None:
format = "B"
if readonly is None:
readonly = False
prefix = ""
typecode = ""
i = 0
if i < len(format):
try:
prefix = self.prefixes[format[i]]
i += 1
except LookupError:
pass
if i < len(format) and format[i] == "1":
i += 1
if i == len(format) - 1:
typecode = format[i]
if itemsize is None:
try:
itemsize = ctypes.sizeof(self.types[prefix + typecode])
except KeyError:
raise ValueError("Unknown item format '" + format + "'")
self.readonly = bool(readonly)
self.format = format
self._format = ctypes.create_string_buffer(format.encode("latin_1"))
self.ndim = len(shape)
self.itemsize = itemsize
self.len = reduce(operator.mul, shape, 1) * self.itemsize
self.shape = tuple(shape)
self._shape = (ctypes.c_ssize_t * self.ndim)(*self.shape)
if strides is None:
self._strides = (ctypes.c_ssize_t * self.ndim)()
self._strides[self.ndim - 1] = itemsize
for i in range(self.ndim - 1, 0, -1):
self._strides[i - 1] = self.shape[i] * self._strides[i]
self.strides = tuple(self._strides)
elif len(strides) == self.ndim:
self.strides = tuple(strides)
self._strides = (ctypes.c_ssize_t * self.ndim)(*self.strides)
else:
raise ValueError("Mismatch in length of strides and shape")
buflen = max(d * abs(s) for d, s in zip(self.shape, self.strides))
self.buflen = buflen
self._buf = (ctypes.c_ubyte * buflen)()
offset = sum(
(d - 1) * abs(s) for d, s in zip(self.shape, self.strides) if s < 0
)
self.buf = ctypes.addressof(self._buf) + offset
def buffer_info(self):
return (ctypes.addressof(self.buffer), self.shape[0])
def tobytes(self):
return ctypes.cast(self.buffer, ctypes.POINTER(ctypes.c_char))[0 : self._len]
def __len__(self):
return self.shape[0]
def _get_buffer(self, view, flags):
from ctypes import addressof
if (flags & PyBUF_WRITABLE) == PyBUF_WRITABLE and self.readonly:
raise BufferError("buffer is read-only")
if (
flags & PyBUF_C_CONTIGUOUS
) == PyBUF_C_CONTIGUOUS and not self.is_contiguous("C"):
raise BufferError("data is not C contiguous")
if (
flags & PyBUF_F_CONTIGUOUS
) == PyBUF_F_CONTIGUOUS and not self.is_contiguous("F"):
raise BufferError("data is not F contiguous")
if (
flags & PyBUF_ANY_CONTIGUOUS
) == PyBUF_ANY_CONTIGUOUS and not self.is_contiguous("A"):
raise BufferError("data is not contiguous")
view.buf = self.buf
view.readonly = self.readonly
view.len = self.len
if flags | PyBUF_WRITABLE == PyBUF_WRITABLE:
view.ndim = 0
else:
view.ndim = self.ndim
view.itemsize = self.itemsize
if (flags & PyBUF_FORMAT) == PyBUF_FORMAT:
view.format = addressof(self._format)
else:
view.format = None
if (flags & PyBUF_ND) == PyBUF_ND:
view.shape = addressof(self._shape)
elif self.is_contiguous("C"):
view.shape = None
else:
raise BufferError(f"shape required for {self.ndim} dimensional data")
if (flags & PyBUF_STRIDES) == PyBUF_STRIDES:
view.strides = ctypes.addressof(self._strides)
elif view.shape is None or self.is_contiguous("C"):
view.strides = None
else:
raise BufferError("strides required for none C contiguous data")
view.suboffsets = None
view.internal = None
view.obj = self
def is_contiguous(self, fortran):
if fortran in "CA":
if self.strides[-1] == self.itemsize:
for i in range(self.ndim - 1, 0, -1):
if self.strides[i - 1] != self.shape[i] * self.strides[i]:
break
else:
return True
if fortran in "FA":
if self.strides[0] == self.itemsize:
for i in range(0, self.ndim - 1):
if self.strides[i + 1] != self.shape[i] * self.strides[i]:
break
else:
return True
return False
class Importer:
"""An object that imports a new buffer interface
The fields of the Py_buffer C struct are exposed by identically
named Importer read-only properties.
"""
def __init__(self, obj, flags):
self._view = pygame.newbuffer.Py_buffer()
self._view.get_buffer(obj, flags)
@property
def obj(self):
"""return object or None for NULL field"""
return self._view.obj
@property
def buf(self):
"""return int or None for NULL field"""
return self._view.buf
@property
def len(self):
"""return int"""
return self._view.len
@property
def readonly(self):
"""return bool"""
return self._view.readonly
@property
def format(self):
"""return bytes or None for NULL field"""
format_addr = self._view.format
if format_addr is None:
return None
return ctypes.cast(format_addr, ctypes.c_char_p).value.decode("ascii")
@property
def itemsize(self):
"""return int"""
return self._view.itemsize
@property
def ndim(self):
"""return int"""
return self._view.ndim
@property
def shape(self):
"""return int tuple or None for NULL field"""
return self._to_ssize_tuple(self._view.shape)
@property
def strides(self):
"""return int tuple or None for NULL field"""
return self._to_ssize_tuple(self._view.strides)
@property
def suboffsets(self):
"""return int tuple or None for NULL field"""
return self._to_ssize_tuple(self._view.suboffsets)
@property
def internal(self):
"""return int or None for NULL field"""
return self._view.internal
def _to_ssize_tuple(self, addr):
from ctypes import cast, POINTER, c_ssize_t
if addr is None:
return None
return tuple(cast(addr, POINTER(c_ssize_t))[0 : self._view.ndim])
class ExporterTest(unittest.TestCase):
"""Class Exporter unit tests"""
def test_formats(self):
char_sz = ctypes.sizeof(ctypes.c_char)
short_sz = ctypes.sizeof(ctypes.c_short)
int_sz = ctypes.sizeof(ctypes.c_int)
long_sz = ctypes.sizeof(ctypes.c_long)
longlong_sz = ctypes.sizeof(ctypes.c_longlong)
float_sz = ctypes.sizeof(ctypes.c_float)
double_sz = ctypes.sizeof(ctypes.c_double)
voidp_sz = ctypes.sizeof(ctypes.c_void_p)
bool_sz = ctypes.sizeof(ctypes.c_bool)
self.check_args(0, (1,), "B", (1,), 1, 1, 1)
self.check_args(1, (1,), "b", (1,), 1, 1, 1)
self.check_args(1, (1,), "B", (1,), 1, 1, 1)
self.check_args(1, (1,), "c", (char_sz,), char_sz, char_sz, char_sz)
self.check_args(1, (1,), "h", (short_sz,), short_sz, short_sz, short_sz)
self.check_args(1, (1,), "H", (short_sz,), short_sz, short_sz, short_sz)
self.check_args(1, (1,), "i", (int_sz,), int_sz, int_sz, int_sz)
self.check_args(1, (1,), "I", (int_sz,), int_sz, int_sz, int_sz)
self.check_args(1, (1,), "l", (long_sz,), long_sz, long_sz, long_sz)
self.check_args(1, (1,), "L", (long_sz,), long_sz, long_sz, long_sz)
self.check_args(
1, (1,), "q", (longlong_sz,), longlong_sz, longlong_sz, longlong_sz
)
self.check_args(
1, (1,), "Q", (longlong_sz,), longlong_sz, longlong_sz, longlong_sz
)
self.check_args(1, (1,), "f", (float_sz,), float_sz, float_sz, float_sz)
self.check_args(1, (1,), "d", (double_sz,), double_sz, double_sz, double_sz)
self.check_args(1, (1,), "x", (1,), 1, 1, 1)
self.check_args(1, (1,), "P", (voidp_sz,), voidp_sz, voidp_sz, voidp_sz)
self.check_args(1, (1,), "?", (bool_sz,), bool_sz, bool_sz, bool_sz)
self.check_args(1, (1,), "@b", (1,), 1, 1, 1)
self.check_args(1, (1,), "@B", (1,), 1, 1, 1)
self.check_args(1, (1,), "@c", (char_sz,), char_sz, char_sz, char_sz)
self.check_args(1, (1,), "@h", (short_sz,), short_sz, short_sz, short_sz)
self.check_args(1, (1,), "@H", (short_sz,), short_sz, short_sz, short_sz)
self.check_args(1, (1,), "@i", (int_sz,), int_sz, int_sz, int_sz)
self.check_args(1, (1,), "@I", (int_sz,), int_sz, int_sz, int_sz)
self.check_args(1, (1,), "@l", (long_sz,), long_sz, long_sz, long_sz)
self.check_args(1, (1,), "@L", (long_sz,), long_sz, long_sz, long_sz)
self.check_args(
1, (1,), "@q", (longlong_sz,), longlong_sz, longlong_sz, longlong_sz
)
self.check_args(
1, (1,), "@Q", (longlong_sz,), longlong_sz, longlong_sz, longlong_sz
)
self.check_args(1, (1,), "@f", (float_sz,), float_sz, float_sz, float_sz)
self.check_args(1, (1,), "@d", (double_sz,), double_sz, double_sz, double_sz)
self.check_args(1, (1,), "@?", (bool_sz,), bool_sz, bool_sz, bool_sz)
self.check_args(1, (1,), "=b", (1,), 1, 1, 1)
self.check_args(1, (1,), "=B", (1,), 1, 1, 1)
self.check_args(1, (1,), "=c", (1,), 1, 1, 1)
self.check_args(1, (1,), "=h", (2,), 2, 2, 2)
self.check_args(1, (1,), "=H", (2,), 2, 2, 2)
self.check_args(1, (1,), "=i", (4,), 4, 4, 4)
self.check_args(1, (1,), "=I", (4,), 4, 4, 4)
self.check_args(1, (1,), "=l", (4,), 4, 4, 4)
self.check_args(1, (1,), "=L", (4,), 4, 4, 4)
self.check_args(1, (1,), "=q", (8,), 8, 8, 8)
self.check_args(1, (1,), "=Q", (8,), 8, 8, 8)
self.check_args(1, (1,), "=?", (1,), 1, 1, 1)
self.check_args(1, (1,), "<h", (2,), 2, 2, 2)
self.check_args(1, (1,), ">h", (2,), 2, 2, 2)
self.check_args(1, (1,), "!h", (2,), 2, 2, 2)
self.check_args(1, (1,), "<q", (8,), 8, 8, 8)
self.check_args(1, (1,), ">q", (8,), 8, 8, 8)
self.check_args(1, (1,), "!q", (8,), 8, 8, 8)
self.check_args(1, (1,), "1x", (1,), 1, 1, 1)
self.check_args(1, (1,), "2x", (2,), 2, 2, 2)
self.check_args(1, (1,), "3x", (3,), 3, 3, 3)
self.check_args(1, (1,), "4x", (4,), 4, 4, 4)
self.check_args(1, (1,), "5x", (5,), 5, 5, 5)
self.check_args(1, (1,), "6x", (6,), 6, 6, 6)
self.check_args(1, (1,), "7x", (7,), 7, 7, 7)
self.check_args(1, (1,), "8x", (8,), 8, 8, 8)
self.check_args(1, (1,), "9x", (9,), 9, 9, 9)
self.check_args(1, (1,), "1h", (2,), 2, 2, 2)
self.check_args(1, (1,), "=1h", (2,), 2, 2, 2)
self.assertRaises(ValueError, Exporter, (2, 1), "")
self.assertRaises(ValueError, Exporter, (2, 1), "W")
self.assertRaises(ValueError, Exporter, (2, 1), "^Q")
self.assertRaises(ValueError, Exporter, (2, 1), "=W")
self.assertRaises(ValueError, Exporter, (2, 1), "=f")
self.assertRaises(ValueError, Exporter, (2, 1), "=d")
self.assertRaises(ValueError, Exporter, (2, 1), "<f")
self.assertRaises(ValueError, Exporter, (2, 1), "<d")
self.assertRaises(ValueError, Exporter, (2, 1), ">f")
self.assertRaises(ValueError, Exporter, (2, 1), ">d")
self.assertRaises(ValueError, Exporter, (2, 1), "!f")
self.assertRaises(ValueError, Exporter, (2, 1), "!d")
self.assertRaises(ValueError, Exporter, (2, 1), "0x")
self.assertRaises(ValueError, Exporter, (2, 1), "11x")
self.assertRaises(ValueError, Exporter, (2, 1), "BB")
def test_strides(self):
self.check_args(1, (10,), "=h", (2,), 20, 20, 2)
self.check_args(1, (5, 3), "=h", (6, 2), 30, 30, 2)
self.check_args(1, (7, 3, 5), "=h", (30, 10, 2), 210, 210, 2)
self.check_args(1, (13, 5, 11, 3), "=h", (330, 66, 6, 2), 4290, 4290, 2)
self.check_args(3, (7, 3, 5), "=h", (2, 14, 42), 210, 210, 2)
self.check_args(3, (7, 3, 5), "=h", (2, 16, 48), 210, 240, 2)
self.check_args(3, (13, 5, 11, 3), "=h", (440, 88, 8, 2), 4290, 5720, 2)
self.check_args(3, (7, 5), "3x", (15, 3), 105, 105, 3)
self.check_args(3, (7, 5), "3x", (3, 21), 105, 105, 3)
self.check_args(3, (7, 5), "3x", (3, 24), 105, 120, 3)
def test_readonly(self):
a = Exporter((2,), "h", readonly=True)
self.assertTrue(a.readonly)
b = Importer(a, PyBUF_STRIDED_RO)
self.assertRaises(BufferError, Importer, a, PyBUF_STRIDED)
b = Importer(a, PyBUF_STRIDED_RO)
def test_is_contiguous(self):
a = Exporter((10,), "=h")
self.assertTrue(a.is_contiguous("C"))
self.assertTrue(a.is_contiguous("F"))
self.assertTrue(a.is_contiguous("A"))
a = Exporter((10, 4), "=h")
self.assertTrue(a.is_contiguous("C"))
self.assertTrue(a.is_contiguous("A"))
self.assertFalse(a.is_contiguous("F"))
a = Exporter((13, 5, 11, 3), "=h", (330, 66, 6, 2))
self.assertTrue(a.is_contiguous("C"))
self.assertTrue(a.is_contiguous("A"))
self.assertFalse(a.is_contiguous("F"))
a = Exporter((10, 4), "=h", (2, 20))
self.assertTrue(a.is_contiguous("F"))
self.assertTrue(a.is_contiguous("A"))
self.assertFalse(a.is_contiguous("C"))
a = Exporter((13, 5, 11, 3), "=h", (2, 26, 130, 1430))
self.assertTrue(a.is_contiguous("F"))
self.assertTrue(a.is_contiguous("A"))
self.assertFalse(a.is_contiguous("C"))
a = Exporter((2, 11, 6, 4), "=h", (576, 48, 8, 2))
self.assertFalse(a.is_contiguous("A"))
a = Exporter((2, 11, 6, 4), "=h", (2, 4, 48, 288))
self.assertFalse(a.is_contiguous("A"))
a = Exporter((3, 2, 2), "=h", (16, 8, 4))
self.assertFalse(a.is_contiguous("A"))
a = Exporter((3, 2, 2), "=h", (4, 12, 24))
self.assertFalse(a.is_contiguous("A"))
def test_PyBUF_flags(self):
a = Exporter((10, 2), "d")
b = Importer(a, PyBUF_SIMPLE)
self.assertTrue(b.obj is a)
self.assertTrue(b.format is None)
self.assertEqual(b.len, a.len)
self.assertEqual(b.itemsize, a.itemsize)
self.assertTrue(b.shape is None)
self.assertTrue(b.strides is None)
self.assertTrue(b.suboffsets is None)
self.assertTrue(b.internal is None)
self.assertFalse(b.readonly)
b = Importer(a, PyBUF_WRITABLE)
self.assertTrue(b.obj is a)
self.assertTrue(b.format is None)
self.assertEqual(b.len, a.len)
self.assertEqual(b.itemsize, a.itemsize)
self.assertTrue(b.shape is None)
self.assertTrue(b.strides is None)
self.assertTrue(b.suboffsets is None)
self.assertTrue(b.internal is None)
self.assertFalse(b.readonly)
b = Importer(a, PyBUF_ND)
self.assertTrue(b.obj is a)
self.assertTrue(b.format is None)
self.assertEqual(b.len, a.len)
self.assertEqual(b.itemsize, a.itemsize)
self.assertEqual(b.shape, a.shape)
self.assertTrue(b.strides is None)
self.assertTrue(b.suboffsets is None)
self.assertTrue(b.internal is None)
self.assertFalse(b.readonly)
a = Exporter((5, 10), "=h", (24, 2))
b = Importer(a, PyBUF_STRIDES)
self.assertTrue(b.obj is a)
self.assertTrue(b.format is None)
self.assertEqual(b.len, a.len)
self.assertEqual(b.itemsize, a.itemsize)
self.assertEqual(b.shape, a.shape)
self.assertEqual(b.strides, a.strides)
self.assertTrue(b.suboffsets is None)
self.assertTrue(b.internal is None)
self.assertFalse(b.readonly)
b = Importer(a, PyBUF_FULL)
self.assertTrue(b.obj is a)
self.assertEqual(b.format, "=h")
self.assertEqual(b.len, a.len)
self.assertEqual(b.itemsize, a.itemsize)
self.assertEqual(b.shape, a.shape)
self.assertEqual(b.strides, a.strides)
self.assertTrue(b.suboffsets is None)
self.assertTrue(b.internal is None)
self.assertFalse(b.readonly)
self.assertRaises(BufferError, Importer, a, PyBUF_SIMPLE)
self.assertRaises(BufferError, Importer, a, PyBUF_WRITABLE)
self.assertRaises(BufferError, Importer, a, PyBUF_ND)
self.assertRaises(BufferError, Importer, a, PyBUF_C_CONTIGUOUS)
self.assertRaises(BufferError, Importer, a, PyBUF_F_CONTIGUOUS)
self.assertRaises(BufferError, Importer, a, PyBUF_ANY_CONTIGUOUS)
self.assertRaises(BufferError, Importer, a, PyBUF_CONTIG)
def test_negative_strides(self):
self.check_args(3, (3, 5, 4), "B", (20, 4, -1), 60, 60, 1, 3)
self.check_args(3, (3, 5, 3), "B", (20, 4, -1), 45, 60, 1, 2)
self.check_args(3, (3, 5, 4), "B", (20, -4, 1), 60, 60, 1, 16)
self.check_args(3, (3, 5, 4), "B", (-20, -4, -1), 60, 60, 1, 59)
self.check_args(3, (3, 5, 3), "B", (-20, -4, -1), 45, 60, 1, 58)
def test_attributes(self):
a = Exporter((13, 5, 11, 3), "=h", (440, 88, 8, 2))
self.assertEqual(a.ndim, 4)
self.assertEqual(a.itemsize, 2)
self.assertFalse(a.readonly)
self.assertEqual(a.shape, (13, 5, 11, 3))
self.assertEqual(a.format, "=h")
self.assertEqual(a.strides, (440, 88, 8, 2))
self.assertEqual(a.len, 4290)
self.assertEqual(a.buflen, 5720)
self.assertEqual(a.buf, ctypes.addressof(a._buf))
a = Exporter((8,))
self.assertEqual(a.ndim, 1)
self.assertEqual(a.itemsize, 1)
self.assertFalse(a.readonly)
self.assertEqual(a.shape, (8,))
self.assertEqual(a.format, "B")
self.assertTrue(isinstance(a.strides, tuple))
self.assertEqual(a.strides, (1,))
self.assertEqual(a.len, 8)
self.assertEqual(a.buflen, 8)
a = Exporter([13, 5, 11, 3], "=h", [440, 88, 8, 2])
self.assertTrue(isinstance(a.shape, tuple))
self.assertTrue(isinstance(a.strides, tuple))
self.assertEqual(a.shape, (13, 5, 11, 3))
self.assertEqual(a.strides, (440, 88, 8, 2))
def test_itemsize(self):
exp = Exporter((4, 5), format="B", itemsize=8)
imp = Importer(exp, PyBUF_RECORDS)
self.assertEqual(imp.itemsize, 8)
self.assertEqual(imp.format, "B")
self.assertEqual(imp.strides, (40, 8))
exp = Exporter((4, 5), format="weird", itemsize=5)
imp = Importer(exp, PyBUF_RECORDS)
self.assertEqual(imp.itemsize, 5)
self.assertEqual(imp.format, "weird")
self.assertEqual(imp.strides, (25, 5))
def check_args(
self, call_flags, shape, format, strides, length, bufsize, itemsize, offset=0
):
format_arg = format if call_flags & 1 else None
strides_arg = strides if call_flags & 2 else None
a = Exporter(shape, format_arg, strides_arg)
self.assertEqual(a.buflen, bufsize)
self.assertEqual(a.buf, ctypes.addressof(a._buf) + offset)
m = Importer(a, PyBUF_RECORDS_RO)
self.assertEqual(m.buf, a.buf)
self.assertEqual(m.len, length)
self.assertEqual(m.format, format)
self.assertEqual(m.itemsize, itemsize)
self.assertEqual(m.shape, shape)
self.assertEqual(m.strides, strides)
if __name__ == "__main__":
unittest.main()

View file

@ -0,0 +1,20 @@
# Module pygame.tests.test_utils.endian
#
# Machine independent conversion to little-endian and big-endian Python
# integer values.
import struct
def little_endian_uint32(i):
"""Return the 32 bit unsigned integer little-endian representation of i"""
s = struct.pack("<I", i)
return struct.unpack("=I", s)[0]
def big_endian_uint32(i):
"""Return the 32 bit unsigned integer big-endian representation of i"""
s = struct.pack(">I", i)
return struct.unpack("=I", s)[0]

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,350 @@
import sys
if __name__ == "__main__":
raise RuntimeError("This module is for import only")
test_pkg_name = ".".join(__name__.split(".")[0:-2])
is_pygame_pkg = test_pkg_name == "pygame.tests"
test_runner_mod = test_pkg_name + ".test_utils.test_runner"
if is_pygame_pkg:
from pygame.tests.test_utils import import_submodule
from pygame.tests.test_utils.test_runner import (
prepare_test_env,
run_test,
combine_results,
get_test_results,
TEST_RESULTS_START,
)
else:
from test.test_utils import import_submodule
from test.test_utils.test_runner import (
prepare_test_env,
run_test,
combine_results,
get_test_results,
TEST_RESULTS_START,
)
import pygame
import pygame.threads
import os
import re
import shutil
import tempfile
import time
import random
from pprint import pformat
was_run = False
def run(*args, **kwds):
"""Run the Pygame unit test suite and return (total tests run, fails dict)
Positional arguments (optional):
The names of tests to include. If omitted then all tests are run. Test
names need not include the trailing '_test'.
Keyword arguments:
incomplete - fail incomplete tests (default False)
usesubprocess - run all test suites in the current process
(default False, use separate subprocesses)
dump - dump failures/errors as dict ready to eval (default False)
file - if provided, the name of a file into which to dump failures/errors
timings - if provided, the number of times to run each individual test to
get an average run time (default is run each test once)
exclude - A list of TAG names to exclude from the run. The items may be
comma or space separated.
show_output - show silenced stderr/stdout on errors (default False)
all - dump all results, not just errors (default False)
randomize - randomize order of tests (default False)
seed - if provided, a seed randomizer integer
multi_thread - if provided, the number of THREADS in which to run
subprocessed tests
time_out - if subprocess is True then the time limit in seconds before
killing a test (default 30)
fake - if provided, the name of the fake tests package in the
run_tests__tests subpackage to run instead of the normal
Pygame tests
python - the path to a python executable to run subprocessed tests
(default sys.executable)
interactive - allow tests tagged 'interactive'.
Return value:
A tuple of total number of tests run, dictionary of error information. The
dictionary is empty if no errors were recorded.
By default individual test modules are run in separate subprocesses. This
recreates normal Pygame usage where pygame.init() and pygame.quit() are
called only once per program execution, and avoids unfortunate
interactions between test modules. Also, a time limit is placed on test
execution, so frozen tests are killed when there time allotment expired.
Use the single process option if threading is not working properly or if
tests are taking too long. It is not guaranteed that all tests will pass
in single process mode.
Tests are run in a randomized order if the randomize argument is True or a
seed argument is provided. If no seed integer is provided then the system
time is used.
Individual test modules may have a corresponding *_tags.py module,
defining a __tags__ attribute, a list of tag strings used to selectively
omit modules from a run. By default only the 'interactive', 'ignore', and
'subprocess_ignore' tags are ignored. 'interactive' is for modules that
take user input, like cdrom_test.py. 'ignore' and 'subprocess_ignore' for
for disabling modules for foreground and subprocess modes respectively.
These are for disabling tests on optional modules or for experimental
modules with known problems. These modules can be run from the console as
a Python program.
This function can only be called once per Python session. It is not
reentrant.
"""
global was_run
if was_run:
raise RuntimeError("run() was already called this session")
was_run = True
options = kwds.copy()
option_usesubprocess = options.get("usesubprocess", False)
option_dump = options.pop("dump", False)
option_file = options.pop("file", None)
option_randomize = options.get("randomize", False)
option_seed = options.get("seed", None)
option_multi_thread = options.pop("multi_thread", 1)
option_time_out = options.pop("time_out", 120)
option_fake = options.pop("fake", None)
option_python = options.pop("python", sys.executable)
option_exclude = options.pop("exclude", ())
option_interactive = options.pop("interactive", False)
if not option_interactive and "interactive" not in option_exclude:
option_exclude += ("interactive",)
if option_usesubprocess and "subprocess_ignore" not in option_exclude:
option_exclude += ("subprocess_ignore",)
elif "ignore" not in option_exclude:
option_exclude += ("ignore",)
option_exclude += ("python3_ignore",)
option_exclude += ("SDL2_ignore",)
main_dir, test_subdir, fake_test_subdir = prepare_test_env()
###########################################################################
# Compile a list of test modules. If fake, then compile list of fake
# xxxx_test.py from run_tests__tests
TEST_MODULE_RE = re.compile(r"^(.+_test)\.py$")
test_mods_pkg_name = test_pkg_name
working_dir_temp = tempfile.mkdtemp()
if option_fake is not None:
test_mods_pkg_name = ".".join(
[test_mods_pkg_name, "run_tests__tests", option_fake]
)
test_subdir = os.path.join(fake_test_subdir, option_fake)
working_dir = test_subdir
else:
working_dir = working_dir_temp
# Added in because some machines will need os.environ else there will be
# false failures in subprocess mode. Same issue as python2.6. Needs some
# env vars.
test_env = os.environ
fmt1 = "%s.%%s" % test_mods_pkg_name
fmt2 = "%s.%%s_test" % test_mods_pkg_name
if args:
test_modules = [m.endswith("_test") and (fmt1 % m) or (fmt2 % m) for m in args]
else:
test_modules = []
for f in sorted(os.listdir(test_subdir)):
for match in TEST_MODULE_RE.findall(f):
test_modules.append(fmt1 % (match,))
###########################################################################
# Remove modules to be excluded.
tmp = test_modules
test_modules = []
for name in tmp:
tag_module_name = f"{name[0:-5]}_tags"
try:
tag_module = import_submodule(tag_module_name)
except ImportError:
test_modules.append(name)
else:
try:
tags = tag_module.__tags__
except AttributeError:
print(f"{tag_module_name} has no tags: ignoring")
test_modules.append(name)
else:
for tag in tags:
if tag in option_exclude:
print(f"skipping {name} (tag '{tag}')")
break
else:
test_modules.append(name)
del tmp, tag_module_name, name
###########################################################################
# Meta results
results = {}
meta_results = {"__meta__": {}}
meta = meta_results["__meta__"]
###########################################################################
# Randomization
if option_randomize or option_seed is not None:
if option_seed is None:
option_seed = time.time()
meta["random_seed"] = option_seed
print(f"\nRANDOM SEED USED: {option_seed}\n")
random.seed(option_seed)
random.shuffle(test_modules)
###########################################################################
# Single process mode
if not option_usesubprocess:
options["exclude"] = option_exclude
t = time.time()
for module in test_modules:
results.update(run_test(module, **options))
t = time.time() - t
###########################################################################
# Subprocess mode
#
else:
if is_pygame_pkg:
from pygame.tests.test_utils.async_sub import proc_in_time_or_kill
else:
from test.test_utils.async_sub import proc_in_time_or_kill
pass_on_args = ["--exclude", ",".join(option_exclude)] + [
"--" + field
for field in ("randomize", "incomplete", "unbuffered", "verbosity")
if kwds.get(field)
]
def sub_test(module):
print(f"loading {module}")
cmd = [option_python, "-m", test_runner_mod, module] + pass_on_args
return (
module,
(cmd, test_env, working_dir),
proc_in_time_or_kill(
cmd, option_time_out, env=test_env, wd=working_dir
),
)
if option_multi_thread > 1:
def tmap(f, args):
return pygame.threads.tmap(
f, args, stop_on_error=False, num_workers=option_multi_thread
)
else:
tmap = map
t = time.time()
for module, cmd, (return_code, raw_return) in tmap(sub_test, test_modules):
test_file = f"{os.path.join(test_subdir, module)}.py"
cmd, test_env, working_dir = cmd
test_results = get_test_results(raw_return)
if test_results:
results.update(test_results)
else:
results[module] = {}
results[module].update(
{
"return_code": return_code,
"raw_return": raw_return,
"cmd": cmd,
"test_file": test_file,
"test_env": test_env,
"working_dir": working_dir,
"module": module,
}
)
t = time.time() - t
###########################################################################
# Output Results
#
untrusty_total, combined = combine_results(results, t)
total, n_errors, n_failures = count_results(results)
meta["total_tests"] = total
meta["combined"] = combined
meta["total_errors"] = n_errors
meta["total_failures"] = n_failures
results.update(meta_results)
if not option_usesubprocess and total != untrusty_total:
raise AssertionError(
"Something went wrong in the Test Machinery:\n"
"total: %d != untrusty_total: %d" % (total, untrusty_total)
)
if not option_dump:
print(combined)
else:
print(TEST_RESULTS_START)
print(pformat(results))
if option_file is not None:
results_file = open(option_file, "w")
try:
results_file.write(pformat(results))
finally:
results_file.close()
shutil.rmtree(working_dir_temp)
return total, n_errors + n_failures
def count_results(results):
total = errors = failures = 0
for result in results.values():
if result.get("return_code", 0):
total += 1
errors += 1
else:
total += result["num_tests"]
errors += result["num_errors"]
failures += result["num_failures"]
return total, errors, failures
def run_and_exit(*args, **kwargs):
"""Run the tests, and if there are failures, exit with a return code of 1.
This is needed for various buildbots to recognise that the tests have
failed.
"""
total, fails = run(*args, **kwargs)
if fails:
sys.exit(1)
sys.exit(0)

View file

@ -0,0 +1,89 @@
import inspect
import random
import re
import unittest
try:
from StringIO import StringIO
except ImportError:
from io import StringIO
from . import import_submodule
class PygameTestLoader(unittest.TestLoader):
def __init__(
self, randomize_tests=False, include_incomplete=False, exclude=("interactive",)
):
super().__init__()
self.randomize_tests = randomize_tests
if exclude is None:
self.exclude = set()
else:
self.exclude = set(exclude)
if include_incomplete:
self.testMethodPrefix = ("test", "todo_")
def getTestCaseNames(self, testCaseClass):
res = []
for name in super().getTestCaseNames(testCaseClass):
tags = get_tags(testCaseClass, getattr(testCaseClass, name))
if self.exclude.isdisjoint(tags):
res.append(name)
if self.randomize_tests:
random.shuffle(res)
return res
# Exclude by tags:
TAGS_RE = re.compile(r"\|[tT]ags:(-?[ a-zA-Z,0-9_\n]+)\|", re.M)
class TestTags:
def __init__(self):
self.memoized = {}
self.parent_modules = {}
def get_parent_module(self, class_):
if class_ not in self.parent_modules:
self.parent_modules[class_] = import_submodule(class_.__module__)
return self.parent_modules[class_]
def __call__(self, parent_class, meth):
key = (parent_class, meth.__name__)
if key not in self.memoized:
parent_module = self.get_parent_module(parent_class)
module_tags = getattr(parent_module, "__tags__", [])
class_tags = getattr(parent_class, "__tags__", [])
tags = TAGS_RE.search(inspect.getdoc(meth) or "")
if tags:
test_tags = [t.strip() for t in tags.group(1).split(",")]
else:
test_tags = []
combined = set()
for tags in (module_tags, class_tags, test_tags):
if not tags:
continue
add = {t for t in tags if not t.startswith("-")}
remove = {t[1:] for t in tags if t not in add}
if add:
combined.update(add)
if remove:
combined.difference_update(remove)
self.memoized[key] = combined
return self.memoized[key]
get_tags = TestTags()

View file

@ -0,0 +1,324 @@
import sys
import os
if __name__ == "__main__":
pkg_dir = os.path.split(os.path.split(os.path.abspath(__file__))[0])[0]
parent_dir, pkg_name = os.path.split(pkg_dir)
is_pygame_pkg = pkg_name == "tests" and os.path.split(parent_dir)[1] == "pygame"
if not is_pygame_pkg:
sys.path.insert(0, parent_dir)
else:
is_pygame_pkg = __name__.startswith("pygame.tests.")
import io
import optparse
import re
import unittest
from pprint import pformat
from .test_machinery import PygameTestLoader
def prepare_test_env():
test_subdir = os.path.split(os.path.split(os.path.abspath(__file__))[0])[0]
main_dir = os.path.split(test_subdir)[0]
sys.path.insert(0, test_subdir)
fake_test_subdir = os.path.join(test_subdir, "run_tests__tests")
return main_dir, test_subdir, fake_test_subdir
main_dir, test_subdir, fake_test_subdir = prepare_test_env()
################################################################################
# Set the command line options
#
# options are shared with run_tests.py so make sure not to conflict
# in time more will be added here
TAG_PAT = r"-?[a-zA-Z0-9_]+"
TAG_RE = re.compile(TAG_PAT)
EXCLUDE_RE = re.compile(rf"({TAG_PAT},?\s*)+$")
def exclude_callback(option, opt, value, parser):
if EXCLUDE_RE.match(value) is None:
raise optparse.OptionValueError(f"{opt} argument has invalid value")
parser.values.exclude = TAG_RE.findall(value)
opt_parser = optparse.OptionParser()
opt_parser.add_option(
"-i", "--incomplete", action="store_true", help="fail incomplete tests"
)
opt_parser.add_option(
"-s",
"--usesubprocess",
action="store_true",
help="run everything in a single process " " (default: use no subprocesses)",
)
opt_parser.add_option(
"-e",
"--exclude",
action="callback",
type="string",
help="exclude tests containing any of TAGS",
callback=exclude_callback,
)
opt_parser.add_option(
"-u",
"--unbuffered",
action="store_true",
help="Show stdout/stderr as tests run, rather than storing it and showing on failures",
)
opt_parser.add_option(
"-v",
"--verbose",
dest="verbosity",
action="store_const",
const=2,
help="Verbose output",
)
opt_parser.add_option(
"-q",
"--quiet",
dest="verbosity",
action="store_const",
const=0,
help="Quiet output",
)
opt_parser.add_option(
"-r", "--randomize", action="store_true", help="randomize order of tests"
)
################################################################################
# If an xxxx_test.py takes longer than TIME_OUT seconds it will be killed
# This is only the default, can be over-ridden on command line
TIME_OUT = 30
# DEFAULTS
################################################################################
# Human readable output
#
COMPLETE_FAILURE_TEMPLATE = """
======================================================================
ERROR: all_tests_for (%(module)s.AllTestCases)
----------------------------------------------------------------------
Traceback (most recent call last):
File "test/%(module)s.py", line 1, in all_tests_for
subprocess completely failed with return code of %(return_code)s
cmd: %(cmd)s
test_env: %(test_env)s
working_dir: %(working_dir)s
return (first 10 and last 10 lines):
%(raw_return)s
""" # Leave that last empty line else build page regex won't match
# Text also needs to be vertically compressed
RAN_TESTS_DIV = (70 * "-") + "\nRan"
DOTS = re.compile("^([FE.sux]*)$", re.MULTILINE)
def extract_tracebacks(output):
"""from test runner output return the tracebacks."""
verbose_mode = " ..." in output
if verbose_mode:
if "ERROR" in output or "FAILURE" in output:
return "\n\n==".join(output.split("\n\n==")[1:])
else:
dots = DOTS.search(output).group(1)
if "E" in dots or "F" in dots:
return output[len(dots) + 1 :].split(RAN_TESTS_DIV)[0]
return ""
def output_into_dots(output):
"""convert the test runner output into dots."""
# verbose_mode = ") ..." in output
verbose_mode = " ..." in output
if verbose_mode:
# a map from the verbose output to the dots output.
reasons = {
"... ERROR": "E",
"... unexpected success": "u",
"... skipped": "s",
"... expected failure": "x",
"... ok": ".",
"... FAIL": "F",
}
results = output.split("\n\n==")[0]
lines = [l for l in results.split("\n") if l and "..." in l]
dotlist = []
for l in lines:
found = False
for reason in reasons:
if reason in l:
dotlist.append(reasons[reason])
found = True
break
if not found:
raise ValueError(f"Not sure what this is. Add to reasons. :{l}")
return "".join(dotlist)
dots = DOTS.search(output).group(1)
return dots
def combine_results(all_results, t):
"""
Return pieced together results in a form fit for human consumption. Don't
rely on results if piecing together subprocessed results (single process
mode is fine). Was originally meant for that purpose but was found to be
unreliable. See the dump option for reliable results.
"""
all_dots = ""
failures = []
for module, results in sorted(all_results.items()):
output, return_code, raw_return = map(
results.get, ("output", "return_code", "raw_return")
)
if not output or (return_code and RAN_TESTS_DIV not in output):
# would this effect the original dict? TODO
output_lines = raw_return.splitlines()
if len(output_lines) > 20:
results["raw_return"] = "\n".join(
output_lines[:10] + ["..."] + output_lines[-10:]
)
failures.append(COMPLETE_FAILURE_TEMPLATE % results)
all_dots += "E"
continue
dots = output_into_dots(output)
all_dots += dots
tracebacks = extract_tracebacks(output)
if tracebacks:
failures.append(tracebacks)
total_fails, total_errors = map(all_dots.count, "FE")
total_tests = len(all_dots)
combined = [all_dots]
if failures:
combined += ["".join(failures).lstrip("\n")[:-1]]
combined += [f"{RAN_TESTS_DIV} {total_tests} tests in {t:.3f}s\n"]
if failures:
infos = ([f"failures={total_fails}"] if total_fails else []) + (
[f"errors={total_errors}"] if total_errors else []
)
combined += [f"FAILED ({', '.join(infos)})\n"]
else:
combined += ["OK\n"]
return total_tests, "\n".join(combined)
################################################################################
TEST_RESULTS_START = "<--!! TEST RESULTS START HERE !!-->"
TEST_RESULTS_END = "<--!! TEST RESULTS END HERE !!-->"
_test_re_str = f"{TEST_RESULTS_START}\n(.*){TEST_RESULTS_END}"
TEST_RESULTS_RE = re.compile(_test_re_str, re.DOTALL | re.M)
def get_test_results(raw_return):
test_results = TEST_RESULTS_RE.search(raw_return)
if test_results:
try:
return eval(test_results.group(1))
except:
print(f"BUGGY TEST RESULTS EVAL:\n {test_results.group(1)}")
raise
################################################################################
def run_test(
module,
incomplete=False,
usesubprocess=True,
randomize=False,
exclude=("interactive",),
buffer=True,
unbuffered=None,
verbosity=1,
):
"""Run a unit test module"""
suite = unittest.TestSuite()
if verbosity is None:
verbosity = 1
if verbosity:
print(f"loading {module}")
loader = PygameTestLoader(
randomize_tests=randomize, include_incomplete=incomplete, exclude=exclude
)
suite.addTest(loader.loadTestsFromName(module))
output = io.StringIO()
runner = unittest.TextTestRunner(stream=output, buffer=buffer, verbosity=verbosity)
results = runner.run(suite)
if verbosity == 2:
output.seek(0)
print(output.read())
output.seek(0)
results = {
module: {
"output": output.getvalue(),
"num_tests": results.testsRun,
"num_errors": len(results.errors),
"num_failures": len(results.failures),
}
}
if usesubprocess:
print(TEST_RESULTS_START)
print(pformat(results))
print(TEST_RESULTS_END)
else:
return results
################################################################################
if __name__ == "__main__":
options, args = opt_parser.parse_args()
if not args:
if is_pygame_pkg:
run_from = "pygame.tests.go"
else:
run_from = os.path.join(main_dir, "run_tests.py")
sys.exit(f"No test module provided; consider using {run_from} instead")
run_test(
args[0],
incomplete=options.incomplete,
usesubprocess=options.usesubprocess,
randomize=options.randomize,
exclude=options.exclude,
buffer=(not options.unbuffered),
)
################################################################################