1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
|
import re
import sys
from typing import List, Optional, Union
__all__ = ["ReceiveBuffer"]
# Operations we want to support:
# - find next \r\n or \r\n\r\n (\n or \n\n are also acceptable),
# or wait until there is one
# - read at-most-N bytes
# Goals:
# - on average, do this fast
# - worst case, do this in O(n) where n is the number of bytes processed
# Plan:
# - store bytearray, offset, how far we've searched for a separator token
# - use the how-far-we've-searched data to avoid rescanning
# - while doing a stream of uninterrupted processing, advance offset instead
# of constantly copying
# WARNING:
# - I haven't benchmarked or profiled any of this yet.
#
# Note that starting in Python 3.4, deleting the initial n bytes from a
# bytearray is amortized O(n), thanks to some excellent work by Antoine
# Martin:
#
# https://bugs.python.org/issue19087
#
# This means that if we only supported 3.4+, we could get rid of the code here
# involving self._start and self.compress, because it's doing exactly the same
# thing that bytearray now does internally.
#
# BUT unfortunately, we still support 2.7, and reading short segments out of a
# long buffer MUST be O(bytes read) to avoid DoS issues, so we can't actually
# delete this code. Yet:
#
# https://pythonclock.org/
#
# (Two things to double-check first though: make sure PyPy also has the
# optimization, and benchmark to make sure it's a win, since we do have a
# slightly clever thing where we delay calling compress() until we've
# processed a whole event, which could in theory be slightly more efficient
# than the internal bytearray support.)
blank_line_regex = re.compile(b"\n\r?\n", re.MULTILINE)
class ReceiveBuffer:
def __init__(self) -> None:
self._data = bytearray()
self._next_line_search = 0
self._multiple_lines_search = 0
def __iadd__(self, byteslike: Union[bytes, bytearray]) -> "ReceiveBuffer":
self._data += byteslike
return self
def __bool__(self) -> bool:
return bool(len(self))
def __len__(self) -> int:
return len(self._data)
# for @property unprocessed_data
def __bytes__(self) -> bytes:
return bytes(self._data)
def _extract(self, count: int) -> bytearray:
# extracting an initial slice of the data buffer and return it
out = self._data[:count]
del self._data[:count]
self._next_line_search = 0
self._multiple_lines_search = 0
return out
def maybe_extract_at_most(self, count: int) -> Optional[bytearray]:
"""
Extract a fixed number of bytes from the buffer.
"""
out = self._data[:count]
if not out:
return None
return self._extract(count)
def maybe_extract_next_line(self) -> Optional[bytearray]:
"""
Extract the first line, if it is completed in the buffer.
"""
# Only search in buffer space that we've not already looked at.
search_start_index = max(0, self._next_line_search - 1)
partial_idx = self._data.find(b"\r\n", search_start_index)
if partial_idx == -1:
self._next_line_search = len(self._data)
return None
# + 2 is to compensate len(b"\r\n")
idx = partial_idx + 2
return self._extract(idx)
def maybe_extract_lines(self) -> Optional[List[bytearray]]:
"""
Extract everything up to the first blank line, and return a list of lines.
"""
# Handle the case where we have an immediate empty line.
if self._data[:1] == b"\n":
self._extract(1)
return []
if self._data[:2] == b"\r\n":
self._extract(2)
return []
# Only search in buffer space that we've not already looked at.
match = blank_line_regex.search(self._data, self._multiple_lines_search)
if match is None:
self._multiple_lines_search = max(0, len(self._data) - 2)
return None
# Truncate the buffer and return it.
idx = match.span(0)[-1]
out = self._extract(idx)
lines = out.split(b"\n")
for line in lines:
if line.endswith(b"\r"):
del line[-1]
assert lines[-2] == lines[-1] == b""
del lines[-2:]
return lines
# In theory we should wait until `\r\n` before starting to validate
# incoming data. However it's interesting to detect (very) invalid data
# early given they might not even contain `\r\n` at all (hence only
# timeout will get rid of them).
# This is not a 100% effective detection but more of a cheap sanity check
# allowing for early abort in some useful cases.
# This is especially interesting when peer is messing up with HTTPS and
# sent us a TLS stream where we were expecting plain HTTP given all
# versions of TLS so far start handshake with a 0x16 message type code.
def is_next_line_obviously_invalid_request_line(self) -> bool:
try:
# HTTP header line must not contain non-printable characters
# and should not start with a space
return self._data[0] < 0x21
except IndexError:
return False
|