Skip to content

Commit

Permalink
Add channel query option
Browse files Browse the repository at this point in the history
  • Loading branch information
zwimer committed Oct 9, 2024
1 parent ac34ae7 commit b2ab69e
Show file tree
Hide file tree
Showing 9 changed files with 98 additions and 10 deletions.
31 changes: 30 additions & 1 deletion rpipe/client/client/client.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from dataclasses import dataclass
from logging import getLogger
from json import dumps

from human_readable import listing

from ...shared import TRACE, QueryEC
from ..config import ConfigFile, Option, PartialConfig
from .util import REQUEST_TIMEOUT, request
from .errors import UsageError
from .errors import UsageError, VersionError
from .delete import delete
from .recv import recv
from .send import send
Expand All @@ -24,6 +26,7 @@ class Mode:
print_config: bool
save_config: bool
server_version: bool
query: bool
# Read/Write/Delete modes
read: bool
delete: bool
Expand Down Expand Up @@ -72,6 +75,28 @@ def tru(x) -> bool:
raise UsageError(fmt(args) + "when deleting data from the pipe")


def _query(conf: PartialConfig) -> None:
log = getLogger(_LOG)
log.info("Mode: Query")
if conf.url is None:
raise UsageError("URL unknown; try again with --url")
if conf.channel is None:
raise UsageError("Channel unknown; try again with --channel")
log.info("Querying channel %s ...", conf.channel)
r = request("GET", f"{conf.url.value}/q/{conf.channel.value}")
log.debug("Got response %s", r)
log.log(TRACE, "Data: %s", r.content)
match r.status_code:
case QueryEC.illegal_version:
raise VersionError(f"Server requires version >= {r.text}")
case QueryEC.no_data:
print("No data on this channel")
return
if not r.ok:
raise RuntimeError(f"Query failed. Error {r.status_code}: {r.text}")
print(f"{conf.channel.value}: {dumps(r.json(), indent=4)}")


def _priority_actions(conf: PartialConfig, mode: Mode, config_file) -> PartialConfig | None:
log = getLogger(_LOG)
# Print config if requested
Expand All @@ -95,6 +120,10 @@ def _priority_actions(conf: PartialConfig, mode: Mode, config_file) -> PartialCo
raise RuntimeError(f"Failed to get version: {r}")
print(f"rpipe_server {r.text}")
return None
if mode.query:
_query(conf)
return None

return conf


Expand Down
6 changes: 6 additions & 0 deletions rpipe/client/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ def cli() -> None:
priority_mode.add_argument(
"-Q", "--server-version", action="store_true", help="Print the server version then exit"
)
priority_mode.add_argument(
"-q",
"--query",
action="store_true",
help="Get information on the given channel",
)
priority_mode.add_argument("-A", "--admin", action="store_true", help="Allow use of admin commands")
# Admin commands
admin = parser.add_subparsers(
Expand Down
9 changes: 7 additions & 2 deletions rpipe/server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from ..shared import restrict_umask, log, __version__
from .util import MAX_SIZE_HARD, plaintext
from .channel import channel_handler
from .channel import handler, query
from .server import Server
from .admin import Admin

Expand Down Expand Up @@ -81,7 +81,12 @@ def _show_version() -> Response:

@app.route("/c/<channel>", methods=["DELETE", "GET", "POST", "PUT"])
def _channel(channel: str) -> Response:
return channel_handler(server.state, channel)
return handler(server.state, channel)


@app.route("/q/<channel>")
def _query(channel: str) -> Response:
return query(server.state, channel)


# Admin routes
Expand Down
2 changes: 1 addition & 1 deletion rpipe/server/channel/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from .channel import channel_handler
from .channel import handler, query
29 changes: 25 additions & 4 deletions rpipe/server/channel/channel.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from dataclasses import asdict
from logging import getLogger

from flask import request

from ...shared import QueryEC, QueryResponse, total_len
from ..util import plaintext, json_response
from ..server import ServerShutdown
from ..util import plaintext
from .write import write
from .read import read

Expand All @@ -14,7 +16,7 @@
from ..server import State


def _channel_handler(state: State, channel: str) -> Response:
def _handler(state: State, channel: str) -> Response:
log = getLogger("channel")
try:
match request.method:
Expand All @@ -37,11 +39,30 @@ def _channel_handler(state: State, channel: str) -> Response:
return plaintext("Server is shutting down", status=503)


def channel_handler(state: State, channel: str) -> Response:
def handler(state: State, channel: str) -> Response:
log = getLogger("channel")
log.info("Invoking: %s %s", request.method, channel)
ret = _channel_handler(state, channel)
ret = _handler(state, channel)
log.info("Sending: %s", ret)
if ret.status_code >= 400:
log.debug(" body: %s", ret.get_data())
return ret


def query(state: State, channel: str) -> Response:
log = getLogger("query")
log.info("Query %s", channel)
with state as u:
if (s := u.streams.get(channel, None)) is None:
log.debug("Channel not found: %s", channel)
return plaintext("No data on this channel", status=QueryEC.no_data)
q = QueryResponse(
new=s.new,
upload_complete=s.upload_complete,
size=total_len(s.data),
encrypted=s.encrypted,
version=s.version,
expiration=s.expire,
)
log.debug("Channel found: %s", q)
return json_response(asdict(q))
3 changes: 2 additions & 1 deletion rpipe/shared/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
UploadResponseHeaders,
DownloadRequestParams,
DownloadResponseHeaders,
QueryResponse,
)
from .error_code import UploadEC, DownloadEC, AdminEC
from .error_code import UploadEC, DownloadEC, QueryEC, AdminEC
from .admin import AdminMessage, ChannelInfo
from .util import restrict_umask, total_len
from .stats import AdminStats, Stats
Expand Down
10 changes: 10 additions & 0 deletions rpipe/shared/error_code.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,16 @@ class DownloadEC(metaclass=_UniqueEnum):
in_use: int = 453 # Someone else is reading from the pipe


class QueryEC(metaclass=_UniqueEnum):
"""
HTTP error codes the rpipe client may be sent when in query mode
Others may be sent, but these are the ones the client should be prepared to handle
"""

illegal_version: int = 426 # Illegal version
no_data: int = 410 # No data on this channel


class AdminEC(metaclass=_UniqueEnum):
"""
HTTP error codes the rpipe client may be sent when in admin mode
Expand Down
16 changes: 16 additions & 0 deletions rpipe/shared/request_response.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations
from dataclasses import dataclass, asdict
from typing import TYPE_CHECKING, TypeVar
from datetime import datetime

from .version_ import Version, WEB_VERSION

Expand Down Expand Up @@ -126,3 +127,18 @@ def _from_dict(cls, d: dict[str, str]) -> DownloadResponseHeaders:
final=d["final"] == "True",
encrypted=d["encrypted"] == "True",
)


#
# Query
#


@dataclass(kw_only=True, frozen=True)
class QueryResponse:
new: bool
upload_complete: bool
size: int
encrypted: bool
version: Version
expiration: datetime
2 changes: 1 addition & 1 deletion rpipe/shared/version_.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations


__version__: str = "8.6.0" # Must be "<major>.<minor>.<patch>", all numbers
__version__: str = "8.7.0" # Must be "<major>.<minor>.<patch>", all numbers


class Version:
Expand Down

0 comments on commit b2ab69e

Please sign in to comment.