Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] PR: Add remote-files capability to remote-client plugin #23381

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions binder/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ dependencies:
- cookiecutter >=1.6.0
- diff-match-patch >=20181111
- fcitx-qt5 >=1.2.7
- fsspec >= 2021.10.0
- fzf >=0.42.0
- importlib-metadata >=4.6.0
- intervaltree >=3.0.2
Expand Down
6 changes: 3 additions & 3 deletions external-deps/spyder-kernels/.gitrepo

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 3 additions & 9 deletions external-deps/spyder-kernels/RELEASE.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions requirements/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ dependencies:
- cloudpickle >=0.5.0
- cookiecutter >=1.6.0
- diff-match-patch >=20181111
- fsspec >= 2021.10.0
- fzf >=0.42.0
# Need at least some compatibility with python 3.10 features
- importlib-metadata >=4.6.0
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ def run(self):
'cloudpickle>=0.5.0',
'cookiecutter>=1.6.0',
'diff-match-patch>=20181111',
'fsspec>=2021.10.0',
# While this is only required for python <3.10, it is safe enough to
# install in all cases and helps the tests to pass.
'importlib-metadata>=4.6.0',
Expand Down
5 changes: 5 additions & 0 deletions spyder/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
CLOUDPICKLE_REQVER = '>=0.5.0'
COOKIECUTTER_REQVER = '>=1.6.0'
DIFF_MATCH_PATCH_REQVER = '>=20181111'
FSSPEC_REQVER = '>=2021.10.0'
IMPORTLIB_METADATA_REQVER = '>=4.6.0'
INTERVALTREE_REQVER = '>=3.0.2'
IPYTHON_REQVER = ">=8.12.2,<8.13.0" if PY38 else ">=8.13.0,<9.0.0,!=8.17.1"
Expand Down Expand Up @@ -130,6 +131,10 @@
'package_name': "diff-match-patch",
'features': _("Compute text file diff changes during edition"),
'required_version': DIFF_MATCH_PATCH_REQVER},
{'modname': "fsspec",
'package_name': "fsspec",
'features': _("File system abstraction layer for remote file systems"),
'required_version': FSSPEC_REQVER},
{'modname': 'importlib_metadata',
'package_name': 'importlib-metadata',
'features': _('Access the metadata for a Python package'),
Expand Down
263 changes: 263 additions & 0 deletions spyder/plugins/remoteclient/api/jupyterhub/files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
import json
import requests
import base64
from websockets.sync.client import connect

from fsspec.utils import stringify_path
from fsspec.spec import AbstractBufferedFile, AbstractFileSystem


class RemoteBufferedFile(AbstractBufferedFile):
"""
A buffered file-like object for reading/writing over the
WebSocket protocol. Inherit from AbstractBufferedFile, which
handles the high-level read/write logic, and only supply the
low-level chunk ops:
- _fetch_range()
- _initiate_upload()
- _upload_chunk()
- _finalize_upload()
"""

def _fetch_range(self, start, end, **kwargs):
"""
Download and return [start:end] of the underlying file, as bytes.
Open a sync WebSocket, send a read_block request, then gather.
"""
length = end - start
if length < 0:
return b""

# Connect to the server’s WebSocket
with connect(self.fs.ws_url) as ws:
# Send JSON request: method=read_block
# Note: if offset or length is 0, the server should handle gracefully.
msg = {
"method": "read_block",
"args": [self.path, start, length],
"kwargs": {} # e.g. delimiter, if needed
}
ws.send_or_raise(self.fs._encode_json(msg))

# Now collect base64 chunks
data = b""
while True:
resp_raw = ws.recv()
resp = self._decode_json(resp_raw)

if "error" in resp:
raise RuntimeError(f"Server error: {resp['error']}")
msg_type = resp.get("type")

if msg_type == "data":
chunk_b = base64.b64decode(resp["data"])
data += chunk_b
elif msg_type == "eof":
break
else:
raise ValueError(f"Unexpected message type: {msg_type}")
return data

def _initiate_upload(self):
"""
Called once when opening in write mode before writing the first data.
Open a new WebSocket, announce "write_file", and keep the socket open
for streaming chunks. Store the socket in self._ws.
"""
self._ws = connect(self.fs.ws_url)
# Announce write_file
msg = {
"method": "write_file",
"args": [self.path],
"kwargs": {}
}
self._ws.send_or_raise(self.fs._encode_json(msg))

def _upload_chunk(self, final=False):
"""
Called when the buffer is flushed to the backend. Encode self.buffer and
send it as {"type": "data", "data": <base64>}. If `final=True`, don't do
anything special here; the final call is in _finalize_upload().
"""
if not self.buffer:
return
chunk_b64 = base64.b64encode(self.buffer).decode()
msg = {"type": "data", "data": chunk_b64}
self._ws.send_or_raise(self._encode_json(msg))

def _finalize_upload(self):
"""
Called once after all writing is done. Send 'eof',
then wait for "write_complete".
"""
# Send 'eof'
eof_msg = {"type": "eof"}
self._ws.send_or_raise(self._encode_json(eof_msg))

# Wait for server's final response
while True:
try:
resp_raw = self._ws.recv()
except:
break # socket closed?
resp = self._decode_json(resp_raw)
if "error" in resp:
raise RuntimeError(f"Server error: {resp['error']}")
if resp.get("type") == "write_complete":
break

# Close the socket
self._ws.close()

def close(self):
"""
Overridden close to ensure final flush (if writing).
"""
super().close() # calls self.flush(force=True) -> calls _finalize_upload()
# Additional logic if needed

def _encode_json(data: dict) -> str:
"""
Encode a JSON-serializable dict as a string.
"""
return json.dumps(data)

def _decode_json(data: str) -> dict:
"""
Decode a JSON string to a dict.
"""
return json.loads(data)

class RemoteFileSystem(AbstractFileSystem):
"""
An example custom FileSystem that:
- uses REST endpoints for all metadata operations
- uses a WebSocket for chunked read/write
"""
cachable = False

def __init__(self, rest_url, ws_url, *args, **kwargs):
super().__init__(*args, **kwargs)
self.rest_url = rest_url.rstrip("/")
self.ws_url = ws_url

# Typically you’d keep a session for performance
self._session = requests.Session()

# ----------------------------------------------------------------
# Internally used helper for REST calls
# ----------------------------------------------------------------
def _rest_get(self, endpoint, params=None):
url = f"{self.rest_url}/{endpoint}"
r = self._session.get(url, params=params)
r.raise_for_status()
return r.json()

def _rest_post(self, endpoint, params=None):
url = f"{self.rest_url}/{endpoint}"
r = self._session.post(url, params=params)
r.raise_for_status()
return r.json()

def _rest_delete(self, endpoint, params=None):
url = f"{self.rest_url}/{endpoint}"
r = self._session.delete(url, params=params)
r.raise_for_status()
return r.json()

# ----------------------------------------------------------------
# fsspec-required API: metadata
# ----------------------------------------------------------------
def ls(self, path, detail=True, **kwargs):
"""List files at a path."""
params = {"path": path, "detail": str(detail).lower()}
out = self._rest_get("ls", params=params)
return out

def info(self, path, **kwargs):
"""Get info about a single file/directory."""
params = {"path": path}
out = self._rest_get("info", params=params)
return out

def isfile(self, path):
params = {"path": path}
out = self._rest_get("isfile", params=params)
return out["isfile"]

def isdir(self, path):
params = {"path": path}
out = self._rest_get("isdir", params=params)
return out["isdir"]

def exists(self, path):
params = {"path": path}
out = self._rest_get("exists", params=params)
return out["exists"]

def mkdir(self, path, create_parents=True, **kwargs):
params = {
"path": path,
"create_parents": str(bool(create_parents)).lower(),
"exist_ok": str(bool(kwargs.get("exist_ok", False))).lower()
}
out = self._rest_post("mkdir", params=params)
return out

def rmdir(self, path):
params = {"path": path}
out = self._rest_delete("rmdir", params=params)
return out

def rm_file(self, path, missing_ok=False):
params = {"path": path, "missing_ok": str(bool(missing_ok)).lower()}
out = self._rest_delete("file", params=params)
return out

def touch(self, path, truncate=True, **kwargs):
params = {"path": path, "truncate": str(bool(truncate)).lower()}
out = self._rest_post("touch", params=params)
return out

# ----------------------------------------------------------------
# fsspec open/read/write
# ----------------------------------------------------------------
def _open(self, path, mode="rb", block_size=2**20, **kwargs):
"""
Return a file-like object that handles reading/writing with WebSocket.

Parameters
----------
path : str
Path to the file.
mode : str
File mode, e.g., 'rb', 'wb', 'ab', 'r+b', etc.
block_size : int
Chunk size for reading/writing. Default is 1MB.
"""
return RemoteBufferedFile(
fs=self,
path=stringify_path(path),
mode=mode,
block_size=block_size,
**kwargs
)

def cat_file(self, path, start=None, end=None, **kwargs):
"""
Read entire file (or partial range) from WebSocket and return bytes.
"""
# A straightforward approach: open in read mode, read the data
with self._open(path, mode="rb") as f:
if start:
f.seek(start)
length = None
if end is not None and start is not None:
length = end - start
out = f.read(length)
return out

def pipe_file(self, path, data, **kwargs):
"""Write bytes to a file (full overwrite)."""
with self._open(path, mode="wb") as f:
f.write(data)
Loading