summaryrefslogtreecommitdiff
path: root/config/tests/unitMozZipFile.py
blob: f9c0419ab5422cafea331711c98fac9beb526668 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

import unittest

import shutil
import os
import re
import sys
import random
import copy
from string import letters

'''
Test case infrastructure for MozZipFile.

This isn't really a unit test, but a test case generator and runner.
For a given set of files, lengths, and number of writes, we create 
a testcase for every combination of the three. There are some
symmetries used to reduce the number of test cases, the first file
written is always the first file, the second is either the first or
the second, the third is one of the first three. That is, if we
had 4 files, but only three writes, the fourth file would never even
get tried.

The content written to the jars is pseudorandom with a fixed seed.
'''

if not __file__:
  __file__ = sys.argv[0]
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))

from MozZipFile import ZipFile
import zipfile

leafs = (
  'firstdir/oneleaf',
  'seconddir/twoleaf',
  'thirddir/with/sub/threeleaf')
_lengths = map(lambda n: n * 64, [16, 64, 80])
lengths = 3
writes = 5

def givenlength(i):
  '''Return a length given in the _lengths array to allow manual
  tuning of which lengths of zip entries to use.
  '''
  return _lengths[i]


def prod(*iterables):
  ''''Tensor product of a list of iterables.

  This generator returns lists of items, one of each given
  iterable. It iterates over all possible combinations.
  '''
  for item in iterables[0]:
    if len(iterables) == 1:
      yield [item]
    else:
      for others in prod(*iterables[1:]):
        yield [item] + others


def getid(descs):
  'Convert a list of ints to a string.'
  return reduce(lambda x,y: x+'{0}{1}'.format(*tuple(y)), descs,'')


def getContent(length):
  'Get pseudo random content of given length.'
  rv = [None] * length
  for i in xrange(length):
    rv[i] = random.choice(letters)
  return ''.join(rv)


def createWriter(sizer, *items):
  'Helper method to fill in tests, one set of writes, one for each item'
  locitems = copy.deepcopy(items)
  for item in locitems:
    item['length'] = sizer(item.pop('length', 0))
  def helper(self):
    mode  = 'w'
    if os.path.isfile(self.f):
      mode = 'a'
    zf = ZipFile(self.f, mode, self.compression)
    for item in locitems:
      self._write(zf, **item)
    zf = None
    pass
  return helper

def createTester(name, *writes):
  '''Helper method to fill in tests, calls into a list of write
  helper methods.
  '''
  _writes = copy.copy(writes)
  def tester(self):
    for w in _writes:
      getattr(self, w)()
    self._verifyZip()
    pass
  # unit tests get confused if the method name isn't test...
  tester.__name__ = name
  return tester

class TestExtensiveStored(unittest.TestCase):
  '''Unit tests for MozZipFile

  The testcase are actually populated by code following the class
  definition.
  '''
  
  stage = "mozzipfilestage"
  compression = zipfile.ZIP_STORED

  def leaf(self, *leafs):
    return os.path.join(self.stage, *leafs)
  def setUp(self):
    if os.path.exists(self.stage):
      shutil.rmtree(self.stage)
    os.mkdir(self.stage)
    self.f = self.leaf('test.jar')
    self.ref = {}
    self.seed = 0
  
  def tearDown(self):
    self.f = None
    self.ref = None
  
  def _verifyZip(self):
    zf = zipfile.ZipFile(self.f)
    badEntry = zf.testzip()
    self.failIf(badEntry, badEntry)
    zlist = zf.namelist()
    zlist.sort()
    vlist = self.ref.keys()
    vlist.sort()
    self.assertEqual(zlist, vlist)
    for leaf, content in self.ref.iteritems():
      zcontent = zf.read(leaf)
      self.assertEqual(content, zcontent)
  
  def _write(self, zf, seed=None, leaf=0, length=0):
    if seed is None:
      seed = self.seed
      self.seed += 1
    random.seed(seed)
    leaf = leafs[leaf]
    content = getContent(length)
    self.ref[leaf] = content
    zf.writestr(leaf, content)
    dir = os.path.dirname(self.leaf('stage', leaf))
    if not os.path.isdir(dir):
      os.makedirs(dir)
    open(self.leaf('stage', leaf), 'w').write(content)

# all leafs in all lengths
atomics = list(prod(xrange(len(leafs)), xrange(lengths)))

# populate TestExtensiveStore with testcases
for w in xrange(writes):
  # Don't iterate over all files for the the first n passes,
  # those are redundant as long as w < lengths.
  # There are symmetries in the trailing end, too, but I don't know
  # how to reduce those out right now.
  nonatomics = [list(prod(range(min(i,len(leafs))), xrange(lengths)))
                for i in xrange(1, w+1)] + [atomics]
  for descs in prod(*nonatomics):
    suffix = getid(descs)
    dicts = [dict(leaf=leaf, length=length) for leaf, length in descs]
    setattr(TestExtensiveStored, '_write' + suffix,
            createWriter(givenlength, *dicts))
    setattr(TestExtensiveStored, 'test' + suffix,
            createTester('test' + suffix, '_write' + suffix))

# now create another round of tests, with two writing passes
# first, write all file combinations into the jar, close it,
# and then write all atomics again.
# This should catch more or less all artifacts generated
# by the final ordering step when closing the jar.
files = [list(prod([i], xrange(lengths))) for i in xrange(len(leafs))]
allfiles = reduce(lambda l,r:l+r,
                  [list(prod(*files[:(i+1)])) for i in xrange(len(leafs))])

for first in allfiles:
  testbasename = 'test{0}_'.format(getid(first))
  test = [None, '_write' + getid(first), None]
  for second in atomics:
    test[0] = testbasename + getid([second])
    test[2] = '_write' + getid([second])
    setattr(TestExtensiveStored, test[0], createTester(*test))

class TestExtensiveDeflated(TestExtensiveStored):
  'Test all that has been tested with ZIP_STORED with DEFLATED, too.'
  compression = zipfile.ZIP_DEFLATED

if __name__ == '__main__':
  unittest.main()