Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
0047854
otel concept
shikokuchuo Aug 7, 2025
f2f444b
Include daemon on which mirai is evaluated as an attribute
shikokuchuo Aug 8, 2025
b7456f7
Enable across all daemons types
shikokuchuo Aug 8, 2025
0579830
Add mirai_map() spans
shikokuchuo Aug 13, 2025
f58b171
Add daemon() span
shikokuchuo Aug 14, 2025
e35a1df
Explicitly end daemon spans
shikokuchuo Aug 15, 2025
d5f03a1
Apply suggestion from @schloerke
shikokuchuo Aug 19, 2025
de8389b
Correct otel span nesting code
shikokuchuo Aug 19, 2025
de53879
Rebase
shikokuchuo Aug 19, 2025
b3b5036
Simplify inheritance for mirai::mirai_map
shikokuchuo Aug 19, 2025
e9c5199
Simplify inheritance for mirai::daemon and mirai::daemon->eval
shikokuchuo Aug 19, 2025
c4ce8f6
Also nest mirai::mirai_map under active spans
shikokuchuo Aug 19, 2025
70e017c
Better attribute labels
shikokuchuo Aug 20, 2025
503a161
Set status for mirai::daemons span
shikokuchuo Aug 20, 2025
7518546
Add mirai id and enable spans only when daemons are used
shikokuchuo Aug 20, 2025
9b24a59
Add news item
shikokuchuo Aug 20, 2025
68277ca
Set status for mirai::daemon->eval result
shikokuchuo Aug 20, 2025
58ee98a
Upgrade to use links
shikokuchuo Aug 21, 2025
efa5c1e
Add events to mirai::daemon span
shikokuchuo Aug 21, 2025
fa7ac08
Eval start
shikokuchuo Aug 21, 2025
02b4523
Add span kinds
shikokuchuo Aug 22, 2025
e08688e
Simplify and avoid long parent spans
shikokuchuo Aug 22, 2025
e5f3219
Update for new otel interfaces
shikokuchuo Sep 1, 2025
5a10821
Add otel tests
shikokuchuo Sep 1, 2025
98cac72
Migrate from dynGet for all otel spans
shikokuchuo Sep 1, 2025
d13a068
Complete otel tests
shikokuchuo Sep 2, 2025
e5ecc92
✨Add OpenTelemetry vignette✨ (#423)
shikokuchuo Sep 2, 2025
7239898
Update package docs and reorg vignettes
shikokuchuo Sep 2, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/test-coverage.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ jobs:
extra-packages: any::covr, any::xml2
needs: coverage

- name: Install additional packages
run: |
pak::pak(c("r-lib/otelsdk", "rlang"))
shell: Rscript {0}

- name: Test coverage
run: |
cov <- covr::package_coverage(
Expand Down
3 changes: 2 additions & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ Imports:
nanonext (>= 1.7.0)
Suggests:
cli,
litedown
litedown,
otel
Enhances:
parallel,
promises
Expand Down
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#### New Features

* Complete observability of mirai requests by emitting OpenTelemetry traces when tracing is enabled by the otelsdk package (#394).
* Adds `info()` as an alternative to `status()` for retrieving more succinct information statistics, more convenient for programmatic use (thanks @wlandau, #410).
* Adds `with_daemons()` and `local_daemons()` helper functions for using a particular compute profile.
These work with daemons that are already set up unlike the existing `with.miraiDaemons()` method, which creates a new scope and tears it down when finished (#360).
Expand Down
24 changes: 22 additions & 2 deletions R/daemon.R
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,12 @@ daemon <- function(
task <- 1L
timeout <- if (idletime > walltime) walltime else if (is.finite(idletime)) idletime
maxtime <- if (is.finite(walltime)) mclock() + walltime else FALSE
if (otel_tracing) {
spn <- otel::start_local_active_span(
"mirai::daemon",
attributes = otel::as_attributes(list(url = url))
)
}

if (dispatcher) {
aio <- recv_aio(sock, mode = 1L, cv = cv)
Expand Down Expand Up @@ -201,15 +207,29 @@ daemon <- function(

# internals --------------------------------------------------------------------

handle_mirai_error <- function(cnd) invokeRestart("mirai_error", cnd, sys.calls())
handle_mirai_error <- function(cnd) {
if (otel_tracing) otel::get_active_span()$set_status("error")
invokeRestart("mirai_error", cnd, sys.calls())
}

handle_mirai_interrupt <- function(cnd) invokeRestart("mirai_interrupt")
handle_mirai_interrupt <- function(cnd) {
if (otel_tracing) otel::get_active_span()$set_status("unset")
invokeRestart("mirai_interrupt")
}

eval_mirai <- function(._mirai_.) {
withRestarts(
withCallingHandlers(
{
list2env(._mirai_.[["._globals_."]], envir = .GlobalEnv)
if (otel_tracing && length(._mirai_.[["._otel_."]])) {
prtctx <- otel::extract_http_context(._mirai_.[["._otel_."]])
spn <- otel::start_local_active_span(
"mirai::daemon->eval",
links = list(daemon = otel::get_active_span()),
options = list(kind = "server", parent = prtctx)
)
}
eval(._mirai_.[["._expr_."]], envir = ._mirai_., enclos = .GlobalEnv)
},
error = handle_mirai_error,
Expand Down
17 changes: 17 additions & 0 deletions R/daemons.R
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ daemons <- function(

if (signal) send_signal(envir)
reap(envir[["sock"]])
if (otel_tracing) envir[["otel_span"]]$set_status("ok")$end()
..[[.compute]] <- NULL -> envir
return(invisible(FALSE))
}
Expand Down Expand Up @@ -292,6 +293,22 @@ daemons <- function(
)
})

if (otel_tracing) {
`[[<-`(
envir,
"otel_span",
otel::start_span(
"mirai::daemons",
attributes = otel::as_attributes(list(
url = envir[["url"]],
n = envir[["n"]],
dispatcher = if (is.null(envir[["dispatcher"]])) "false" else "true",
compute_profile = .compute
))
)
)
}

invisible(`class<-`(TRUE, c("miraiDaemons", .compute)))
}

Expand Down
8 changes: 8 additions & 0 deletions R/map.R
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,14 @@
mirai_map <- function(.x, .f, ..., .args = list(), .promise = NULL, .compute = NULL) {
require_daemons(.compute = .compute, call = environment())
is.function(.f) || stop(sprintf(._[["function_required"]], typeof(.f)))
if (is.null(.compute)) .compute <- .[["cp"]]

if (otel_tracing) {
spn <- otel::start_local_active_span(
"mirai::mirai_map",
links = list(compute_profile = ..[[.compute]][["otel_span"]])
)
}

dx <- dim(.x)
vec <- if (is.null(dx)) {
Expand Down
11 changes: 11 additions & 0 deletions R/mirai-package.R
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@
#' This may be overriden, if desired, by specifying 'url' in the [daemons()]
#' interface and launching daemons using [launch_local()].
#'
#' @section OpenTelemetry:
#'
#' mirai provides comprehensive OpenTelemetry tracing support for observing
#' asynchronous operations and distributed computation. Please refer to the
#' OpenTelemetry vignette for further details:
#' `vignette("v05-opentelemetry", package = "mirai")`
#'
#' @section Reference Manual:
#'
#' `vignette("mirai", package = "mirai")`
Expand All @@ -38,6 +45,7 @@
# tested implicitly

.onLoad <- function(libname, pkgname) {
otel_tracing <<- requireNamespace("otel", quietly = TRUE) && otel::is_tracing_enabled()
switch(
Sys.info()[["sysname"]],
Linux = {
Expand Down Expand Up @@ -85,3 +93,6 @@
),
hash = TRUE
)

otel_tracing <- FALSE
otel_tracer_name <- "org.r-lib.mirai"
26 changes: 18 additions & 8 deletions R/mirai.R
Original file line number Diff line number Diff line change
Expand Up @@ -155,17 +155,25 @@ mirai <- function(.expr, ..., .args = list(), .timeout = NULL, .compute = NULL)
}
all(nzchar(gn)) || stop(._[["named_dots"]])
}
if (length(envir[["seed"]])) {
globals[[".Random.seed"]] <- next_stream(envir)
}
if (length(envir[["seed"]])) globals[[".Random.seed"]] <- next_stream(envir)

data <- list(
._expr_. = if (
is.symbol(expr) &&
exists(as.character(expr), envir = parent.frame()) &&
is.language(.expr)
) .expr else expr,
._globals_. = globals
._globals_. = globals,
._otel_. = if (otel_tracing && length(envir)) {
spn <- otel::start_local_active_span(
"mirai::mirai",
links = list(compute_profile = envir[["otel_span"]]),
options = list(kind = "client")
)
otel::pack_http_context()
}
)

if (length(.args)) {
if (is.environment(.args)) {
.args <- as.list.environment(.args, all.names = TRUE)
Expand All @@ -177,7 +185,7 @@ mirai <- function(.expr, ..., .args = list(), .timeout = NULL, .compute = NULL)

is.null(envir) && return(ephemeral_daemon(data, .timeout))

request(
req <- request(
.context(envir[["sock"]]),
data,
send_mode = 1L,
Expand All @@ -186,6 +194,8 @@ mirai <- function(.expr, ..., .args = list(), .timeout = NULL, .compute = NULL)
cv = envir[["cv"]],
id = envir[["dispatcher"]]
)
if (otel_tracing) spn$set_attribute("mirai.id", attr(req, "id"))
invisible(req)
}

#' Evaluate Everywhere
Expand Down Expand Up @@ -597,16 +607,16 @@ ephemeral_daemon <- function(data, timeout) {
stderr = FALSE,
wait = FALSE
)
aio <- request(
req <- request(
.context(sock),
data,
send_mode = 1L,
recv_mode = 1L,
timeout = timeout,
cv = substitute()
)
`attr<-`(.subset2(aio, "aio"), "sock", sock)
invisible(aio)
`attr<-`(.subset2(req, "aio"), "sock", sock)
invisible(req)
}

deparse_safe <- function(x) {
Expand Down
139 changes: 139 additions & 0 deletions dev/vignettes/_v05-opentelemetry.Rmd
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
---
title: "OpenTelemetry"
vignette: >
%\VignetteIndexEntry{OpenTelemetry}
%\VignetteEngine{litedown::vignette}
%\VignetteEncoding{UTF-8}
---

```{r}
#| include: false
knitr::opts_chunk$set(
collapse = TRUE,
comment = "#>",
out.width = "100%"
)
```

### 1. Introduction

mirai provides comprehensive OpenTelemetry (otel) tracing support for observing asynchronous operations and distributed computation.

When the `otel` and `otelsdk` packages are installed and tracing is enabled, mirai automatically creates spans to track the lifecycle of daemon management, async operations, and task execution.

This enables detailed monitoring of:

- Task submission and completion times
- Daemon lifecycle and performance
- Error tracking and debugging
- Distributed tracing across network boundaries

### 2. Automatic Tracing Setup

Tracing is automatically enabled when:

1. The `otel` and `otelsdk` packages are installed and available
2. OpenTelemetry tracing is configured and enabled (see <https://otelsdk.r-lib.org/reference/collecting.html>)

No additional configuration is required - mirai will automatically detect the presence of OpenTelemetry and begin creating spans.

### 3. Span Types and Hierarchy

mirai creates several types of spans to represent different operations:

#### 3.1 Core Span Types

**`mirai::daemons`** - Root span for daemon management

- **Kind**: `internal`
- **Attributes**: `url`, `n` (number of daemons), `dispatcher` (true/false), `compute_profile`
- **Lifecycle**: Created when `daemons()` is called, ended when daemons are reset

**`mirai::daemon`** - Individual daemon process span

- **Kind**: `internal`
- **Lifecycle**: Tracks the lifetime of a single daemon process

**`mirai::mirai_map`** - Parallel map operation span

- **Kind**: `internal`
- **Lifecycle**: Encompasses the entire map operation across multiple mirai tasks

**`mirai::mirai`** - Client-side async task span

- **Kind**: `client`
- **Attributes**: `mirai.id` (unique task identifier)
- **Lifecycle**: Created when `mirai()` is called, ended as soon as it returns

**`mirai::daemon->eval`** - Server-side task evaluation span

- **Kind**: `server`
- **Lifecycle**: Tracks for the duration of actual mirai evaluation on the daemon

#### 3.2 Span Relationships and Context Propagation

The spans form a distributed structure that traces the complete lifecycle of async operations:

```
mirai::daemons (compute profile - top level)

mirai::daemon (daemon process 1 - top level)
mirai::daemon (daemon process 2 - top level)
mirai::daemon (daemon process N - top level)

mirai::mirai_map (top level)
├── mirai::mirai (task 1) ──link→ mirai::daemons
├── mirai::mirai (task 2) ──link→ mirai::daemons
└── mirai::mirai (task N) ──link→ mirai::daemons
└── mirai::daemon->eval ──link→ mirai::daemon

mirai::mirai (top level) ──link→ mirai::daemons
└── mirai::daemon->eval ──link→ mirai::daemon
```

**Context Propagation**: The context is automatically packaged with each `mirai()` call and extracted on the daemon side, enabling proper parent-child relationships across process boundaries.

**Span Links**: Tasks are linked to their compute profile's `mirai::daemons` span on the client side, and to each `mirai::daemon` span on the server side, showing exactly where each evaluation happened.

### 4. Status and Error Tracking

`mirai::daemon->eval` spans automatically track the success or failure of operations:

**Status Values**:

- `'ok'` - Operation completed successfully
- `'error'` - Operation failed with an error
- `'unset'` - Operation was interrupted

### 5. Monitoring and Observability

The OpenTelemetry spans provide rich observability into mirai operations:

**Performance Monitoring**:

- Track task execution times from submission to completion
- Monitor daemon utilization and load balancing
- Identify bottlenecks in distributed computation

**Error Analysis**:

- Correlate errors with specific tasks and daemons
- Track error rates across different types of operations
- Debug issues in distributed environments

**Distributed Tracing**:

- Follow task execution across network boundaries
- Understand the complete lifecycle of async operations
- Correlate client-side requests with server-side execution

### 6. Integration with Observability Platforms

mirai's OpenTelemetry implementation works seamlessly with any OpenTelemetry-compatible observability platform, including:

- Grafana
- Pydantic Logfire
- Jaeger
- Any of those listed at <https://otelsdk.r-lib.org/reference/collecting.html>.

The tracer name used by mirai is `"org.r-lib.mirai"`, making it easy to filter and identify mirai-related traces in your observability platform.
File renamed without changes.
File renamed without changes.
5 changes: 3 additions & 2 deletions dev/vignettes/precompile.R
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ knitr::knit("dev/vignettes/_v01-map.Rmd", "vignettes/v01-map.Rmd")
knitr::knit("dev/vignettes/_v02-promises.Rmd", "vignettes/v02-promises.Rmd")
knitr::knit("dev/vignettes/_v03-serialization.Rmd", "vignettes/v03-serialization.Rmd")
knitr::knit("dev/vignettes/_v04-parallel.Rmd", "vignettes/v04-parallel.Rmd")
knitr::knit("dev/vignettes/_v05-packages.Rmd", "vignettes/v05-packages.Rmd")
knitr::knit("dev/vignettes/_v06-questions.Rmd", "vignettes/v06-questions.Rmd")
knitr::knit("dev/vignettes/_v05-opentelemetry.Rmd", "vignettes/v05-opentelemetry.Rmd")
knitr::knit("dev/vignettes/_v06-packages.Rmd", "vignettes/v06-packages.Rmd")
knitr::knit("dev/vignettes/_v07-questions.Rmd", "vignettes/v07-questions.Rmd")
9 changes: 9 additions & 0 deletions man/mirai-package.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading