Skip to content

Commit 3e8d1f5

Browse files
oxseanzrlw
andauthored
Support returning JSON content when using SSE (#15464)
* Support returning JSON content when using SSE * Update Http2SseServerChannelObserver.java * Update Http2SseServerChannelObserver.java --------- Co-authored-by: zrlw <zrlw@sina.com>
1 parent 43478c1 commit 3e8d1f5

4 files changed

Lines changed: 61 additions & 3 deletions

File tree

dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,13 +166,13 @@ protected final HttpMetadata buildMetadata(
166166
if (message != null) {
167167
headers.set(HttpHeaderNames.CONTENT_TYPE.getKey(), responseEncoder.contentType());
168168
}
169+
customizeHeaders(headers, throwable, message);
169170
if (data instanceof HttpResult) {
170171
HttpResult<?> result = (HttpResult<?>) data;
171172
if (result.getHeaders() != null) {
172173
headers.set(result.getHeaders());
173174
}
174175
}
175-
customizeHeaders(headers, throwable, message);
176176
return metadata;
177177
}
178178

@@ -198,7 +198,7 @@ protected final void sendMetadata(HttpMetadata metadata) {
198198
}
199199
}
200200

201-
protected final HttpOutputMessage buildMessage(int statusCode, Object data) throws Throwable {
201+
protected HttpOutputMessage buildMessage(int statusCode, Object data) throws Throwable {
202202
if (statusCode < 200 || statusCode == 204 || statusCode == 304) {
203203
return null;
204204
}

dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/ServerStreamServerCallListener.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.apache.dubbo.rpc.protocol.tri.h12;
1818

1919
import org.apache.dubbo.common.stream.StreamObserver;
20+
import org.apache.dubbo.remoting.http12.HttpResult;
2021
import org.apache.dubbo.remoting.http12.exception.HttpStatusException;
2122
import org.apache.dubbo.rpc.Invoker;
2223
import org.apache.dubbo.rpc.RpcInvocation;
@@ -29,7 +30,12 @@ public ServerStreamServerCallListener(
2930
}
3031

3132
@Override
32-
public void onReturn(Object value) {}
33+
public void onReturn(Object value) {
34+
if (value instanceof HttpResult) {
35+
responseObserver.onNext(value);
36+
responseObserver.onCompleted();
37+
}
38+
}
3339

3440
@Override
3541
public void onMessage(Object message) {

dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/Http1SseServerChannelObserver.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,24 @@
2020
import org.apache.dubbo.remoting.http12.HttpConstants;
2121
import org.apache.dubbo.remoting.http12.HttpHeaderNames;
2222
import org.apache.dubbo.remoting.http12.HttpMetadata;
23+
import org.apache.dubbo.remoting.http12.HttpOutputMessage;
24+
import org.apache.dubbo.remoting.http12.HttpResult;
2325
import org.apache.dubbo.remoting.http12.h1.Http1ServerChannelObserver;
2426
import org.apache.dubbo.remoting.http12.message.HttpMessageEncoder;
2527
import org.apache.dubbo.remoting.http12.message.ServerSentEventEncoder;
2628

2729
public class Http1SseServerChannelObserver extends Http1ServerChannelObserver {
2830

31+
private HttpMessageEncoder originalResponseEncoder;
32+
2933
public Http1SseServerChannelObserver(HttpChannel httpChannel) {
3034
super(httpChannel);
3135
}
3236

3337
@Override
3438
public void setResponseEncoder(HttpMessageEncoder responseEncoder) {
3539
super.setResponseEncoder(new ServerSentEventEncoder(responseEncoder));
40+
this.originalResponseEncoder = responseEncoder;
3641
}
3742

3843
@Override
@@ -49,4 +54,25 @@ protected HttpMetadata encodeHttpMetadata(boolean endStream) {
4954
.header(HttpHeaderNames.TRANSFER_ENCODING.getKey(), HttpConstants.CHUNKED)
5055
.header(HttpHeaderNames.CACHE_CONTROL.getKey(), HttpConstants.NO_CACHE);
5156
}
57+
58+
@Override
59+
protected HttpOutputMessage buildMessage(int statusCode, Object data) throws Throwable {
60+
if (data instanceof HttpResult) {
61+
data = ((HttpResult<?>) data).getBody();
62+
63+
if (data == null && statusCode != 200) {
64+
return null;
65+
}
66+
67+
HttpOutputMessage message = encodeHttpOutputMessage(data);
68+
try {
69+
originalResponseEncoder.encode(message.getBody(), data);
70+
} catch (Throwable t) {
71+
message.close();
72+
throw t;
73+
}
74+
return message;
75+
}
76+
return super.buildMessage(statusCode, data);
77+
}
5278
}

dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2SseServerChannelObserver.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,25 @@
1919
import org.apache.dubbo.remoting.http12.HttpConstants;
2020
import org.apache.dubbo.remoting.http12.HttpHeaderNames;
2121
import org.apache.dubbo.remoting.http12.HttpMetadata;
22+
import org.apache.dubbo.remoting.http12.HttpOutputMessage;
23+
import org.apache.dubbo.remoting.http12.HttpResult;
2224
import org.apache.dubbo.remoting.http12.h2.H2StreamChannel;
2325
import org.apache.dubbo.remoting.http12.message.HttpMessageEncoder;
2426
import org.apache.dubbo.remoting.http12.message.ServerSentEventEncoder;
2527
import org.apache.dubbo.rpc.model.FrameworkModel;
2628

2729
public final class Http2SseServerChannelObserver extends Http2StreamServerChannelObserver {
2830

31+
private HttpMessageEncoder originalResponseEncoder;
32+
2933
public Http2SseServerChannelObserver(FrameworkModel frameworkModel, H2StreamChannel h2StreamChannel) {
3034
super(frameworkModel, h2StreamChannel);
3135
}
3236

3337
@Override
3438
public void setResponseEncoder(HttpMessageEncoder responseEncoder) {
3539
super.setResponseEncoder(new ServerSentEventEncoder(responseEncoder));
40+
this.originalResponseEncoder = responseEncoder;
3641
}
3742

3843
@Override
@@ -41,6 +46,27 @@ protected HttpMetadata encodeHttpMetadata(boolean endStream) {
4146
.header(HttpHeaderNames.CACHE_CONTROL.getKey(), HttpConstants.NO_CACHE);
4247
}
4348

49+
@Override
50+
protected HttpOutputMessage buildMessage(int statusCode, Object data) throws Throwable {
51+
if (data instanceof HttpResult) {
52+
data = ((HttpResult<?>) data).getBody();
53+
54+
if (data == null && statusCode != 200) {
55+
return null;
56+
}
57+
58+
HttpOutputMessage message = encodeHttpOutputMessage(data);
59+
try {
60+
originalResponseEncoder.encode(message.getBody(), data);
61+
} catch (Throwable t) {
62+
message.close();
63+
throw t;
64+
}
65+
return message;
66+
}
67+
return super.buildMessage(statusCode, data);
68+
}
69+
4470
@Override
4571
protected void doOnCompleted(Throwable throwable) {
4672
// if throwable is not null, the header will be flushed by super.doOnCompleted(throwable)

0 commit comments

Comments
 (0)