summaryrefslogtreecommitdiffstats
path: root/venv/lib/python3.9/site-packages/smmap/test/test_buf.py
blob: 17555afe4c351f26822b298af45c69b50bfb6808 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
from .lib import TestBase, FileCreator

from smmap.mman import (
    SlidingWindowMapManager,
    StaticWindowMapManager
)
from smmap.buf import SlidingWindowMapBuffer

from random import randint
from time import time
import sys
import os


man_optimal = SlidingWindowMapManager()
man_worst_case = SlidingWindowMapManager(
    window_size=TestBase.k_window_test_size // 100,
    max_memory_size=TestBase.k_window_test_size // 3,
    max_open_handles=15)
static_man = StaticWindowMapManager()


class TestBuf(TestBase):

    def test_basics(self):
        with FileCreator(self.k_window_test_size, "buffer_test") as fc:

            # invalid paths fail upon construction
            c = man_optimal.make_cursor(fc.path)
            self.assertRaises(ValueError, SlidingWindowMapBuffer, type(c)())            # invalid cursor
            self.assertRaises(ValueError, SlidingWindowMapBuffer, c, fc.size)       # offset too large

            buf = SlidingWindowMapBuffer()                                              # can create uninitailized buffers
            assert buf.cursor() is None

            # can call end access any time
            buf.end_access()
            buf.end_access()
            assert len(buf) == 0

            # begin access can revive it, if the offset is suitable
            offset = 100
            assert buf.begin_access(c, fc.size) == False
            assert buf.begin_access(c, offset) == True
            assert len(buf) == fc.size - offset
            assert buf.cursor().is_valid()

            # empty begin access keeps it valid on the same path, but alters the offset
            assert buf.begin_access() == True
            assert len(buf) == fc.size
            assert buf.cursor().is_valid()

            # simple access
            with open(fc.path, 'rb') as fp:
                data = fp.read()
            assert data[offset] == buf[0]
            assert data[offset:offset * 2] == buf[0:offset]

            # negative indices, partial slices
            assert buf[-1] == buf[len(buf) - 1]
            assert buf[-10:] == buf[len(buf) - 10:len(buf)]

            # end access makes its cursor invalid
            buf.end_access()
            assert not buf.cursor().is_valid()
            assert buf.cursor().is_associated()         # but it remains associated

            # an empty begin access fixes it up again
            assert buf.begin_access() == True and buf.cursor().is_valid()
            del(buf)        # ends access automatically
            del(c)

            assert man_optimal.num_file_handles() == 1

            # PERFORMANCE
            # blast away with random access and a full mapping - we don't want to
            # exaggerate the manager's overhead, but measure the buffer overhead
            # We do it once with an optimal setting, and with a worse manager which
            # will produce small mappings only !
            max_num_accesses = 100
            fd = os.open(fc.path, os.O_RDONLY)
            for item in (fc.path, fd):
                for manager, man_id in ((man_optimal, 'optimal'),
                                        (man_worst_case, 'worst case'),
                                        (static_man, 'static optimal')):
                    buf = SlidingWindowMapBuffer(manager.make_cursor(item))
                    assert manager.num_file_handles() == 1
                    for access_mode in range(2):    # single, multi
                        num_accesses_left = max_num_accesses
                        num_bytes = 0
                        fsize = fc.size

                        st = time()
                        buf.begin_access()
                        while num_accesses_left:
                            num_accesses_left -= 1
                            if access_mode:  # multi
                                ofs_start = randint(0, fsize)
                                ofs_end = randint(ofs_start, fsize)
                                d = buf[ofs_start:ofs_end]
                                assert len(d) == ofs_end - ofs_start
                                assert d == data[ofs_start:ofs_end]
                                num_bytes += len(d)
                                del d
                            else:
                                pos = randint(0, fsize)
                                assert buf[pos] == data[pos]
                                num_bytes += 1
                            # END handle mode
                        # END handle num accesses

                        buf.end_access()
                        assert manager.num_file_handles()
                        assert manager.collect()
                        assert manager.num_file_handles() == 0
                        elapsed = max(time() - st, 0.001)  # prevent zero division errors on windows
                        mb = float(1000 * 1000)
                        mode_str = (access_mode and "slice") or "single byte"
                        print("%s: Made %i random %s accesses to buffer created from %s reading a total of %f mb in %f s (%f mb/s)"
                              % (man_id, max_num_accesses, mode_str, type(item), num_bytes / mb, elapsed, (num_bytes / mb) / elapsed),
                              file=sys.stderr)
                    # END handle access mode
                    del buf
                # END for each manager
            # END for each input
            os.close(fd)