Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ INSTALL_TEMPLATE (ugh/autoconf.h.in DESTINATION include/ugh)

# modules

AUX_SOURCE_DIRECTORY (ugh_example SRC_UGH_EXAMPLE)
ADD_LIBRARY (ugh_example MODULE ${SRC_UGH_EXAMPLE})
ADD_LIBRARY (ugh_example MODULE ugh_example/module.c)
INSTALL (TARGETS ugh_example DESTINATION ${LIBDIR}/ugh)
INSTALL_TEMPLATE (ugh_example/config.cfg.in DESTINATION etc/ugh_example)

ADD_LIBRARY (ugh_external_example MODULE ugh_example/external_module.c)
INSTALL (TARGETS ugh_external_example DESTINATION ${LIBDIR}/ugh)

94 changes: 94 additions & 0 deletions ugh/subreq_external.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
#include "ugh.h"

static
void ugh_subreq_external_timeout(EV_P_ ev_timer *w, int tev)
{
ugh_subreq_external_t *r = aux_memberof(ugh_subreq_external_t, wev_timeout, w);
ugh_subreq_external_del(r);
}


static
void ugh_subreq_wcb_ready(EV_P_ ev_async *w, int tev)
{
ugh_subreq_external_t *r = aux_memberof(ugh_subreq_external_t, wev_ready, w);

ugh_subreq_external_del(r);
}

int ugh_subreq_external_assign_body(ugh_subreq_external_t *r, char *data, size_t size)
{
r->body.data = aux_pool_nalloc(r->c->pool, size);
r->body.size = size;
memcpy(r->body.data, data, size);
return 0;
}


ugh_subreq_external_t *ugh_subreq_external_add(ugh_client_t *c)
{
ugh_subreq_external_t *r;

aux_pool_link(c->pool); /* use c->pool for subreq allocations */

r = (ugh_subreq_external_t *) aux_pool_calloc(c->pool, sizeof(*r));

if (NULL == r)
{
aux_pool_free(c->pool);
return NULL;
}

r->c = c;

r->timeout = UGH_CONFIG_SUBREQ_TIMEOUT; /* XXX this should be "no timeout" by default */

log_debug("ugh_external_subreq_add");

return r;
}

int ugh_subreq_external_set_timeout(ugh_subreq_external_t *r, ev_tstamp timeout, int timeout_type)
{
r->timeout = timeout;
return 0;
}

void ugh_subreq_external_signal_ready(ugh_subreq_external_t* ex_subreq) {
ev_async_send(loop, &ex_subreq->wev_ready);
}

int ugh_subreq_external_run(ugh_subreq_external_t *r)
{
r->c->wait++;
r->request_done = ugh_subreq_external_signal_ready;
ev_timer_init(&r->wev_timeout, &ugh_subreq_external_timeout, 0, r->timeout);
ev_async_init(&r->wev_ready, &ugh_subreq_wcb_ready);

ev_timer_again(loop, &r->wev_timeout);
ev_async_start(loop, &r->wev_ready);
return 0;
}

