Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions Lib/test/test_free_threading/test_zlib.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import itertools
import unittest

from test.support import import_helper, threading_helper
from test.support.threading_helper import run_concurrently

zlib = import_helper.import_module("zlib")

from test.test_zlib import HAMLET_SCENE


NTHREADS = 10


@threading_helper.requires_working_threading()
class TestZlib(unittest.TestCase):
def test_compressor(self):
comp = zlib.compressobj()

# First compress() outputs zlib header
header = comp.compress(HAMLET_SCENE)
self.assertGreater(len(header), 0)

def worker():
# it should return empty bytes as it buffers data internally
data = comp.compress(HAMLET_SCENE)
self.assertEqual(data, b"")

run_concurrently(worker_func=worker, nthreads=NTHREADS - 1)
full_compressed = header + comp.flush()
decompressed = zlib.decompress(full_compressed)
# The decompressed data should be HAMLET_SCENE repeated NTHREADS times
self.assertEqual(decompressed, HAMLET_SCENE * NTHREADS)

def test_decompressor_concurrent_attribute_reads(self):
input_data = HAMLET_SCENE * NTHREADS
compressed = zlib.compress(input_data)

decomp = zlib.decompressobj()
decomp_size_per_loop = len(input_data) // 1000
decompressed_parts = []

def decomp_worker():
# Decompress in chunks, which updates eof, unused_data, unconsumed_tail
decompressed_parts.append(
decomp.decompress(compressed, decomp_size_per_loop)
)
while decomp.unconsumed_tail:
decompressed_parts.append(
decomp.decompress(
decomp.unconsumed_tail, decomp_size_per_loop
)
)

def decomp_attr_reader():
# Read attributes concurrently while another thread decompresses
for _ in range(1000):
_ = decomp.unused_data
_ = decomp.unconsumed_tail
_ = decomp.eof

counter = itertools.count()

def worker():
# First thread decompresses, others read attributes
if next(counter) == 0:
decomp_worker()
else:
decomp_attr_reader()

run_concurrently(worker_func=worker, nthreads=NTHREADS)

self.assertTrue(decomp.eof)
self.assertEqual(decomp.unused_data, b"")
decompressed = b"".join(decompressed_parts)
self.assertEqual(decompressed, HAMLET_SCENE * NTHREADS)


if __name__ == "__main__":
unittest.main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Make the attributes in :mod:`zlib` thread-safe on the :term:`free threaded
<free threading>` build.
88 changes: 35 additions & 53 deletions Modules/zlibmodule.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#endif

#include "Python.h"
#include "pycore_object.h" // _PyObject_XSetRefDelayed

#include "zlib.h"
#include "stdbool.h"
Expand Down Expand Up @@ -181,15 +182,6 @@ OutputBuffer_WindowOnError(_BlocksOutputBuffer *buffer, _Uint32Window *window)
}


#define ENTER_ZLIB(obj) do { \
if (!PyThread_acquire_lock((obj)->lock, 0)) { \
Py_BEGIN_ALLOW_THREADS \
PyThread_acquire_lock((obj)->lock, 1); \
Py_END_ALLOW_THREADS \
} } while (0)
#define LEAVE_ZLIB(obj) PyThread_release_lock((obj)->lock);


/* The following parameters are copied from zutil.h, version 0.95 */
#define DEFLATED 8
#if MAX_MEM_LEVEL >= 8
Expand Down Expand Up @@ -228,7 +220,7 @@ typedef struct
char eof;
bool is_initialised;
PyObject *zdict;
PyThread_type_lock lock;
PyMutex mutex;
} compobject;

#define _compobject_CAST(op) ((compobject *)op)
Expand Down Expand Up @@ -291,12 +283,7 @@ newcompobject(PyTypeObject *type)
Py_DECREF(self);
return NULL;
}
self->lock = PyThread_allocate_lock();
if (self->lock == NULL) {
Py_DECREF(self);
PyErr_SetString(PyExc_MemoryError, "Unable to allocate lock");
return NULL;
}
self->mutex = (PyMutex){0};
return self;
}

