-
-
Notifications
You must be signed in to change notification settings - Fork 34.5k
gh-133390: Support table, index, trigger, view, column, function, and schema completion in the sqlite3 CLI #136101
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
7164755
a641e59
0bfc0fd
47d7fb4
393e24c
dfd3721
62b9dc5
164a9e1
2c99f21
088e741
445e279
10f09e0
658d41c
48c60bf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,4 @@ | ||
| from _sqlite3 import OperationalError | ||
| from contextlib import contextmanager | ||
|
|
||
| try: | ||
|
|
@@ -8,29 +9,70 @@ | |
| _completion_matches = [] | ||
|
|
||
|
|
||
| def _complete(text, state): | ||
| def _complete(con, text, state): | ||
| global _completion_matches | ||
|
|
||
| if state == 0: | ||
| text_upper = text.upper() | ||
| _completion_matches = [c for c in SQLITE_KEYWORDS if c.startswith(text_upper)] | ||
| text_lower = text.lower() | ||
| _completion_matches = [c + " " for c in SQLITE_KEYWORDS if c.startswith(text_upper)] | ||
| cursor = con.cursor() | ||
| schemata = tuple(row[1] for row | ||
| in cursor.execute("PRAGMA database_list")) | ||
| # tables, indexes, triggers, and views | ||
| select_clauses = (f"SELECT name FROM \"{schema}\".sqlite_master" | ||
| for schema in schemata) | ||
| tables = (row[0] for row | ||
| in cursor.execute(" UNION ".join(select_clauses))) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Did you consider using I think it might be OK to simply not support completions for 3.37 (2021-11-27) and below, if it would mean less code to maintain. But, @erlend-aasland's opinion would would be more important than mine :)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree that |
||
| _completion_matches.extend(c + " " for c in tables | ||
| if c.lower().startswith(text_lower)) | ||
| # columns | ||
| try: | ||
| select_clauses = (f"""\ | ||
| SELECT pti.name FROM "{schema}".sqlite_master AS sm | ||
| JOIN pragma_table_xinfo(sm.name,'{schema}') AS pti | ||
| WHERE sm.type='table'""" for schema in schemata) | ||
| columns = (row[0] for row | ||
| in cursor.execute(" UNION ".join(select_clauses))) | ||
| _completion_matches.extend(c + " " for c in columns | ||
| if c.lower().startswith(text_lower)) | ||
| except OperationalError: | ||
| # skip on SQLite<3.16.0 where pragma table-valued function is not | ||
| # supported yet | ||
| pass | ||
| # functions | ||
| try: | ||
| funcs = (row[0] for row in cursor.execute("""\ | ||
| SELECT DISTINCT UPPER(name) FROM pragma_function_list() | ||
| WHERE name NOT IN ('->', '->>')""")) | ||
| _completion_matches.extend(c + "(" for c in funcs | ||
| if c.startswith(text_upper)) | ||
| except OperationalError: | ||
| # skip on SQLite<3.30.0 where function_list is not supported yet | ||
| pass | ||
| # schemata | ||
| _completion_matches.extend(c for c in schemata | ||
| if c.lower().startswith(text_lower)) | ||
| _completion_matches = sorted(set(_completion_matches)) | ||
| try: | ||
| return _completion_matches[state] + " " | ||
| return _completion_matches[state] | ||
| except IndexError: | ||
| return None | ||
|
|
||
|
|
||
| @contextmanager | ||
| def completer(): | ||
| def completer(con): | ||
| try: | ||
| import readline | ||
| except ImportError: | ||
| yield | ||
| return | ||
|
|
||
| old_completer = readline.get_completer() | ||
| def complete(text, state): | ||
| return _complete(con, text, state) | ||
| try: | ||
| readline.set_completer(_complete) | ||
| readline.set_completer(complete) | ||
| if readline.backend == "editline": | ||
| # libedit uses "^I" instead of "tab" | ||
| command_string = "bind ^I rl_complete" | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -216,10 +216,6 @@ class Completion(unittest.TestCase): | |
|
|
||
| @classmethod | ||
| def setUpClass(cls): | ||
| _sqlite3 = import_module("_sqlite3") | ||
| if not hasattr(_sqlite3, "SQLITE_KEYWORDS"): | ||
| raise unittest.SkipTest("unable to determine SQLite keywords") | ||
|
|
||
| readline = import_module("readline") | ||
| if readline.backend == "editline": | ||
| raise unittest.SkipTest("libedit readline is not supported") | ||
|
|
@@ -229,12 +225,24 @@ def write_input(self, input_, env=None): | |
| import readline | ||
| from sqlite3.__main__ import main | ||
|
|
||
| # Configure readline to ...: | ||
| # - hide control sequences surrounding each candidate | ||
| # - hide "Display all xxx possibilities? (y or n)" | ||
| # - show candidates one per line | ||
| readline.parse_and_bind("set colored-completion-prefix off") | ||
| readline.parse_and_bind("set completion-query-items 0") | ||
| readline.parse_and_bind("set page-completions off") | ||
| readline.parse_and_bind("set completion-display-width 0") | ||
|
|
||
| main() | ||
| """) | ||
| return run_pty(script, input_, env) | ||
|
|
||
| def test_complete_sql_keywords(self): | ||
| _sqlite3 = import_module("_sqlite3") | ||
| if not hasattr(_sqlite3, "SQLITE_KEYWORDS"): | ||
| raise unittest.SkipTest("unable to determine SQLite keywords") | ||
|
|
||
| # List candidates starting with 'S', there should be multiple matches. | ||
| input_ = b"S\t\tEL\t 1;\n.quit\n" | ||
| output = self.write_input(input_) | ||
|
|
@@ -249,6 +257,103 @@ def test_complete_sql_keywords(self): | |
| self.assertIn(b"SELECT", output) | ||
| self.assertIn(b"(1,)", output) | ||
|
|
||
| def test_complete_table_indexes_triggers_views(self): | ||
| input_ = textwrap.dedent("""\ | ||
| CREATE TABLE _table (id); | ||
| CREATE INDEX _index ON _table (id); | ||
| CREATE TRIGGER _trigger BEFORE INSERT | ||
| ON _table BEGIN SELECT 1; END; | ||
| CREATE VIEW _view AS SELECT 1; | ||
|
|
||
| CREATE TEMP TABLE _temp_table (id); | ||
| CREATE INDEX temp._temp_index ON _temp_table (id); | ||
| CREATE TEMP TRIGGER _temp_trigger BEFORE INSERT | ||
| ON _table BEGIN SELECT 1; END; | ||
| CREATE TEMP VIEW _temp_view AS SELECT 1; | ||
|
|
||
| ATTACH ':memory:' AS attached; | ||
| CREATE TABLE attached._attached_table (id); | ||
| CREATE INDEX attached._attached_index ON _attached_table (id); | ||
| CREATE TRIGGER attached._attached_trigger BEFORE INSERT | ||
| ON _attached_table BEGIN SELECT 1; END; | ||
| CREATE VIEW attached._attached_view AS SELECT 1; | ||
|
|
||
| SELECT id FROM _\t\tta\t; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The last
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for your review! I will add it. |
||
| .quit\n""").encode() | ||
| output = self.write_input(input_) | ||
| lines = output.decode().splitlines() | ||
| indices = [i for i, line in enumerate(lines) | ||
| if line.startswith(self.PS1)] | ||
| start, end = indices[-3], indices[-2] | ||
| candidates = [l.strip() for l in lines[start+1:end]] | ||
| self.assertEqual(candidates, | ||
| [ | ||
| "_attached_index", | ||
| "_attached_table", | ||
| "_attached_trigger", | ||
| "_attached_view", | ||
| "_index", | ||
| "_table", | ||
| "_temp_index", | ||
| "_temp_table", | ||
| "_temp_trigger", | ||
| "_temp_view", | ||
| "_trigger", | ||
| "_view", | ||
| ], | ||
| ) | ||
|
|
||
| def test_complete_columns(self): | ||
| input_ = textwrap.dedent("""\ | ||
| CREATE TABLE _table (_col_table); | ||
| CREATE TEMP TABLE _temp_table (_col_temp); | ||
| ATTACH ':memory:' AS attached; | ||
| CREATE TABLE attached._attached_table (_col_attached); | ||
|
|
||
| SELECT _col_\t\tta\tFROM _table; | ||
| .quit\n""").encode() | ||
| output = self.write_input(input_) | ||
| lines = output.decode().splitlines() | ||
| indices = [ | ||
| i for i, line in enumerate(lines) if line.startswith(self.PS1) | ||
| ] | ||
| start, end = indices[-3], indices[-2] | ||
| candidates = [l.strip() for l in lines[start+1:end]] | ||
|
|
||
| self.assertEqual( | ||
| candidates, ["_col_attached", "_col_table", "_col_temp"] | ||
| ) | ||
|
|
||
| def test_complete_functions(self): | ||
| input_ = b"SELECT AV\t1);\n.quit\n" | ||
| output = self.write_input(input_) | ||
| self.assertIn(b"AVG(1);", output) | ||
| self.assertIn(b"(1.0,)", output) | ||
|
|
||
| # Functions are completed in upper case for even lower case user input. | ||
| input_ = b"SELECT av\t1);\n.quit\n" | ||
| output = self.write_input(input_) | ||
| self.assertIn(b"AVG(1);", output) | ||
| self.assertIn(b"(1.0,)", output) | ||
|
|
||
| def test_complete_schemata(self): | ||
| input_ = textwrap.dedent("""\ | ||
| ATTACH ':memory:' AS _attached; | ||
| CREATE TEMP TABLE _table (id); | ||
|
|
||
| SELECT * FROM \t\t_att\t.sqlite_master; | ||
| .quit\n""").encode() | ||
| output = self.write_input(input_) | ||
| lines = output.decode().splitlines() | ||
| indices = [ | ||
| i for i, line in enumerate(lines) if line.startswith(self.PS1) | ||
| ] | ||
| start, end = indices[-3], indices[-2] | ||
| candidates = [l.strip() for l in lines[start+1:end]] | ||
| self.assertIn("_attached", candidates) | ||
| self.assertIn("main", candidates) | ||
| self.assertIn("temp", candidates) | ||
|
|
||
| @unittest.skipIf(sys.platform.startswith("freebsd"), | ||
| "Two actual tabs are inserted when there are no matching" | ||
| " completions in the pseudo-terminal opened by run_pty()" | ||
|
|
@@ -269,8 +374,6 @@ def test_complete_no_match(self): | |
| self.assertEqual(line_num, len(lines)) | ||
|
|
||
| def test_complete_no_input(self): | ||
| from _sqlite3 import SQLITE_KEYWORDS | ||
|
|
||
| script = textwrap.dedent(""" | ||
| import readline | ||
| from sqlite3.__main__ import main | ||
|
|
@@ -301,7 +404,7 @@ def test_complete_no_input(self): | |
| self.assertEqual(len(indices), 2) | ||
| start, end = indices | ||
| candidates = [l.strip() for l in lines[start+1:end]] | ||
| self.assertEqual(candidates, sorted(SQLITE_KEYWORDS)) | ||
| self.assertEqual(candidates, sorted(candidates)) | ||
| except: | ||
| if verbose: | ||
| print(' PTY output: '.center(30, '-')) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,2 @@ | ||
| Support table, index, trigger, view, column, function, and schema completion | ||
| for :mod:`sqlite3`'s :ref:`command-line interface <sqlite3-cli>`. |
Uh oh!
There was an error while loading. Please reload this page.