|
2 | 2 | import unittest |
3 | 3 | import aiomysql |
4 | 4 |
|
| 5 | +import pytest |
| 6 | +from pymysql.err import Warning |
5 | 7 | from tests import base |
6 | 8 | from tests._testutils import run_until_complete |
7 | 9 |
|
@@ -404,3 +406,41 @@ def test_issue_175(self): |
404 | 406 | assert len(cur.description) == length |
405 | 407 | finally: |
406 | 408 | yield from cur.execute('drop table if exists test_field_count') |
| 409 | + |
| 410 | + |
| 411 | +# MySQL will get you to renegotiate if sent a cleartext password |
| 412 | +@pytest.mark.run_loop |
| 413 | +async def test_issue_323(mysql_server, loop, recwarn): |
| 414 | + async with aiomysql.create_pool(**mysql_server['conn_params'], |
| 415 | + loop=loop) as pool: |
| 416 | + async with pool.get() as conn: |
| 417 | + async with conn.cursor() as cur: |
| 418 | + create_db = "CREATE DATABASE IF NOT EXISTS bugtest;" |
| 419 | + await cur.execute(create_db) |
| 420 | + |
| 421 | + create_table = """CREATE TABLE IF NOT EXISTS `bugtest`.`testtable` ( |
| 422 | + `id` INT UNSIGNED NOT NULL AUTO_INCREMENT, |
| 423 | + `bindata` VARBINARY(200) NOT NULL, |
| 424 | + PRIMARY KEY (`id`) |
| 425 | + );""" |
| 426 | + |
| 427 | + await cur.execute(create_table) |
| 428 | + |
| 429 | + try: |
| 430 | + async with conn.cursor() as cur: |
| 431 | + await cur.execute("INSERT INTO `bugtest`.`testtable` " |
| 432 | + "(bindata) VALUES (%s);", |
| 433 | + (b'\xB0\x17',)) |
| 434 | + |
| 435 | + warnings = [warn for warn in recwarn.list |
| 436 | + if warn.category is Warning] |
| 437 | + assert len(warnings) == 0, "Got unexpected MySQL warning" |
| 438 | + |
| 439 | + await cur.execute("SELECT * FROM `bugtest`.`testtable`;") |
| 440 | + rows = await cur.fetchall() |
| 441 | + |
| 442 | + assert len(rows) == 1, "Table should have 1 row" |
| 443 | + |
| 444 | + finally: |
| 445 | + async with conn.cursor() as cur: |
| 446 | + await cur.execute("DELETE FROM `bugtest`.`testtable`;") |
0 commit comments