Expand Down Expand Up @@ -720,10 +707,10 @@ compobject_dealloc_impl(PyObject *op, int (*dealloc)(z_streamp))
PyTypeObject *type = Py_TYPE(op);
PyObject_GC_UnTrack(op);
compobject *self = _compobject_CAST(op);
assert(!PyMutex_IsLocked(&self->mutex));
if (self->is_initialised) {
(void)dealloc(&self->zst);
}
PyThread_free_lock(self->lock);
Py_XDECREF(self->unused_data);
Py_XDECREF(self->unconsumed_tail);
Py_XDECREF(self->zdict);
Expand Down Expand Up @@ -777,7 +764,7 @@ zlib_Compress_compress_impl(compobject *self, PyTypeObject *cls,
_BlocksOutputBuffer buffer = {.writer = NULL};
zlibstate *state = PyType_GetModuleState(cls);

ENTER_ZLIB(self);
PyMutex_Lock(&self->mutex);

self->zst.next_in = data->buf;
Py_ssize_t ibuflen = data->len;
Expand Down Expand Up @@ -819,7 +806,7 @@ zlib_Compress_compress_impl(compobject *self, PyTypeObject *cls,
OutputBuffer_OnError(&buffer);
return_value = NULL;
success:
LEAVE_ZLIB(self);
PyMutex_Unlock(&self->mutex);
return return_value;
}

Expand Down Expand Up @@ -850,7 +837,7 @@ save_unconsumed_input(compobject *self, Py_buffer *data, int err)
if (new_unused_data == NULL) {
return -1;
}
Py_SETREF(self->unused_data, new_unused_data);
_PyObject_XSetRefDelayed(&self->unused_data, new_unused_data);
Comment thread
yoney marked this conversation as resolved.
Outdated
self->zst.avail_in = 0;
}
}
Expand All @@ -864,7 +851,7 @@ save_unconsumed_input(compobject *self, Py_buffer *data, int err)
(char *)self->zst.next_in, left_size);
if (new_data == NULL)
return -1;
Py_SETREF(self->unconsumed_tail, new_data);
_PyObject_XSetRefDelayed(&self->unconsumed_tail, new_data);
}

return 0;
Expand Down Expand Up @@ -909,7 +896,7 @@ zlib_Decompress_decompress_impl(compobject *self, PyTypeObject *cls,
max_length = -1;
}

ENTER_ZLIB(self);
PyMutex_Lock(&self->mutex);

