2009-10-15 Yuzo Fujishima <yuzo@google.com>
[WebKit-https.git] / WebKitTools / pywebsocket / test / test_dispatch.py
1 #!/usr/bin/env python
2 #
3 # Copyright 2009, Google Inc.
4 # All rights reserved.
5 #
6 # Redistribution and use in source and binary forms, with or without
7 # modification, are permitted provided that the following conditions are
8 # met:
9 #
10 #     * Redistributions of source code must retain the above copyright
11 # notice, this list of conditions and the following disclaimer.
12 #     * Redistributions in binary form must reproduce the above
13 # copyright notice, this list of conditions and the following disclaimer
14 # in the documentation and/or other materials provided with the
15 # distribution.
16 #     * Neither the name of Google Inc. nor the names of its
17 # contributors may be used to endorse or promote products derived from
18 # this software without specific prior written permission.
19 #
20 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31
32
33 """Tests for dispatch module."""
34
35
36
37 import os
38 import unittest
39
40 import config  # This must be imported before mod_pywebsocket.
41 from mod_pywebsocket import dispatch
42
43 import mock
44
45
46 _TEST_HANDLERS_DIR = os.path.join(
47         os.path.split(__file__)[0], 'testdata', 'handlers')
48
49 class DispatcherTest(unittest.TestCase):
50     def test_normalize_path(self):
51         self.assertEqual(os.path.abspath('/a/b').replace('\\', '/'),
52                          dispatch._normalize_path('/a/b'))
53         self.assertEqual(os.path.abspath('/a/b').replace('\\', '/'),
54                          dispatch._normalize_path('\\a\\b'))
55         self.assertEqual(os.path.abspath('/a/b').replace('\\', '/'),
56                          dispatch._normalize_path('/a/c/../b'))
57         self.assertEqual(os.path.abspath('abc').replace('\\', '/'),
58                          dispatch._normalize_path('abc'))
59
60     def test_converter(self):
61         converter = dispatch._path_to_resource_converter('/a/b')
62         self.assertEqual('/h', converter('/a/b/h_wsh.py'))
63         self.assertEqual('/c/h', converter('/a/b/c/h_wsh.py'))
64         self.assertEqual(None, converter('/a/b/h.py'))
65         self.assertEqual(None, converter('a/b/h_wsh.py'))
66
67         converter = dispatch._path_to_resource_converter('a/b')
68         self.assertEqual('/h', converter('a/b/h_wsh.py'))
69
70         converter = dispatch._path_to_resource_converter('/a/b///')
71         self.assertEqual('/h', converter('/a/b/h_wsh.py'))
72         self.assertEqual('/h', converter('/a/b/../b/h_wsh.py'))
73
74         converter = dispatch._path_to_resource_converter('/a/../a/b/../b/')
75         self.assertEqual('/h', converter('/a/b/h_wsh.py'))
76
77         converter = dispatch._path_to_resource_converter(r'\a\b')
78         self.assertEqual('/h', converter(r'\a\b\h_wsh.py'))
79         self.assertEqual('/h', converter(r'/a/b/h_wsh.py'))
80
81     def test_source_file_paths(self):
82         paths = list(dispatch._source_file_paths(_TEST_HANDLERS_DIR))
83         paths.sort()
84         self.assertEqual(7, len(paths))
85         expected_paths = [
86                 os.path.join(_TEST_HANDLERS_DIR, 'blank_wsh.py'),
87                 os.path.join(_TEST_HANDLERS_DIR, 'origin_check_wsh.py'),
88                 os.path.join(_TEST_HANDLERS_DIR, 'sub',
89                              'exception_in_transfer_wsh.py'),
90                 os.path.join(_TEST_HANDLERS_DIR, 'sub', 'non_callable_wsh.py'),
91                 os.path.join(_TEST_HANDLERS_DIR, 'sub', 'plain_wsh.py'),
92                 os.path.join(_TEST_HANDLERS_DIR, 'sub',
93                              'wrong_handshake_sig_wsh.py'),
94                 os.path.join(_TEST_HANDLERS_DIR, 'sub',
95                              'wrong_transfer_sig_wsh.py'),
96                 ]
97         for expected, actual in zip(expected_paths, paths):
98             self.assertEqual(expected, actual)
99
100     def test_source(self):
101         self.assertRaises(dispatch.DispatchError, dispatch._source, '')
102         self.assertRaises(dispatch.DispatchError, dispatch._source, 'def')
103         self.assertRaises(dispatch.DispatchError, dispatch._source, '1/0')
104         self.failUnless(dispatch._source(
105                 'def web_socket_do_extra_handshake(request):pass\n'
106                 'def web_socket_transfer_data(request):pass\n'))
107
108     def test_source_warnings(self):
109         dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR)
110         warnings = dispatcher.source_warnings()
111         warnings.sort()
112         expected_warnings = [
113                 (os.path.join(_TEST_HANDLERS_DIR, 'blank_wsh.py') +
114                  ': web_socket_do_extra_handshake is not defined.'),
115                 (os.path.join(_TEST_HANDLERS_DIR, 'sub',
116                               'non_callable_wsh.py') +
117                  ': web_socket_do_extra_handshake is not callable.'),
118                 (os.path.join(_TEST_HANDLERS_DIR, 'sub',
119                               'wrong_handshake_sig_wsh.py') +
120                  ': web_socket_do_extra_handshake is not defined.'),
121                 (os.path.join(_TEST_HANDLERS_DIR, 'sub',
122                               'wrong_transfer_sig_wsh.py') +
123                  ': web_socket_transfer_data is not defined.'),
124                 ]
125         self.assertEquals(4, len(warnings))
126         for expected, actual in zip(expected_warnings, warnings):
127             self.assertEquals(expected, actual)
128
129     def test_shake_hand(self):
130         dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR)
131         request = mock.MockRequest()
132         request.ws_resource = '/origin_check'
133         request.ws_origin = 'http://example.com'
134         dispatcher.do_extra_handshake(request)  # Must not raise exception.
135
136         request.ws_origin = 'http://bad.example.com'
137         self.assertRaises(dispatch.DispatchError,
138                           dispatcher.do_extra_handshake, request)
139
140     def test_transfer_data(self):
141         dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR)
142         request = mock.MockRequest(connection=mock.MockConn(''))
143         request.ws_resource = '/origin_check'
144         request.ws_protocol = 'p1'
145
146         dispatcher.transfer_data(request)
147         self.assertEqual('origin_check_wsh.py is called for /origin_check, p1',
148                          request.connection.written_data())
149
150         request = mock.MockRequest(connection=mock.MockConn(''))
151         request.ws_resource = '/sub/plain'
152         request.ws_protocol = None
153         dispatcher.transfer_data(request)
154         self.assertEqual('sub/plain_wsh.py is called for /sub/plain, None',
155                          request.connection.written_data())
156
157     def test_transfer_data_no_handler(self):
158         dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR)
159         for resource in ['/blank', '/sub/non_callable',
160                          '/sub/no_wsh_at_the_end', '/does/not/exist']:
161             request = mock.MockRequest(connection=mock.MockConn(''))
162             request.ws_resource = resource
163             request.ws_protocol = 'p2'
164             try:
165                 dispatcher.transfer_data(request)
166                 self.fail()
167             except dispatch.DispatchError, e:
168                 self.failUnless(str(e).find('No handler') != -1)
169             except Exception:
170                 self.fail()
171
172     def test_transfer_data_handler_exception(self):
173         dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR)
174         request = mock.MockRequest(connection=mock.MockConn(''))
175         request.ws_resource = '/sub/exception_in_transfer'
176         request.ws_protocol = 'p3'
177         try:
178             dispatcher.transfer_data(request)
179             self.fail()
180         except dispatch.DispatchError, e:
181             self.failUnless(str(e).find('Intentional') != -1)
182         except Exception:
183             self.fail()
184
185
186 if __name__ == '__main__':
187     unittest.main()
188
189
190 # vi:sts=4 sw=4 et