[k2] rpc client boost#1635
Conversation
156e9ec to
222adae
Compare
| // prepare and send RPC request | ||
| // 'request_buf' will look like this: | ||
| // [ RpcExtraHeaders (optional) ] [ payload ] | ||
| if (const auto& [opt_new_extra_header, cur_extra_header_size]{kphp::rpc::regularize_extra_headers(tl_storer.view(), ignore_answer)}; opt_new_extra_header) { |
There was a problem hiding this comment.
В новом коде нет вызова regularize_extra_headers(...)
| const auto it_awaiter_task{rpc_client_instance_st.response_awaiter_tasks.find(request_id)}; | ||
| if (it_awaiter_task == rpc_client_instance_st.response_awaiter_tasks.end()) [[unlikely]] { | ||
| kphp::log::warning("could not find rpc query with id {} in pending queries", queue_id); | ||
| const auto it_rpc_request_info{rpc_client_instance_st.rpc_requests_infos.find(request_id)}; |
There was a problem hiding this comment.
кажется, это не будет работать для ignore_answer запросов (если для них, вообще, вызывается rpc_queue_push)
| if (!first_response_size) { | ||
| return std::unexpected{std::make_pair(TL_ERROR_INTERNAL, string{"error fetching rpc response"})}; | ||
| } | ||
| string response{reinterpret_cast<char*>(k2::alloc(*first_response_size)), static_cast<string::size_type>(*first_response_size)}; |
There was a problem hiding this comment.
Looks like we can have a single buffer for RPC response to reuse
| std::chrono::nanoseconds now_ns{now_instant.time_point_ns}; | ||
| std::chrono::nanoseconds timeout{rpc_request_info.deadline - now_ns}; | ||
| kphp::coro::io_scheduler& m_scheduler{kphp::coro::io_scheduler::get()}; | ||
| switch (co_await m_scheduler.poll(rpc_request_info.rpc_d, kphp::coro::poll_op::read, timeout)) { |
There was a problem hiding this comment.
It's quite hard to understand what's going on here. Can you please make it more human readable? For example, I think we don't need to directly use the scheduler's API here
There was a problem hiding this comment.
Tried to refactor and improve code readability:
- k2-rpc-api logic refactored to separate
kphp::rpc::query_handleclass - time juggling moved to util functions
| k2::TimePoint now_instant{}; | ||
| k2::instant(std::addressof(now_instant)); | ||
| std::chrono::nanoseconds now_ns{now_instant.time_point_ns}; | ||
| std::chrono::nanoseconds timeout{rpc_request_info.deadline - now_ns}; |
There was a problem hiding this comment.
What will happen if nobody will fetch response? I'm afraid we might hang in this case
…GAIN` errno if response is not ready
a342aa9 to
e77636c
Compare
| * `EAI_MEMORY` => max descriptors count achieved | ||
| * `EINVAL` => invalid `actor_name` or request, or connection pool is empty for this actor. | ||
| */ | ||
| int32_t k2_rpc_send(const char* actor_name, size_t actor_name_len, const void* request_ptr, size_t request_size, enum RpcKind rpc_kind, uint64_t* rpc_d); |
There was a problem hiding this comment.
Let's do either:
- rename
k2_rpc_send->k2_rpc_send_request - rename
k2_rpc_fetch_response->k2_rpc_fetch,k2_rpc_get_response_size->k2_rpc_response_size
| return k2_component_access(component_name.size(), component_name.data()); | ||
| } | ||
|
|
||
| inline std::expected<uint64_t, int32_t> rpc_send_request(std::string_view actor_name, std::span<const std::byte> request_buffer) noexcept { |
There was a problem hiding this comment.
- we have
k2::descriptortype that should be used here instead ofuint64_t - shouldn't we add
rpc_kindparameter?
There was a problem hiding this comment.
Let's put these functions to kphp::time namespace and name them like kphp::time::remaining and kphp::time::expires_at.
P.S. we try to not introduce util as it leads to a bunch of disparate functionality.
No description provided.