Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/coro/net/socket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class socket

/**
* @param how Shuts the socket down with the given operations.
* @param Returns true if the sockets given operations were shutdown.
* @return Returns true if the sockets given operations were shutdown.
*/
auto shutdown(poll_op how = poll_op::read_write) -> bool;

Expand Down
10 changes: 9 additions & 1 deletion include/coro/net/tcp/server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,19 @@ class server

/**
* Accepts an incoming tcp client connection. On failure the tls clients socket will be set to
* and invalid state, use the socket.is_value() to verify the client was correctly accepted.
* and invalid state, use the socket.is_valid() to verify the client was correctly accepted.
* @return The newly connected tcp client connection.
*/
auto accept() -> coro::net::tcp::client;

/**
* @return The tcp accept socket this server is using.
* @{
**/
[[nodiscard]] auto accept_socket() -> net::socket& { return m_accept_socket; }
[[nodiscard]] auto accept_socket() const -> const net::socket& { return m_accept_socket; }
/** @} */

private:
friend client;
/// The io scheduler for awaiting new connections.
Expand Down
8 changes: 8 additions & 0 deletions include/coro/net/tls/server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ class server
*/
auto accept(std::chrono::milliseconds timeout = std::chrono::seconds{30}) -> coro::task<coro::net::tls::client>;

/**
* @return The tcp accept socket this server is using.
* @{
**/
[[nodiscard]] auto accept_socket() -> net::socket& { return m_accept_socket; }
[[nodiscard]] auto accept_socket() const -> const net::socket& { return m_accept_socket; }
/** @} */

private:
/// The io scheduler for awaiting new connections.
std::shared_ptr<io_scheduler> m_io_scheduler{nullptr};
Expand Down
2 changes: 1 addition & 1 deletion src/detail/io_notifier_epoll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ auto io_notifier_epoll::watch(fd_t fd, coro::poll_op op, void* data, bool keep)
auto io_notifier_epoll::watch(detail::poll_info& pi) -> bool
{
auto event_data = event_t{};
event_data.events = static_cast<uint32_t>(pi.m_op) | EPOLLONESHOT | EPOLLRDHUP;
event_data.events = static_cast<uint32_t>(pi.m_op) | EPOLLONESHOT | EPOLLRDHUP | EPOLLHUP;
event_data.data.ptr = static_cast<void*>(&pi);
return ::epoll_ctl(m_fd, EPOLL_CTL_ADD, pi.m_fd, &event_data) != -1;
}
Expand Down
11 changes: 7 additions & 4 deletions src/detail/io_notifier_kqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,14 +160,17 @@ auto io_notifier_kqueue::event_to_poll_status(const event_t& event) -> poll_stat
{
return poll_status::event;
}
else if (event.flags & EV_ERROR)

if (event.flags & EV_EOF)
{
return poll_status::error;
return poll_status::closed;
}
else if (event.flags & EV_EOF)

if (event.flags & EV_ERROR)
{
return poll_status::closed;
return poll_status::error;
}

throw std::runtime_error{"invalid kqueue state"};
}

Expand Down
14 changes: 14 additions & 0 deletions test/bench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,13 @@ TEST_CASE("benchmark tcp::server echo server thread pool", "[benchmark]")
while (true)
{
auto pstatus = co_await client.poll(coro::poll_op::read);
if (pstatus != coro::poll_status::event)
{
REQUIRE_THREAD_SAFE(pstatus == coro::poll_status::closed);
// the socket has been closed
break;
}

REQUIRE_THREAD_SAFE(pstatus == coro::poll_status::event);

auto [rstatus, rspan] = client.recv(in);
Expand Down Expand Up @@ -597,6 +604,13 @@ TEST_CASE("benchmark tcp::server echo server inline", "[benchmark]")
while (true)
{
auto pstatus = co_await client.poll(coro::poll_op::read);
if (pstatus != coro::poll_status::event)
{
REQUIRE_THREAD_SAFE(pstatus == coro::poll_status::closed);
// the socket has been closed
break;
}

REQUIRE_THREAD_SAFE(pstatus == coro::poll_status::event);

auto [rstatus, rspan] = client.recv(in);
Expand Down
53 changes: 53 additions & 0 deletions test/net/test_tcp_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,14 @@

#include <iostream>

TEST_CASE("tcp_server", "[tcp_server]")
{
std::cerr << "[tcp_server]\n\n";
}

TEST_CASE("tcp_server ping server", "[tcp_server]")
{
std::cerr << "BEGIN tcp_server ping server\n";
const std::string client_msg{"Hello from client"};
const std::string server_msg{"Reply from server!"};

Expand Down Expand Up @@ -89,10 +95,12 @@ TEST_CASE("tcp_server ping server", "[tcp_server]")

coro::sync_wait(coro::when_all(
make_server_task(scheduler, client_msg, server_msg), make_client_task(scheduler, client_msg, server_msg)));
std::cerr << "END tcp_server ping server\n";
}

TEST_CASE("tcp_server concurrent polling on the same socket", "[tcp_server]")
{
std::cerr << "BEGIN tcp_server concurrent polling on the same socket\n";
// Issue 224: This test duplicates a client and issues two different poll operations per coroutine.

using namespace std::chrono_literals;
Expand Down Expand Up @@ -173,6 +181,51 @@ TEST_CASE("tcp_server concurrent polling on the same socket", "[tcp_server]")
auto response = std::move(std::get<1>(result).return_value());

REQUIRE(request == response);
std::cerr << "END tcp_server concurrent polling on the same socket\n";
}

#ifndef __APPLE__
// This test is known to not work on kqueue style systems (e.g. apple) because the socket shutdown()
// call does not properly trigger an EV_EOF flag on the accept socket.

TEST_CASE("tcp_server graceful shutdown via socket", "[tcp_server]")
{
std::cerr << "BEGIN tcp_server graceful shutdown via socket\n";
auto scheduler = coro::io_scheduler::make_shared(coro::io_scheduler::options{
.execution_strategy = coro::io_scheduler::execution_strategy_t::process_tasks_inline});
coro::net::tcp::server server{scheduler};
coro::event started{};

auto make_accept_task = [](coro::net::tcp::server& server, coro::event& started) -> coro::task<void>
{
std::cerr << "make accept task start\n";
started.set();
auto poll_result = co_await server.poll();
REQUIRE(poll_result == coro::poll_status::closed);
auto client = server.accept();
REQUIRE_FALSE(client.socket().is_valid());
std::cerr << "make accept task completed\n";
};

scheduler->spawn(make_accept_task(server, started));

coro::sync_wait(started);
// we'll wait a bit to make sure the server.poll() is fully registered.
std::this_thread::sleep_for(std::chrono::milliseconds(500));

server.accept_socket().shutdown(coro::poll_op::read_write);

scheduler->shutdown();
std::cerr << "END tcp_server graceful shutdown via socket\n";
}

#endif

TEST_CASE("~tcp_server", "[tcp_server]")
{
std::cerr << "[~tcp_server]\n\n";
}


#endif // LIBCORO_FEATURE_NETWORKING

Loading