@@ -24,6 +24,12 @@ use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
2424use super :: db_factory:: namespace_from_headers;
2525use super :: AppState ;
2626
27+ const LAGGED_MSG : & str = "some changes were lost" ;
28+ const KEEP_ALIVE_INTERVAL : Duration = Duration :: from_secs ( 15 ) ;
29+ const KEEP_ALIVE_TEXT : & str = "keep-alive" ;
30+
31+ type SseStream = Pin < Box < dyn Stream < Item = Result < Event , Infallible > > + Send > > ;
32+
2733#[ derive( Debug , Clone , Deserialize ) ]
2834#[ serde( rename_all = "lowercase" ) ]
2935pub enum Action {
@@ -39,6 +45,28 @@ pub struct ListenQuery {
3945 action : Option < Vec < Action > > ,
4046}
4147
48+ #[ derive( Debug , Serialize ) ]
49+ #[ serde( rename_all = "snake_case" ) ]
50+ enum AggregatorEvent {
51+ Error ( & ' static str ) ,
52+ #[ serde( untagged) ]
53+ Changes ( BroadcastMsg ) ,
54+ }
55+
56+ enum ListenResponse {
57+ SSE ( Sse < SseStream > ) ,
58+ Redirect ( Redirect ) ,
59+ }
60+
61+ impl IntoResponse for ListenResponse {
62+ fn into_response ( self ) -> axum:: response:: Response {
63+ match self {
64+ ListenResponse :: SSE ( sse) => sse. into_response ( ) ,
65+ ListenResponse :: Redirect ( redirect) => redirect. into_response ( ) ,
66+ }
67+ }
68+ }
69+
4270pub ( super ) async fn handle_listen (
4371 auth : Authenticated ,
4472 AxumState ( state) : AxumState < AppState > ,
@@ -56,8 +84,12 @@ pub(super) async fn handle_listen(
5684 return Err ( Error :: NamespaceDoesntExist ( namespace. to_string ( ) ) ) ;
5785 }
5886
59- if let Some ( primary_url) = state. primary_url {
60- let url = primary_url + uri. path_and_query ( ) . map_or ( "" , |x| x. as_str ( ) ) ;
87+ if let Some ( primary_url) = state. primary_url . as_ref ( ) {
88+ let url = format ! (
89+ "{}{}" ,
90+ primary_url,
91+ uri. path_and_query( ) . map_or( "" , |x| x. as_str( ) )
92+ ) ;
6193 return Ok ( ListenResponse :: Redirect ( Redirect :: temporary ( & url) ) ) ;
6294 }
6395
@@ -72,34 +104,12 @@ pub(super) async fn handle_listen(
72104 Ok ( ListenResponse :: SSE (
73105 Sse :: new ( stream) . keep_alive (
74106 axum:: response:: sse:: KeepAlive :: new ( )
75- . interval ( Duration :: from_secs ( 15 ) )
76- . text ( "keep-alive" ) ,
107+ . interval ( KEEP_ALIVE_INTERVAL )
108+ . text ( KEEP_ALIVE_TEXT ) ,
77109 ) ,
78110 ) )
79111}
80112
81- static LAGGED_MSG : & str = "some changes were lost" ;
82-
83- #[ derive( Debug , Serialize ) ]
84- #[ serde( rename_all = "snake_case" ) ]
85- enum AggregatorEvent {
86- Error ( & ' static str ) ,
87- #[ serde( untagged) ]
88- Changes ( BroadcastMsg ) ,
89- }
90-
91- struct Subscription {
92- store : NamespaceStore ,
93- namespace : NamespaceName ,
94- table : String ,
95- }
96-
97- impl Drop for Subscription {
98- fn drop ( & mut self ) {
99- self . store . unsubscribe ( self . namespace . clone ( ) , & self . table ) ;
100- }
101- }
102-
103113async fn sse_stream (
104114 store : NamespaceStore ,
105115 namespace : NamespaceName ,
@@ -128,24 +138,20 @@ async fn listen_stream(
128138 actions : Option < Vec < Action > > ,
129139) -> impl Stream < Item = crate :: Result < AggregatorEvent > > {
130140 async_stream:: try_stream! {
131- let _sub = Subscription {
132- store: store. clone( ) ,
133- namespace: namespace. clone( ) ,
134- table: table. clone( ) ,
135- } ;
136-
137- let mut stream = store. subscribe( namespace. clone( ) , table. clone( ) ) ;
141+ let _sub = Subscription :: new( store. clone( ) , namespace. clone( ) , table. clone( ) ) ;
142+ let mut stream = store. subscribe( namespace, table) ;
138143
139144 while let Some ( item) = stream. next( ) . await {
140145 match item {
141- Ok ( msg) => if filter_actions( & msg, & actions) {
146+ Ok ( msg) if filter_actions( & msg, & actions) => {
142147 LISTEN_EVENTS_SENT . increment( 1 ) ;
143148 yield AggregatorEvent :: Changes ( msg) ;
144149 } ,
145150 Err ( BroadcastStreamRecvError :: Lagged ( n) ) => {
146151 LISTEN_EVENTS_DROPPED . increment( n as u64 ) ;
147152 yield AggregatorEvent :: Error ( LAGGED_MSG ) ;
148153 } ,
154+ _ => { }
149155 }
150156 }
151157 }
@@ -165,18 +171,24 @@ fn filter_actions(msg: &BroadcastMsg, actions: &Option<Vec<Action>>) -> bool {
165171 } )
166172}
167173
168- type SseStream = Pin < Box < dyn Stream < Item = Result < Event , Infallible > > + Send > > ;
169-
170- enum ListenResponse {
171- SSE ( Sse < SseStream > ) ,
172- Redirect ( Redirect ) ,
174+ struct Subscription {
175+ store : NamespaceStore ,
176+ namespace : NamespaceName ,
177+ table : String ,
173178}
174179
175- impl IntoResponse for ListenResponse {
176- fn into_response ( self ) -> axum:: response:: Response {
177- match self {
178- ListenResponse :: SSE ( sse) => sse. into_response ( ) ,
179- ListenResponse :: Redirect ( redirect) => redirect. into_response ( ) ,
180+ impl Subscription {
181+ fn new ( store : NamespaceStore , namespace : NamespaceName , table : String ) -> Self {
182+ Self {
183+ store,
184+ namespace,
185+ table,
180186 }
181187 }
182188}
189+
190+ impl Drop for Subscription {
191+ fn drop ( & mut self ) {
192+ self . store . unsubscribe ( self . namespace . clone ( ) , & self . table ) ;
193+ }
194+ }
0 commit comments