Skip to content
This repository was archived by the owner on Jul 31, 2023. It is now read-only.

Commit 11f1a46

Browse files
author
Ian Sturdy
authored
Use asynchronous rpcs in the Stackdriver stats exporter. (#135)
1 parent 6cb57aa commit 11f1a46

1 file changed

Lines changed: 35 additions & 15 deletions

File tree

opencensus/exporters/stats/stackdriver/internal/stackdriver_exporter.cc

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,12 @@
1414

1515
#include "opencensus/exporters/stats/stackdriver/stackdriver_exporter.h"
1616

17+
#include <algorithm>
18+
#include <cmath>
19+
#include <cstdint>
20+
#include <memory>
21+
#include <vector>
22+
1723
#include "absl/strings/str_cat.h"
1824
#include "absl/strings/string_view.h"
1925
#include "absl/synchronization/mutex.h"
@@ -88,26 +94,40 @@ void StackdriverExporter::Handler::ExportViewData(
8894
view_time_series.end());
8995
}
9096

91-
// TODO: use asynchronous RPCs.
92-
int i = 0;
93-
while (i < time_series.size()) {
97+
const int num_rpcs =
98+
ceil(static_cast<double>(time_series.size()) / kTimeSeriesBatchSize);
99+
100+
std::vector<std::pair<grpc::Status, ::grpc::ClientContext>> responses(
101+
num_rpcs);
102+
// We can safely re-use an empty response--it is never updated.
103+
google::protobuf::Empty response;
104+
grpc::CompletionQueue cq;
105+
106+
for (int rpc_index = 0; rpc_index < num_rpcs; ++rpc_index) {
94107
auto request = google::monitoring::v3::CreateTimeSeriesRequest();
95108
request.set_name(project_id_);
96-
for (int batch_index = 0; batch_index < kTimeSeriesBatchSize;
97-
++batch_index) {
109+
const int batch_end = std::min(static_cast<int>(time_series.size()),
110+
(rpc_index + 1) * kTimeSeriesBatchSize);
111+
for (int i = rpc_index * kTimeSeriesBatchSize; i < batch_end; ++i) {
98112
*request.add_time_series() = time_series[i];
99-
if (++i >= time_series.size()) {
100-
break;
113+
};
114+
auto rpc(stub_->AsyncCreateTimeSeries(&responses[rpc_index].second, request,
115+
&cq));
116+
rpc->Finish(&response, &responses[rpc_index].first,
117+
(void*)(uintptr_t)rpc_index);
118+
}
119+
120+
cq.Shutdown();
121+
void* tag;
122+
bool ok;
123+
while (cq.Next(&tag, &ok)) {
124+
if (ok) {
125+
const auto& status = responses[(uintptr_t)tag].first;
126+
if (!status.ok()) {
127+
std::cerr << "CreateTimeSeries request failed: "
128+
<< opencensus::common::ToString(status) << "\n";
101129
}
102130
}
103-
::grpc::ClientContext context;
104-
google::protobuf::Empty response;
105-
::grpc::Status status =
106-
stub_->CreateTimeSeries(&context, request, &response);
107-
if (!status.ok()) {
108-
std::cerr << "CreateTimeSeries request failed: "
109-
<< opencensus::common::ToString(status) << "\n";
110-
}
111131
}
112132
}
113133

0 commit comments

Comments
 (0)