Source code for jupyter_starters.py_starters.notebook

"""Use a notebook as a starter."""
# pylint: disable=duplicate-code,too-many-locals
import asyncio
import shutil
import tempfile
from collections import defaultdict
from pathlib import Path

from jupyter_server.utils import ensure_async

from ..json_ import JsonSchemaException, dumps, json_validator, loads
from ..types import Status

NBFORMAT_KEY = "jupyter_starters"
MAGIC_NOTEBOOK_NAME = "_jupyter_starter_.ipynb"
DEFAULT_MSG = {
    "silent": False,
    "store_history": False,
    "user_expressions": {},
    "allow_stdin": False,
    "stop_on_error": True,
}


[docs]def response_from_nbjson(nbjson): """Get the starter response.""" return nbjson.get("metadata", {}).get(NBFORMAT_KEY, {})
[docs]def response_from_notebook(src): """Load a path and return the metadata.""" nbp = Path(src).resolve() nbjson = loads(nbp.read_text(encoding="utf-8")) return response_from_nbjson(nbjson)
[docs]def kernel_for_path(src): """get the kernel. TODO: do better on account of freaky names """ nbp = Path(src).resolve() nbjson = loads(nbp.read_text(encoding="utf-8")) return nbjson["metadata"]["kernelspec"]["name"]
[docs]def starter_from_nbjson(nbjson): """Get just the starter.""" return response_from_nbjson(nbjson).get("starter", {})
[docs]async def get_kernel_and_tmpdir(name, starter, manager): """Use the manager to get a kernel and working directory.""" if name not in manager.kernel_dirs: kernel_name = kernel_for_path(manager.resolve_src(starter)) tmpdir = tempfile.mkdtemp() manager.kernel_dirs[name] = [ await ensure_async( manager.kernel_manager.start_kernel(cwd=tmpdir, kernel_name=kernel_name) ), tmpdir, ] kernel_id, tmpdir = manager.kernel_dirs[name] kernel = manager.kernel_manager.get_kernel(kernel_id) return kernel, tmpdir
[docs]async def stop_kernel(name, manager): """stop the kernel (and clean the tmpdir)""" kernel_id, tmpdir = manager.kernel_dirs.pop(name, [None, None]) if kernel_id: await ensure_async(manager.kernel_manager.shutdown_kernel(kernel_id, now=True)) shutil.rmtree(tmpdir, ignore_errors=True)
[docs]async def ensure_notebook(starter, path, body, tmpdir, manager): """Ensure a notebook exists in a temporary directory.""" nbp = manager.resolve_src(starter) tdp = Path(tmpdir) tmp_nb = tdp / MAGIC_NOTEBOOK_NAME if tmp_nb.exists(): tmp_nb.unlink() nbjson = loads(nbp.read_text()) meta = nbjson["metadata"].setdefault(NBFORMAT_KEY, {}) meta["body"] = body meta["path"] = path tmp_nb.write_text(dumps(nbjson)) return tmp_nb
[docs]async def copy_files(tmp_nb, path, manager): """Handle retrieving the files from the temporary directory.""" first_copied = None tmp_nb.unlink() for root in sorted(tmp_nb.parent.glob("*")): first_copied = root await manager.just_copy(root, path) return first_copied
[docs]async def run_cells(nbjson, kernel, manager): """Actually run the cells.""" futures = {} pubs = defaultdict(list) shell = kernel.connect_shell() iopub = kernel.connect_iopub() listening = True def on_shell(msg): if not listening: return _ident, smsg = kernel.session.feed_identities(msg) msg = kernel.session.deserialize(smsg) if msg["msg_type"] == "execute_reply": status = msg["content"]["status"] parent_id = msg["parent_header"]["msg_id"] futures[parent_id].set_result(msg) if status not in ["ok", "busy"]: manager.log.error(f"[{status}]: {msg}") manager.log.error(pubs[parent_id]) shell.on_recv(on_shell) def on_iopub(msg): if not listening: return _ident, smsg = kernel.session.feed_identities(msg) msg = kernel.session.deserialize(smsg) parent_id = msg.get("parent_header", {}).get("msg_id") if parent_id: pubs[parent_id].append(msg) iopub.on_recv(on_iopub) for cell in nbjson["cells"]: if cell["cell_type"] == "code": code = "".join(cell["source"]) msg = kernel.session.send( shell, "execute_request", content={"code": code, **DEFAULT_MSG}, ) futures[msg["msg_id"]] = asyncio.Future() results = await asyncio.gather(*futures.values(), return_exceptions=True) listening = False return results
[docs]async def notebook_starter(name, starter, path, body, manager): """(re)runs a notebook until its schema is correct.""" kernel, tmpdir = await get_kernel_and_tmpdir(name, starter, manager) tmp_nb = await ensure_notebook(starter, path, body, tmpdir, manager) nbjson = loads(tmp_nb.read_text()) await run_cells(nbjson, kernel, manager) nb_response = response_from_notebook(tmp_nb) nb_response.update(body=body, name=name, path=path) schema = nb_response.get("starter", {}).get("schema") if schema: validator = json_validator(schema) try: validator(body) if nb_response.get("status") is None: nb_response.update(status=Status.DONE) except JsonSchemaException as err: manager.log.debug(f"[not valid]: {err}") elif nb_response.get("status") is None: nb_response.update(status=Status.DONE) status = nb_response.get("status") copy = nb_response.get("copy", False) if status in [Status.DONE] or (status in [Status.CONTINUING] and copy): await copy_files(tmp_nb, path, manager) if status in [Status.DONE]: await stop_kernel(name, manager) if status is None: nb_response["status"] = Status.CONTINUING return nb_response