Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions libs/server-sent-events/include/launchdarkly/sse/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ class Builder {
using ErrorCallback = std::function<void(Error)>;
using ConnectionHook =
std::function<void(http::request<http::string_body>*)>;
using ResponseHook =
std::function<void(http::response_header<> const& headers)>;

/**
* Create a builder for the given URL. If the port is omitted, 443 is
Expand Down Expand Up @@ -196,6 +198,20 @@ class Builder {
*/
Builder& on_connect(ConnectionHook hook);

/**
* Register a hook invoked once per (re)connect attempt after the response
* headers have been received, before any branching on status. Fires for
* every HTTP response (any status code, including redirects and errors).
* Does NOT fire if the connection failed before any response arrived
* (e.g. TCP error, read timeout).
*
* The hook is invoked on the client's executor; callers must not block.
*
* @param hook Callback invoked with the response header.
* @return Reference to this builder.
*/
Builder& on_response(ResponseHook hook);

/**
* Builds a Client. The shared pointer is necessary to extend the lifetime
* of the Client to encompass each asynchronous operation that it performs.
Expand All @@ -219,6 +235,7 @@ class Builder {
std::optional<std::string> custom_ca_file_;
std::optional<std::string> proxy_url_;
ConnectionHook connection_hook_;
ResponseHook response_hook_;
};

/**
Expand Down
21 changes: 18 additions & 3 deletions libs/server-sent-events/src/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class FoxyClient : public Client,
Builder::LogCallback logger,
Builder::ErrorCallback errors,
Builder::ConnectionHook connection_hook,
Builder::ResponseHook response_hook,
std::optional<net::ssl::context> maybe_ssl)
: ssl_context_(std::move(maybe_ssl)),
host_(std::move(host)),
Expand All @@ -87,6 +88,7 @@ class FoxyClient : public Client,
logger_(std::move(logger)),
errors_(std::move(errors)),
connection_hook_(std::move(connection_hook)),
response_hook_(std::move(response_hook)),
body_parser_(std::nullopt),
session_(std::nullopt),
last_event_id_(std::nullopt),
Expand Down Expand Up @@ -256,6 +258,9 @@ class FoxyClient : public Client,

/* headers are finished, body is ready */
auto response = body_parser_->get();
if (response_hook_) {
response_hook_(response.base());
}
auto status_class = beast::http::to_status_class(response.result());

if (status_class == beast::http::status_class::successful) {
Expand Down Expand Up @@ -517,6 +522,10 @@ class FoxyClient : public Client,
// updating query parameters from external state).
Builder::ConnectionHook connection_hook_;

// Optional hook invoked once per (re)connect after the response headers
// have been received, before any branching on status.
Builder::ResponseHook response_hook_;

// Customized parser (see parser.hpp) which repeatedly receives chunks of
// data and parses them into SSE events. It cannot be reused across
// connections, hence the optional so it can be destroyed easily.
Expand Down Expand Up @@ -650,6 +659,11 @@ Builder& Builder::on_connect(ConnectionHook hook) {
return *this;
}

Builder& Builder::on_response(ResponseHook hook) {
response_hook_ = std::move(hook);
return *this;
}

std::shared_ptr<Client> Builder::build() {
auto uri_components = boost::urls::parse_uri(url_);
if (!uri_components) {
Expand Down Expand Up @@ -697,8 +711,8 @@ std::shared_ptr<Client> Builder::build() {
return std::make_shared<CurlClient>(
net::make_strand(executor_), request, host, service, connect_timeout_,
read_timeout_, write_timeout_, initial_reconnect_delay_, receiver_,
logging_cb_, error_cb_, connection_hook_, skip_verify_peer_,
custom_ca_file_, use_https, proxy_url_);
logging_cb_, error_cb_, connection_hook_, response_hook_,
skip_verify_peer_, custom_ca_file_, use_https, proxy_url_);
#else
std::optional<ssl::context> ssl;
if (uri_components->scheme_id() == boost::urls::scheme::https) {
Expand All @@ -720,7 +734,8 @@ std::shared_ptr<Client> Builder::build() {
return std::make_shared<FoxyClient>(
net::make_strand(executor_), request, host, service, connect_timeout_,
read_timeout_, write_timeout_, initial_reconnect_delay_, receiver_,
logging_cb_, error_cb_, connection_hook_, std::move(ssl));
logging_cb_, error_cb_, connection_hook_, response_hook_,
std::move(ssl));
#endif
}

Expand Down
114 changes: 108 additions & 6 deletions libs/server-sent-events/src/curl_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@
#include "curl_client.hpp"

#include <boost/asio/post.hpp>
#include <boost/beast/core/string.hpp>
#include <boost/beast/http/status.hpp>
#include <boost/url/url.hpp>

#include <charconv>
#include <sstream>
#include <system_error>

namespace launchdarkly::sse {
namespace beast = boost::beast;
Expand Down Expand Up @@ -41,6 +44,7 @@ CurlClient::CurlClient(
Builder::LogCallback logger,
Builder::ErrorCallback errors,
Builder::ConnectionHook connection_hook,
Builder::ResponseHook response_hook,
bool skip_verify_peer,
std::optional<std::string> custom_ca_file,
bool use_https,
Expand All @@ -51,6 +55,7 @@ CurlClient::CurlClient(
logger_(std::move(logger)),
errors_(std::move(errors)),
connection_hook_(std::move(connection_hook)),
response_hook_(std::move(response_hook)),
use_https_(use_https),
backoff_timer_(executor),
multi_manager_(CurlMultiManager::create(executor)),
Expand Down Expand Up @@ -149,6 +154,19 @@ void CurlClient::do_run() {
self->log_message(message);
}
}
},
[weak_self, weak_ctx](http::response_header<> headers) {
if (auto ctx = weak_ctx.lock()) {
if (auto const self = weak_self.lock()) {
if (self->response_hook_) {
boost::asio::post(
self->backoff_timer_.get_executor(),
[self, headers = std::move(headers)]() {
self->response_hook_(headers);
});
}
}
}
}));
// Start request using CURL multi (non-blocking)
PerformRequestWithMulti(multi_manager_, ctx);
Expand Down Expand Up @@ -397,19 +415,98 @@ size_t CurlClient::WriteCallback(char const* data,
// Callback for reading request headers
//
// https://curl.se/libcurl/c/CURLOPT_HEADERFUNCTION.html
//
// libcurl invokes this once per CRLF-terminated response line: the HTTP status
// line, then each header, then an empty terminator. With
// CURLOPT_FOLLOWLOCATION enabled the cycle repeats for each response in the
// redirect chain.
size_t CurlClient::HeaderCallback(char const* buffer,
size_t size,
size_t nitems,
void* userdata) {
size_t const total_size = size * nitems;
auto* context = static_cast<RequestContext*>(userdata);

// Check for Content-Type header
if (std::string const header(buffer, total_size);
header.find("Content-Type:") == 0 ||
header.find("content-type:") == 0) {
if (header.find("text/event-stream") == std::string::npos) {
context->log_message("warning: unexpected Content-Type: " + header);
std::string_view line(buffer, total_size);

// Strip the line terminator. Allow bare LF or bare CR per RFC 9112 §2.2;
// libcurl preserves the original wire bytes for HTTP/1.x (only HTTP/2
// synthesizes CRLF), so a non-compliant origin can deliver bare LF here.
while (!line.empty() && (line.back() == '\r' || line.back() == '\n')) {
line.remove_suffix(1);
}

if (line.empty()) {
// Terminator. If we're between responses (e.g., the line ends a
// chunked-transfer trailer block), there's nothing to emit.
if (context->reading_headers) {
context->response(std::move(context->current_response));
context->current_response = http::response_header<>{};
context->reading_headers = false;
}
return total_size;
Comment thread
cursor[bot] marked this conversation as resolved.
}

if (line.substr(0, 5) == "HTTP/") {
// Status line: "HTTP/X.Y CODE REASON". Only legitimate before any
// header has been seen for this response — an interior HTTP/ line
// would otherwise wipe accumulated state.
if (context->reading_headers) {
return total_size;
Comment thread
cursor[bot] marked this conversation as resolved.
}
// Beast default-constructs result_ to status::ok (200); reset to 0
// so an unparseable status line surfaces as result_int() == 0.
context->current_response = http::response_header<>{};
Comment thread
beekld marked this conversation as resolved.
context->current_response.result(0);
auto const code_start = line.find(' ');
if (code_start != std::string_view::npos) {
unsigned code = 0;
auto const result = std::from_chars(
line.data() + code_start + 1, line.data() + line.size(), code);
// Three-digit status per RFC 7231 §6; the tight bound avoids
// result(unsigned) throwing across the libcurl C frame.
if (result.ec == std::errc{} && code >= 100 && code <= 999) {
context->current_response.result(code);
Comment thread
cursor[bot] marked this conversation as resolved.
}
}
context->reading_headers = true;
return total_size;
}

if (!context->reading_headers) {
// Header line received outside an active response — chunked trailer
// or protocol-level junk. Ignore.
return total_size;
}

auto const colon = line.find(':');
if (colon != std::string_view::npos) {
std::string_view name = line.substr(0, colon);
// HTTP optional whitespace (OWS) per RFC 7230 §3.2.3 is SP or HTAB.
std::string_view value = line.substr(colon + 1);
while (!value.empty() &&
(value.front() == ' ' || value.front() == '\t')) {
value.remove_prefix(1);
}
while (!value.empty() &&
(value.back() == ' ' || value.back() == '\t')) {
value.remove_suffix(1);
}
// insert() preserves duplicate-name headers (Set-Cookie, Via, …);
// set() would collapse them and diverge from the Foxy backend.
try {
context->current_response.insert(std::string(name),
std::string(value));
} catch (std::exception const&) {
// insert() throws if the name isn't a valid RFC 7230 token.
context->log_message("ignoring response header with invalid name");
return total_size;
}

if (beast::iequals(name, "Content-Type") &&
value.find("text/event-stream") == std::string_view::npos) {
context->log_message("warning: unexpected Content-Type: " +
std::string(line));
}
}

Expand All @@ -426,6 +523,11 @@ void CurlClient::PerformRequestWithMulti(
// Initialize parser for new connection (last_event_id is tracked
// separately)
context->init_parser();
// Reset header-accumulator state in case the previous transfer dropped
// mid-headers, which would otherwise leave reading_headers=true and
// cause the new response's HTTP/ status line to be skipped.
context->current_response = http::response_header<>{};
context->reading_headers = false;

std::shared_ptr<CURL> curl(curl_easy_init(), curl_easy_cleanup);
if (!curl) {
Expand Down
29 changes: 27 additions & 2 deletions libs/server-sent-events/src/curl_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,20 @@ class CurlClient final : public Client,
std::function<void(Error)> on_error;
std::function<void()> reset_backoff;
std::function<void(std::string)> log_message;
std::function<void(http::response_header<>)> on_response;

Callbacks(std::function<void(std::string)> do_backoff,
std::function<void(Event)> on_receive,
std::function<void(Error)> on_error,
std::function<void()> reset_backoff,
std::function<void(std::string)> log_message)
std::function<void(std::string)> log_message,
std::function<void(http::response_header<>)> on_response)
: do_backoff(std::move(do_backoff)),
on_receive(std::move(on_receive)),
on_error(std::move(on_error)),
reset_backoff(std::move(reset_backoff)),
log_message(std::move(log_message)) {}
log_message(std::move(log_message)),
on_response(std::move(on_response)) {}
};

/**
Expand Down Expand Up @@ -94,6 +97,16 @@ class CurlClient final : public Client,
std::chrono::steady_clock::time_point last_progress_time;
curl_off_t last_download_amount;

// Accumulator for the current response's headers; reset on each new
// status line, emitted on the empty terminator line.
http::response_header<> current_response;
Comment thread
beekld marked this conversation as resolved.

// True while accumulating headers between an `HTTP/` status line and
// the empty terminator. Gates `HeaderCallback` against chunked
// trailers (which arrive without a fresh status line) and against
// interior `HTTP/` lines that would otherwise wipe accumulated state.
bool reading_headers = false;

// Mutated on the strand in do_run() before each transfer, and read by
// libcurl via raw pointers (CURLOPT_URL, CURLOPT_POSTFIELDS) for the
// duration of the transfer. Safe because the next do_run() only fires
Expand Down Expand Up @@ -158,6 +171,16 @@ class CurlClient final : public Client,
}
}

void response(http::response_header<> headers) {
std::lock_guard lock(mutex_);
if (shutting_down_) {
return;
}
if (callbacks_) {
callbacks_->on_response(std::move(headers));
}
}

void set_callbacks(Callbacks callbacks) {
std::lock_guard lock(mutex_);
callbacks_ = std::move(callbacks);
Expand Down Expand Up @@ -238,6 +261,7 @@ class CurlClient final : public Client,
Builder::LogCallback logger,
Builder::ErrorCallback errors,
Builder::ConnectionHook connection_hook,
Builder::ResponseHook response_hook,
Comment thread
beekld marked this conversation as resolved.
bool skip_verify_peer,
std::optional<std::string> custom_ca_file,
bool use_https,
Expand Down Expand Up @@ -294,6 +318,7 @@ class CurlClient final : public Client,
Builder::LogCallback logger_;
Builder::ErrorCallback errors_;
Builder::ConnectionHook connection_hook_;
Builder::ResponseHook response_hook_;

bool use_https_;
boost::asio::steady_timer backoff_timer_;
Expand Down
Loading
Loading