self->zst.next_in = data->buf;
ibuflen = data->len;
Expand Down Expand Up @@ -962,7 +949,7 @@ zlib_Decompress_decompress_impl(compobject *self, PyTypeObject *cls,
if (err == Z_STREAM_END) {
/* This is the logical place to call inflateEnd, but the old behaviour
of only calling it on flush() is preserved. */
self->eof = 1;
FT_ATOMIC_STORE_CHAR_RELAXED(self->eof, 1);
Comment thread
emmatyping marked this conversation as resolved.
} else if (err != Z_OK && err != Z_BUF_ERROR) {
/* We will only get Z_BUF_ERROR if the output buffer was full
but there wasn't more output when we tried again, so it is
Expand All @@ -981,7 +968,7 @@ zlib_Decompress_decompress_impl(compobject *self, PyTypeObject *cls,
OutputBuffer_OnError(&buffer);
return_value = NULL;
success:
LEAVE_ZLIB(self);
PyMutex_Unlock(&self->mutex);
return return_value;
}

Expand Down Expand Up @@ -1014,7 +1001,7 @@ zlib_Compress_flush_impl(compobject *self, PyTypeObject *cls, int mode)
return Py_GetConstant(Py_CONSTANT_EMPTY_BYTES);
}

ENTER_ZLIB(self);
PyMutex_Lock(&self->mutex);

self->zst.avail_in = 0;

Expand Down Expand Up @@ -1070,7 +1057,7 @@ zlib_Compress_flush_impl(compobject *self, PyTypeObject *cls, int mode)
OutputBuffer_OnError(&buffer);
return_value = NULL;
success:
LEAVE_ZLIB(self);
PyMutex_Unlock(&self->mutex);
return return_value;
}

Expand All @@ -1094,9 +1081,9 @@ zlib_Compress_copy_impl(compobject *self, PyTypeObject *cls)
if (!return_value) return NULL;

/* Copy the zstream state
* We use ENTER_ZLIB / LEAVE_ZLIB to make this thread-safe
* We use mutex to make this thread-safe
*/
ENTER_ZLIB(self);
PyMutex_Lock(&self->mutex);
int err = deflateCopy(&return_value->zst, &self->zst);
switch (err) {
case Z_OK:
Expand All @@ -1120,11 +1107,11 @@ zlib_Compress_copy_impl(compobject *self, PyTypeObject *cls)
/* Mark it as being initialized */
return_value->is_initialised = 1;

LEAVE_ZLIB(self);
PyMutex_Unlock(&self->mutex);
return (PyObject *)return_value;

error:
LEAVE_ZLIB(self);
PyMutex_Unlock(&self->mutex);
Py_XDECREF(return_value);
return NULL;
}
Expand Down Expand Up @@ -1178,9 +1165,9 @@ zlib_Decompress_copy_impl(compobject *self, PyTypeObject *cls)
if (!return_value) return NULL;

/* Copy the zstream state
* We use ENTER_ZLIB / LEAVE_ZLIB to make this thread-safe
* We use mutex to make this thread-safe
*/
ENTER_ZLIB(self);
PyMutex_Lock(&self->mutex);
int err = inflateCopy(&return_value->zst, &self->zst);
switch (err) {
case Z_OK:
Expand All @@ -1205,11 +1192,11 @@ zlib_Decompress_copy_impl(compobject *self, PyTypeObject *cls)
/* Mark it as being initialized */
return_value->is_initialised = 1;

LEAVE_ZLIB(self);
PyMutex_Unlock(&self->mutex);
return (PyObject *)return_value;

error:
LEAVE_ZLIB(self);
PyMutex_Unlock(&self->mutex);
Py_XDECREF(return_value);
return NULL;
}
Expand Down Expand Up @@ -1282,10 +1269,10 @@ zlib_Decompress_flush_impl(compobject *self, PyTypeObject *cls,
return NULL;
}

ENTER_ZLIB(self);
PyMutex_Lock(&self->mutex);

if (PyObject_GetBuffer(self->unconsumed_tail, &data, PyBUF_SIMPLE) == -1) {
LEAVE_ZLIB(self);
PyMutex_Unlock(&self->mutex);
return NULL;
}

Expand Down Expand Up @@ -1333,7 +1320,7 @@ zlib_Decompress_flush_impl(compobject *self, PyTypeObject *cls,

/* If at end of stream, clean up any memory allocated by zlib. */
if (err == Z_STREAM_END) {
self->eof = 1;
FT_ATOMIC_STORE_CHAR_RELAXED(self->eof, 1);
self->is_initialised = 0;
err = inflateEnd(&self->zst);
if (err != Z_OK) {
Expand All @@ -1352,7 +1339,7 @@ zlib_Decompress_flush_impl(compobject *self, PyTypeObject *cls,
return_value = NULL;
success:
PyBuffer_Release(&data);
LEAVE_ZLIB(self);
PyMutex_Unlock(&self->mutex);
return return_value;
}

Expand All @@ -1361,7 +1348,7 @@ typedef struct {
PyObject_HEAD
z_stream zst;
PyObject *zdict;
PyThread_type_lock lock;
PyMutex mutex;
PyObject *unused_data;
uint8_t *input_buffer;
Py_ssize_t input_buffer_size;
Expand All @@ -1387,7 +1374,7 @@ ZlibDecompressor_dealloc(PyObject *op)
PyTypeObject *type = Py_TYPE(op);
PyObject_GC_UnTrack(op);
ZlibDecompressor *self = ZlibDecompressor_CAST(op);
PyThread_free_lock(self->lock);
assert(!PyMutex_IsLocked(&self->mutex));
if (self->is_initialised) {
inflateEnd(&self->zst);
}
Expand Down Expand Up @@ -1545,7 +1532,7 @@ decompress_buf(ZlibDecompressor *self, Py_ssize_t max_length)
} while(err != Z_STREAM_END && self->avail_in_real != 0);

if (err == Z_STREAM_END) {
self->eof = 1;
FT_ATOMIC_STORE_CHAR_RELAXED(self->eof, 1);
self->is_initialised = 0;
/* Unlike the Decompress object we call inflateEnd here as there are no
backwards compatibility issues */
Expand Down Expand Up @@ -1633,23 +1620,23 @@ decompress(ZlibDecompressor *self, uint8_t *data,
}

if (self->eof) {
self->needs_input = 0;
FT_ATOMIC_STORE_CHAR_RELAXED(self->needs_input, 0);

if (self->avail_in_real > 0) {
PyObject *unused_data = PyBytes_FromStringAndSize(
(char *)self->zst.next_in, self->avail_in_real);
if (unused_data == NULL) {
goto error;
}
Py_XSETREF(self->unused_data, unused_data);
_PyObject_XSetRefDelayed(&self->unused_data, unused_data);
}
}
else if (self->avail_in_real == 0) {
self->zst.next_in = NULL;
self->needs_input = 1;
FT_ATOMIC_STORE_CHAR_RELAXED(self->needs_input, 1);
}
else {
self->needs_input = 0;
FT_ATOMIC_STORE_CHAR_RELAXED(self->needs_input, 0);

/* If we did not use the input buffer, we now have
to copy the tail from the caller's buffer into the
Expand Down Expand Up @@ -1718,14 +1705,14 @@ zlib__ZlibDecompressor_decompress_impl(ZlibDecompressor *self,
{
PyObject *result = NULL;

ENTER_ZLIB(self);
PyMutex_Lock(&self->mutex);
if (self->eof) {
PyErr_SetString(PyExc_EOFError, "End of stream already reached");
}
else {
result = decompress(self, data->buf, data->len, max_length);
}
LEAVE_ZLIB(self);
PyMutex_Unlock(&self->mutex);
return result;
}

Expand Down Expand Up @@ -1767,12 +1754,7 @@ zlib__ZlibDecompressor_impl(PyTypeObject *type, int wbits, PyObject *zdict)
self->zst.next_in = NULL;
self->zst.avail_in = 0;
self->unused_data = Py_GetConstant(Py_CONSTANT_EMPTY_BYTES);
self->lock = PyThread_allocate_lock();
if (self->lock == NULL) {
Py_DECREF(self);
PyErr_SetString(PyExc_MemoryError, "Unable to allocate lock");
return NULL;
}
self->mutex = (PyMutex){0};
int err = inflateInit2(&(self->zst), wbits);
switch (err) {
case Z_OK:
Expand Down
Loading