1 # Copyright (C) 2009 Google Inc. All rights reserved.
3 # Redistribution and use in source and binary forms, with or without
4 # modification, are permitted provided that the following conditions are
7 # * Redistributions of source code must retain the above copyright
8 # notice, this list of conditions and the following disclaimer.
9 # * Redistributions in binary form must reproduce the above
10 # copyright notice, this list of conditions and the following disclaimer
11 # in the documentation and/or other materials provided with the
13 # * Neither the name of Google Inc. nor the names of its
14 # contributors may be used to endorse or promote products derived from
15 # this software without specific prior written permission.
17 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
18 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
19 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
20 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
21 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35 class MockFileSystem(object):
36 def __init__(self, files=None):
37 """Initializes a "mock" filesystem that can be used to completely
38 stub out a filesystem.
41 files: a dict of filenames -> file contents. A file contents
42 value of None is used to indicate that the file should
45 self.files = files or {}
46 self.written_files = {}
48 self.current_tmpno = 0
50 def _raise_not_found(self, path):
51 raise IOError(errno.ENOENT, path, os.strerror(errno.ENOENT))
53 def _split(self, path):
55 return (path[0:idx], path[idx + 1:])
57 def abspath(self, path):
60 def basename(self, path):
61 return self._split(path)[1]
63 def copyfile(self, source, destination):
64 if not self.exists(source):
65 self._raise_not_found(source)
66 if self.isdir(source):
67 raise IOError(errno.EISDIR, source, os.strerror(errno.ISDIR))
68 if self.isdir(destination):
69 raise IOError(errno.EISDIR, destination, os.strerror(errno.ISDIR))
71 self.files[destination] = self.files[source]
73 def dirname(self, path):
74 return self._split(path)[0]
76 def exists(self, path):
77 return self.isfile(path) or self.isdir(path)
79 def files_under(self, path, dirs_to_skip=[], file_filter=None):
80 def filter_all(fs, dirpath, basename):
83 file_filter = file_filter or filter_all
86 if file_filter(self, self.dirname(path), self.basename(path)):
90 if self.basename(path) in dirs_to_skip:
93 if not path.endswith('/'):
96 dir_substrings = ['/' + d + '/' for d in dirs_to_skip]
97 for filename in self.files:
98 if not filename.startswith(path):
101 suffix = filename[len(path) - 1:]
102 if any(dir_substring in suffix for dir_substring in dir_substrings):
105 dirpath, basename = self._split(filename)
106 if file_filter(self, dirpath, basename):
107 files.append(filename)
111 def glob(self, path):
112 # FIXME: This only handles a wildcard '*' at the end of the path.
113 # Maybe it should handle more?
115 return [f for f in self.files if f.startswith(path[:-1])]
117 return [f for f in self.files if f == path]
119 def isabs(self, path):
120 return path.startswith('/')
122 def isfile(self, path):
123 return path in self.files and self.files[path] is not None
125 def isdir(self, path):
126 if path in self.files:
128 if not path.endswith('/'):
131 # We need to use a copy of the keys here in order to avoid switching
132 # to a different thread and potentially modifying the dict in
134 files = self.files.keys()[:]
135 return any(f.startswith(path) for f in files)
137 def join(self, *comps):
138 return re.sub(re.escape(os.path.sep), '/', os.path.join(*comps))
140 def listdir(self, path):
141 if not self.isdir(path):
142 raise OSError("%s is not a directory" % path)
144 if not path.endswith('/'):
150 if self.exists(f) and f.startswith(path):
151 remaining = f[len(path):]
153 dir = remaining[:remaining.index('/')]
157 files.append(remaining)
160 def mtime(self, path):
161 if self.exists(path):
163 self._raise_not_found(path)
165 def _mktemp(self, suffix='', prefix='tmp', dir=None, **kwargs):
168 curno = self.current_tmpno
169 self.current_tmpno += 1
170 return self.join(dir, "%s_%u_%s" % (prefix, curno, suffix))
172 def mkdtemp(self, **kwargs):
173 class TemporaryDirectory(object):
174 def __init__(self, fs, **kwargs):
175 self._kwargs = kwargs
176 self._filesystem = fs
177 self._directory_path = fs._mktemp(**kwargs)
178 fs.maybe_make_directory(self._directory_path)
181 return self._directory_path
184 return self._directory_path
186 def __exit__(self, type, value, traceback):
187 # Only self-delete if necessary.
189 # FIXME: Should we delete non-empty directories?
190 if self._filesystem.exists(self._directory_path):
191 self._filesystem.rmtree(self._directory_path)
193 return TemporaryDirectory(fs=self, **kwargs)
195 def maybe_make_directory(self, *path):
196 # FIXME: Implement such that subsequent calls to isdir() work?
199 def move(self, src, dst):
200 if self.files[src] is None:
201 self._raise_not_found(src)
202 self.files[dst] = self.files[src]
203 self.files[src] = None
205 def normpath(self, path):
208 def open_binary_tempfile(self, suffix):
209 path = self._mktemp(suffix)
210 return WritableFileObject(self, path), path
212 def open_text_file_for_writing(self, path, append=False):
213 return WritableFileObject(self, path, append)
215 def read_text_file(self, path):
216 return self.read_binary_file(path)
218 def read_binary_file(self, path):
219 # Intentionally raises KeyError if we don't recognize the path.
220 if self.files[path] is None:
221 self._raise_not_found(path)
222 return self.files[path]
224 def remove(self, path):
225 if self.files[path] is None:
226 self._raise_not_found(path)
227 self.files[path] = None
229 def rmtree(self, path):
230 if not path.endswith('/'):
234 if f.startswith(path):
237 def splitext(self, path):
238 idx = path.rfind('.')
241 return (path[0:idx], path[idx:])
243 def write_text_file(self, path, contents):
244 return self.write_binary_file(path, contents)
246 def write_binary_file(self, path, contents):
247 self.files[path] = contents
248 self.written_files[path] = contents
251 class WritableFileObject(object):
252 def __init__(self, fs, path, append=False, encoding=None):
256 if path not in self.fs.files or not append:
257 self.fs.files[path] = ""
262 def __exit__(self, type, value, traceback):
268 def write(self, str):
269 self.fs.files[self.path] += str
270 self.fs.written_files[self.path] = self.fs.files[self.path]