Skip to content

Commit 64227d1

Browse files
authored
feat: efm2 (#371)
1 parent 068e0c0 commit 64227d1

27 files changed

Lines changed: 853 additions & 83 deletions

common/lib/connection_plugin_chain_builder.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import { LimitlessConnectionPluginFactory } from "./plugins/limitless/limitless_
4141
import { FastestResponseStrategyPluginFactory } from "./plugins/strategy/fastest_response/fastest_respose_strategy_plugin_factory";
4242
import { CustomEndpointPluginFactory } from "./plugins/custom_endpoint/custom_endpoint_plugin_factory";
4343
import { ConfigurationProfile } from "./profile/configuration_profile";
44+
import { HostMonitoring2PluginFactory } from "./plugins/efm2/host_monitoring2_plugin_factory";
4445

4546
/*
4647
Type alias used for plugin factory sorting. It holds a reference to a plugin
@@ -63,6 +64,7 @@ export class ConnectionPluginChainBuilder {
6364
["failover", { factory: FailoverPluginFactory, weight: 700 }],
6465
["failover2", { factory: Failover2PluginFactory, weight: 710 }],
6566
["efm", { factory: HostMonitoringPluginFactory, weight: 800 }],
67+
["efm2", { factory: HostMonitoring2PluginFactory, weight: 810 }],
6668
["fastestResponseStrategy", { factory: FastestResponseStrategyPluginFactory, weight: 900 }],
6769
["limitless", { factory: LimitlessConnectionPluginFactory, weight: 950 }],
6870
["iam", { factory: IamAuthenticationPluginFactory, weight: 1000 }],
@@ -82,6 +84,7 @@ export class ConnectionPluginChainBuilder {
8284
[FailoverPluginFactory, 700],
8385
[Failover2PluginFactory, 710],
8486
[HostMonitoringPluginFactory, 800],
87+
[HostMonitoring2PluginFactory, 810],
8588
[LimitlessConnectionPluginFactory, 950],
8689
[IamAuthenticationPluginFactory, 1000],
8790
[AwsSecretsManagerPluginFactory, 1100],

common/lib/host_list_provider/monitoring/cluster_topology_monitor.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor {
196196
}
197197

198198
private async openAnyClientAndUpdateTopology(): Promise<HostInfo[] | null> {
199-
let writerVerifiedByThisThread = false;
199+
let writerVerifiedByThisTask = false;
200200
if (!this.monitoringClient) {
201201
let client: ClientWrapper;
202202
try {
@@ -215,7 +215,7 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor {
215215
this.isVerifiedWriterConnection = true;
216216
this.writerHostInfo = this.initialHostInfo;
217217
logger.info(Messages.get("ClusterTopologyMonitor.writerMonitoringConnection", this.initialHostInfo.hostId));
218-
writerVerifiedByThisThread = true;
218+
writerVerifiedByThisTask = true;
219219
}
220220
} catch (error) {
221221
// Do nothing.
@@ -228,7 +228,7 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor {
228228
}
229229

230230
const hosts: HostInfo[] = await this.fetchTopologyAndUpdateCache(this.monitoringClient);
231-
if (writerVerifiedByThisThread) {
231+
if (writerVerifiedByThisTask) {
232232
if (this.ignoreNewTopologyRequestsEndTimeMs === -1) {
233233
this.ignoreNewTopologyRequestsEndTimeMs = 0;
234234
} else {

common/lib/plugins/efm/host_monitoring_connection_plugin.ts

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ export class HostMonitoringConnectionPlugin extends AbstractConnectionPlugin imp
121121
} finally {
122122
if (monitorContext != null) {
123123
await this.monitorService.stopMonitoring(monitorContext);
124+
logger.debug(Messages.get("HostMonitoringConnectionPlugin.monitoringDeactivated", methodName));
124125

125126
if (monitorContext.isHostUnhealthy) {
126127
const monitoringHostInfo = await this.getMonitoringHostInfo();
@@ -148,7 +149,8 @@ export class HostMonitoringConnectionPlugin extends AbstractConnectionPlugin imp
148149
return result;
149150
}
150151

151-
private throwUnableToIdentifyConnection(host: HostInfo | null, provider: HostListProvider | null): never {
152+
private throwUnableToIdentifyConnection(host: HostInfo | null): never {
153+
const provider: HostListProvider | null = this.pluginService.getHostListProvider();
152154
throw new AwsWrapperError(
153155
Messages.get(
154156
"HostMonitoringConnectionPlugin.unableToIdentifyConnection",
@@ -159,32 +161,31 @@ export class HostMonitoringConnectionPlugin extends AbstractConnectionPlugin imp
159161
}
160162

161163
async getMonitoringHostInfo(): Promise<HostInfo> {
164+
if (this.monitoringHostInfo) {
165+
return this.monitoringHostInfo;
166+
}
167+
this.monitoringHostInfo = this.pluginService.getCurrentHostInfo();
162168
if (this.monitoringHostInfo == null) {
163-
this.monitoringHostInfo = this.pluginService.getCurrentHostInfo();
164-
const provider: HostListProvider | null = this.pluginService.getHostListProvider();
165-
if (this.monitoringHostInfo == null) {
166-
this.throwUnableToIdentifyConnection(null, provider);
167-
}
168-
const rdsUrlType: RdsUrlType = this.rdsUtils.identifyRdsType(this.monitoringHostInfo.url);
169-
170-
try {
171-
if (rdsUrlType.isRdsCluster) {
172-
logger.debug("Monitoring host info is associated with a cluster endpoint, plugin needs to identify the cluster connection");
173-
this.monitoringHostInfo = await this.pluginService.identifyConnection(this.pluginService.getCurrentClient().targetClient!);
174-
if (this.monitoringHostInfo == null) {
175-
const host: HostInfo | null = this.pluginService.getCurrentHostInfo();
176-
this.throwUnableToIdentifyConnection(host, provider);
177-
}
178-
await this.pluginService.fillAliases(this.pluginService.getCurrentClient().targetClient!, this.monitoringHostInfo);
179-
}
180-
} catch (error: any) {
181-
if (!(error instanceof AwsWrapperError)) {
182-
logger.debug(Messages.get("HostMonitoringConnectionPlugin.errorIdentifyingConnection", error.message));
169+
this.throwUnableToIdentifyConnection(null);
170+
}
171+
const rdsUrlType: RdsUrlType = this.rdsUtils.identifyRdsType(this.monitoringHostInfo.url);
172+
173+
try {
174+
if (rdsUrlType.isRdsCluster) {
175+
logger.debug(Messages.get("HostMonitoringConnectionPlugin.identifyClusterConnection"));
176+
this.monitoringHostInfo = await this.pluginService.identifyConnection(this.pluginService.getCurrentClient().targetClient!);
177+
if (this.monitoringHostInfo == null) {
178+
const host: HostInfo | null = this.pluginService.getCurrentHostInfo();
179+
this.throwUnableToIdentifyConnection(host);
183180
}
184-
throw error;
181+
await this.pluginService.fillAliases(this.pluginService.getCurrentClient().targetClient!, this.monitoringHostInfo);
185182
}
183+
} catch (error: any) {
184+
if (!(error instanceof AwsWrapperError)) {
185+
logger.debug(Messages.get("HostMonitoringConnectionPlugin.errorIdentifyingConnection", error.message));
186+
}
187+
throw error;
186188
}
187-
188189
return this.monitoringHostInfo;
189190
}
190191

common/lib/plugins/efm/monitor.ts

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,12 @@ import { PluginService } from "../../plugin_service";
2020
import { logger } from "../../../logutils";
2121
import { Messages } from "../../utils/messages";
2222
import { ClientWrapper } from "../../client_wrapper";
23-
import { sleep } from "../../utils/utils";
24-
import { WrapperProperties } from "../../wrapper_property";
23+
import { getCurrentTimeNano, sleep } from "../../utils/utils";
2524
import { TelemetryFactory } from "../../utils/telemetry/telemetry_factory";
2625
import { TelemetryCounter } from "../../utils/telemetry/telemetry_counter";
2726
import { TelemetryTraceLevel } from "../../utils/telemetry/telemetry_trace_level";
27+
import { HostResponseTimeMonitor } from "../strategy/fastest_response/host_response_time_monitor";
28+
import { WrapperProperties } from "../../wrapper_property";
2829

2930
export interface Monitor {
3031
startMonitoring(context: MonitorConnectionContext): void;
@@ -79,7 +80,7 @@ export class MonitorImpl implements Monitor {
7980
this.properties = properties;
8081
this.hostInfo = hostInfo;
8182
this.monitorDisposalTimeMillis = monitorDisposalTimeMillis;
82-
this.contextLastUsedTimestampNanos = this.getCurrentTimeNano();
83+
this.contextLastUsedTimestampNanos = getCurrentTimeNano();
8384
const instanceId = this.hostInfo.hostId ?? this.hostInfo.host;
8485
this.instanceInvalidCounter = this.telemetryFactory.createCounter(`efm.hostUnhealthy.count.${instanceId}`);
8586
}
@@ -94,7 +95,7 @@ export class MonitorImpl implements Monitor {
9495
logger.warn(Messages.get("MonitorImpl.monitorIsStopped", this.hostInfo.host));
9596
}
9697

97-
const currentTimeNanos: number = this.getCurrentTimeNano();
98+
const currentTimeNanos: number = getCurrentTimeNano();
9899
context.startMonitorTimeNano = currentTimeNanos;
99100
this.contextLastUsedTimestampNanos = currentTimeNanos;
100101
this.newContexts.push(context);
@@ -110,7 +111,7 @@ export class MonitorImpl implements Monitor {
110111
}
111112

112113
context.isActiveContext = false;
113-
this.contextLastUsedTimestampNanos = this.getCurrentTimeNano();
114+
this.contextLastUsedTimestampNanos = getCurrentTimeNano();
114115
}
115116

116117
async run(): Promise<void> {
@@ -121,7 +122,7 @@ export class MonitorImpl implements Monitor {
121122
try {
122123
let newMonitorContext: MonitorConnectionContext | undefined;
123124
let firstAddedNewMonitorContext: MonitorConnectionContext | null = null;
124-
const currentTimeNano: number = this.getCurrentTimeNano();
125+
const currentTimeNano: number = getCurrentTimeNano();
125126

126127
while ((newMonitorContext = this.newContexts.shift()) != null) {
127128
if (firstAddedNewMonitorContext === newMonitorContext) {
@@ -140,9 +141,9 @@ export class MonitorImpl implements Monitor {
140141
}
141142

142143
if (this.activeContexts.length > 0) {
143-
this.contextLastUsedTimestampNanos = this.getCurrentTimeNano();
144+
this.contextLastUsedTimestampNanos = getCurrentTimeNano();
144145

145-
const statusCheckStartTimeNanos: number = this.getCurrentTimeNano();
146+
const statusCheckStartTimeNanos: number = getCurrentTimeNano();
146147
this.contextLastUsedTimestampNanos = statusCheckStartTimeNanos;
147148

148149
const status: ConnectionStatus = await this.checkConnectionStatus();
@@ -199,7 +200,7 @@ export class MonitorImpl implements Monitor {
199200
this.delayMillisTimeoutId = setTimeout(resolve, delayMillis);
200201
});
201202
} else {
202-
if (this.getCurrentTimeNano() - this.contextLastUsedTimestampNanos >= this.monitorDisposalTimeMillis * 1_000_000) {
203+
if (getCurrentTimeNano() - this.contextLastUsedTimestampNanos >= this.monitorDisposalTimeMillis * 1_000_000) {
203204
break;
204205
}
205206
await new Promise((resolve) => {
@@ -229,36 +230,33 @@ export class MonitorImpl implements Monitor {
229230
const connectContext = this.telemetryFactory.openTelemetryContext("Connection status check", TelemetryTraceLevel.FORCE_TOP_LEVEL);
230231
connectContext.setAttribute("url", this.hostInfo.host);
231232
return await connectContext.start(async () => {
232-
const startNanos = this.getCurrentTimeNano();
233+
const startNanos = getCurrentTimeNano();
233234
try {
234235
const clientIsValid = this.monitoringClient && (await this.pluginService.isClientValid(this.monitoringClient));
235236

236237
if (this.monitoringClient !== null && clientIsValid) {
237-
return Promise.resolve(new ConnectionStatus(clientIsValid, this.getCurrentTimeNano() - startNanos));
238+
return Promise.resolve(new ConnectionStatus(clientIsValid, getCurrentTimeNano() - startNanos));
238239
}
239240

240241
await this.endMonitoringClient();
241-
242242
// Open a new connection.
243243
const monitoringConnProperties: Map<string, any> = new Map(this.properties);
244-
245-
for (const key of this.properties.keys()) {
244+
for (const key of monitoringConnProperties.keys()) {
246245
if (!key.startsWith(WrapperProperties.MONITORING_PROPERTY_PREFIX)) {
247246
continue;
248247
}
249-
250248
monitoringConnProperties.set(key.substring(WrapperProperties.MONITORING_PROPERTY_PREFIX.length), this.properties.get(key));
251249
monitoringConnProperties.delete(key);
252250
}
253251

254252
logger.debug(`Opening a monitoring connection to ${this.hostInfo.url}`);
255253
this.monitoringClient = await this.pluginService.forceConnect(this.hostInfo, monitoringConnProperties);
256254
logger.debug(`Successfully opened monitoring connection to ${this.monitoringClient.id} - ${this.hostInfo.url}`);
257-
return Promise.resolve(new ConnectionStatus(true, this.getCurrentTimeNano() - startNanos));
255+
return Promise.resolve(new ConnectionStatus(true, getCurrentTimeNano() - startNanos));
258256
} catch (error: any) {
259257
this.instanceInvalidCounter.inc();
260258
await this.endMonitoringClient();
261-
return Promise.resolve(new ConnectionStatus(false, this.getCurrentTimeNano() - startNanos));
259+
return Promise.resolve(new ConnectionStatus(false, getCurrentTimeNano() - startNanos));
262260
}
263261
});
264262
}
@@ -272,10 +270,6 @@ export class MonitorImpl implements Monitor {
272270
return this.stopped || this.cancelled;
273271
}
274272

275-
protected getCurrentTimeNano() {
276-
return Number(process.hrtime.bigint());
277-
}
278-
279273
async releaseResources() {
280274
this.cancelled = true;
281275
clearTimeout(this.delayMillisTimeoutId);

common/lib/plugins/efm/monitor_connection_context.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ export class MonitorConnectionContext {
4343

4444
constructor(
4545
monitor: Monitor,
46-
clientToAbort: any,
46+
clientToAbort: ClientWrapper,
4747
failureDetectionTimeMillis: number,
4848
failureDetectionIntervalMillis: number,
4949
failureDetectionCount: number,
@@ -127,15 +127,14 @@ export class MonitorConnectionContext {
127127

128128
const invalidHostDurationNano: number = statusCheckEndNano - this.invalidHostStartTimeNano;
129129
const maxInvalidHostDurationMillis: number = this.failureDetectionIntervalMillis * Math.max(0, this.failureDetectionCount);
130-
131130
if (this.failureCount >= this.failureDetectionCount || invalidHostDurationNano >= maxInvalidHostDurationMillis * 1_000_000) {
132131
logger.debug(Messages.get("MonitorConnectionContext.hostDead", hostName));
133132
this.isHostUnhealthy = true;
134133
await this.abortConnection();
135134
return;
136135
}
137136

138-
logger.debug(Messages.get("MonitorConnectionContext.hostNotResponding", hostName, String(this.failureCount)));
137+
logger.debug(Messages.get("MonitorConnectionContext.hostNotResponding", hostName));
139138
return;
140139
}
141140

common/lib/plugins/efm/monitor_service.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,11 @@ import { WrapperProperties } from "../../wrapper_property";
2222
import { SlidingExpirationCache } from "../../utils/sliding_expiration_cache";
2323
import { PluginService } from "../../plugin_service";
2424
import { Messages } from "../../utils/messages";
25+
import { ClientWrapper } from "../../client_wrapper";
2526

2627
export interface MonitorService {
2728
startMonitoring(
28-
clientToAbort: any,
29+
clientToAbort: ClientWrapper,
2930
hostKeys: Set<string>,
3031
hostInfo: HostInfo,
3132
properties: Map<string, any>,
@@ -59,7 +60,7 @@ export class MonitorServiceImpl implements MonitorService {
5960
}
6061

6162
async startMonitoring(
62-
clientToAbort: any,
63+
clientToAbort: ClientWrapper,
6364
hostKeys: Set<string>,
6465
hostInfo: HostInfo,
6566
properties: Map<string, any>,
@@ -158,7 +159,7 @@ export class MonitorServiceImpl implements MonitorService {
158159
}
159160

160161
async releaseResources() {
161-
for (const [key, monitor] of MonitorServiceImpl.monitors.entries) {
162+
for (const [_key, monitor] of MonitorServiceImpl.monitors.entries) {
162163
if (monitor.item) {
163164
await monitor.item.releaseResources();
164165
}

0 commit comments

Comments
 (0)