This repository was archived by the owner on Sep 17, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 247
Expand file tree
/
Copy pathutils.py
More file actions
142 lines (106 loc) · 4.27 KB
/
utils.py
File metadata and controls
142 lines (106 loc) · 4.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
from datetime import UTC, datetime
from grpc.framework.foundation import future
from grpc.framework.interfaces.face import face
from opencensus.trace import execution_context, time_event
def extract_byte_size(proto_message):
"""Gets the byte size from a google.protobuf or proto-plus message"""
if hasattr(proto_message, "ByteSize"):
# google.protobuf message
return proto_message.ByteSize()
if hasattr(type(proto_message), "pb"):
# proto-plus message
return type(proto_message).pb(proto_message).ByteSize()
return None
def add_message_event(proto_message, span, message_event_type, message_id=1):
"""Adds a MessageEvent to the span based off of the given protobuf
message
"""
span.add_message_event(
time_event.MessageEvent(
datetime.datetime.now(UTC),
message_id,
type=message_event_type,
uncompressed_size_bytes=extract_byte_size(proto_message),
)
)
def wrap_iter_with_message_events(request_or_response_iter, span,
message_event_type):
"""Wraps a request or response iterator to add message events to the span
for each proto message sent or received
"""
for message_id, message in enumerate(request_or_response_iter, start=1):
add_message_event(
proto_message=message,
span=span,
message_event_type=message_event_type,
message_id=message_id)
yield message
def wrap_iter_with_end_span(response_iter):
"""Wraps an iterator to end the current span on completion"""
for response in response_iter:
yield response
execution_context.get_opencensus_tracer().end_span()
class WrappedResponseIterator(future.Future, face.Call):
"""Wraps the rpc response iterator.
The grpc.StreamStreamClientInterceptor abstract class states stream
interceptor method should return an object that's both a call (implementing
the response iterator) and a future. Thus, this class is a thin wrapper
around the rpc response to provide the opencensus extension.
:type iterator: (future.Future, face.Call)
:param iterator: rpc response iterator
:type span: opencensus.trace.Span
:param span: rpc span
"""
def __init__(self, iterator, span):
self._iterator = iterator
self._span = span
self._messages_received = 0
def add_done_callback(self, fn):
self._iterator.add_done_callback(lambda ignored_callback: fn(self))
def __iter__(self):
return self
def __next__(self):
try:
message = next(self._iterator)
except StopIteration:
execution_context.get_opencensus_tracer().end_span()
raise
self._messages_received += 1
add_message_event(
proto_message=message,
span=self._span,
message_event_type=time_event.Type.RECEIVED,
message_id=self._messages_received)
return message
def next(self):
return self.__next__()
def cancel(self):
return self._iterator.cancel()
def is_active(self):
return self._iterator.is_active()
def cancelled(self):
raise NotImplementedError() # pragma: NO COVER
def running(self):
raise NotImplementedError() # pragma: NO COVER
def done(self):
raise NotImplementedError() # pragma: NO COVER
def result(self, timeout=None):
raise NotImplementedError() # pragma: NO COVER
def exception(self, timeout=None):
raise NotImplementedError() # pragma: NO COVER
def traceback(self, timeout=None):
raise NotImplementedError() # pragma: NO COVER
def initial_metadata(self):
raise NotImplementedError() # pragma: NO COVER
def terminal_metadata(self):
raise NotImplementedError() # pragma: NO COVER
def code(self):
raise NotImplementedError() # pragma: NO COVER
def details(self):
raise NotImplementedError() # pragma: NO COVER
def time_remaining(self):
raise NotImplementedError() # pragma: NO COVER
def add_abortion_callback(self, abortion_callback):
raise NotImplementedError() # pragma: NO COVER
def protocol_context(self):
raise NotImplementedError() # pragma: NO COVER