Skip to content

Commit 103ce12

Browse files
committed
wip: make aiokdb.cli use the reconnecting client, to make it easier to discover any reconnection issues
1 parent 6918163 commit 103ce12

File tree

3 files changed

+53
-23
lines changed

3 files changed

+53
-23
lines changed

aiokdb/cli.py

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,32 @@
33
import logging
44
import os
55
import traceback
6-
from typing import Any
6+
from typing import Any, Optional
77

88
from prompt_toolkit.history import FileHistory
99
from prompt_toolkit.patch_stdout import patch_stdout
1010
from prompt_toolkit.shortcuts import PromptSession
1111

1212
from aiokdb import TypeEnum, cv
13-
from aiokdb.client import open_qipc_connection
13+
from aiokdb.client import (
14+
ClientContext,
15+
KdbWriter,
16+
maintain_qipc_connection,
17+
)
1418
from aiokdb.format import AsciiFormatter
1519

1620

21+
class CliClientContext(ClientContext):
22+
def __init__(self) -> None:
23+
self.writer: Optional[KdbWriter] = None
24+
25+
async def writer_available(self, dotzw: KdbWriter) -> None:
26+
self.writer = dotzw
27+
28+
def writer_closing(self, dotzw: KdbWriter) -> None:
29+
self.writer = None
30+
31+
1732
async def main(args: Any) -> None:
1833
history = FileHistory(os.path.expanduser("~/.aiokdb-cli-history"))
1934
session: Any = PromptSession("(eval) > ", history=history)
@@ -23,8 +38,12 @@ async def main(args: Any) -> None:
2338
if password is None:
2439
password = await session.prompt_async("Password:", is_password=True)
2540

26-
r, w = await open_qipc_connection(
27-
host=args.host, port=args.port, user=args.user, password=password
41+
cc = CliClientContext()
42+
43+
task = asyncio.create_task(
44+
maintain_qipc_connection(
45+
uri=f"kdb://{args.user}:{password}@{args.host}:{args.port}", context=cc
46+
)
2847
)
2948

3049
# Run echo loop. Read text from stdin, and reply it back.
@@ -33,15 +52,22 @@ async def main(args: Any) -> None:
3352
inp = await session.prompt_async("q)", is_password=False)
3453
if inp == "":
3554
continue
36-
output = await w.sync_req(cv(inp))
37-
if output.t != TypeEnum.NIL:
38-
print(fmt.format(output))
55+
if not cc.writer:
56+
print("Writer not connected, wait for re-connect")
57+
else:
58+
output = await cc.writer.sync_req(cv(inp))
59+
if output.t != TypeEnum.NIL:
60+
print(fmt.format(output))
3961
except KeyboardInterrupt:
4062
return
4163
except EOFError:
4264
break
4365
except Exception:
4466
traceback.print_exc(limit=-2)
67+
68+
task.cancel()
69+
await task
70+
4571
return None
4672

4773

aiokdb/client.py

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -100,20 +100,17 @@ def mask_uri(uri: str) -> str:
100100
Return a version of the URI safe for logging — any password in the
101101
userinfo part (user:pass@) is replaced with '***'.
102102
"""
103-
try:
104-
parsed = urlparse(uri)
105-
# Rebuild netloc with redacted password if needed
106-
userinfo = parsed.username or ""
107-
if parsed.password:
108-
userinfo += ":***"
109-
netloc = f"{parsed.hostname}"
110-
if userinfo:
111-
netloc = f"{userinfo}@{parsed.hostname}"
112-
if parsed.port:
113-
netloc += f":{parsed.port}"
114-
return urlunparse(parsed._replace(netloc=netloc))
115-
except Exception:
116-
return uri
103+
parsed = urlparse(uri)
104+
# Rebuild netloc with redacted password if needed
105+
userinfo = parsed.username or ""
106+
if parsed.password:
107+
userinfo += ":***"
108+
netloc = f"{parsed.hostname}"
109+
if userinfo:
110+
netloc = f"{userinfo}@{parsed.hostname}"
111+
if parsed.port:
112+
netloc += f":{parsed.port}"
113+
return urlunparse(parsed._replace(netloc=netloc))
117114

118115

119116
if __name__ == "__main__":

aiokdb/server.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from functools import partial
88
from typing import Any, Callable, List, Optional, Tuple
99

10-
from aiokdb import KException, KObj, MessageType, TypeEnum, b9, d9, krr, logger
10+
from aiokdb import KException, KObj, MessageType, TypeEnum, b9, d9, kNil, krr, logger
1111

1212

1313
class CredentialsException(Exception):
@@ -308,7 +308,14 @@ async def start_qserver(port: int, context: ServerContext) -> Any:
308308

309309

310310
async def main(qpassword: str, qport: int) -> None:
311-
context = ServerContext(qpassword)
311+
class DelayedServerContext(ServerContext):
312+
async def on_sync_request(self, cmd: KObj, dotzw: KdbWriter) -> KObj: # .z.pg
313+
logging.info("Got request")
314+
await asyncio.sleep(1)
315+
logging.info("sending response")
316+
return kNil
317+
318+
context = DelayedServerContext(qpassword)
312319
server = await start_qserver(qport, context)
313320
async with server:
314321
await server.serve_forever()

0 commit comments

Comments
 (0)