| import base64 |
| import io |
| import re |
|
|
| import requests |
|
|
| import fsspec |
|
|
|
|
| class JupyterFileSystem(fsspec.AbstractFileSystem): |
| """View of the files as seen by a Jupyter server (notebook or lab)""" |
|
|
| protocol = ("jupyter", "jlab") |
|
|
| def __init__(self, url, tok=None, **kwargs): |
| """ |
| |
| Parameters |
| ---------- |
| url : str |
| Base URL of the server, like "http://127.0.0.1:8888". May include |
| token in the string, which is given by the process when starting up |
| tok : str |
| If the token is obtained separately, can be given here |
| kwargs |
| """ |
| if "?" in url: |
| if tok is None: |
| try: |
| tok = re.findall("token=([a-z0-9]+)", url)[0] |
| except IndexError as e: |
| raise ValueError("Could not determine token") from e |
| url = url.split("?", 1)[0] |
| self.url = url.rstrip("/") + "/api/contents" |
| self.session = requests.Session() |
| if tok: |
| self.session.headers["Authorization"] = f"token {tok}" |
|
|
| super().__init__(**kwargs) |
|
|
| def ls(self, path, detail=True, **kwargs): |
| path = self._strip_protocol(path) |
| r = self.session.get(f"{self.url}/{path}") |
| if r.status_code == 404: |
| return FileNotFoundError(path) |
| r.raise_for_status() |
| out = r.json() |
|
|
| if out["type"] == "directory": |
| out = out["content"] |
| else: |
| out = [out] |
| for o in out: |
| o["name"] = o.pop("path") |
| o.pop("content") |
| if o["type"] == "notebook": |
| o["type"] = "file" |
| if detail: |
| return out |
| return [o["name"] for o in out] |
|
|
| def cat_file(self, path, start=None, end=None, **kwargs): |
| path = self._strip_protocol(path) |
| r = self.session.get(f"{self.url}/{path}") |
| if r.status_code == 404: |
| return FileNotFoundError(path) |
| r.raise_for_status() |
| out = r.json() |
| if out["format"] == "text": |
| |
| b = out["content"].encode() |
| else: |
| b = base64.b64decode(out["content"]) |
| return b[start:end] |
|
|
| def pipe_file(self, path, value, **_): |
| path = self._strip_protocol(path) |
| json = { |
| "name": path.rsplit("/", 1)[-1], |
| "path": path, |
| "size": len(value), |
| "content": base64.b64encode(value).decode(), |
| "format": "base64", |
| "type": "file", |
| } |
| self.session.put(f"{self.url}/{path}", json=json) |
|
|
| def mkdir(self, path, create_parents=True, **kwargs): |
| path = self._strip_protocol(path) |
| if create_parents and "/" in path: |
| self.mkdir(path.rsplit("/", 1)[0], True) |
| json = { |
| "name": path.rsplit("/", 1)[-1], |
| "path": path, |
| "size": None, |
| "content": None, |
| "type": "directory", |
| } |
| self.session.put(f"{self.url}/{path}", json=json) |
|
|
| def _rm(self, path): |
| path = self._strip_protocol(path) |
| self.session.delete(f"{self.url}/{path}") |
|
|
| def _open(self, path, mode="rb", **kwargs): |
| path = self._strip_protocol(path) |
| if mode == "rb": |
| data = self.cat_file(path) |
| return io.BytesIO(data) |
| else: |
| return SimpleFileWriter(self, path, mode="wb") |
|
|
|
|
| class SimpleFileWriter(fsspec.spec.AbstractBufferedFile): |
| def _upload_chunk(self, final=False): |
| """Never uploads a chunk until file is done |
| |
| Not suitable for large files |
| """ |
| if final is False: |
| return False |
| self.buffer.seek(0) |
| data = self.buffer.read() |
| self.fs.pipe_file(self.path, data) |
|
|