Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions crates/amalthea/src/comm/comm_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,39 @@ impl MessageType for UiFrontendRequest {
String::from("rpc_request")
}
}

/// Create an RPC message nested in a comm message
/// Creates a `CommMsg::Data` with `method` and `params` fields.
///
/// Example usage:
///
/// ```
/// comm_rpc_message!("my_method")
/// comm_rpc_message!("my_method", foo = 1, bar = my_value)
/// ```
#[macro_export]
macro_rules! comm_rpc_message {
($method:expr) => {
CommMsg::Data(serde_json::json!({
"method": $method,
"params": {}
}))
};
($method:expr, $($param_key:ident = $param_value:expr),+ $(,)?) => {
CommMsg::Data(serde_json::json!({
"method": $method,
"params": {
$(
stringify!($param_key): $param_value
),*
}
}))
};
}

pub fn comm_rpc_message(method: &str, params: Value) -> CommMsg {
CommMsg::Data(serde_json::json!({
"method": method,
"params": params
}))
}
16 changes: 6 additions & 10 deletions crates/amalthea/src/kernel.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
/*
* kernel.rs
*
* Copyright (C) 2022 Posit Software, PBC. All rights reserved.
* Copyright (C) 2022-2025 Posit Software, PBC. All rights reserved.
*
*/

use core::panic;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::Mutex;

Expand Down Expand Up @@ -63,8 +63,7 @@ pub fn connect(
registration_file: Option<RegistrationFile>,
shell_handler: Box<dyn ShellHandler>,
control_handler: Arc<Mutex<dyn ControlHandler>>,
lsp_handler: Option<Arc<Mutex<dyn ServerHandler>>>,
dap_handler: Option<Arc<Mutex<dyn ServerHandler>>>,
server_handlers: HashMap<String, Arc<Mutex<dyn ServerHandler>>>,
stream_behavior: StreamBehavior,
iopub_tx: Sender<IOPubMessage>,
iopub_rx: Receiver<IOPubMessage>,
Expand Down Expand Up @@ -111,8 +110,7 @@ pub fn connect(
iopub_tx_clone,
comm_manager_tx,
shell_handler,
lsp_handler,
dap_handler,
server_handlers,
)
});

