Skip to content

Commit bf3a31d

Browse files
authored
fix: plugin chain cache issues (#464)
1 parent 20b27df commit bf3a31d

5 files changed

Lines changed: 31 additions & 41 deletions

File tree

common/lib/plugin_manager.ts

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -54,16 +54,15 @@ class PluginChain<T> {
5454
return this;
5555
}
5656

57-
execute(pluginFunc: PluginFunc<T>, targetFunc: () => Promise<T>): Promise<T> {
57+
execute(pluginFunc: PluginFunc<T>): Promise<T> {
5858
if (this.chain === undefined) {
5959
throw new AwsWrapperError(Messages.get("PluginManager.pipelineNone"));
6060
}
61-
return this.chain(pluginFunc, targetFunc);
61+
return this.chain(pluginFunc, this.targetFunc);
6262
}
6363
}
6464

6565
export class PluginManager {
66-
private static readonly PLUGIN_CHAIN_CACHE = new Map<string, PluginChain<any>>();
6766
private static readonly STRATEGY_PLUGIN_CHAIN_CACHE = new Map<ConnectionPlugin[], Set<ConnectionPlugin>>();
6867
private static readonly ALL_METHODS: string = "*";
6968
private static readonly CONNECT_METHOD = "connect";
@@ -189,12 +188,8 @@ export class PluginManager {
189188
pluginFunc: PluginFunc<T>,
190189
methodFunc: () => Promise<T>
191190
): Promise<T> {
192-
let chain = PluginManager.PLUGIN_CHAIN_CACHE.get(methodName);
193-
if (!chain) {
194-
chain = this.makeExecutePipeline(hostInfo, props, methodName, methodFunc);
195-
PluginManager.PLUGIN_CHAIN_CACHE.set(methodName, chain);
196-
}
197-
return chain.execute(pluginFunc, methodFunc);
191+
const chain = this.makeExecutePipeline(hostInfo, props, methodName, methodFunc);
192+
return chain.execute(pluginFunc);
198193
}
199194

200195
makeExecutePipeline<T>(hostInfo: HostInfo, props: Map<string, any>, name: string, methodFunc: () => Promise<T>): PluginChain<T> {

common/lib/plugins/failover2/failover2_plugin.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ import { TelemetryTraceLevel } from "../../utils/telemetry/telemetry_trace_level
4040
import { HostRole } from "../../host_role";
4141
import { CanReleaseResources } from "../../can_release_resources";
4242
import { ReaderFailoverResult } from "../failover/reader_failover_result";
43-
import { HostListProvider } from "../../host_list_provider/host_list_provider";
43+
import { BlockingHostListProvider, HostListProvider } from "../../host_list_provider/host_list_provider";
4444
import { logTopology } from "../../utils/utils";
4545

4646
export class Failover2Plugin extends AbstractConnectionPlugin implements CanReleaseResources {
@@ -495,7 +495,7 @@ export class Failover2Plugin extends AbstractConnectionPlugin implements CanRele
495495
async releaseResources(): Promise<void> {
496496
const hostListProvider: HostListProvider = this.pluginService.getHostListProvider();
497497
if (this.pluginService.isBlockingHostListProvider(hostListProvider)) {
498-
await hostListProvider.clearAll();
498+
await (hostListProvider as BlockingHostListProvider).clearAll();
499499
}
500500
}
501501
}

tests/integration/container/tests/basic_connectivity.test.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,6 @@ describe("basic_connectivity", () => {
186186
password: env.databaseInfo.password,
187187
port: env.proxyDatabaseInfo.instanceEndpointPort,
188188
plugins: "",
189-
wrapperConnectTimeout: 120000,
190189
clusterInstanceHostPattern: "?." + env.proxyDatabaseInfo.instanceEndpointSuffix,
191190
enableTelemetry: true,
192191
telemetryTracesBackend: "OTLP",

tests/integration/container/tests/iam_authentication.test.ts

Lines changed: 21 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@ const itIf =
3737
: it.skip;
3838

3939
let env: TestEnvironment;
40-
let driver;
40+
let driver: any;
4141
let initClientFunc: (props: any) => any;
42-
42+
let client: any;
4343
const sslCertificate = {
4444
ca: readFileSync("/app/global-bundle.pem").toString()
4545
};
@@ -67,18 +67,10 @@ async function initDefaultConfig(host: string): Promise<any> {
6767
return props;
6868
}
6969

70-
async function validateConnection(client: AwsPGClient | AwsMySQLClient) {
71-
try {
72-
await client.connect();
73-
const res = await DriverHelper.executeQuery(env.engine, client, "select 1");
74-
expect(res).not.toBeNull();
75-
} finally {
76-
try {
77-
await client.end();
78-
} catch (error) {
79-
// pass
80-
}
81-
}
70+
async function validateConnection() {
71+
await client.connect();
72+
const res = await DriverHelper.executeQuery(env.engine, client, "select 1");
73+
expect(res).not.toBeNull();
8274
}
8375

8476
describe("iam authentication", () => {
@@ -87,13 +79,19 @@ describe("iam authentication", () => {
8779
jest.useFakeTimers({
8880
doNotFake: ["nextTick"]
8981
});
82+
client = null;
9083
env = await TestEnvironment.getCurrent();
9184
driver = DriverHelper.getDriverForDatabaseEngine(env.engine);
9285
initClientFunc = DriverHelper.getClient(driver);
9386
IamAuthenticationPlugin.clearCache();
9487
});
9588

9689
afterEach(async () => {
90+
try {
91+
await client.end();
92+
} catch (error) {
93+
// pass
94+
}
9795
await PluginManager.releaseResources();
9896
await TestEnvironment.verifyClusterStatus();
9997
logger.info(`Test finished: ${expect.getState().currentTestName}`);
@@ -109,7 +107,7 @@ describe("iam authentication", () => {
109107
async () => {
110108
const config = await initDefaultConfig(env.databaseInfo.writerInstanceEndpoint);
111109
config["user"] = `WRONG_${env.info.databaseInfo.username}_USER`;
112-
const client: AwsPGClient | AwsMySQLClient = initClientFunc(config);
110+
client = initClientFunc(config);
113111

114112
await expect(client.connect()).rejects.toThrow();
115113
},
@@ -121,7 +119,7 @@ describe("iam authentication", () => {
121119
async () => {
122120
const config = await initDefaultConfig(env.databaseInfo.writerInstanceEndpoint);
123121
config["user"] = undefined;
124-
const client: AwsPGClient | AwsMySQLClient = initClientFunc(config);
122+
client = initClientFunc(config);
125123

126124
await expect(client.connect()).rejects.toBeInstanceOf(AwsWrapperError);
127125
},
@@ -133,7 +131,7 @@ describe("iam authentication", () => {
133131
async () => {
134132
const config = await initDefaultConfig(env.databaseInfo.writerInstanceEndpoint);
135133
config["iamHost"] = "<>";
136-
const client: AwsPGClient | AwsMySQLClient = initClientFunc(config);
134+
client = initClientFunc(config);
137135

138136
await expect(client.connect()).rejects.toBeInstanceOf(AwsWrapperError);
139137
},
@@ -153,9 +151,9 @@ describe("iam authentication", () => {
153151
config["password"] = "anything";
154152
config["iamHost"] = instance.host;
155153

156-
const client: AwsPGClient | AwsMySQLClient = initClientFunc(config);
154+
client = initClientFunc(config);
157155

158-
await validateConnection(client);
156+
await validateConnection();
159157
} else {
160158
throw new AwsWrapperError("Host not found");
161159
}
@@ -169,8 +167,8 @@ describe("iam authentication", () => {
169167
async () => {
170168
const config = await initDefaultConfig(env.databaseInfo.writerInstanceEndpoint);
171169
config["password"] = "anything";
172-
const client: AwsPGClient | AwsMySQLClient = initClientFunc(config);
173-
await validateConnection(client);
170+
client = initClientFunc(config);
171+
await validateConnection();
174172
},
175173
100000
176174
);
@@ -180,8 +178,8 @@ describe("iam authentication", () => {
180178
async () => {
181179
const config = await initDefaultConfig(env.databaseInfo.writerInstanceEndpoint);
182180
config["password"] = undefined;
183-
const client: AwsPGClient | AwsMySQLClient = initClientFunc(config);
184-
await validateConnection(client);
181+
client = initClientFunc(config);
182+
await validateConnection();
185183
},
186184
100000
187185
);

tests/integration/container/tests/initial_connection_strategy.test.ts

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,8 @@ describe("aurora initial connection strategy", () => {
100100
const connectedReaderIds: Set<string> = new Set();
101101
const connectionsSet: Set<any> = new Set();
102102
try {
103-
// TODO: fix round robin strategy cached connection issue on first instance
104-
// remove these lines when fixed
105-
const client = initClientFunc(config);
103+
// first instance is not cached
104+
client = initClientFunc(config);
106105
await client.connect();
107106
const readerId = await auroraTestUtility.queryInstanceId(client);
108107
connectionsSet.add(readerId);
@@ -138,9 +137,8 @@ describe("aurora initial connection strategy", () => {
138137
const config = await initConfig("roundRobin");
139138
config["roundRobinHostWeightPairs"] = `${initialReader}:${numReaders}`;
140139

141-
// TODO: fix round robin strategy cached connection issue on first instance
142-
// remove these lines when fixed
143-
const client = initClientFunc(config);
140+
// first instance is not cached
141+
client = initClientFunc(config);
144142
await client.connect();
145143
const readerId = await auroraTestUtility.queryInstanceId(client);
146144
connectionsSet.add(readerId);

0 commit comments

Comments
 (0)