d6172053866e0d5d2f3db6d414acc1c0e34268a5
[WebKit-https.git] / WebKitTools / pywebsocket / test / test_dispatch.py
1 #!/usr/bin/env python
2 #
3 # Copyright 2009, Google Inc.
4 # All rights reserved.
5 #
6 # Redistribution and use in source and binary forms, with or without
7 # modification, are permitted provided that the following conditions are
8 # met:
9 #
10 #     * Redistributions of source code must retain the above copyright
11 # notice, this list of conditions and the following disclaimer.
12 #     * Redistributions in binary form must reproduce the above
13 # copyright notice, this list of conditions and the following disclaimer
14 # in the documentation and/or other materials provided with the
15 # distribution.
16 #     * Neither the name of Google Inc. nor the names of its
17 # contributors may be used to endorse or promote products derived from
18 # this software without specific prior written permission.
19 #
20 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31
32
33 """Tests for dispatch module."""
34
35
36
37 import os
38 import unittest
39
40 import config  # This must be imported before mod_pywebsocket.
41 from mod_pywebsocket import dispatch
42
43 import mock
44
45
46 _TEST_HANDLERS_DIR = os.path.join(
47         os.path.split(__file__)[0], 'testdata', 'handlers')
48
49 _TEST_HANDLERS_SUB_DIR = os.path.join(_TEST_HANDLERS_DIR, 'sub')
50
51 class DispatcherTest(unittest.TestCase):
52     def test_normalize_path(self):
53         self.assertEqual(os.path.abspath('/a/b').replace('\\', '/'),
54                          dispatch._normalize_path('/a/b'))
55         self.assertEqual(os.path.abspath('/a/b').replace('\\', '/'),
56                          dispatch._normalize_path('\\a\\b'))
57         self.assertEqual(os.path.abspath('/a/b').replace('\\', '/'),
58                          dispatch._normalize_path('/a/c/../b'))
59         self.assertEqual(os.path.abspath('abc').replace('\\', '/'),
60                          dispatch._normalize_path('abc'))
61
62     def test_converter(self):
63         converter = dispatch._path_to_resource_converter('/a/b')
64         self.assertEqual('/h', converter('/a/b/h_wsh.py'))
65         self.assertEqual('/c/h', converter('/a/b/c/h_wsh.py'))
66         self.assertEqual(None, converter('/a/b/h.py'))
67         self.assertEqual(None, converter('a/b/h_wsh.py'))
68
69         converter = dispatch._path_to_resource_converter('a/b')
70         self.assertEqual('/h', converter('a/b/h_wsh.py'))
71
72         converter = dispatch._path_to_resource_converter('/a/b///')
73         self.assertEqual('/h', converter('/a/b/h_wsh.py'))
74         self.assertEqual('/h', converter('/a/b/../b/h_wsh.py'))
75
76         converter = dispatch._path_to_resource_converter('/a/../a/b/../b/')
77         self.assertEqual('/h', converter('/a/b/h_wsh.py'))
78
79         converter = dispatch._path_to_resource_converter(r'\a\b')
80         self.assertEqual('/h', converter(r'\a\b\h_wsh.py'))
81         self.assertEqual('/h', converter(r'/a/b/h_wsh.py'))
82
83     def test_source_file_paths(self):
84         paths = list(dispatch._source_file_paths(_TEST_HANDLERS_DIR))
85         paths.sort()
86         self.assertEqual(7, len(paths))
87         expected_paths = [
88                 os.path.join(_TEST_HANDLERS_DIR, 'blank_wsh.py'),
89                 os.path.join(_TEST_HANDLERS_DIR, 'origin_check_wsh.py'),
90                 os.path.join(_TEST_HANDLERS_DIR, 'sub',
91                              'exception_in_transfer_wsh.py'),
92                 os.path.join(_TEST_HANDLERS_DIR, 'sub', 'non_callable_wsh.py'),
93                 os.path.join(_TEST_HANDLERS_DIR, 'sub', 'plain_wsh.py'),
94                 os.path.join(_TEST_HANDLERS_DIR, 'sub',
95                              'wrong_handshake_sig_wsh.py'),
96                 os.path.join(_TEST_HANDLERS_DIR, 'sub',
97                              'wrong_transfer_sig_wsh.py'),
98                 ]
99         for expected, actual in zip(expected_paths, paths):
100             self.assertEqual(expected, actual)
101
102     def test_source(self):
103         self.assertRaises(dispatch.DispatchError, dispatch._source, '')
104         self.assertRaises(dispatch.DispatchError, dispatch._source, 'def')
105         self.assertRaises(dispatch.DispatchError, dispatch._source, '1/0')
106         self.failUnless(dispatch._source(
107                 'def web_socket_do_extra_handshake(request):pass\n'
108                 'def web_socket_transfer_data(request):pass\n'))
109
110     def test_source_warnings(self):
111         dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None)
112         warnings = dispatcher.source_warnings()
113         warnings.sort()
114         expected_warnings = [
115                 (os.path.join(_TEST_HANDLERS_DIR, 'blank_wsh.py') +
116                  ': web_socket_do_extra_handshake is not defined.'),
117                 (os.path.join(_TEST_HANDLERS_DIR, 'sub',
118                               'non_callable_wsh.py') +
119                  ': web_socket_do_extra_handshake is not callable.'),
120                 (os.path.join(_TEST_HANDLERS_DIR, 'sub',
121                               'wrong_handshake_sig_wsh.py') +
122                  ': web_socket_do_extra_handshake is not defined.'),
123                 (os.path.join(_TEST_HANDLERS_DIR, 'sub',
124                               'wrong_transfer_sig_wsh.py') +
125                  ': web_socket_transfer_data is not defined.'),
126                 ]
127         self.assertEquals(4, len(warnings))
128         for expected, actual in zip(expected_warnings, warnings):
129             self.assertEquals(expected, actual)
130
131     def test_do_extra_handshake(self):
132         dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None)
133         request = mock.MockRequest()
134         request.ws_resource = '/origin_check'
135         request.ws_origin = 'http://example.com'
136         dispatcher.do_extra_handshake(request)  # Must not raise exception.
137
138         request.ws_origin = 'http://bad.example.com'
139         self.assertRaises(dispatch.DispatchError,
140                           dispatcher.do_extra_handshake, request)
141
142     def test_transfer_data(self):
143         dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None)
144         request = mock.MockRequest(connection=mock.MockConn(''))
145         request.ws_resource = '/origin_check'
146         request.ws_protocol = 'p1'
147
148         dispatcher.transfer_data(request)
149         self.assertEqual('origin_check_wsh.py is called for /origin_check, p1',
150                          request.connection.written_data())
151
152         request = mock.MockRequest(connection=mock.MockConn(''))
153         request.ws_resource = '/sub/plain'
154         request.ws_protocol = None
155         dispatcher.transfer_data(request)
156         self.assertEqual('sub/plain_wsh.py is called for /sub/plain, None',
157                          request.connection.written_data())
158
159     def test_transfer_data_no_handler(self):
160         dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None)
161         for resource in ['/blank', '/sub/non_callable',
162                          '/sub/no_wsh_at_the_end', '/does/not/exist']:
163             request = mock.MockRequest(connection=mock.MockConn(''))
164             request.ws_resource = resource
165             request.ws_protocol = 'p2'
166             try:
167                 dispatcher.transfer_data(request)
168                 self.fail()
169             except dispatch.DispatchError, e:
170                 self.failUnless(str(e).find('No handler') != -1)
171             except Exception:
172                 self.fail()
173
174     def test_transfer_data_handler_exception(self):
175         dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None)
176         request = mock.MockRequest(connection=mock.MockConn(''))
177         request.ws_resource = '/sub/exception_in_transfer'
178         request.ws_protocol = 'p3'
179         try:
180             dispatcher.transfer_data(request)
181             self.fail()
182         except dispatch.DispatchError, e:
183             self.failUnless(str(e).find('Intentional') != -1)
184         except Exception:
185             self.fail()
186
187     def test_scan_dir(self):
188         disp = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None)
189         self.assertEqual(3, len(disp._handlers))
190         self.failUnless(disp._handlers.has_key('/origin_check'))
191         self.failUnless(disp._handlers.has_key('/sub/exception_in_transfer'))
192         self.failUnless(disp._handlers.has_key('/sub/plain'))
193
194     def test_scan_sub_dir(self):
195         disp = dispatch.Dispatcher(_TEST_HANDLERS_DIR, _TEST_HANDLERS_SUB_DIR)
196         self.assertEqual(2, len(disp._handlers))
197         self.failIf(disp._handlers.has_key('/origin_check'))
198         self.failUnless(disp._handlers.has_key('/sub/exception_in_transfer'))
199         self.failUnless(disp._handlers.has_key('/sub/plain'))
200
201     def test_scan_sub_dir_as_root(self):
202         disp = dispatch.Dispatcher(_TEST_HANDLERS_SUB_DIR,
203                                    _TEST_HANDLERS_SUB_DIR)
204         self.assertEqual(2, len(disp._handlers))
205         self.failIf(disp._handlers.has_key('/origin_check'))
206         self.failIf(disp._handlers.has_key('/sub/exception_in_transfer'))
207         self.failIf(disp._handlers.has_key('/sub/plain'))
208         self.failUnless(disp._handlers.has_key('/exception_in_transfer'))
209         self.failUnless(disp._handlers.has_key('/plain'))
210
211     def test_scan_dir_must_under_root(self):
212         dispatch.Dispatcher('a/b', 'a/b/c')  # OK
213         dispatch.Dispatcher('a/b///', 'a/b')  # OK
214         self.assertRaises(dispatch.DispatchError,
215                           dispatch.Dispatcher, 'a/b/c', 'a/b')
216
217
218 if __name__ == '__main__':
219     unittest.main()
220
221
222 # vi:sts=4 sw=4 et