2009-11-25 Yuzo Fujishima <yuzo@google.com>
[WebKit-https.git] / WebKitTools / pywebsocket / test / test_dispatch.py
1 #!/usr/bin/env python
2 #
3 # Copyright 2009, Google Inc.
4 # All rights reserved.
5 #
6 # Redistribution and use in source and binary forms, with or without
7 # modification, are permitted provided that the following conditions are
8 # met:
9 #
10 #     * Redistributions of source code must retain the above copyright
11 # notice, this list of conditions and the following disclaimer.
12 #     * Redistributions in binary form must reproduce the above
13 # copyright notice, this list of conditions and the following disclaimer
14 # in the documentation and/or other materials provided with the
15 # distribution.
16 #     * Neither the name of Google Inc. nor the names of its
17 # contributors may be used to endorse or promote products derived from
18 # this software without specific prior written permission.
19 #
20 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31
32
33 """Tests for dispatch module."""
34
35
36
37 import os
38 import unittest
39
40 import config  # This must be imported before mod_pywebsocket.
41 from mod_pywebsocket import dispatch
42
43 import mock
44
45
46 _TEST_HANDLERS_DIR = os.path.join(
47         os.path.split(__file__)[0], 'testdata', 'handlers')
48
49 _TEST_HANDLERS_SUB_DIR = os.path.join(_TEST_HANDLERS_DIR, 'sub')
50
51 class DispatcherTest(unittest.TestCase):
52     def test_normalize_path(self):
53         self.assertEqual(os.path.abspath('/a/b').replace('\\', '/'),
54                          dispatch._normalize_path('/a/b'))
55         self.assertEqual(os.path.abspath('/a/b').replace('\\', '/'),
56                          dispatch._normalize_path('\\a\\b'))
57         self.assertEqual(os.path.abspath('/a/b').replace('\\', '/'),
58                          dispatch._normalize_path('/a/c/../b'))
59         self.assertEqual(os.path.abspath('abc').replace('\\', '/'),
60                          dispatch._normalize_path('abc'))
61
62     def test_converter(self):
63         converter = dispatch._path_to_resource_converter('/a/b')
64         self.assertEqual('/h', converter('/a/b/h_wsh.py'))
65         self.assertEqual('/c/h', converter('/a/b/c/h_wsh.py'))
66         self.assertEqual(None, converter('/a/b/h.py'))
67         self.assertEqual(None, converter('a/b/h_wsh.py'))
68
69         converter = dispatch._path_to_resource_converter('a/b')
70         self.assertEqual('/h', converter('a/b/h_wsh.py'))
71
72         converter = dispatch._path_to_resource_converter('/a/b///')
73         self.assertEqual('/h', converter('/a/b/h_wsh.py'))
74         self.assertEqual('/h', converter('/a/b/../b/h_wsh.py'))
75
76         converter = dispatch._path_to_resource_converter('/a/../a/b/../b/')
77         self.assertEqual('/h', converter('/a/b/h_wsh.py'))
78
79         converter = dispatch._path_to_resource_converter(r'\a\b')
80         self.assertEqual('/h', converter(r'\a\b\h_wsh.py'))
81         self.assertEqual('/h', converter(r'/a/b/h_wsh.py'))
82
83     def test_source_file_paths(self):
84         paths = list(dispatch._source_file_paths(_TEST_HANDLERS_DIR))
85         paths.sort()
86         self.assertEqual(7, len(paths))
87         expected_paths = [
88                 os.path.join(_TEST_HANDLERS_DIR, 'blank_wsh.py'),
89                 os.path.join(_TEST_HANDLERS_DIR, 'origin_check_wsh.py'),
90                 os.path.join(_TEST_HANDLERS_DIR, 'sub',
91                              'exception_in_transfer_wsh.py'),
92                 os.path.join(_TEST_HANDLERS_DIR, 'sub', 'non_callable_wsh.py'),
93                 os.path.join(_TEST_HANDLERS_DIR, 'sub', 'plain_wsh.py'),
94                 os.path.join(_TEST_HANDLERS_DIR, 'sub',
95                              'wrong_handshake_sig_wsh.py'),
96                 os.path.join(_TEST_HANDLERS_DIR, 'sub',
97                              'wrong_transfer_sig_wsh.py'),
98                 ]
99         for expected, actual in zip(expected_paths, paths):
100             self.assertEqual(expected, actual)
101
102     def test_source(self):
103         self.assertRaises(dispatch.DispatchError, dispatch._source, '')
104         self.assertRaises(dispatch.DispatchError, dispatch._source, 'def')
105         self.assertRaises(dispatch.DispatchError, dispatch._source, '1/0')
106         self.failUnless(dispatch._source(
107                 'def web_socket_do_extra_handshake(request):pass\n'
108                 'def web_socket_transfer_data(request):pass\n'))
109
110     def test_source_warnings(self):
111         dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None)
112         warnings = dispatcher.source_warnings()
113         warnings.sort()
114         expected_warnings = [
115                 (os.path.join(_TEST_HANDLERS_DIR, 'blank_wsh.py') +
116                  ': web_socket_do_extra_handshake is not defined.'),
117                 (os.path.join(_TEST_HANDLERS_DIR, 'sub',
118                               'non_callable_wsh.py') +
119                  ': web_socket_do_extra_handshake is not callable.'),
120                 (os.path.join(_TEST_HANDLERS_DIR, 'sub',
121                               'wrong_handshake_sig_wsh.py') +
122                  ': web_socket_do_extra_handshake is not defined.'),
123                 (os.path.join(_TEST_HANDLERS_DIR, 'sub',
124                               'wrong_transfer_sig_wsh.py') +
125                  ': web_socket_transfer_data is not defined.'),
126                 ]
127         self.assertEquals(4, len(warnings))
128         for expected, actual in zip(expected_warnings, warnings):
129             self.assertEquals(expected, actual)
130
131     def test_do_extra_handshake(self):
132         dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None)
133         request = mock.MockRequest()
134         request.ws_resource = '/origin_check'
135         request.ws_origin = 'http://example.com'
136         dispatcher.do_extra_handshake(request)  # Must not raise exception.
137
138         request.ws_origin = 'http://bad.example.com'
139         self.assertRaises(dispatch.DispatchError,
140                           dispatcher.do_extra_handshake, request)
141
142     def test_transfer_data(self):
143         dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None)
144         request = mock.MockRequest(connection=mock.MockConn(''))
145         request.ws_resource = '/origin_check'
146         request.ws_protocol = 'p1'
147
148         dispatcher.transfer_data(request)
149         self.assertEqual('origin_check_wsh.py is called for /origin_check, p1',
150                          request.connection.written_data())
151
152         request = mock.MockRequest(connection=mock.MockConn(''))
153         request.ws_resource = '/sub/plain'
154         request.ws_protocol = None
155         dispatcher.transfer_data(request)
156         self.assertEqual('sub/plain_wsh.py is called for /sub/plain, None',
157                          request.connection.written_data())
158
159         request = mock.MockRequest(connection=mock.MockConn(''))
160         request.ws_resource = '/sub/plain?'
161         request.ws_protocol = None
162         dispatcher.transfer_data(request)
163         self.assertEqual('sub/plain_wsh.py is called for /sub/plain?, None',
164                          request.connection.written_data())
165
166         request = mock.MockRequest(connection=mock.MockConn(''))
167         request.ws_resource = '/sub/plain?q=v'
168         request.ws_protocol = None
169         dispatcher.transfer_data(request)
170         self.assertEqual('sub/plain_wsh.py is called for /sub/plain?q=v, None',
171                          request.connection.written_data())
172
173     def test_transfer_data_no_handler(self):
174         dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None)
175         for resource in ['/blank', '/sub/non_callable',
176                          '/sub/no_wsh_at_the_end', '/does/not/exist']:
177             request = mock.MockRequest(connection=mock.MockConn(''))
178             request.ws_resource = resource
179             request.ws_protocol = 'p2'
180             try:
181                 dispatcher.transfer_data(request)
182                 self.fail()
183             except dispatch.DispatchError, e:
184                 self.failUnless(str(e).find('No handler') != -1)
185             except Exception:
186                 self.fail()
187
188     def test_transfer_data_handler_exception(self):
189         dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None)
190         request = mock.MockRequest(connection=mock.MockConn(''))
191         request.ws_resource = '/sub/exception_in_transfer'
192         request.ws_protocol = 'p3'
193         try:
194             dispatcher.transfer_data(request)
195             self.fail()
196         except dispatch.DispatchError, e:
197             self.failUnless(str(e).find('Intentional') != -1)
198         except Exception:
199             self.fail()
200
201     def test_scan_dir(self):
202         disp = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None)
203         self.assertEqual(3, len(disp._handlers))
204         self.failUnless(disp._handlers.has_key('/origin_check'))
205         self.failUnless(disp._handlers.has_key('/sub/exception_in_transfer'))
206         self.failUnless(disp._handlers.has_key('/sub/plain'))
207
208     def test_scan_sub_dir(self):
209         disp = dispatch.Dispatcher(_TEST_HANDLERS_DIR, _TEST_HANDLERS_SUB_DIR)
210         self.assertEqual(2, len(disp._handlers))
211         self.failIf(disp._handlers.has_key('/origin_check'))
212         self.failUnless(disp._handlers.has_key('/sub/exception_in_transfer'))
213         self.failUnless(disp._handlers.has_key('/sub/plain'))
214
215     def test_scan_sub_dir_as_root(self):
216         disp = dispatch.Dispatcher(_TEST_HANDLERS_SUB_DIR,
217                                    _TEST_HANDLERS_SUB_DIR)
218         self.assertEqual(2, len(disp._handlers))
219         self.failIf(disp._handlers.has_key('/origin_check'))
220         self.failIf(disp._handlers.has_key('/sub/exception_in_transfer'))
221         self.failIf(disp._handlers.has_key('/sub/plain'))
222         self.failUnless(disp._handlers.has_key('/exception_in_transfer'))
223         self.failUnless(disp._handlers.has_key('/plain'))
224
225     def test_scan_dir_must_under_root(self):
226         dispatch.Dispatcher('a/b', 'a/b/c')  # OK
227         dispatch.Dispatcher('a/b///', 'a/b')  # OK
228         self.assertRaises(dispatch.DispatchError,
229                           dispatch.Dispatcher, 'a/b/c', 'a/b')
230
231
232 if __name__ == '__main__':
233     unittest.main()
234
235
236 # vi:sts=4 sw=4 et