Skip to content

Commit 53f6e91

Browse files
Optimise decompression size (#12357)
1 parent d3cd6cb commit 53f6e91

5 files changed

Lines changed: 53 additions & 35 deletions

File tree

aiohttp/compression_utils.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -326,9 +326,15 @@ def decompress_sync(
326326
) -> bytes:
327327
"""Decompress the given data."""
328328
if hasattr(self._obj, "decompress"):
329-
result = cast(bytes, self._obj.decompress(data, max_length))
329+
if max_length == ZLIB_MAX_LENGTH_UNLIMITED:
330+
result = cast(bytes, self._obj.decompress(data))
331+
else:
332+
result = cast(bytes, self._obj.decompress(data, max_length))
330333
else:
331-
result = cast(bytes, self._obj.process(data, max_length))
334+
if max_length == ZLIB_MAX_LENGTH_UNLIMITED:
335+
result = cast(bytes, self._obj.process(data))
336+
else:
337+
result = cast(bytes, self._obj.process(data, max_length))
332338
# Only way to know that brotli has no further data is checking we get no output
333339
self._last_empty = result == b""
334340
return result

aiohttp/http_parser.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import asyncio
33
import re
44
import string
5+
import sys
56
from contextlib import suppress
67
from enum import IntEnum
78
from re import Pattern
@@ -1118,10 +1119,12 @@ def feed_data(self, chunk: bytes) -> bool:
11181119
encoding=self.encoding, suppress_deflate_header=True
11191120
)
11201121

1122+
low_water = self.out._low_water
1123+
max_length = (
1124+
0 if low_water >= sys.maxsize else max(self._max_decompress_size, low_water)
1125+
)
11211126
try:
1122-
chunk = self.decompressor.decompress_sync(
1123-
chunk, max_length=self._max_decompress_size
1124-
)
1127+
chunk = self.decompressor.decompress_sync(chunk, max_length=max_length)
11251128
except Exception:
11261129
raise ContentEncodingError(
11271130
"Can not decode content-encoding: %s" % self.encoding

aiohttp/streams.py

Lines changed: 34 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import asyncio
22
import collections
3+
import sys
34
import warnings
45
from collections.abc import Awaitable, Callable
56
from typing import Final, Generic, TypeVar
@@ -67,31 +68,7 @@ async def __anext__(self) -> tuple[bytes, bool]:
6768
return rv
6869

6970

70-
class AsyncStreamReaderMixin:
71-
72-
__slots__ = ()
73-
74-
def __aiter__(self) -> AsyncStreamIterator[bytes]:
75-
return AsyncStreamIterator(self.readline) # type: ignore[attr-defined]
76-
77-
def iter_chunked(self, n: int) -> AsyncStreamIterator[bytes]:
78-
"""Returns an asynchronous iterator that yields chunks of size n."""
79-
return AsyncStreamIterator(lambda: self.read(n)) # type: ignore[attr-defined]
80-
81-
def iter_any(self) -> AsyncStreamIterator[bytes]:
82-
"""Yield all available data as soon as it is received."""
83-
return AsyncStreamIterator(self.readany) # type: ignore[attr-defined]
84-
85-
def iter_chunks(self) -> ChunkTupleAsyncStreamIterator:
86-
"""Yield chunks of data as they are received by the server.
87-
88-
The yielded objects are tuples
89-
of (bytes, bool) as returned by the StreamReader.readchunk method.
90-
"""
91-
return ChunkTupleAsyncStreamIterator(self) # type: ignore[arg-type]
92-
93-
94-
class StreamReader(AsyncStreamReaderMixin):
71+
class StreamReader:
9572
"""An enhancement of asyncio.StreamReader.
9673
9774
Supports asynchronous iteration by line, chunk or as available::
@@ -174,9 +151,35 @@ def __repr__(self) -> str:
174151
info.append("e=%r" % self._exception)
175152
return "<%s>" % " ".join(info)
176153

154+
def __aiter__(self) -> AsyncStreamIterator[bytes]:
155+
return AsyncStreamIterator(self.readline)
156+
157+
def iter_chunked(self, n: int) -> AsyncStreamIterator[bytes]:
158+
"""Returns an asynchronous iterator that yields chunks of size n."""
159+
self.set_read_chunk_size(n)
160+
return AsyncStreamIterator(lambda: self.read(n))
161+
162+
def iter_any(self) -> AsyncStreamIterator[bytes]:
163+
"""Yield all available data as soon as it is received."""
164+
return AsyncStreamIterator(self.readany)
165+
166+
def iter_chunks(self) -> ChunkTupleAsyncStreamIterator:
167+
"""Yield chunks of data as they are received by the server.
168+
169+
The yielded objects are tuples
170+
of (bytes, bool) as returned by the StreamReader.readchunk method.
171+
"""
172+
return ChunkTupleAsyncStreamIterator(self)
173+
177174
def get_read_buffer_limits(self) -> tuple[int, int]:
178175
return (self._low_water, self._high_water)
179176

177+
def set_read_chunk_size(self, n: int) -> None:
178+
"""Raise buffer limits to match the consumer's chunk size."""
179+
if n > self._low_water:
180+
self._low_water = n
181+
self._high_water = n * 2
182+
180183
def exception(self) -> type[BaseException] | BaseException | None:
181184
return self._exception
182185

@@ -410,10 +413,8 @@ async def read(self, n: int = -1) -> bytes:
410413
return b""
411414

412415
if n < 0:
413-
# This used to just loop creating a new waiter hoping to
414-
# collect everything in self._buffer, but that would
415-
# deadlock if the subprocess sends more than self.limit
416-
# bytes. So just call self.readany() until EOF.
416+
# Reading everything — remove decompression chunk limit.
417+
self.set_read_chunk_size(sys.maxsize)
417418
blocks = []
418419
while True:
419420
block = await self.readany()
@@ -422,6 +423,7 @@ async def read(self, n: int = -1) -> bytes:
422423
blocks.append(block)
423424
return b"".join(blocks)
424425

426+
self.set_read_chunk_size(n)
425427
# TODO: should be `if` instead of `while`
426428
# because waiter maybe triggered on chunk end,
427429
# without feeding any data
@@ -595,6 +597,9 @@ async def wait_eof(self) -> None:
595597
def feed_data(self, data: bytes) -> bool:
596598
return False
597599

600+
def set_read_chunk_size(self, n: int) -> None:
601+
return
602+
598603
async def readline(self, *, max_line_length: int | None = None) -> bytes:
599604
return b""
600605

aiohttp/web_request.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -628,6 +628,10 @@ async def read(self) -> bytes:
628628
Returns bytes object with full request content.
629629
"""
630630
if self._read_bytes is None:
631+
# Raise the buffer limits so compressed payloads decompress in
632+
# larger chunks instead of many small pause/resume cycles.
633+
if self._client_max_size:
634+
self._payload.set_read_chunk_size(self._client_max_size)
631635
body = bytearray()
632636
while True:
633637
chunk = await self._payload.readany()

tests/test_flowcontrol_streams.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ async def test_readexactly(self, stream: streams.StreamReader) -> None:
7777
stream.feed_data(b"data")
7878
res = await stream.readexactly(3)
7979
assert res == b"dat"
80-
assert not stream._protocol.resume_reading.called # type: ignore[attr-defined]
80+
assert stream._protocol.resume_reading.called # type: ignore[attr-defined]
8181

8282
async def test_feed_data(self, stream: streams.StreamReader) -> None:
8383
stream._protocol._reading_paused = False

0 commit comments

Comments
 (0)