Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions include/fluent-bit/flb_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,11 @@ struct flb_config {
char *storage_type; /* global storage type */
int storage_inherit; /* apply storage type to inputs */

/* DLQ for non-retriable output failures */
int storage_keep_rejected; /* 0/1 */
char *storage_rejected_path; /* relative to storage_path, default "rejected" */
void *storage_rejected_stream; /* NULL until first use */

/* Embedded SQL Database support (SQLite3) */
#ifdef FLB_HAVE_SQLDB
struct mk_list sqldb_list;
Expand Down Expand Up @@ -411,6 +416,9 @@ enum conf_type {
#define FLB_CONF_STORAGE_TRIM_FILES "storage.trim_files"
#define FLB_CONF_STORAGE_TYPE "storage.type"
#define FLB_CONF_STORAGE_INHERIT "storage.inherit"
/* Storage DLQ */
#define FLB_CONF_STORAGE_KEEP_REJECTED "storage.keep.rejected"
#define FLB_CONF_STORAGE_REJECTED_PATH "storage.rejected.path"

/* Coroutines */
#define FLB_CONF_STR_CORO_STACK_SIZE "Coro_Stack_Size"
Expand Down
9 changes: 9 additions & 0 deletions include/fluent-bit/flb_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ static inline char *flb_storage_get_type(int type)
return NULL;
}

struct flb_input_instance;

int flb_storage_create(struct flb_config *ctx);
int flb_storage_input_create(struct cio_ctx *cio,
struct flb_input_instance *in);
Expand All @@ -85,4 +87,11 @@ int flb_storage_metrics_update(struct flb_config *config, struct flb_storage_met

void flb_storage_chunk_count(struct flb_config *ctx, int *mem_chunks, int *fs_chunks);

/* DLQ */
int flb_storage_quarantine_chunk(struct flb_config *ctx,
struct cio_chunk *ch,
const char *tag,
int status_code,
const char *out_name);

#endif
11 changes: 11 additions & 0 deletions src/flb_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,13 @@ struct flb_service_config service_configs[] = {
{FLB_CONF_STORAGE_INHERIT,
FLB_CONF_TYPE_BOOL,
offsetof(struct flb_config, storage_inherit)},
/* Storage / DLQ */
{FLB_CONF_STORAGE_KEEP_REJECTED,
FLB_CONF_TYPE_BOOL,
offsetof(struct flb_config, storage_keep_rejected)},
{FLB_CONF_STORAGE_REJECTED_PATH,
FLB_CONF_TYPE_STR,
offsetof(struct flb_config, storage_rejected_path)},

/* Coroutines */
{FLB_CONF_STR_CORO_STACK_SIZE,
Expand Down Expand Up @@ -312,6 +319,7 @@ struct flb_config *flb_config_init()
config->storage_type = NULL;
config->storage_inherit = FLB_FALSE;
config->storage_bl_flush_on_shutdown = FLB_FALSE;
config->storage_rejected_path = NULL;
config->sched_cap = FLB_SCHED_CAP;
config->sched_base = FLB_SCHED_BASE;
config->json_escape_unicode = FLB_TRUE;
Expand Down Expand Up @@ -573,6 +581,9 @@ void flb_config_exit(struct flb_config *config)
if (config->storage_bl_mem_limit) {
flb_free(config->storage_bl_mem_limit);
}
if (config->storage_rejected_path) {
flb_free(config->storage_rejected_path);
}

#ifdef FLB_HAVE_STREAM_PROCESSOR
if (config->stream_processor_file) {
Expand Down
51 changes: 51 additions & 0 deletions src/flb_engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,50 @@ static inline double calculate_chunk_capacity_percent(struct flb_output_instance
((double)ins->total_limit_size));
}

static void handle_dlq_if_available(struct flb_config *config,
struct flb_task *task,
struct flb_output_instance *ins,
int status_code /* pass 0 if unknown */)
{
const char *tag_buf = NULL;
int tag_len = 0;
flb_sds_t tag_sds = NULL;
const char *tag = NULL;
const char *out = NULL;
struct flb_input_chunk *ic;
struct cio_chunk *cio_ch;

if (!config || !config->storage_keep_rejected || !task || !task->ic || !ins) {
return;
}

ic = (struct flb_input_chunk *) task->ic;

if (!ic || !ic->chunk) {
return;
}

/* Obtain tag from the input chunk API (no direct field available) */
if (flb_input_chunk_get_tag(ic, &tag_buf, &tag_len) == 0 && tag_buf && tag_len > 0) {
tag_sds = flb_sds_create_len(tag_buf, tag_len); /* make it NUL-terminated */
tag = tag_sds;
}
else {
/* Fallback: use input instance name */
tag = flb_input_name(task->i_ins);
}

out = flb_output_name(ins);
cio_ch = (struct cio_chunk *) ic->chunk; /* ic->chunk is a cio_chunk* under the hood */

/* Copy bytes into DLQ stream (filesystem) */
(void) flb_storage_quarantine_chunk(config, cio_ch, tag, status_code, out);

if (tag_sds) {
flb_sds_destroy(tag_sds);
}
}

static inline int handle_output_event(uint64_t ts,
struct flb_config *config,
uint64_t val)
Expand Down Expand Up @@ -353,6 +397,8 @@ static inline int handle_output_event(uint64_t ts,
}
else if (ret == FLB_RETRY) {
if (ins->retry_limit == FLB_OUT_RETRY_NONE) {
handle_dlq_if_available(config, task, ins, 0);

/* cmetrics: output_dropped_records_total */
cmt_counter_add(ins->cmt_dropped_records, ts, task->records,
1, (char *[]) {name});
Expand Down Expand Up @@ -388,6 +434,8 @@ static inline int handle_output_event(uint64_t ts,
* - It reached the maximum number of re-tries
*/

handle_dlq_if_available(config, task, ins, 0);

/* cmetrics */
cmt_counter_inc(ins->cmt_retries_failed, ts, 1, (char *[]) {name});
cmt_counter_add(ins->cmt_dropped_records, ts, task->records,
Expand Down Expand Up @@ -429,6 +477,8 @@ static inline int handle_output_event(uint64_t ts,
* memory available or we ran out of file descriptors.
*/
if (retry_seconds == -1) {
handle_dlq_if_available(config, task, ins, 0);

flb_warn("[engine] retry for chunk '%s' could not be scheduled: "
"input=%s > output=%s",
flb_input_chunk_get_name(task->ic),
Expand Down Expand Up @@ -465,6 +515,7 @@ static inline int handle_output_event(uint64_t ts,
}
}
else if (ret == FLB_ERROR) {
handle_dlq_if_available(config, task, ins, 0);
/* cmetrics */
cmt_counter_inc(ins->cmt_errors, ts, 1, (char *[]) {name});
cmt_counter_add(ins->cmt_dropped_records, ts, task->records,
Expand Down
145 changes: 145 additions & 0 deletions src/flb_storage.c
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,151 @@ void flb_storage_chunk_count(struct flb_config *ctx, int *mem_chunks, int *fs_ch
*fs_chunks = storage_st.chunks_fs;
}

/* Replace '/', '\\' and ':' with '_' to make filename components safe */
static inline void sanitize_name_component(const char *in, char *out, size_t out_sz)
{
size_t i;

if (out_sz == 0) {
return;
}

if (!in) {
in = "no-tag";
}

for (i = 0; i < out_sz - 1 && in[i] != '\0'; i++) {
out[i] = (in[i] == '/' || in[i] == '\\' || in[i] == ':') ? '_' : in[i];
}
out[i] = '\0';
}

static struct cio_stream *get_or_create_rejected_stream(struct flb_config *ctx)
{
#ifdef CIO_HAVE_BACKEND_FILESYSTEM
struct cio_stream *st;
const char *name;

if (!ctx || !ctx->cio) {
return NULL;
}
if (!ctx->storage_keep_rejected || !ctx->storage_path) {
return NULL;
}

if (ctx->storage_rejected_stream) {
return ctx->storage_rejected_stream;
}

name = ctx->storage_rejected_path ? ctx->storage_rejected_path : "rejected";

st = cio_stream_get(ctx->cio, name);
if (!st) {
st = cio_stream_create(ctx->cio, name, FLB_STORAGE_FS);
}
if (!st) {
flb_warn("[storage] failed to create rejected stream '%s'", name);
return NULL;
}

ctx->storage_rejected_stream = st;
return st;
#else
FLB_UNUSED(ctx);
return NULL;
#endif
}

static inline int flb_storage_chunk_restore_state(struct cio_chunk *src, int was_up, int ret_val)
{
if (!was_up) {
if (cio_chunk_down(src) != CIO_OK) {
flb_debug("[storage] failed to bring chunk back down");
}
}

return ret_val;
}

int flb_storage_quarantine_chunk(struct flb_config *ctx,
struct cio_chunk *src,
const char *tag,
int status_code,
const char *out_name)
{
#ifdef CIO_HAVE_BACKEND_FILESYSTEM
struct cio_stream *dlq;
void *buf = NULL;
int was_up = 0;
size_t size = 0;
int err = 0;
char name[256];
struct cio_chunk *dst;
char safe_tag[128];
char safe_out[64];

if (!ctx || !src) {
return -1;
}
dlq = get_or_create_rejected_stream(ctx);
if (!dlq) {
return -1;
}

/* Remember original state and bring the chunk up if needed */
was_up = (cio_chunk_is_up(src) == CIO_TRUE);
if (!was_up) {
if (cio_chunk_up_force(src) != CIO_OK) {
flb_warn("[storage] cannot bring chunk up to copy into DLQ");
return -1;
}
}

sanitize_name_component(tag, safe_tag, sizeof(safe_tag));
sanitize_name_component(out_name ? out_name : "out", safe_out, sizeof(safe_out));

/* Compose a simple, unique-ish file name with sanitized pieces */
snprintf(name, sizeof(name),
"%s_%d_%s_%p.flb",
safe_tag, status_code, safe_out, (void *) src);

if (cio_chunk_get_content_copy(src, &buf, &size) != CIO_OK || size == 0) {
flb_warn("[storage] cannot read content for DLQ copy (size=%zu)", size);
return flb_storage_chunk_restore_state(src, was_up, -1);
}

/* Create + write the DLQ copy */
dst = cio_chunk_open(ctx->cio, dlq, name, CIO_OPEN, size, &err);
if (!dst) {
flb_warn("[storage] DLQ open failed (err=%d)", err);
flb_free(buf);
return flb_storage_chunk_restore_state(src, was_up, -1);
}
if (cio_chunk_write(dst, buf, size) != CIO_OK ||
cio_chunk_sync(dst) != CIO_OK) {
flb_warn("[storage] DLQ write/sync failed");
cio_chunk_close(dst, CIO_TRUE);
flb_free(buf);
return flb_storage_chunk_restore_state(src, was_up, -1);
}

cio_chunk_close(dst, CIO_FALSE);
flb_free(buf);

flb_info("[storage] quarantined rejected chunk into DLQ stream (bytes=%zu)", size);

return flb_storage_chunk_restore_state(src, was_up, 0);
#else
FLB_UNUSED(ctx);
FLB_UNUSED(src);
FLB_UNUSED(tag);
FLB_UNUSED(status_code);
FLB_UNUSED(out_name);

return -1;
#endif
}

void flb_storage_destroy(struct flb_config *ctx)
{
struct cio_ctx *cio;
Expand Down
1 change: 1 addition & 0 deletions tests/internal/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ set(UNIT_TESTS_FILES
storage_inherit.c
unicode.c
opentelemetry.c
storage_dlq.c
)

# TLS helpers
Expand Down
Loading
Loading