Skip to content

Commit c0adb19

Browse files
feat: add messagebus cloudevents (#1384)
* feat: add messagebus cloudevents --------- Co-authored-by: Michael Jacoby <michael.jacoby@iosb.fraunhofer.de>
1 parent 2d12294 commit c0adb19

40 files changed

Lines changed: 2886 additions & 40 deletions

File tree

core/src/main/java/de/fraunhofer/iosb/ilt/faaast/service/config/CoreConfig.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ public class CoreConfig {
5353
private RegistrySynchronizationConfig registrySynchronization;
5454
private double minInflateRatio;
5555
private long operationTimeout;
56+
private String callbackAddress;
5657

5758
public CoreConfig() {
5859
this.assetConnectionRetryInterval = DEFAULT_ASSET_CONNECTION_RETRY_INTERVAL;
@@ -237,6 +238,16 @@ public void setOperationTimeout(long operationTimeout) {
237238
}
238239

239240

241+
public String getCallbackAddress() {
242+
return callbackAddress;
243+
}
244+
245+
246+
public void setCallbackAddress(String callbackAddress) {
247+
this.callbackAddress = callbackAddress;
248+
}
249+
250+
240251
@Override
241252
public int hashCode() {
242253
return Objects.hash(assetConnectionRetryInterval,
@@ -251,7 +262,8 @@ public int hashCode() {
251262
submodelRegistries,
252263
registrySynchronization,
253264
minInflateRatio,
254-
operationTimeout);
265+
operationTimeout,
266+
callbackAddress);
255267
}
256268

257269

@@ -279,7 +291,8 @@ public boolean equals(Object obj) {
279291
&& Objects.equals(this.submodelRegistries, other.submodelRegistries)
280292
&& Objects.equals(this.registrySynchronization, other.registrySynchronization)
281293
&& Objects.equals(this.minInflateRatio, other.minInflateRatio)
282-
&& Objects.equals(this.operationTimeout, other.operationTimeout);
294+
&& Objects.equals(this.operationTimeout, other.operationTimeout)
295+
&& Objects.equals(callbackAddress, other.callbackAddress);
283296
}
284297

285298
public static class Builder extends ExtendableBuilder<CoreConfig, Builder> {
@@ -398,6 +411,12 @@ public Builder operationTimeout(long value) {
398411
}
399412

400413

414+
public Builder callbackAddress(String value) {
415+
getBuildingInstance().setCallbackAddress(value);
416+
return getSelf();
417+
}
418+
419+
401420
@Override
402421
protected Builder getSelf() {
403422
return this;

docs/source/basics/configuration.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ The `core` configuration block contains properties not related to the implementa
4444
| assetConnectionReadTimeout<br>*(optional)* | Long | Timeout in ms for reading all asset connections for a single AAS command | 5000 |
4545
| assetConnectionRetryInterval<br>*(optional)* | Long | Interval in ms in which to retry establishing asset connections | 1000 |
4646
| assetConnectionWriteMaxThreadPoolSize<br>*(optional)* | Integer | Size of thread pool used to write to asset connections | 1000 |
47+
| callbackAddress<br>*(optional)* | String | The external URI the FA³ST Service is reachable from. Used in registry synchronization and cloud events message bus. | |
4748
| minInflateRatio<br>*(optional)* | Double | Ratio between de- and inflated bytes to detect zipbomb when loading AASX files | 0.001 |
4849
| operationTimeout<br>*(optional)* | Long | Timeout in ms for executing AAS operations. Set to 0 for no timeout. | 3600000 |
4950
| requestHandlerThreadPoolSize<br>*(optional)* | Integer | Number of concurrent thread that can execute API requests | 2 |

docs/source/interfaces/endpoint.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ The HTTP Endpoint is based on the document [Details of the Asset Administration
4848
:::{table} Configuration properties of HTTP Endpoint.
4949
| Name | Allowed Value | Description | Default Value |
5050
| --------------------------------------- | ----------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------- |
51-
| callbackAddress<br>*(optional)* | String | The callback URI to be used for automated registration of descriptors at a registry. **Overrides `hostname`.** A descriptor will contain the following `href`-endpoints: [{callbackAddress}, {callbackAddress}/{id}] | |
5251
| certificate<br>*(optional)* | [CertificateInfo](#providing-certificates-in-configuration) | The HTTPS certificate to use.<br> | self-signed certificate |
5352
| corsAllowCredentials<br>*(optional)* | Boolean | Sets the `Access-Control-Allow-Credentials` response header. | false |
5453
| corsAllowedHeaders<br>*(optional)* | String (comma-separated list) | Sets the `Access-Control-Allow-Headers` response header. | * |

docs/source/interfaces/message-bus.md

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,3 +161,63 @@ For deserialization of events the class `JsonEventDeserializer` in module `dataf
161161
//...
162162
}
163163
```
164+
165+
166+
167+
## CloudEvents
168+
169+
This implementation of the `MessageBus` interface publishes CloudEvent messages via MQTT by using an externally hosted MQTT broker.
170+
171+
### Topics & Payload
172+
173+
Each message type is published on its own topic in the form of `[topicPrefix]`.
174+
The payload is a JSON serialization of a CloudEvent as specified in the async-aas specification: https://factory-x-contributions.github.io/async-aas-helm
175+
176+
### Configuration
177+
178+
:::{table} Configuration properties of MQTT MessageBus.
179+
| Name | Allowed Value | Description | Default Value |
180+
| ----------------------------------- | ----------------------------------------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------- | ---------------------------------------------------------------------------------------------------- |
181+
| host<br> | String | The host name of the MQTT server with scheme, port and path segments if relevant. | |
182+
| clientId<br>*(optional)* | String | ClientId to use when connecting to the MQTT server. This clientId will only be used to identify as a MQTT publisher, not for oauth-flows or as a username. | FA3ST MQTT MessageBus |
183+
| username<br>*(optional)* | String | Username used to connect to the MQTT server. | |
184+
| password<br>*(optional)* | String | Password used to connect to the MQTT server. | |
185+
| identityProviderUrl<br>*(optional)* | String | Oauth2 IdP URL. Obtained tokens are placed as the MQTT broker password, the username is obtained from the username config. Token refresh happens automatically. | |
186+
| oauth2ClientId<br>*(optional)* | String | Oauth2 client id to use when retrieving an oauth2-token from the IdP. | |
187+
| oauth2ClientSecret<br>*(optional)* | String | Oauth2 client secret to use when retrieving an oauth2-token from the IdP. | |
188+
| topicPrefix<br>*(optional)* | String | Prefix to use for the topic names. | noauth |
189+
| slimEvents<br>*(optional)* | boolean | Whether the full referable is sent in a CloudEvent's data-field. If true, the data-field will be empty. | true |
190+
| eventTypePrefix<br>*(optional)* | String | Prefix to use in the CloudEvents type-field. Should end with a ".". | io.admin-shell.events.v1. |
191+
| dataSchemaPrefix<br>*(optional)* | String | Prefix to use in the CloudEvents dataschema-field. Should end with a "/". | https://api.swaggerhub.com/domains/Plattform_i40/Part1-MetaModel-Schemas/V3.1.0#/components/schemas/ |
192+
| clientCertificate<br>*(optional)* | [CertificateInfo](#providing-certificates-in-configuration) | The client certificate to use. If not set, SSL will be disabled. | |
193+
:::
194+
195+
```{code-block} json
196+
:caption: Example configuration for MQTT MessageBus.
197+
:lineno-start: 1
198+
{
199+
"messageBus": {
200+
"@class": "de.fraunhofer.iosb.ilt.faaast.service.messagebus.mqtt.MessageBusMqtt",
201+
"useInternalServer": true,
202+
"host": "tcp://localhost:1883",
203+
"clientCertificate": {
204+
"keyStoreType": "PKCS12",
205+
"keyStorePath": "C:\faaast\MyKeyStore.p12",
206+
"keyStorePassword": "changeit",
207+
"keyAlias": "client-key",
208+
"keyPassword": "changeit"
209+
},
210+
"username": "broker-user",
211+
"password": "broker-password",
212+
"identityProviderUrl": "http://localhost:12345/realms/fa3st/protocol/openid-connect/token",
213+
"oauth2ClientId": "my-keycloak-clientid",
214+
"oauth2ClientSecret": "my-keycloak-clientsecret",
215+
"clientId": "FA³ST Service client",
216+
"topicPrefix": "noauth",
217+
"slimEvents": true,
218+
"eventTypePrefix": "io.admin-shell.events.v1.",
219+
"dataSchemaPrefix": "https://api.swaggerhub.com/domains/Plattform_i40/Part1-MetaModel-Schemas/V3.1.0#/components/schemas/"
220+
},
221+
//...
222+
}
223+
```

docs/source/other/release-notes.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
- HTTP
3838
- Add logging of raw HTTP requests & responses with level `TRACE`
3939
- MessageBus
40+
- CloudEventsMessageBus now available implementing the async-aas specification in https://factory-x-contributions.github.io/async-aas-helm/
4041
- MQTT
4142
- Changed default value for `host` from localhost to 0.0.0.0
4243
- SMT Processor

endpoint/http/src/main/java/de/fraunhofer/iosb/ilt/faaast/service/endpoint/http/HttpEndpoint.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@
1616

1717
import static de.fraunhofer.iosb.ilt.faaast.service.certificate.util.KeyStoreHelper.DEFAULT_ALIAS;
1818

19+
import de.fraunhofer.iosb.ilt.faaast.service.ServiceContext;
1920
import de.fraunhofer.iosb.ilt.faaast.service.certificate.CertificateData;
2021
import de.fraunhofer.iosb.ilt.faaast.service.certificate.CertificateInformation;
2122
import de.fraunhofer.iosb.ilt.faaast.service.certificate.util.KeyStoreHelper;
23+
import de.fraunhofer.iosb.ilt.faaast.service.config.CoreConfig;
2224
import de.fraunhofer.iosb.ilt.faaast.service.endpoint.AbstractEndpoint;
2325
import de.fraunhofer.iosb.ilt.faaast.service.endpoint.http.util.HttpHelper;
2426
import de.fraunhofer.iosb.ilt.faaast.service.exception.EndpointException;
@@ -77,6 +79,7 @@ public class HttpEndpoint extends AbstractEndpoint<HttpEndpointConfig> {
7779
private static final String ENDPOINT_PROTOCOL = "HTTP";
7880
private static final String ENDPOINT_PROTOCOL_VERSION = "1.1";
7981
private Server server;
82+
private String callbackAddress;
8083

8184
@Override
8285
public HttpEndpointConfig asConfig() {
@@ -121,6 +124,13 @@ public void start() throws EndpointException {
121124
}
122125

123126

127+
@Override
128+
public void init(CoreConfig coreConfig, HttpEndpointConfig config, ServiceContext serviceContext) {
129+
callbackAddress = coreConfig.getCallbackAddress();
130+
super.init(coreConfig, config, serviceContext);
131+
}
132+
133+
124134
private void configureHttpServer() throws EndpointException {
125135
HttpConfiguration httpConfig = new HttpConfiguration();
126136
httpConfig.setSendServerVersion(false);
@@ -298,9 +308,9 @@ private String render(String subprotocolBodyTemplate, String identifiableId) {
298308
private URI getEndpointUri() {
299309
URI result = server.getURI();
300310
try {
301-
if (Objects.nonNull(config.getCallbackAddress())) {
311+
if (Objects.nonNull(callbackAddress)) {
302312
result = buildUri(
303-
config.getCallbackAddress(),
313+
callbackAddress,
304314
// server URI path comes before configured prefix
305315
result.getPath(),
306316
config.getPathPrefix());
@@ -319,7 +329,7 @@ else if (Objects.nonNull(config.getHostname())) {
319329
}
320330
catch (URISyntaxException e) {
321331
LOGGER.error("error creating endpoint URI for HTTP endpoint based on hostname from configuration (callbackAddress: {}, hostname: {}): {}",
322-
config.getCallbackAddress(), config.getHostname(), e.getMessage());
332+
callbackAddress, config.getHostname(), e.getMessage());
323333
}
324334
return result;
325335
}

endpoint/http/src/main/java/de/fraunhofer/iosb/ilt/faaast/service/endpoint/http/HttpEndpointConfig.java

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ public static Builder builder() {
4444
return new Builder();
4545
}
4646

47-
private String callbackAddress;
4847
private CertificateConfig certificate;
4948
private boolean corsEnabled;
5049
private boolean corsAllowCredentials;
@@ -81,16 +80,6 @@ public HttpEndpointConfig() {
8180
}
8281

8382

84-
public String getCallbackAddress() {
85-
return callbackAddress;
86-
}
87-
88-
89-
public void setCallbackAddress(String callbackAddress) {
90-
this.callbackAddress = callbackAddress;
91-
}
92-
93-
9483
public CertificateConfig getCertificate() {
9584
return certificate;
9685
}
@@ -277,7 +266,6 @@ public boolean equals(Object o) {
277266
}
278267
HttpEndpointConfig that = (HttpEndpointConfig) o;
279268
return super.equals(o)
280-
&& Objects.equals(callbackAddress, that.callbackAddress)
281269
&& Objects.equals(certificate, that.certificate)
282270
&& Objects.equals(corsEnabled, that.corsEnabled)
283271
&& Objects.equals(corsAllowCredentials, that.corsAllowCredentials)
@@ -302,7 +290,6 @@ public boolean equals(Object o) {
302290
public int hashCode() {
303291
return Objects.hash(
304292
super.hashCode(),
305-
callbackAddress,
306293
certificate,
307294
corsEnabled,
308295
corsAllowCredentials,
@@ -331,12 +318,6 @@ private void validatePathPrefix(String pathPrefix) {
331318

332319
private abstract static class AbstractBuilder<T extends HttpEndpointConfig, B extends AbstractBuilder<T, B>> extends EndpointConfig.AbstractBuilder<HttpEndpoint, T, B> {
333320

334-
public B callbackAddress(String value) {
335-
getBuildingInstance().setCallbackAddress(value);
336-
return getSelf();
337-
}
338-
339-
340321
public B certificate(CertificateConfig value) {
341322
getBuildingInstance().setCertificate(value);
342323
return getSelf();

endpoint/http/src/test/java/de/fraunhofer/iosb/ilt/faaast/service/endpoint/http/AbstractHttpEndpointTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import com.fasterxml.jackson.core.type.TypeReference;
2323
import de.fraunhofer.iosb.ilt.faaast.service.Service;
24+
import de.fraunhofer.iosb.ilt.faaast.service.config.CoreConfig;
2425
import de.fraunhofer.iosb.ilt.faaast.service.dataformat.DeserializationException;
2526
import de.fraunhofer.iosb.ilt.faaast.service.endpoint.http.request.mapper.QueryParameters;
2627
import de.fraunhofer.iosb.ilt.faaast.service.endpoint.http.serialization.HttpJsonApiDeserializer;
@@ -132,6 +133,7 @@ public abstract class AbstractHttpEndpointTest {
132133
protected static HttpJsonApiDeserializer deserializer;
133134
protected static HttpJsonApiSerializer serializer;
134135
protected static Server server;
136+
protected static CoreConfig coreConfig = CoreConfig.builder().callbackAddress("http://invalid.local").build();
135137

136138
@Before
137139
public void setUp() {
@@ -306,9 +308,8 @@ public void testGetAasEndpointInformationWithCallbackAddress() {
306308
ProtocolInformation protocolInformation = actual.get(0).getProtocolInformation();
307309

308310
HttpEndpointConfig config = endpoint.asConfig();
309-
if (config.getCallbackAddress() != null) {
310-
Assert.assertEquals(config.getCallbackAddress().concat(endpoint.getPathPrefix()).concat("/shells"), protocolInformation.getHref());
311-
}
311+
312+
Assert.assertEquals(coreConfig.getCallbackAddress().concat(endpoint.getPathPrefix()).concat("/shells"), protocolInformation.getHref());
312313
Assert.assertEquals(config.getSubprotocol(), protocolInformation.getSubprotocol());
313314
Assert.assertEquals(config.getSubprotocolBody(), protocolInformation.getSubprotocolBody());
314315
Assert.assertEquals(config.getSubprotocolBodyEncoding(), protocolInformation.getSubprotocolBodyEncoding());

0 commit comments

Comments
 (0)