int ugh_subreq_external_del(ugh_subreq_external_t *r)
{
ev_timer_stop(loop, &r->wev_timeout);
ev_async_stop(loop, &r->wev_ready);

r->c->wait--;

/* We check is_main_coro here, because we could possibly call
* ugh_subreq_del from module coroutine (e.g. when IP-address of
* subrequest was definitely mallformed) and in this case we don't need
* to call coro_transfer
*/
if (0 == r->c->wait && is_main_coro)
{
is_main_coro = 0;
coro_transfer(&ctx_main, &r->c->ctx);
is_main_coro = 1;
}
aux_pool_free(r->c->pool);

return 0;
}
22 changes: 22 additions & 0 deletions ugh/ugh.h
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,9 @@ ugh_upstream_t *ugh_upstream_get(ugh_config_t *cfg, const char *name, size_t siz
typedef struct ugh_subreq
ugh_subreq_t;

typedef struct ugh_subreq_external
ugh_subreq_external_t;

typedef struct ugh_channel
ugh_channel_t;

Expand All @@ -267,6 +270,19 @@ typedef int (*ugh_subreq_handle_fp)(ugh_subreq_t *r, char *data, size_t size);
#define UGH_RESPONSE_CHUNKED -1
#define UGH_RESPONSE_CLOSE_AFTER_BODY -2

struct ugh_subreq_external
{
void* external_data;
void(*request_done)(ugh_subreq_external_t*);
void(*run_external)(void* /*external data*/, ugh_subreq_external_t* /*subrequest to run*/);
ugh_client_t *c;
ev_async wev_ready;
ev_timer wev_timeout;
ev_tstamp timeout;
strt body;
size_t body_capacity;
};

struct ugh_subreq
{
ugh_client_t *c;
Expand Down Expand Up @@ -352,6 +368,10 @@ struct ugh_subreq
#define UGH_SUBREQ_PUSH 2

ugh_subreq_t *ugh_subreq_add(ugh_client_t *c, char *url, size_t size, int flags);
ugh_subreq_external_t *ugh_subreq_external_add(ugh_client_t *c);

int ugh_subreq_external_assign_body(ugh_subreq_external_t *r, char *data, size_t size);

int ugh_subreq_set_method(ugh_subreq_t *r, unsigned char method);
int ugh_subreq_set_body(ugh_subreq_t *r, char *body, size_t body_size);
int ugh_subreq_set_timeout(ugh_subreq_t *r, ev_tstamp timeout, int timeout_type);
Expand All @@ -363,8 +383,10 @@ int ugh_subreq_set_timeout_connect(ugh_subreq_t *r, ev_tstamp timeout);
int ugh_subreq_set_channel(ugh_subreq_t *r, ugh_channel_t *ch, unsigned tag);

int ugh_subreq_run(ugh_subreq_t *r);
int ugh_subreq_external_run(ugh_subreq_external_t *r);
int ugh_subreq_gen(ugh_subreq_t *r, strp u_host);
int ugh_subreq_del(ugh_subreq_t *r, uint32_t ft_type, int ft_errno);
int ugh_subreq_external_del(ugh_subreq_external_t *r);

strp ugh_subreq_get_host(ugh_subreq_t *r);
in_port_t ugh_subreq_get_port(ugh_subreq_t *r);
Expand Down
73 changes: 73 additions & 0 deletions ugh_example/external_module.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#include <ugh/ugh.h>

typedef struct
{
ugh_template_t unused_configurable_parameter;
} ugh_module_ext_example_conf_t;

extern ugh_module_t ugh_module_ext_example;

int ugh_module_ext_example_handle(ugh_client_t *c, void *data, strp body)
{
// ugh_module_ext_example_conf_t *conf = data; /* get module conf */

ugh_subreq_external_t *e_subreq = ugh_subreq_external_add(c);
ugh_subreq_external_run(e_subreq);

// Here lies any magic by which request_done can be called.
// request done is safe to call from other thread
ugh_subreq_external_assign_body(e_subreq, "It works", sizeof("It works"));
e_subreq->request_done(e_subreq);

/* wait for it (do smth in different coroutine while it is being downloaded) */

ugh_subreq_wait(c);

body->data = e_subreq->body.data;
body->size = e_subreq->body.size;

return UGH_HTTP_OK;
}

int ugh_module_ext_example_init(ugh_config_t *cfg, void *data)
{
ugh_module_ext_example_conf_t *conf = data;

log_info("ugh_module_ext_example_init (called for each added handle, each time server is starting), conf=%p", &conf);

return 0;
}

int ugh_module_ext_example_free(ugh_config_t *cfg, void *data)
{
log_info("ugh_module_ext_example_free (called for each added handle, each time server is stopped)");

return 0;
}

int ugh_command_ext_example(ugh_config_t *cfg, int argc, char **argv, ugh_command_t *cmd)
{
ugh_module_ext_example_conf_t *conf;

conf = aux_pool_malloc(cfg->pool, sizeof(*conf));
if (NULL == conf) return -1;

ugh_module_handle_add(ugh_module_ext_example, conf, ugh_module_ext_example_handle);

return 0;
}

static ugh_command_t ugh_module_ext_example_cmds [] =
{
ugh_make_command(ext_example),
ugh_make_command_template(unused_configurable_parameter, ugh_module_ext_example_conf_t, unused_configurable_parameter),
ugh_null_command
};

ugh_module_t ugh_module_ext_example =
{
ugh_module_ext_example_cmds,
ugh_module_ext_example_init,
ugh_module_ext_example_free
};