don't use autoinstall to import pywebsocket but check it in WebKit directly.
[WebKit-https.git] / Tools / Scripts / webkitpy / thirdparty / mod_pywebsocket / _stream_hybi.py
1 # Copyright 2012, Google Inc.
2 # All rights reserved.
3 #
4 # Redistribution and use in source and binary forms, with or without
5 # modification, are permitted provided that the following conditions are
6 # met:
7 #
8 #     * Redistributions of source code must retain the above copyright
9 # notice, this list of conditions and the following disclaimer.
10 #     * Redistributions in binary form must reproduce the above
11 # copyright notice, this list of conditions and the following disclaimer
12 # in the documentation and/or other materials provided with the
13 # distribution.
14 #     * Neither the name of Google Inc. nor the names of its
15 # contributors may be used to endorse or promote products derived from
16 # this software without specific prior written permission.
17 #
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30
31 """This file provides classes and helper functions for parsing/building frames
32 of the WebSocket protocol (RFC 6455).
33
34 Specification:
35 http://tools.ietf.org/html/rfc6455
36 """
37
38
39 from collections import deque
40 import os
41 import struct
42
43 from mod_pywebsocket import common
44 from mod_pywebsocket import util
45 from mod_pywebsocket._stream_base import BadOperationException
46 from mod_pywebsocket._stream_base import ConnectionTerminatedException
47 from mod_pywebsocket._stream_base import InvalidFrameException
48 from mod_pywebsocket._stream_base import InvalidUTF8Exception
49 from mod_pywebsocket._stream_base import StreamBase
50 from mod_pywebsocket._stream_base import UnsupportedFrameException
51
52
53 _NOOP_MASKER = util.NoopMasker()
54
55
56 class Frame(object):
57
58     def __init__(self, fin=1, rsv1=0, rsv2=0, rsv3=0,
59                  opcode=None, payload=''):
60         self.fin = fin
61         self.rsv1 = rsv1
62         self.rsv2 = rsv2
63         self.rsv3 = rsv3
64         self.opcode = opcode
65         self.payload = payload
66
67
68 # Helper functions made public to be used for writing unittests for WebSocket
69 # clients.
70
71
72 def create_length_header(length, mask):
73     """Creates a length header.
74
75     Args:
76         length: Frame length. Must be less than 2^63.
77         mask: Mask bit. Must be boolean.
78
79     Raises:
80         ValueError: when bad data is given.
81     """
82
83     if mask:
84         mask_bit = 1 << 7
85     else:
86         mask_bit = 0
87
88     if length < 0:
89         raise ValueError('length must be non negative integer')
90     elif length <= 125:
91         return chr(mask_bit | length)
92     elif length < (1 << 16):
93         return chr(mask_bit | 126) + struct.pack('!H', length)
94     elif length < (1 << 63):
95         return chr(mask_bit | 127) + struct.pack('!Q', length)
96     else:
97         raise ValueError('Payload is too big for one frame')
98
99
100 def create_header(opcode, payload_length, fin, rsv1, rsv2, rsv3, mask):
101     """Creates a frame header.
102
103     Raises:
104         Exception: when bad data is given.
105     """
106
107     if opcode < 0 or 0xf < opcode:
108         raise ValueError('Opcode out of range')
109
110     if payload_length < 0 or (1 << 63) <= payload_length:
111         raise ValueError('payload_length out of range')
112
113     if (fin | rsv1 | rsv2 | rsv3) & ~1:
114         raise ValueError('FIN bit and Reserved bit parameter must be 0 or 1')
115
116     header = ''
117
118     first_byte = ((fin << 7)
119                   | (rsv1 << 6) | (rsv2 << 5) | (rsv3 << 4)
120                   | opcode)
121     header += chr(first_byte)
122     header += create_length_header(payload_length, mask)
123
124     return header
125
126
127 def _build_frame(header, body, mask):
128     if not mask:
129         return header + body
130
131     masking_nonce = os.urandom(4)
132     masker = util.RepeatedXorMasker(masking_nonce)
133
134     return header + masking_nonce + masker.mask(body)
135
136
137 def _filter_and_format_frame_object(frame, mask, frame_filters):
138     for frame_filter in frame_filters:
139         frame_filter.filter(frame)
140
141     header = create_header(
142         frame.opcode, len(frame.payload), frame.fin,
143         frame.rsv1, frame.rsv2, frame.rsv3, mask)
144     return _build_frame(header, frame.payload, mask)
145
146
147 def create_binary_frame(
148     message, opcode=common.OPCODE_BINARY, fin=1, mask=False, frame_filters=[]):
149     """Creates a simple binary frame with no extension, reserved bit."""
150
151     frame = Frame(fin=fin, opcode=opcode, payload=message)
152     return _filter_and_format_frame_object(frame, mask, frame_filters)
153
154
155 def create_text_frame(
156     message, opcode=common.OPCODE_TEXT, fin=1, mask=False, frame_filters=[]):
157     """Creates a simple text frame with no extension, reserved bit."""
158
159     encoded_message = message.encode('utf-8')
160     return create_binary_frame(encoded_message, opcode, fin, mask,
161                                frame_filters)
162
163
164 class FragmentedFrameBuilder(object):
165     """A stateful class to send a message as fragments."""
166
167     def __init__(self, mask, frame_filters=[]):
168         """Constructs an instance."""
169
170         self._mask = mask
171         self._frame_filters = frame_filters
172
173         self._started = False
174
175         # Hold opcode of the first frame in messages to verify types of other
176         # frames in the message are all the same.
177         self._opcode = common.OPCODE_TEXT
178
179     def build(self, message, end, binary):
180         if binary:
181             frame_type = common.OPCODE_BINARY
182         else:
183             frame_type = common.OPCODE_TEXT
184         if self._started:
185             if self._opcode != frame_type:
186                 raise ValueError('Message types are different in frames for '
187                                  'the same message')
188             opcode = common.OPCODE_CONTINUATION
189         else:
190             opcode = frame_type
191             self._opcode = frame_type
192
193         if end:
194             self._started = False
195             fin = 1
196         else:
197             self._started = True
198             fin = 0
199
200         if binary:
201             return create_binary_frame(
202                 message, opcode, fin, self._mask, self._frame_filters)
203         else:
204             return create_text_frame(
205                 message, opcode, fin, self._mask, self._frame_filters)
206
207
208 def _create_control_frame(opcode, body, mask, frame_filters):
209     frame = Frame(opcode=opcode, payload=body)
210
211     return _filter_and_format_frame_object(frame, mask, frame_filters)
212
213
214 def create_ping_frame(body, mask=False, frame_filters=[]):
215     return _create_control_frame(common.OPCODE_PING, body, mask, frame_filters)
216
217
218 def create_pong_frame(body, mask=False, frame_filters=[]):
219     return _create_control_frame(common.OPCODE_PONG, body, mask, frame_filters)
220
221
222 def create_close_frame(body, mask=False, frame_filters=[]):
223     return _create_control_frame(
224         common.OPCODE_CLOSE, body, mask, frame_filters)
225
226
227 class StreamOptions(object):
228     """Holds option values to configure Stream objects."""
229
230     def __init__(self):
231         """Constructs StreamOptions."""
232
233         # Enables deflate-stream extension.
234         self.deflate_stream = False
235
236         # Filters applied to frames.
237         self.outgoing_frame_filters = []
238         self.incoming_frame_filters = []
239
240         self.mask_send = False
241         self.unmask_receive = True
242
243
244 class Stream(StreamBase):
245     """A class for parsing/building frames of the WebSocket protocol
246     (RFC 6455).
247     """
248
249     def __init__(self, request, options):
250         """Constructs an instance.
251
252         Args:
253             request: mod_python request.
254         """
255
256         StreamBase.__init__(self, request)
257
258         self._logger = util.get_class_logger(self)
259
260         self._options = options
261
262         if self._options.deflate_stream:
263             self._logger.debug('Setup filter for deflate-stream')
264             self._request = util.DeflateRequest(self._request)
265
266         self._request.client_terminated = False
267         self._request.server_terminated = False
268
269         # Holds body of received fragments.
270         self._received_fragments = []
271         # Holds the opcode of the first fragment.
272         self._original_opcode = None
273
274         self._writer = FragmentedFrameBuilder(
275             self._options.mask_send, self._options.outgoing_frame_filters)
276
277         self._ping_queue = deque()
278
279     def _receive_frame(self):
280         """Receives a frame and return data in the frame as a tuple containing
281         each header field and payload separately.
282
283         Raises:
284             ConnectionTerminatedException: when read returns empty
285                 string.
286             InvalidFrameException: when the frame contains invalid data.
287         """
288
289         received = self.receive_bytes(2)
290
291         first_byte = ord(received[0])
292         fin = (first_byte >> 7) & 1
293         rsv1 = (first_byte >> 6) & 1
294         rsv2 = (first_byte >> 5) & 1
295         rsv3 = (first_byte >> 4) & 1
296         opcode = first_byte & 0xf
297
298         second_byte = ord(received[1])
299         mask = (second_byte >> 7) & 1
300         payload_length = second_byte & 0x7f
301
302         if (mask == 1) != self._options.unmask_receive:
303             raise InvalidFrameException(
304                 'Mask bit on the received frame did\'nt match masking '
305                 'configuration for received frames')
306
307         # The Hybi-13 and later specs disallow putting a value in 0x0-0xFFFF
308         # into the 8-octet extended payload length field (or 0x0-0xFD in
309         # 2-octet field).
310         valid_length_encoding = True
311         length_encoding_bytes = 1
312         if payload_length == 127:
313             extended_payload_length = self.receive_bytes(8)
314             payload_length = struct.unpack(
315                 '!Q', extended_payload_length)[0]
316             if payload_length > 0x7FFFFFFFFFFFFFFF:
317                 raise InvalidFrameException(
318                     'Extended payload length >= 2^63')
319             if self._request.ws_version >= 13 and payload_length < 0x10000:
320                 valid_length_encoding = False
321                 length_encoding_bytes = 8
322         elif payload_length == 126:
323             extended_payload_length = self.receive_bytes(2)
324             payload_length = struct.unpack(
325                 '!H', extended_payload_length)[0]
326             if self._request.ws_version >= 13 and payload_length < 126:
327                 valid_length_encoding = False
328                 length_encoding_bytes = 2
329
330         if not valid_length_encoding:
331             self._logger.warning(
332                 'Payload length is not encoded using the minimal number of '
333                 'bytes (%d is encoded using %d bytes)',
334                 payload_length,
335                 length_encoding_bytes)
336
337         if mask == 1:
338             masking_nonce = self.receive_bytes(4)
339             masker = util.RepeatedXorMasker(masking_nonce)
340         else:
341             masker = _NOOP_MASKER
342
343         bytes = masker.mask(self.receive_bytes(payload_length))
344
345         return opcode, bytes, fin, rsv1, rsv2, rsv3
346
347     def _receive_frame_as_frame_object(self):
348         opcode, bytes, fin, rsv1, rsv2, rsv3 = self._receive_frame()
349
350         return Frame(fin=fin, rsv1=rsv1, rsv2=rsv2, rsv3=rsv3,
351                      opcode=opcode, payload=bytes)
352
353     def send_message(self, message, end=True, binary=False):
354         """Send message.
355
356         Args:
357             message: text in unicode or binary in str to send.
358             binary: send message as binary frame.
359
360         Raises:
361             BadOperationException: when called on a server-terminated
362                 connection or called with inconsistent message type or binary
363                 parameter.
364         """
365
366         if self._request.server_terminated:
367             raise BadOperationException(
368                 'Requested send_message after sending out a closing handshake')
369
370         if binary and isinstance(message, unicode):
371             raise BadOperationException(
372                 'Message for binary frame must be instance of str')
373
374         try:
375             self._write(self._writer.build(message, end, binary))
376         except ValueError, e:
377             raise BadOperationException(e)
378
379     def receive_message(self):
380         """Receive a WebSocket frame and return its payload as a text in
381         unicode or a binary in str.
382
383         Returns:
384             payload data of the frame
385             - as unicode instance if received text frame
386             - as str instance if received binary frame
387             or None iff received closing handshake.
388         Raises:
389             BadOperationException: when called on a client-terminated
390                 connection.
391             ConnectionTerminatedException: when read returns empty
392                 string.
393             InvalidFrameException: when the frame contains invalid
394                 data.
395             UnsupportedFrameException: when the received frame has
396                 flags, opcode we cannot handle. You can ignore this
397                 exception and continue receiving the next frame.
398         """
399
400         if self._request.client_terminated:
401             raise BadOperationException(
402                 'Requested receive_message after receiving a closing '
403                 'handshake')
404
405         while True:
406             # mp_conn.read will block if no bytes are available.
407             # Timeout is controlled by TimeOut directive of Apache.
408
409             frame = self._receive_frame_as_frame_object()
410
411             for frame_filter in self._options.incoming_frame_filters:
412                 frame_filter.filter(frame)
413
414             if frame.rsv1 or frame.rsv2 or frame.rsv3:
415                 raise UnsupportedFrameException(
416                     'Unsupported flag is set (rsv = %d%d%d)' %
417                     (frame.rsv1, frame.rsv2, frame.rsv3))
418
419             if frame.opcode == common.OPCODE_CONTINUATION:
420                 if not self._received_fragments:
421                     if frame.fin:
422                         raise InvalidFrameException(
423                             'Received a termination frame but fragmentation '
424                             'not started')
425                     else:
426                         raise InvalidFrameException(
427                             'Received an intermediate frame but '
428                             'fragmentation not started')
429
430                 if frame.fin:
431                     # End of fragmentation frame
432                     self._received_fragments.append(frame.payload)
433                     message = ''.join(self._received_fragments)
434                     self._received_fragments = []
435                 else:
436                     # Intermediate frame
437                     self._received_fragments.append(frame.payload)
438                     continue
439             else:
440                 if self._received_fragments:
441                     if frame.fin:
442                         raise InvalidFrameException(
443                             'Received an unfragmented frame without '
444                             'terminating existing fragmentation')
445                     else:
446                         raise InvalidFrameException(
447                             'New fragmentation started without terminating '
448                             'existing fragmentation')
449
450                 if frame.fin:
451                     # Unfragmented frame
452
453                     if (common.is_control_opcode(frame.opcode) and
454                         len(frame.payload) > 125):
455                         raise InvalidFrameException(
456                             'Application data size of control frames must be '
457                             '125 bytes or less')
458
459                     self._original_opcode = frame.opcode
460                     message = frame.payload
461                 else:
462                     # Start of fragmentation frame
463
464                     if common.is_control_opcode(frame.opcode):
465                         raise InvalidFrameException(
466                             'Control frames must not be fragmented')
467
468                     self._original_opcode = frame.opcode
469                     self._received_fragments.append(frame.payload)
470                     continue
471
472             if self._original_opcode == common.OPCODE_TEXT:
473                 # The WebSocket protocol section 4.4 specifies that invalid
474                 # characters must be replaced with U+fffd REPLACEMENT
475                 # CHARACTER.
476                 try:
477                     return message.decode('utf-8')
478                 except UnicodeDecodeError, e:
479                     raise InvalidUTF8Exception(e)
480             elif self._original_opcode == common.OPCODE_BINARY:
481                 return message
482             elif self._original_opcode == common.OPCODE_CLOSE:
483                 self._request.client_terminated = True
484
485                 # Status code is optional. We can have status reason only if we
486                 # have status code. Status reason can be empty string. So,
487                 # allowed cases are
488                 # - no application data: no code no reason
489                 # - 2 octet of application data: has code but no reason
490                 # - 3 or more octet of application data: both code and reason
491                 if len(message) == 1:
492                     raise InvalidFrameException(
493                         'If a close frame has status code, the length of '
494                         'status code must be 2 octet')
495                 elif len(message) >= 2:
496                     self._request.ws_close_code = struct.unpack(
497                         '!H', message[0:2])[0]
498                     self._request.ws_close_reason = message[2:].decode(
499                         'utf-8', 'replace')
500                     self._logger.debug(
501                         'Received close frame (code=%d, reason=%r)',
502                         self._request.ws_close_code,
503                         self._request.ws_close_reason)
504
505                 # Drain junk data after the close frame if necessary.
506                 self._drain_received_data()
507
508                 if self._request.server_terminated:
509                     self._logger.debug(
510                         'Received ack for server-initiated closing '
511                         'handshake')
512                     return None
513
514                 self._logger.debug(
515                     'Received client-initiated closing handshake')
516
517                 code = common.STATUS_NORMAL_CLOSURE
518                 reason = ''
519                 if hasattr(self._request, '_dispatcher'):
520                     dispatcher = self._request._dispatcher
521                     code, reason = dispatcher.passive_closing_handshake(
522                         self._request)
523                 self._send_closing_handshake(code, reason)
524                 self._logger.debug(
525                     'Sent ack for client-initiated closing handshake')
526                 return None
527             elif self._original_opcode == common.OPCODE_PING:
528                 try:
529                     handler = self._request.on_ping_handler
530                     if handler:
531                         handler(self._request, message)
532                         continue
533                 except AttributeError, e:
534                     pass
535                 self._send_pong(message)
536             elif self._original_opcode == common.OPCODE_PONG:
537                 # TODO(tyoshino): Add ping timeout handling.
538
539                 inflight_pings = deque()
540
541                 while True:
542                     try:
543                         expected_body = self._ping_queue.popleft()
544                         if expected_body == message:
545                             # inflight_pings contains pings ignored by the
546                             # other peer. Just forget them.
547                             self._logger.debug(
548                                 'Ping %r is acked (%d pings were ignored)',
549                                 expected_body, len(inflight_pings))
550                             break
551                         else:
552                             inflight_pings.append(expected_body)
553                     except IndexError, e:
554                         # The received pong was unsolicited pong. Keep the
555                         # ping queue as is.
556                         self._ping_queue = inflight_pings
557                         self._logger.debug('Received a unsolicited pong')
558                         break
559
560                 try:
561                     handler = self._request.on_pong_handler
562                     if handler:
563                         handler(self._request, message)
564                         continue
565                 except AttributeError, e:
566                     pass
567
568                 continue
569             else:
570                 raise UnsupportedFrameException(
571                     'Opcode %d is not supported' % self._original_opcode)
572
573     def _send_closing_handshake(self, code, reason):
574         if code >= (1 << 16) or code < 0:
575             raise BadOperationException('Status code is out of range')
576
577         encoded_reason = reason.encode('utf-8')
578         if len(encoded_reason) + 2 > 125:
579             raise BadOperationException(
580                 'Application data size of close frames must be 125 bytes or '
581                 'less')
582
583         frame = create_close_frame(
584             struct.pack('!H', code) + encoded_reason,
585             self._options.mask_send,
586             self._options.outgoing_frame_filters)
587
588         self._request.server_terminated = True
589
590         self._write(frame)
591
592     def close_connection(self, code=common.STATUS_NORMAL_CLOSURE, reason=''):
593         """Closes a WebSocket connection."""
594
595         if self._request.server_terminated:
596             self._logger.debug(
597                 'Requested close_connection but server is already terminated')
598             return
599
600         self._send_closing_handshake(code, reason)
601         self._logger.debug('Sent server-initiated closing handshake')
602
603         if (code == common.STATUS_GOING_AWAY or
604             code == common.STATUS_PROTOCOL_ERROR):
605             # It doesn't make sense to wait for a close frame if the reason is
606             # protocol error or that the server is going away. For some of
607             # other reasons, it might not make sense to wait for a close frame,
608             # but it's not clear, yet.
609             return
610
611         # TODO(ukai): 2. wait until the /client terminated/ flag has been set,
612         # or until a server-defined timeout expires.
613         #
614         # For now, we expect receiving closing handshake right after sending
615         # out closing handshake.
616         message = self.receive_message()
617         if message is not None:
618             raise ConnectionTerminatedException(
619                 'Didn\'t receive valid ack for closing handshake')
620         # TODO: 3. close the WebSocket connection.
621         # note: mod_python Connection (mp_conn) doesn't have close method.
622
623     def send_ping(self, body=''):
624         if len(body) > 125:
625             raise ValueError(
626                 'Application data size of control frames must be 125 bytes or '
627                 'less')
628         frame = create_ping_frame(
629             body,
630             self._options.mask_send,
631             self._options.outgoing_frame_filters)
632         self._write(frame)
633
634         self._ping_queue.append(body)
635
636     def _send_pong(self, body):
637         if len(body) > 125:
638             raise ValueError(
639                 'Application data size of control frames must be 125 bytes or '
640                 'less')
641         frame = create_pong_frame(
642             body,
643             self._options.mask_send,
644             self._options.outgoing_frame_filters)
645         self._write(frame)
646
647     def _drain_received_data(self):
648         """Drains unread data in the receive buffer to avoid sending out TCP
649         RST packet. This is because when deflate-stream is enabled, some
650         DEFLATE block for flushing data may follow a close frame. If any data
651         remains in the receive buffer of a socket when the socket is closed,
652         it sends out TCP RST packet to the other peer.
653
654         Since mod_python's mp_conn object doesn't support non-blocking read,
655         we perform this only when pywebsocket is running in standalone mode.
656         """
657
658         # If self._options.deflate_stream is true, self._request is
659         # DeflateRequest, so we can get wrapped request object by
660         # self._request._request.
661         #
662         # Only _StandaloneRequest has _drain_received_data method.
663         if (self._options.deflate_stream and
664             ('_drain_received_data' in dir(self._request._request))):
665             self._request._request._drain_received_data()
666
667
668 # vi:sts=4 sw=4 et