Skip to content

Commit bfd8669

Browse files
authored
fix: EFM2 abort and stop monitoring on dead connection (#415)
1 parent 17029b4 commit bfd8669

8 files changed

Lines changed: 132 additions & 34 deletions

File tree

common/lib/plugins/connection_tracker/aurora_connection_tracker_plugin_factory.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import { ConnectionPluginFactory } from "../../plugin_factory";
1818
import { PluginService } from "../../plugin_service";
1919
import { ConnectionPlugin } from "../../connection_plugin";
20-
import { logger } from "../../../logutils";
2120
import { AwsWrapperError } from "../../utils/errors";
2221
import { Messages } from "../../utils/messages";
2322

common/lib/plugins/connection_tracker/opened_connection_tracker.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,8 @@ export class OpenedConnectionTracker {
8585

8686
private async invalidateConnections(connectionQueue: Array<WeakRef<ClientWrapper>>): Promise<void> {
8787
let clientRef: WeakRef<ClientWrapper> | undefined;
88-
while ((clientRef = connectionQueue.shift())) {
89-
const client = clientRef.deref();
88+
while ((clientRef = connectionQueue?.shift()) != null) {
89+
const client = clientRef?.deref() ?? null;
9090
if (!client) {
9191
continue;
9292
}
@@ -101,7 +101,7 @@ export class OpenedConnectionTracker {
101101
for (const queue of OpenedConnectionTracker.openedConnections.values()) {
102102
if (queue.length !== 0) {
103103
for (const connRef of queue) {
104-
const conn = connRef.deref();
104+
const conn = connRef?.deref() ?? null;
105105
if (conn) {
106106
hostList.push(`${conn.id} - ${conn.hostInfo.toString()}`);
107107
}
@@ -124,7 +124,7 @@ export class OpenedConnectionTracker {
124124
for (const [key, queue] of OpenedConnectionTracker.openedConnections) {
125125
OpenedConnectionTracker.openedConnections.set(
126126
key,
127-
queue.filter((connWeakRef: WeakRef<ClientWrapper>) => connWeakRef.deref())
127+
queue.filter((connWeakRef: WeakRef<ClientWrapper>) => connWeakRef?.deref() ?? null)
128128
);
129129
}
130130
}

common/lib/plugins/efm/monitor.ts

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -123,16 +123,17 @@ export class MonitorImpl implements Monitor {
123123
let newMonitorContext: MonitorConnectionContext | undefined;
124124
let firstAddedNewMonitorContext: MonitorConnectionContext | null = null;
125125
const currentTimeNano: number = getCurrentTimeNano();
126-
127-
while ((newMonitorContext = this.newContexts.shift()) != null) {
126+
while ((newMonitorContext = this.newContexts?.shift()) != null) {
128127
if (firstAddedNewMonitorContext === newMonitorContext) {
129128
this.newContexts.push(newMonitorContext);
129+
130130
break;
131131
}
132132

133133
if (newMonitorContext.isActiveContext) {
134134
if (newMonitorContext.expectedActiveMonitoringStartTimeNano > currentTimeNano) {
135135
this.newContexts.push(newMonitorContext);
136+
136137
firstAddedNewMonitorContext = firstAddedNewMonitorContext ?? newMonitorContext;
137138
} else {
138139
this.activeContexts.push(newMonitorContext);
@@ -152,7 +153,7 @@ export class MonitorImpl implements Monitor {
152153
let monitorContext: MonitorConnectionContext | undefined;
153154
let firstAddedMonitorContext: MonitorConnectionContext | null = null;
154155

155-
while ((monitorContext = this.activeContexts.shift()) != null) {
156+
while ((monitorContext = this.activeContexts?.shift()) != null) {
156157
// If context is already invalid, just skip it.
157158
if (!monitorContext.isActiveContext) {
158159
continue;
@@ -161,7 +162,8 @@ export class MonitorImpl implements Monitor {
161162
if (firstAddedMonitorContext == monitorContext) {
162163
// This context is already processed by this loop.
163164
// Add it to the array and exit this loop.
164-
this.activeContexts.push(monitorContext);
165+
166+
this.activeContexts?.push(monitorContext);
165167
break;
166168
}
167169

@@ -174,18 +176,19 @@ export class MonitorImpl implements Monitor {
174176
);
175177

176178
if (monitorContext.isActiveContext && !monitorContext.isHostUnhealthy) {
177-
this.activeContexts.push(monitorContext);
179+
this.activeContexts?.push(monitorContext);
180+
178181
if (firstAddedMonitorContext == null) {
179182
firstAddedMonitorContext = monitorContext;
180183
}
181184

182-
if (delayMillis == -1 || delayMillis > monitorContext.failureDetectionIntervalMillis) {
185+
if (delayMillis === -1 || delayMillis > monitorContext.failureDetectionIntervalMillis) {
183186
delayMillis = monitorContext.failureDetectionIntervalMillis;
184187
}
185188
}
186189
}
187190

188-
if (delayMillis == -1) {
191+
if (delayMillis === -1) {
189192
// No active contexts.
190193
delayMillis = this.SLEEP_WHEN_INACTIVE_MILLIS;
191194
} else {

common/lib/plugins/efm2/monitor.ts

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,6 @@ export interface Monitor {
3737

3838
endMonitoringClient(): Promise<void>;
3939

40-
end(): Promise<void>;
41-
4240
releaseResources(): Promise<void>;
4341
}
4442

@@ -125,13 +123,15 @@ export class MonitorImpl implements Monitor {
125123
for (const [key, val] of MonitorImpl.newContexts.entries()) {
126124
if (key < currentTimeNanos) {
127125
const queue: Array<WeakRef<MonitorConnectionContext>> = val;
126+
128127
processedKeys.push(key);
129128
// Each value of found entry is a queue of monitoring contexts awaiting active monitoring.
130129
// Add all contexts to an active monitoring contexts queue.
131130
// Ignore disposed contexts.
132131
let monitorContextRef: WeakRef<MonitorConnectionContext> | undefined;
133-
while ((monitorContextRef = queue.shift())) {
134-
const monitorContext: MonitorConnectionContext = monitorContextRef.deref();
132+
133+
while ((monitorContextRef = queue?.shift()) != null) {
134+
const monitorContext: MonitorConnectionContext = monitorContextRef?.deref() ?? null;
135135
if (monitorContext && monitorContext.isActive()) {
136136
this.activeContexts.push(monitorContextRef);
137137
}
@@ -176,22 +176,26 @@ export class MonitorImpl implements Monitor {
176176

177177
let monitorContextRef: WeakRef<MonitorConnectionContext> | undefined;
178178

179-
while ((monitorContextRef = this.activeContexts.shift())) {
179+
while ((monitorContextRef = this.activeContexts?.shift()) != null) {
180180
if (this.isStopped()) {
181181
break;
182182
}
183+
183184
const monitorContext: MonitorConnectionContext = monitorContextRef?.deref() ?? null;
185+
184186
if (!monitorContext) {
185187
continue;
186188
}
187189

188190
if (this.hostUnhealthy) {
189191
// Kill connection
190192
monitorContext.setHostUnhealthy(true);
193+
const clientToAbort = monitorContext.getClient();
194+
191195
monitorContext.setInactive();
192-
const connectionToAbort = monitorContext.getClient();
193-
if (connectionToAbort != null) {
194-
await this.endMonitoringClient();
196+
if (clientToAbort != null) {
197+
await this.endMonitoringClient(clientToAbort);
198+
195199
this.abortedConnectionsCounter.inc();
196200
}
197201
} else if (monitorContext && monitorContext.isActive()) {
@@ -219,7 +223,6 @@ export class MonitorImpl implements Monitor {
219223
logger.debug(Messages.get("MonitorImpl.errorDuringMonitoringStop", error.message));
220224
} finally {
221225
await this.endMonitoringClient();
222-
await sleep(3000);
223226
}
224227

225228
logger.debug(Messages.get("MonitorImpl.stopMonitoring", this.hostInfo.host));
@@ -260,13 +263,6 @@ export class MonitorImpl implements Monitor {
260263
return this.stopped;
261264
}
262265

263-
async end(): Promise<void> {
264-
this.stopped = true;
265-
// Waiting for 30s gives a task enough time to exit monitoring loop and close database connection.
266-
await sleep(30000);
267-
logger.debug(Messages.get("MonitorImpl.stopped", this.hostInfo.host));
268-
}
269-
270266
updateHostHealthStatus(connectionValid: boolean, statusCheckStartNano: number, statusCheckEndNano: number): Promise<void> {
271267
if (!connectionValid) {
272268
this.failureCount++;
@@ -278,7 +274,7 @@ export class MonitorImpl implements Monitor {
278274
const invalidHostDurationNano = statusCheckEndNano - this.invalidHostStartTimeNano;
279275
const maxInvalidHostDurationNano = this.failureDetectionIntervalNanos * Math.max(0, this.failureDetectionCount - 1);
280276

281-
if (invalidHostDurationNano >= maxInvalidHostDurationNano) {
277+
if (this.failureCount >= this.failureDetectionCount || invalidHostDurationNano >= maxInvalidHostDurationNano) {
282278
logger.debug(Messages.get("MonitorConnectionContext.hostDead", this.hostInfo.host));
283279
this.hostUnhealthy = true;
284280
return Promise.resolve();
@@ -306,10 +302,18 @@ export class MonitorImpl implements Monitor {
306302
await sleep(500);
307303
}
308304

309-
async endMonitoringClient() {
310-
if (this.monitoringClient) {
311-
await this.pluginService.abortTargetClient(this.monitoringClient);
312-
this.monitoringClient = null;
305+
async endMonitoringClient(clientToAbort?: ClientWrapper) {
306+
try {
307+
if (clientToAbort) {
308+
await this.pluginService.abortTargetClient(clientToAbort);
309+
} else if (this.monitoringClient) {
310+
await this.pluginService.abortTargetClient(this.monitoringClient);
311+
this.monitoringClient = null;
312+
}
313+
this.stopped = true;
314+
} catch (error: any) {
315+
// ignore
316+
logger.debug(Messages.get("MonitorConnectionContext.errorAbortingConnection", error.message));
313317
}
314318
}
315319
}

common/lib/plugins/efm2/monitor_connection_context.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ export class MonitorConnectionContext {
4848
}
4949

5050
getClient(): ClientWrapper | null {
51-
return this.clientToAbortRef.deref() ?? null;
51+
return this.clientToAbortRef?.deref() ?? null;
5252
}
5353

5454
isActive() {

tests/integration/container/tests/aurora_failover.test.ts

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ const itIf =
3636
? it
3737
: it.skip;
3838
const itIfTwoInstance = instanceCount == 2 ? itIf : it.skip;
39+
const itIfThreeInstance = instanceCount == 3 ? it : it.skip;
3940

4041
let env: TestEnvironment;
4142
let driver;
@@ -65,6 +66,20 @@ async function initDefaultConfig(host: string, port: number, connectToProxy: boo
6566
return config;
6667
}
6768

69+
async function initConfigWithEFM2(host: string, port: number, connectToProxy: boolean): Promise<any> {
70+
const config: any = await initDefaultConfig(host, port, connectToProxy);
71+
config["plugins"] = "failover,efm2";
72+
config["failoverTimeoutMs"] = 20000;
73+
config["failureDetectionCount"] = 2;
74+
config["failureDetectionInterval"] = 1000;
75+
config["failureDetectionTime"] = 2000;
76+
config["connectTimeout"] = 10000;
77+
config["wrapperQueryTimeout"] = 20000;
78+
config["monitoring_wrapperQueryTimeout"] = 3000;
79+
config["monitoring_wrapperConnectTimeout"] = 3000;
80+
return config;
81+
}
82+
6883
describe("aurora failover", () => {
6984
beforeEach(async () => {
7085
logger.info(`Test started: ${expect.getState().currentTestName}`);
@@ -100,6 +115,45 @@ describe("aurora failover", () => {
100115
logger.info(`Test finished: ${expect.getState().currentTestName}`);
101116
}, 1320000);
102117

118+
itIfThreeInstance(
119+
"writer failover efm",
120+
async () => {
121+
// Connect to writer instance.
122+
const writerConfig = await initDefaultConfig(env.proxyDatabaseInfo.writerInstanceEndpoint, env.proxyDatabaseInfo.instanceEndpointPort, true);
123+
writerConfig["failoverMode"] = "reader-or-writer";
124+
125+
client = initClientFunc(writerConfig);
126+
await client.connect();
127+
128+
const initialWriterId = await auroraTestUtility.queryInstanceId(client);
129+
expect(await auroraTestUtility.isDbInstanceWriter(initialWriterId)).toStrictEqual(true);
130+
const instances = env.databaseInfo.instances;
131+
const readerInstance = instances[1].instanceId;
132+
await ProxyHelper.disableAllConnectivity(env.engine);
133+
134+
try {
135+
await ProxyHelper.enableConnectivity(initialWriterId);
136+
137+
// Sleep query activates monitoring connection after monitoring_wrapperQueryTimeout time is reached.
138+
await auroraTestUtility.queryInstanceIdWithSleep(client);
139+
140+
await ProxyHelper.enableConnectivity(readerInstance);
141+
await ProxyHelper.disableConnectivity(env.engine, initialWriterId);
142+
} catch (error) {
143+
fail("The disable connectivity task was unexpectedly interrupted.");
144+
}
145+
// Failure occurs on connection invocation.
146+
await expect(async () => {
147+
await auroraTestUtility.queryInstanceId(client);
148+
}).rejects.toThrow(FailoverSuccessError);
149+
150+
const currentConnectionId = await auroraTestUtility.queryInstanceId(client);
151+
expect(await auroraTestUtility.isDbInstanceWriter(currentConnectionId)).toBe(false);
152+
expect(currentConnectionId).not.toBe(initialWriterId);
153+
},
154+
1320000
155+
);
156+
103157
itIf(
104158
"fails from writer to new writer on connection invocation",
105159
async () => {

tests/integration/container/tests/utils/aurora_test_utility.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,11 @@ export class AuroraTestUtility {
220220
return await DriverHelper.executeInstanceQuery(testEnvironment.engine, testEnvironment.deployment, client);
221221
}
222222

223+
async queryInstanceIdWithSleep(client: AwsClient) {
224+
const testEnvironment: TestEnvironment = await TestEnvironment.getCurrent();
225+
return await DriverHelper.executeInstanceQueryWithSleep(testEnvironment.engine, testEnvironment.deployment, client);
226+
}
227+
223228
async isDbInstanceWriter(instanceId: string, clusterId?: string) {
224229
if (clusterId === undefined) {
225230
clusterId = (await TestEnvironment.getCurrent()).info.auroraClusterName;

tests/integration/container/tests/utils/driver_helper.ts

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,19 @@ export class DriverHelper {
4444
}
4545
}
4646

47+
static getSleepSql(engine: DatabaseEngine, deployment: DatabaseEngineDeployment): string {
48+
switch (deployment) {
49+
case DatabaseEngineDeployment.AURORA:
50+
switch (engine) {
51+
case DatabaseEngine.PG:
52+
return "SELECT pg_sleep(10)";
53+
case DatabaseEngine.MYSQL:
54+
return "SELECT sleep(10)";
55+
default:
56+
throw new Error("invalid engine");
57+
}
58+
}
59+
}
4760
static getInstanceIdSql(engine: DatabaseEngine, deployment: DatabaseEngineDeployment): string {
4861
switch (deployment) {
4962
case DatabaseEngineDeployment.AURORA:
@@ -85,6 +98,26 @@ export class DriverHelper {
8598
}
8699
}
87100

101+
static async executeInstanceQueryWithSleep(engine: DatabaseEngine, deployment: DatabaseEngineDeployment, client: AwsClient) {
102+
const sql1 = DriverHelper.getSleepSql(engine, deployment);
103+
104+
const sql2 = DriverHelper.getInstanceIdSql(engine, deployment);
105+
let result;
106+
switch (engine) {
107+
case DatabaseEngine.PG:
108+
await (client as AwsPGClient).query(sql1);
109+
return await (client as AwsPGClient).query(sql2).then((result) => {
110+
return result.rows[0]["id"];
111+
});
112+
case DatabaseEngine.MYSQL:
113+
await (client as AwsMySQLClient).query({ sql: sql1 });
114+
result = await (client as AwsMySQLClient).query({ sql: sql2 });
115+
return JSON.parse(JSON.stringify(result))[0][0]["id"];
116+
default:
117+
throw new Error("invalid engine");
118+
}
119+
}
120+
88121
static getSleepQuery(engine: DatabaseEngine, seconds: number) {
89122
switch (engine) {
90123
case DatabaseEngine.PG:

0 commit comments

Comments
 (0)