|
1 | 1 | import errno |
2 | 2 | import itertools |
3 | 3 | import os |
| 4 | +import signal |
| 5 | +import subprocess |
4 | 6 | import sys |
5 | | -import unittest |
6 | 7 | import termios |
| 8 | +import unittest |
7 | 9 | from functools import partial |
8 | 10 | from test.support import os_helper, force_not_colorized_test_class |
9 | 11 | from test.support import script_helper |
10 | | -import subprocess |
11 | | -import signal |
12 | | -import textwrap |
13 | 12 |
|
14 | 13 | from unittest import TestCase |
15 | 14 | from unittest.mock import MagicMock, call, patch, ANY, Mock |
@@ -331,115 +330,16 @@ def test_eio_error_handling_in_restore(self, mock_tcgetattr, mock_tcsetattr): |
331 | 330 |
|
332 | 331 | mock_tcsetattr.side_effect = termios.error(errno.EIO, "Input/output error") |
333 | 332 |
|
334 | | - try: |
335 | | - console.restore() |
336 | | - except termios.error as e: |
337 | | - if e.args[0] == errno.EIO: |
338 | | - self.fail("EIO error should have been handled gracefully in restore()") |
339 | | - raise |
| 333 | + # EIO error should be handled gracefully in restore() |
| 334 | + console.restore() |
340 | 335 |
|
341 | 336 | @unittest.skipUnless(sys.platform == "linux", "Only valid on Linux") |
342 | 337 | def test_repl_eio(self): |
343 | 338 | # Use the pty-based approach to simulate EIO error |
344 | | - child_code = textwrap.dedent(""" |
345 | | - import os, sys, pty, fcntl, termios, signal, time, errno |
346 | | -
|
347 | | - def handler(sig, f): |
348 | | - pass |
349 | | -
|
350 | | - def create_eio_condition(): |
351 | | - try: |
352 | | - # Try to create a condition that will actually produce EIO |
353 | | - # Method: Use your original script's approach with modifications |
354 | | - master_fd, slave_fd = pty.openpty() |
355 | | - # Fork a child that will manipulate the pty |
356 | | - child_pid = os.fork() |
357 | | - if child_pid == 0: |
358 | | - # Child process |
359 | | - try: |
360 | | - # Set up session and control terminal like your script |
361 | | - os.setsid() |
362 | | - fcntl.ioctl(slave_fd, termios.TIOCSCTTY, 0) |
363 | | - # Get process group |
364 | | - p2_pgid = os.getpgrp() |
365 | | - # Fork grandchild |
366 | | - grandchild_pid = os.fork() |
367 | | - if grandchild_pid == 0: |
368 | | - # Grandchild - set up process group |
369 | | - os.setpgid(0, 0) |
370 | | - # Redirect stdin to slave |
371 | | - os.dup2(slave_fd, 0) |
372 | | - if slave_fd > 2: |
373 | | - os.close(slave_fd) |
374 | | - # Fork great-grandchild for terminal control manipulation |
375 | | - ggc_pid = os.fork() |
376 | | - if ggc_pid == 0: |
377 | | - # Great-grandchild - just exit quickly |
378 | | - sys.exit(0) |
379 | | - else: |
380 | | - # Back to grandchild |
381 | | - try: |
382 | | - os.tcsetpgrp(0, p2_pgid) |
383 | | - except: |
384 | | - pass |
385 | | - sys.exit(0) |
386 | | - else: |
387 | | - # Back to child |
388 | | - try: |
389 | | - os.setpgid(grandchild_pid, grandchild_pid) |
390 | | - except ProcessLookupError: |
391 | | - pass |
392 | | - os.tcsetpgrp(slave_fd, grandchild_pid) |
393 | | - if slave_fd > 2: |
394 | | - os.close(slave_fd) |
395 | | - os.waitpid(grandchild_pid, 0) |
396 | | - # Manipulate terminal control to create EIO condition |
397 | | - os.tcsetpgrp(master_fd, p2_pgid) |
398 | | - # Now try to read from master - this might cause EIO |
399 | | - try: |
400 | | - os.read(master_fd, 1) |
401 | | - except OSError as e: |
402 | | - if e.errno == errno.EIO: |
403 | | - print(f"Setup created EIO condition: {e}", file=sys.stderr) |
404 | | - sys.exit(0) |
405 | | - except Exception as setup_e: |
406 | | - print(f"Setup error: {setup_e}", file=sys.stderr) |
407 | | - sys.exit(1) |
408 | | - else: |
409 | | - # Parent process |
410 | | - os.close(slave_fd) |
411 | | - os.waitpid(child_pid, 0) |
412 | | - # Now replace stdin with master_fd and try to read |
413 | | - os.dup2(master_fd, 0) |
414 | | - os.close(master_fd) |
415 | | - # This should now trigger EIO |
416 | | - result = input() |
417 | | - print(f"Unexpectedly got input: {repr(result)}", file=sys.stderr) |
418 | | - sys.exit(0) |
419 | | - except OSError as e: |
420 | | - if e.errno == errno.EIO: |
421 | | - print(f"Got EIO: {e}", file=sys.stderr) |
422 | | - sys.exit(1) |
423 | | - elif e.errno == errno.ENXIO: |
424 | | - print(f"Got ENXIO (no such device): {e}", file=sys.stderr) |
425 | | - sys.exit(1) # Treat ENXIO as success too |
426 | | - else: |
427 | | - print(f"Got other OSError: errno={e.errno} {e}", file=sys.stderr) |
428 | | - sys.exit(2) |
429 | | - except EOFError as e: |
430 | | - print(f"Got EOFError: {e}", file=sys.stderr) |
431 | | - sys.exit(3) |
432 | | - except Exception as e: |
433 | | - print(f"Got unexpected error: {type(e).__name__}: {e}", file=sys.stderr) |
434 | | - sys.exit(4) |
435 | | - # Set up signal handler for coordination |
436 | | - signal.signal(signal.SIGUSR1, lambda *a: create_eio_condition()) |
437 | | - print("READY", flush=True) |
438 | | - signal.pause() |
439 | | - """).strip() |
| 339 | + script_path = os.path.join(os.path.dirname(__file__), "eio_test_script.py") |
440 | 340 |
|
441 | 341 | proc = script_helper.spawn_python( |
442 | | - "-S", "-c", child_code, |
| 342 | + "-S", script_path, |
443 | 343 | stderr=subprocess.PIPE, |
444 | 344 | text=True |
445 | 345 | ) |
|
0 commit comments