Expand Down Expand Up @@ -357,16 +355,14 @@ fn shell_thread(
iopub_tx: Sender<IOPubMessage>,
comm_manager_tx: Sender<CommManagerEvent>,
shell_handler: Box<dyn ShellHandler>,
lsp_handler: Option<Arc<Mutex<dyn ServerHandler>>>,
dap_handler: Option<Arc<Mutex<dyn ServerHandler>>>,
server_handlers: HashMap<String, Arc<Mutex<dyn ServerHandler>>>,
) -> Result<(), Error> {
let mut shell = Shell::new(
socket,
iopub_tx.clone(),
comm_manager_tx,
shell_handler,
lsp_handler,
dap_handler,
server_handlers,
);
shell.listen();
Ok(())
Expand Down
3 changes: 3 additions & 0 deletions crates/amalthea/src/socket/iopub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ pub struct IOPub {

/// Enumeration of possible channels that an IOPub message can be associated
/// with.
#[derive(Debug)]
pub enum IOPubContextChannel {
Shell,
Control,
Expand All @@ -79,6 +80,7 @@ pub enum IOPubContextChannel {
/// Enumeration of all messages that can be delivered from the IOPub XPUB/SUB
/// socket. These messages generally are created on other threads and then sent
/// via a channel to the IOPub thread.
#[derive(Debug)]
pub enum IOPubMessage {
Status(JupyterHeader, IOPubContextChannel, KernelStatus),
ExecuteResult(ExecuteResult),
Expand All @@ -96,6 +98,7 @@ pub enum IOPubMessage {

/// A special IOPub message used to block the sender until the IOPub queue has
/// forwarded all messages before this one on to the frontend.
#[derive(Debug)]
pub struct Wait {
pub wait_tx: Sender<()>,
}
Expand Down
132 changes: 78 additions & 54 deletions crates/amalthea/src/socket/shell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,17 @@
*/

use std::cell::RefCell;
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::Mutex;

use crossbeam::channel::Receiver;
use crossbeam::channel::Sender;
use futures::executor::block_on;
use serde_json::json;
use stdext::result::ResultOrLog;

use crate::comm::comm_channel::comm_rpc_message;
use crate::comm::comm_channel::Comm;
use crate::comm::comm_channel::CommMsg;
use crate::comm::event::CommManagerEvent;
Expand Down Expand Up @@ -60,11 +61,8 @@ pub struct Shell {
/// Language-provided shell handler object
shell_handler: RefCell<Box<dyn ShellHandler>>,

/// Language-provided LSP handler object
lsp_handler: Option<Arc<Mutex<dyn ServerHandler>>>,

/// Language-provided DAP handler object
dap_handler: Option<Arc<Mutex<dyn ServerHandler>>>,
/// Map of server handler target names to their handlers
server_handlers: HashMap<String, Arc<Mutex<dyn ServerHandler>>>,

/// Channel used to deliver comm events to the comm manager
comm_manager_tx: Sender<CommManagerEvent>,
Expand All @@ -78,14 +76,13 @@ impl Shell {
/// * `comm_manager_tx` - A channel that delivers messages to the comm manager thread
/// * `comm_changed_rx` - A channel that receives messages from the comm manager thread
/// * `shell_handler` - The language's shell channel handler
/// * `lsp_handler` - The language's LSP handler, if it supports LSP
/// * `server_handlers` - A map of server handler target names to their handlers
pub fn new(
socket: Socket,
iopub_tx: Sender<IOPubMessage>,
comm_manager_tx: Sender<CommManagerEvent>,
shell_handler: Box<dyn ShellHandler>,
lsp_handler: Option<Arc<Mutex<dyn ServerHandler>>>,
dap_handler: Option<Arc<Mutex<dyn ServerHandler>>>,
server_handlers: HashMap<String, Arc<Mutex<dyn ServerHandler>>>,
) -> Self {
// Need a RefCell to allow handler methods to be mutable.
// We only run one handler at a time so this is safe.
Expand All @@ -94,8 +91,7 @@ impl Shell {
socket,
iopub_tx,
shell_handler,
lsp_handler,
dap_handler,
server_handlers,
comm_manager_tx,
}
}
Expand Down Expand Up @@ -306,16 +302,27 @@ impl Shell {
/// request from the frontend to deliver a message to a backend, often as
/// the request side of a request/response pair.
fn handle_comm_msg(&self, header: JupyterHeader, msg: &CommWireMsg) -> crate::Result<()> {
// Store this message as a pending RPC request so that when the comm
// responds, we can match it up
self.comm_manager_tx
.send(CommManagerEvent::PendingRpc(header.clone()))
.unwrap();
// The presence of an `id` field means this is a request, not a notification
// https://github.com/posit-dev/positron/issues/7448
let comm_msg = if msg.data.get("id").is_some() {
// Note that the JSON-RPC `id` field must exactly match the one in
// the Jupyter header
let request_id = header.msg_id.clone();

// Store this message as a pending RPC request so that when the comm
// responds, we can match it up
self.comm_manager_tx
.send(CommManagerEvent::PendingRpc(header))
.unwrap();

CommMsg::Rpc(request_id, msg.data.clone())
} else {
CommMsg::Data(msg.data.clone())
};

// Send the message to the comm
let rpc = CommMsg::Rpc(header.msg_id.clone(), msg.data.clone());
self.comm_manager_tx
.send(CommManagerEvent::Message(msg.comm_id.clone(), rpc))
.send(CommManagerEvent::Message(msg.comm_id.clone(), comm_msg))
.unwrap();

Ok(())
Expand Down Expand Up @@ -374,29 +381,39 @@ impl Shell {
// deliver messages to the frontend without having to store its own
// internal ID or a reference to the IOPub channel.

let mut lsp_comm = false;

let opened = match comm {
// If this is the special LSP or DAP comms, start the server and create
// a comm that wraps it
Comm::Dap => {
server_started_rx = Some(Self::start_server_comm(
msg,
self.dap_handler.clone(),
&comm_socket,
)?);
true
},
Comm::Lsp => {
server_started_rx = Some(Self::start_server_comm(
msg,
self.lsp_handler.clone(),
&comm_socket,
)?);
// If this is an old-style server comm (only the LSP as of now),
// start the server and create a comm that wraps it
lsp_comm = true;

// Extract the target name (strip "positron." prefix if present)
let target_key = if msg.target_name.starts_with("positron.") {
&msg.target_name[9..]
} else {
&msg.target_name
};

let handler = self.server_handlers.get(target_key).cloned();
server_started_rx = Some(Self::start_server_comm(msg, handler, &comm_socket)?);
true
},

// Only the LSP and DAP comms are handled by the Amalthea
// kernel framework itself; all other comms are passed through
// to the shell handler.
Comm::Other(_) => {
// This might be a server comm or a regular comm
if let Some(handler) = self.server_handlers.get(&msg.target_name).cloned() {
server_started_rx =
Some(Self::start_server_comm(msg, Some(handler), &comm_socket)?);
true
} else {
// No server handler found, pass through to shell handler
block_on(shell_handler.handle_comm_open(comm, comm_socket.clone()))?
}
},

// All comms tied to known Positron clients are passed through to the shell handler
_ => {
// Call the shell handler to open the comm
block_on(shell_handler.handle_comm_open(comm, comm_socket.clone()))?
Expand All @@ -421,24 +438,30 @@ impl Shell {
// accept connections. This also sends back the port number to connect on. Failing
// to send or receive this notification is a critical failure for this comm.
if let Some(server_started_rx) = server_started_rx {
match server_started_rx.recv() {
Ok(server_started) => {
let message = CommMsg::Data(json!({
let result = (|| -> anyhow::Result<()> {
let params = server_started_rx.recv()?;

let message = if lsp_comm {
// If this is the LSP comm, use the legacy message structure.
// TODO: Switch LSP comms to new message structure once we've
// kicked the tyres enough with the DAP comm.
CommMsg::Data(serde_json::json!({
"msg_type": "server_started",
"content": server_started
}));

if let Err(error) = comm_socket.outgoing_tx.send(message) {
log::error!(
"For '{comm_name}', failed to send a `server_started` message: {error}"
);
return Err(Error::SendError(format!("{error}")));
}
},
Err(error) => {
log::error!("For '{comm_name}', failed to receive a `server_started_rx` message: {error}");
return Err(Error::ReceiveError(format!("{error}")));
},
"content": params
}))
} else {
comm_rpc_message("server_started", serde_json::to_value(params)?)
};

comm_socket.outgoing_tx.send(message)?;

Ok(())
})();

if let Err(err) = result {
let msg = format!("With comm '{comm_name}': {err}");
log::error!("{msg}");
return Err(Error::SendError(msg));
}
}

Expand Down Expand Up @@ -471,7 +494,8 @@ impl Shell {
} else {
// If we don't have the corresponding handler, return an error
log::error!(
"Client attempted to start LSP or DAP, but no handler was provided by kernel."
"Client attempted to start '{}', but no handler was provided by kernel.",
msg.target_name
);
Err(Error::UnknownCommName(msg.target_name.clone()))
}
Expand Down
3 changes: 2 additions & 1 deletion crates/amalthea/tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ fn test_amalthea_comms() {

let comm_req_id = frontend.send_shell(CommWireMsg {
comm_id: comm_id.to_string(),
data: serde_json::Value::Null,
// Include `id` field to signal this is a request
data: serde_json::json!({ "id": "foo" }),
});

frontend.recv_iopub_busy();
Expand Down
5 changes: 3 additions & 2 deletions crates/amalthea/tests/dummy_frontend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*
*/

use std::collections::HashMap;
use std::ops::Deref;
use std::ops::DerefMut;
use std::sync::Arc;
Expand Down Expand Up @@ -76,14 +77,14 @@ impl DummyAmaltheaFrontend {
let comm_manager_tx = comm_manager_tx.clone();

move || {
let server_handlers = HashMap::new();
if let Err(err) = kernel::connect(
"amalthea",
connection_file,
Some(registration_file),
shell,
control,
None,
None,
server_handlers,
StreamBehavior::None,
iopub_tx,
iopub_rx,
Expand Down
Loading
Loading