summaryrefslogtreecommitdiffstats
path: root/venv/lib/python3.9/site-packages/smmap/test
diff options
context:
space:
mode:
Diffstat (limited to 'venv/lib/python3.9/site-packages/smmap/test')
-rw-r--r--venv/lib/python3.9/site-packages/smmap/test/__init__.py0
-rw-r--r--venv/lib/python3.9/site-packages/smmap/test/lib.py72
-rw-r--r--venv/lib/python3.9/site-packages/smmap/test/test_buf.py126
-rw-r--r--venv/lib/python3.9/site-packages/smmap/test/test_mman.py224
-rw-r--r--venv/lib/python3.9/site-packages/smmap/test/test_tutorial.py75
-rw-r--r--venv/lib/python3.9/site-packages/smmap/test/test_util.py105
6 files changed, 602 insertions, 0 deletions
diff --git a/venv/lib/python3.9/site-packages/smmap/test/__init__.py b/venv/lib/python3.9/site-packages/smmap/test/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/venv/lib/python3.9/site-packages/smmap/test/__init__.py
diff --git a/venv/lib/python3.9/site-packages/smmap/test/lib.py b/venv/lib/python3.9/site-packages/smmap/test/lib.py
new file mode 100644
index 00000000..ca91ee91
--- /dev/null
+++ b/venv/lib/python3.9/site-packages/smmap/test/lib.py
@@ -0,0 +1,72 @@
+"""Provide base classes for the test system"""
+from unittest import TestCase
+import os
+import tempfile
+
+__all__ = ['TestBase', 'FileCreator']
+
+
+#{ Utilities
+
+class FileCreator:
+
+ """A instance which creates a temporary file with a prefix and a given size
+ and provides this info to the user.
+ Once it gets deleted, it will remove the temporary file as well."""
+ __slots__ = ("_size", "_path")
+
+ def __init__(self, size, prefix=''):
+ assert size, "Require size to be larger 0"
+
+ self._path = tempfile.mktemp(prefix=prefix)
+ self._size = size
+
+ with open(self._path, "wb") as fp:
+ fp.seek(size - 1)
+ fp.write(b'1')
+
+ assert os.path.getsize(self.path) == size
+
+ def __del__(self):
+ try:
+ os.remove(self.path)
+ except OSError:
+ pass
+ # END exception handling
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ self.__del__()
+
+ @property
+ def path(self):
+ return self._path
+
+ @property
+ def size(self):
+ return self._size
+
+#} END utilities
+
+
+class TestBase(TestCase):
+
+ """Foundation used by all tests"""
+
+ #{ Configuration
+ k_window_test_size = 1000 * 1000 * 8 + 5195
+ #} END configuration
+
+ #{ Overrides
+ @classmethod
+ def setUpAll(cls):
+ # nothing for now
+ pass
+
+ # END overrides
+
+ #{ Interface
+
+ #} END interface
diff --git a/venv/lib/python3.9/site-packages/smmap/test/test_buf.py b/venv/lib/python3.9/site-packages/smmap/test/test_buf.py
new file mode 100644
index 00000000..17555afe
--- /dev/null
+++ b/venv/lib/python3.9/site-packages/smmap/test/test_buf.py
@@ -0,0 +1,126 @@
+from .lib import TestBase, FileCreator
+
+from smmap.mman import (
+ SlidingWindowMapManager,
+ StaticWindowMapManager
+)
+from smmap.buf import SlidingWindowMapBuffer
+
+from random import randint
+from time import time
+import sys
+import os
+
+
+man_optimal = SlidingWindowMapManager()
+man_worst_case = SlidingWindowMapManager(
+ window_size=TestBase.k_window_test_size // 100,
+ max_memory_size=TestBase.k_window_test_size // 3,
+ max_open_handles=15)
+static_man = StaticWindowMapManager()
+
+
+class TestBuf(TestBase):
+
+ def test_basics(self):
+ with FileCreator(self.k_window_test_size, "buffer_test") as fc:
+
+ # invalid paths fail upon construction
+ c = man_optimal.make_cursor(fc.path)
+ self.assertRaises(ValueError, SlidingWindowMapBuffer, type(c)()) # invalid cursor
+ self.assertRaises(ValueError, SlidingWindowMapBuffer, c, fc.size) # offset too large
+
+ buf = SlidingWindowMapBuffer() # can create uninitailized buffers
+ assert buf.cursor() is None
+
+ # can call end access any time
+ buf.end_access()
+ buf.end_access()
+ assert len(buf) == 0
+
+ # begin access can revive it, if the offset is suitable
+ offset = 100
+ assert buf.begin_access(c, fc.size) == False
+ assert buf.begin_access(c, offset) == True
+ assert len(buf) == fc.size - offset
+ assert buf.cursor().is_valid()
+
+ # empty begin access keeps it valid on the same path, but alters the offset
+ assert buf.begin_access() == True
+ assert len(buf) == fc.size
+ assert buf.cursor().is_valid()
+
+ # simple access
+ with open(fc.path, 'rb') as fp:
+ data = fp.read()
+ assert data[offset] == buf[0]
+ assert data[offset:offset * 2] == buf[0:offset]
+
+ # negative indices, partial slices
+ assert buf[-1] == buf[len(buf) - 1]
+ assert buf[-10:] == buf[len(buf) - 10:len(buf)]
+
+ # end access makes its cursor invalid
+ buf.end_access()
+ assert not buf.cursor().is_valid()
+ assert buf.cursor().is_associated() # but it remains associated
+
+ # an empty begin access fixes it up again
+ assert buf.begin_access() == True and buf.cursor().is_valid()
+ del(buf) # ends access automatically
+ del(c)
+
+ assert man_optimal.num_file_handles() == 1
+
+ # PERFORMANCE
+ # blast away with random access and a full mapping - we don't want to
+ # exaggerate the manager's overhead, but measure the buffer overhead
+ # We do it once with an optimal setting, and with a worse manager which
+ # will produce small mappings only !
+ max_num_accesses = 100
+ fd = os.open(fc.path, os.O_RDONLY)
+ for item in (fc.path, fd):
+ for manager, man_id in ((man_optimal, 'optimal'),
+ (man_worst_case, 'worst case'),
+ (static_man, 'static optimal')):
+ buf = SlidingWindowMapBuffer(manager.make_cursor(item))
+ assert manager.num_file_handles() == 1
+ for access_mode in range(2): # single, multi
+ num_accesses_left = max_num_accesses
+ num_bytes = 0
+ fsize = fc.size
+
+ st = time()
+ buf.begin_access()
+ while num_accesses_left:
+ num_accesses_left -= 1
+ if access_mode: # multi
+ ofs_start = randint(0, fsize)
+ ofs_end = randint(ofs_start, fsize)
+ d = buf[ofs_start:ofs_end]
+ assert len(d) == ofs_end - ofs_start
+ assert d == data[ofs_start:ofs_end]
+ num_bytes += len(d)
+ del d
+ else:
+ pos = randint(0, fsize)
+ assert buf[pos] == data[pos]
+ num_bytes += 1
+ # END handle mode
+ # END handle num accesses
+
+ buf.end_access()
+ assert manager.num_file_handles()
+ assert manager.collect()
+ assert manager.num_file_handles() == 0
+ elapsed = max(time() - st, 0.001) # prevent zero division errors on windows
+ mb = float(1000 * 1000)
+ mode_str = (access_mode and "slice") or "single byte"
+ print("%s: Made %i random %s accesses to buffer created from %s reading a total of %f mb in %f s (%f mb/s)"
+ % (man_id, max_num_accesses, mode_str, type(item), num_bytes / mb, elapsed, (num_bytes / mb) / elapsed),
+ file=sys.stderr)
+ # END handle access mode
+ del buf
+ # END for each manager
+ # END for each input
+ os.close(fd)
diff --git a/venv/lib/python3.9/site-packages/smmap/test/test_mman.py b/venv/lib/python3.9/site-packages/smmap/test/test_mman.py
new file mode 100644
index 00000000..d88316b8
--- /dev/null
+++ b/venv/lib/python3.9/site-packages/smmap/test/test_mman.py
@@ -0,0 +1,224 @@
+from .lib import TestBase, FileCreator
+
+from smmap.mman import (
+ WindowCursor,
+ SlidingWindowMapManager,
+ StaticWindowMapManager
+)
+from smmap.util import align_to_mmap
+
+from random import randint
+from time import time
+import os
+import sys
+from copy import copy
+
+
+class TestMMan(TestBase):
+
+ def test_cursor(self):
+ with FileCreator(self.k_window_test_size, "cursor_test") as fc:
+ man = SlidingWindowMapManager()
+ ci = WindowCursor(man) # invalid cursor
+ assert not ci.is_valid()
+ assert not ci.is_associated()
+ assert ci.size() == 0 # this is cached, so we can query it in invalid state
+
+ cv = man.make_cursor(fc.path)
+ assert not cv.is_valid() # no region mapped yet
+ assert cv.is_associated() # but it know where to map it from
+ assert cv.file_size() == fc.size
+ assert cv.path() == fc.path
+
+ # copy module
+ cio = copy(cv)
+ assert not cio.is_valid() and cio.is_associated()
+
+ # assign method
+ assert not ci.is_associated()
+ ci.assign(cv)
+ assert not ci.is_valid() and ci.is_associated()
+
+ # unuse non-existing region is fine
+ cv.unuse_region()
+ cv.unuse_region()
+
+ # destruction is fine (even multiple times)
+ cv._destroy()
+ WindowCursor(man)._destroy()
+
+ def test_memory_manager(self):
+ slide_man = SlidingWindowMapManager()
+ static_man = StaticWindowMapManager()
+
+ for man in (static_man, slide_man):
+ assert man.num_file_handles() == 0
+ assert man.num_open_files() == 0
+ winsize_cmp_val = 0
+ if isinstance(man, StaticWindowMapManager):
+ winsize_cmp_val = -1
+ # END handle window size
+ assert man.window_size() > winsize_cmp_val
+ assert man.mapped_memory_size() == 0
+ assert man.max_mapped_memory_size() > 0
+
+ # collection doesn't raise in 'any' mode
+ man._collect_lru_region(0)
+ # doesn't raise if we are within the limit
+ man._collect_lru_region(10)
+
+ # doesn't fail if we over-allocate
+ assert man._collect_lru_region(sys.maxsize) == 0
+
+ # use a region, verify most basic functionality
+ with FileCreator(self.k_window_test_size, "manager_test") as fc:
+ fd = os.open(fc.path, os.O_RDONLY)
+ try:
+ for item in (fc.path, fd):
+ c = man.make_cursor(item)
+ assert c.path_or_fd() is item
+ assert c.use_region(10, 10).is_valid()
+ assert c.ofs_begin() == 10
+ assert c.size() == 10
+ with open(fc.path, 'rb') as fp:
+ assert c.buffer()[:] == fp.read(20)[10:]
+
+ if isinstance(item, int):
+ self.assertRaises(ValueError, c.path)
+ else:
+ self.assertRaises(ValueError, c.fd)
+ # END handle value error
+ # END for each input
+ finally:
+ os.close(fd)
+ # END for each manasger type
+
+ def test_memman_operation(self):
+ # test more access, force it to actually unmap regions
+ with FileCreator(self.k_window_test_size, "manager_operation_test") as fc:
+ with open(fc.path, 'rb') as fp:
+ data = fp.read()
+ fd = os.open(fc.path, os.O_RDONLY)
+ try:
+ max_num_handles = 15
+ # small_size =
+ for mtype, args in ((StaticWindowMapManager, (0, fc.size // 3, max_num_handles)),
+ (SlidingWindowMapManager, (fc.size // 100, fc.size // 3, max_num_handles)),):
+ for item in (fc.path, fd):
+ assert len(data) == fc.size
+
+ # small windows, a reasonable max memory. Not too many regions at once
+ man = mtype(window_size=args[0], max_memory_size=args[1], max_open_handles=args[2])
+ c = man.make_cursor(item)
+
+ # still empty (more about that is tested in test_memory_manager()
+ assert man.num_open_files() == 0
+ assert man.mapped_memory_size() == 0
+
+ base_offset = 5000
+ # window size is 0 for static managers, hence size will be 0. We take that into consideration
+ size = man.window_size() // 2
+ assert c.use_region(base_offset, size).is_valid()
+ rr = c.region()
+ assert rr.client_count() == 2 # the manager and the cursor and us
+
+ assert man.num_open_files() == 1
+ assert man.num_file_handles() == 1
+ assert man.mapped_memory_size() == rr.size()
+
+ # assert c.size() == size # the cursor may overallocate in its static version
+ assert c.ofs_begin() == base_offset
+ assert rr.ofs_begin() == 0 # it was aligned and expanded
+ if man.window_size():
+ # but isn't larger than the max window (aligned)
+ assert rr.size() == align_to_mmap(man.window_size(), True)
+ else:
+ assert rr.size() == fc.size
+ # END ignore static managers which dont use windows and are aligned to file boundaries
+
+ assert c.buffer()[:] == data[base_offset:base_offset + (size or c.size())]
+
+ # obtain second window, which spans the first part of the file - it is a still the same window
+ nsize = (size or fc.size) - 10
+ assert c.use_region(0, nsize).is_valid()
+ assert c.region() == rr
+ assert man.num_file_handles() == 1
+ assert c.size() == nsize
+ assert c.ofs_begin() == 0
+ assert c.buffer()[:] == data[:nsize]
+
+ # map some part at the end, our requested size cannot be kept
+ overshoot = 4000
+ base_offset = fc.size - (size or c.size()) + overshoot
+ assert c.use_region(base_offset, size).is_valid()
+ if man.window_size():
+ assert man.num_file_handles() == 2
+ assert c.size() < size
+ assert c.region() is not rr # old region is still available, but has not curser ref anymore
+ assert rr.client_count() == 1 # only held by manager
+ else:
+ assert c.size() < fc.size
+ # END ignore static managers which only have one handle per file
+ rr = c.region()
+ assert rr.client_count() == 2 # manager + cursor
+ assert rr.ofs_begin() < c.ofs_begin() # it should have extended itself to the left
+ assert rr.ofs_end() <= fc.size # it cannot be larger than the file
+ assert c.buffer()[:] == data[base_offset:base_offset + (size or c.size())]
+
+ # unising a region makes the cursor invalid
+ c.unuse_region()
+ assert not c.is_valid()
+ if man.window_size():
+ # but doesn't change anything regarding the handle count - we cache it and only
+ # remove mapped regions if we have to
+ assert man.num_file_handles() == 2
+ # END ignore this for static managers
+
+ # iterate through the windows, verify data contents
+ # this will trigger map collection after a while
+ max_random_accesses = 5000
+ num_random_accesses = max_random_accesses
+ memory_read = 0
+ st = time()
+
+ # cache everything to get some more performance
+ includes_ofs = c.includes_ofs
+ max_mapped_memory_size = man.max_mapped_memory_size()
+ max_file_handles = man.max_file_handles()
+ mapped_memory_size = man.mapped_memory_size
+ num_file_handles = man.num_file_handles
+ while num_random_accesses:
+ num_random_accesses -= 1
+ base_offset = randint(0, fc.size - 1)
+
+ # precondition
+ if man.window_size():
+ assert max_mapped_memory_size >= mapped_memory_size()
+ # END statics will overshoot, which is fine
+ assert max_file_handles >= num_file_handles()
+ assert c.use_region(base_offset, (size or c.size())).is_valid()
+ csize = c.size()
+ assert c.buffer()[:] == data[base_offset:base_offset + csize]
+ memory_read += csize
+
+ assert includes_ofs(base_offset)
+ assert includes_ofs(base_offset + csize - 1)
+ assert not includes_ofs(base_offset + csize)
+ # END while we should do an access
+ elapsed = max(time() - st, 0.001) # prevent zero divison errors on windows
+ mb = float(1000 * 1000)
+ print("%s: Read %i mb of memory with %i random on cursor initialized with %s accesses in %fs (%f mb/s)\n"
+ % (mtype, memory_read / mb, max_random_accesses, type(item), elapsed, (memory_read / mb) / elapsed),
+ file=sys.stderr)
+
+ # an offset as large as the size doesn't work !
+ assert not c.use_region(fc.size, size).is_valid()
+
+ # collection - it should be able to collect all
+ assert man.num_file_handles()
+ assert man.collect()
+ assert man.num_file_handles() == 0
+ # END for each item
+ # END for each manager type
+ finally:
+ os.close(fd)
diff --git a/venv/lib/python3.9/site-packages/smmap/test/test_tutorial.py b/venv/lib/python3.9/site-packages/smmap/test/test_tutorial.py
new file mode 100644
index 00000000..31c272ab
--- /dev/null
+++ b/venv/lib/python3.9/site-packages/smmap/test/test_tutorial.py
@@ -0,0 +1,75 @@
+from .lib import TestBase
+
+
+class TestTutorial(TestBase):
+
+ def test_example(self):
+ # Memory Managers
+ ##################
+ import smmap
+ # This instance should be globally available in your application
+ # It is configured to be well suitable for 32-bit or 64 bit applications.
+ mman = smmap.SlidingWindowMapManager()
+
+ # the manager provides much useful information about its current state
+ # like the amount of open file handles or the amount of mapped memory
+ assert mman.num_file_handles() == 0
+ assert mman.mapped_memory_size() == 0
+ # and many more ...
+
+ # Cursors
+ ##########
+ import smmap.test.lib
+ with smmap.test.lib.FileCreator(1024 * 1024 * 8, "test_file") as fc:
+ # obtain a cursor to access some file.
+ c = mman.make_cursor(fc.path)
+
+ # the cursor is now associated with the file, but not yet usable
+ assert c.is_associated()
+ assert not c.is_valid()
+
+ # before you can use the cursor, you have to specify a window you want to
+ # access. The following just says you want as much data as possible starting
+ # from offset 0.
+ # To be sure your region could be mapped, query for validity
+ assert c.use_region().is_valid() # use_region returns self
+
+ # once a region was mapped, you must query its dimension regularly
+ # to assure you don't try to access its buffer out of its bounds
+ assert c.size()
+ c.buffer()[0] # first byte
+ c.buffer()[1:10] # first 9 bytes
+ c.buffer()[c.size() - 1] # last byte
+
+ # you can query absolute offsets, and check whether an offset is included
+ # in the cursor's data.
+ assert c.ofs_begin() < c.ofs_end()
+ assert c.includes_ofs(100)
+
+ # If you are over out of bounds with one of your region requests, the
+ # cursor will be come invalid. It cannot be used in that state
+ assert not c.use_region(fc.size, 100).is_valid()
+ # map as much as possible after skipping the first 100 bytes
+ assert c.use_region(100).is_valid()
+
+ # You can explicitly free cursor resources by unusing the cursor's region
+ c.unuse_region()
+ assert not c.is_valid()
+
+ # Buffers
+ #########
+ # Create a default buffer which can operate on the whole file
+ buf = smmap.SlidingWindowMapBuffer(mman.make_cursor(fc.path))
+
+ # you can use it right away
+ assert buf.cursor().is_valid()
+
+ buf[0] # access the first byte
+ buf[-1] # access the last ten bytes on the file
+ buf[-10:] # access the last ten bytes
+
+ # If you want to keep the instance between different accesses, use the
+ # dedicated methods
+ buf.end_access()
+ assert not buf.cursor().is_valid() # you cannot use the buffer anymore
+ assert buf.begin_access(offset=10) # start using the buffer at an offset
diff --git a/venv/lib/python3.9/site-packages/smmap/test/test_util.py b/venv/lib/python3.9/site-packages/smmap/test/test_util.py
new file mode 100644
index 00000000..e6ac10f3
--- /dev/null
+++ b/venv/lib/python3.9/site-packages/smmap/test/test_util.py
@@ -0,0 +1,105 @@
+from .lib import TestBase, FileCreator
+
+from smmap.util import (
+ MapWindow,
+ MapRegion,
+ MapRegionList,
+ ALLOCATIONGRANULARITY,
+ is_64_bit,
+ align_to_mmap
+)
+
+import os
+import sys
+
+
+class TestMMan(TestBase):
+
+ def test_window(self):
+ wl = MapWindow(0, 1) # left
+ wc = MapWindow(1, 1) # center
+ wc2 = MapWindow(10, 5) # another center
+ wr = MapWindow(8000, 50) # right
+
+ assert wl.ofs_end() == 1
+ assert wc.ofs_end() == 2
+ assert wr.ofs_end() == 8050
+
+ # extension does nothing if already in place
+ maxsize = 100
+ wc.extend_left_to(wl, maxsize)
+ assert wc.ofs == 1 and wc.size == 1
+ wl.extend_right_to(wc, maxsize)
+ wl.extend_right_to(wc, maxsize)
+ assert wl.ofs == 0 and wl.size == 1
+
+ # an actual left extension
+ pofs_end = wc2.ofs_end()
+ wc2.extend_left_to(wc, maxsize)
+ assert wc2.ofs == wc.ofs_end() and pofs_end == wc2.ofs_end()
+
+ # respects maxsize
+ wc.extend_right_to(wr, maxsize)
+ assert wc.ofs == 1 and wc.size == maxsize
+ wc.extend_right_to(wr, maxsize)
+ assert wc.ofs == 1 and wc.size == maxsize
+
+ # without maxsize
+ wc.extend_right_to(wr, sys.maxsize)
+ assert wc.ofs_end() == wr.ofs and wc.ofs == 1
+
+ # extend left
+ wr.extend_left_to(wc2, maxsize)
+ wr.extend_left_to(wc2, maxsize)
+ assert wr.size == maxsize
+
+ wr.extend_left_to(wc2, sys.maxsize)
+ assert wr.ofs == wc2.ofs_end()
+
+ wc.align()
+ assert wc.ofs == 0 and wc.size == align_to_mmap(wc.size, True)
+
+ def test_region(self):
+ with FileCreator(self.k_window_test_size, "window_test") as fc:
+ half_size = fc.size // 2
+ rofs = align_to_mmap(4200, False)
+ rfull = MapRegion(fc.path, 0, fc.size)
+ rhalfofs = MapRegion(fc.path, rofs, fc.size)
+ rhalfsize = MapRegion(fc.path, 0, half_size)
+
+ # offsets
+ assert rfull.ofs_begin() == 0 and rfull.size() == fc.size
+ assert rfull.ofs_end() == fc.size # if this method works, it works always
+
+ assert rhalfofs.ofs_begin() == rofs and rhalfofs.size() == fc.size - rofs
+ assert rhalfsize.ofs_begin() == 0 and rhalfsize.size() == half_size
+
+ assert rfull.includes_ofs(0) and rfull.includes_ofs(fc.size - 1) and rfull.includes_ofs(half_size)
+ assert not rfull.includes_ofs(-1) and not rfull.includes_ofs(sys.maxsize)
+
+ # auto-refcount
+ assert rfull.client_count() == 1
+ rfull2 = rfull
+ assert rfull.client_count() == 1, "no auto-counting"
+
+ # window constructor
+ w = MapWindow.from_region(rfull)
+ assert w.ofs == rfull.ofs_begin() and w.ofs_end() == rfull.ofs_end()
+
+ def test_region_list(self):
+ with FileCreator(100, "sample_file") as fc:
+ fd = os.open(fc.path, os.O_RDONLY)
+ try:
+ for item in (fc.path, fd):
+ ml = MapRegionList(item)
+
+ assert len(ml) == 0
+ assert ml.path_or_fd() == item
+ assert ml.file_size() == fc.size
+ finally:
+ os.close(fd)
+
+ def test_util(self):
+ assert isinstance(is_64_bit(), bool) # just call it
+ assert align_to_mmap(1, False) == 0
+ assert align_to_mmap(1, True) == ALLOCATIONGRANULARITY