Skip to content

Commit a5382f0

Browse files
authored
feat: connection pool (#533)
1 parent 0221f17 commit a5382f0

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+2189
-160
lines changed

common/lib/aws_client.ts

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import { HostListProvider } from "./host_list_provider/host_list_provider";
2222
import { PluginManager } from "./plugin_manager";
2323

2424
import pkgStream from "stream";
25-
import { DriverConnectionProvider } from "./driver_connection_provider";
2625
import { ClientWrapper } from "./client_wrapper";
2726
import { ConnectionProviderManager } from "./connection_provider_manager";
2827
import { DefaultTelemetryFactory } from "./utils/telemetry/default_telemetry_factory";
@@ -35,10 +34,13 @@ import { AwsWrapperError } from "./utils/errors";
3534
import { Messages } from "./utils/messages";
3635
import { TransactionIsolationLevel } from "./utils/transaction_isolation_level";
3736
import { HostListProviderService } from "./host_list_provider_service";
37+
import { SessionStateClient } from "./session_state_client";
38+
import { ConnectionProvider } from "./connection_provider";
39+
import { DriverConnectionProvider } from "./driver_connection_provider";
3840

3941
const { EventEmitter } = pkgStream;
4042

41-
export abstract class AwsClient extends EventEmitter {
43+
export abstract class AwsClient extends EventEmitter implements SessionStateClient {
4244
private _defaultPort: number = -1;
4345
protected telemetryFactory: TelemetryFactory;
4446
protected pluginManager: PluginManager;
@@ -55,7 +57,8 @@ export abstract class AwsClient extends EventEmitter {
5557
dbType: DatabaseType,
5658
knownDialectsByCode: Map<string, DatabaseDialect>,
5759
parser: ConnectionUrlParser,
58-
driverDialect: DriverDialect
60+
driverDialect: DriverDialect,
61+
connectionProvider?: ConnectionProvider
5962
) {
6063
super();
6164
this.config = config;
@@ -110,7 +113,7 @@ export abstract class AwsClient extends EventEmitter {
110113
this.pluginManager = new PluginManager(
111114
container,
112115
this.properties,
113-
new ConnectionProviderManager(new DriverConnectionProvider(), WrapperProperties.CONNECTION_PROVIDER.get(this.properties)),
116+
new ConnectionProviderManager(connectionProvider ?? new DriverConnectionProvider(), WrapperProperties.CONNECTION_PROVIDER.get(this.properties)),
114117
this.telemetryFactory
115118
);
116119
}

common/lib/aws_pool_client.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
limitations under the License.
1515
*/
1616

17-
export interface AwsPoolClient {
17+
export interface AwsInternalPoolClient {
1818
connect(): Promise<any>;
1919

2020
end(): Promise<any>;

common/lib/aws_pool_config.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ export class AwsPoolConfig {
3030
*/
3131
readonly minConnections?: number | undefined;
3232
readonly allowExitOnIdle?: boolean | undefined;
33+
readonly maxLifetimeSeconds?: number | undefined;
3334

3435
constructor(props?: any) {
3536
this.maxConnections = props.maxConnections ?? 10;
@@ -39,5 +40,6 @@ export class AwsPoolConfig {
3940
this.queueLimit = props.queueLimit ?? 0;
4041
this.minConnections = props.minConnections ?? 0;
4142
this.allowExitOnIdle = props.allowExitOnIdle ?? false;
43+
this.maxLifetimeSeconds = props.maxLifetimeSeconds ?? 0;
4244
}
4345
}

common/lib/client_wrapper.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ export interface ClientWrapper {
2222
readonly properties: Map<string, any>;
2323
readonly id: string;
2424

25+
// Internal method, executes wrapper-specific queries like the topology query.
2526
query(sql: string): Promise<any>;
27+
query(config: any, values?: any, callback?: any): Promise<any>;
2628

2729
end(): Promise<void>;
2830

common/lib/connection_provider_manager.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,18 @@
1717
import { ConnectionProvider } from "./connection_provider";
1818
import { HostRole } from "./host_role";
1919
import { HostInfo } from "./host_info";
20+
import { CanReleaseResources } from "./can_release_resources";
21+
import { DriverConnectionProvider } from "./driver_connection_provider";
2022

2123
export class ConnectionProviderManager {
24+
private readonly forceConnectProvider: ConnectionProvider;
2225
private readonly defaultProvider: ConnectionProvider;
2326
private readonly effectiveProvider: ConnectionProvider | null;
2427

2528
constructor(defaultProvider: ConnectionProvider, effectiveProvider: ConnectionProvider | null) {
2629
this.defaultProvider = defaultProvider;
2730
this.effectiveProvider = effectiveProvider;
31+
this.forceConnectProvider = new DriverConnectionProvider();
2832
}
2933

3034
getConnectionProvider(hostInfo: HostInfo | null, props: Map<string, any>): ConnectionProvider {
@@ -64,4 +68,22 @@ export class ConnectionProviderManager {
6468
getDefaultConnectionProvider(): ConnectionProvider {
6569
return this.defaultProvider;
6670
}
71+
72+
getForceConnectionProvider(): ConnectionProvider {
73+
return this.forceConnectProvider;
74+
}
75+
76+
private static implementsCanReleaseResources(provider: any): provider is CanReleaseResources {
77+
return provider.releaseResources !== undefined;
78+
}
79+
80+
async releaseResources(): Promise<void> {
81+
if (this.effectiveProvider && ConnectionProviderManager.implementsCanReleaseResources(this.effectiveProvider)) {
82+
await this.effectiveProvider.releaseResources();
83+
}
84+
85+
if (ConnectionProviderManager.implementsCanReleaseResources(this.defaultProvider)) {
86+
await this.defaultProvider.releaseResources();
87+
}
88+
}
6789
}

common/lib/driver_dialect/driver_dialect.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
import { ClientWrapper } from "../client_wrapper";
1818
import { AwsPoolConfig } from "../aws_pool_config";
19-
import { AwsPoolClient } from "../aws_pool_client";
19+
import { AwsInternalPoolClient } from "../aws_pool_client";
2020
import { HostInfo } from "../host_info";
2121

2222
export interface DriverDialect {
@@ -26,11 +26,13 @@ export interface DriverDialect {
2626

2727
preparePoolClientProperties(props: Map<string, any>, poolConfig: AwsPoolConfig | undefined): any;
2828

29-
getAwsPoolClient(props: any): AwsPoolClient;
29+
getAwsPoolClient(props: any): AwsInternalPoolClient;
3030

3131
setConnectTimeout(props: Map<string, any>, wrapperConnectTimeout?: any): void;
3232

3333
setQueryTimeout(props: Map<string, any>, sql?: any, wrapperConnectTimeout?: any): void;
3434

3535
setKeepAliveProperties(props: Map<string, any>, keepAliveProps: any): void;
36+
37+
getQueryFromMethodArg(methodArg: any): string;
3638
}

common/lib/error_handler.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,4 +47,10 @@ export interface ErrorHandler {
4747
* @param clientWrapper a wrapper containing the target community client.
4848
*/
4949
attachNoOpErrorListener(clientWrapper: ClientWrapper | undefined): void;
50+
51+
/**
52+
* Remove an error event listener from a ClientWrapper.
53+
* @param clientWrapper a wrapper containing the target community client.
54+
*/
55+
removeErrorListener(clientWrapper: ClientWrapper | undefined): void;
5056
}

common/lib/host_list_provider_service.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import { DatabaseDialect } from "./database_dialect/database_dialect";
2121
import { HostInfoBuilder } from "./host_info_builder";
2222
import { ConnectionUrlParser } from "./utils/connection_url_parser";
2323
import { TelemetryFactory } from "./utils/telemetry/telemetry_factory";
24-
import { AllowedAndBlockedHosts } from "./AllowedAndBlockedHosts";
24+
import { AllowedAndBlockedHosts } from "./allowed_and_blocked_hosts";
2525

2626
export interface HostListProviderService {
2727
getHostListProvider(): HostListProvider | null;

common/lib/internal_pooled_connection_provider.ts

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import { HostSelector } from "./host_selector";
3333
import { RandomHostSelector } from "./random_host_selector";
3434
import { InternalPoolMapping } from "./utils/internal_pool_mapping";
3535
import { RoundRobinHostSelector } from "./round_robin_host_selector";
36-
import { AwsPoolClient } from "./aws_pool_client";
36+
import { AwsInternalPoolClient } from "./aws_pool_client";
3737
import { AwsPoolConfig } from "./aws_pool_config";
3838
import { LeastConnectionsHostSelector } from "./least_connections_host_selector";
3939
import { PoolClientWrapper } from "./pool_client_wrapper";
@@ -59,7 +59,7 @@ export class InternalPooledConnectionProvider implements PooledConnectionProvide
5959
private readonly _poolMapping?: InternalPoolMapping;
6060
private readonly _poolConfig?: AwsPoolConfig;
6161
targetClient?: ClientWrapper;
62-
internalPool: AwsPoolClient | undefined;
62+
internalPool: AwsInternalPoolClient | undefined;
6363

6464
private static poolExpirationCheckNanos: bigint = InternalPooledConnectionProvider.POOL_EXPIRATION_NANOS; // 30 minutes
6565

@@ -80,6 +80,12 @@ export class InternalPooledConnectionProvider implements PooledConnectionProvide
8080
}
8181

8282
async connect(hostInfo: HostInfo, pluginService: PluginService, props: Map<string, any>): Promise<ClientWrapper> {
83+
const resultProps = new Map(props);
84+
resultProps.set(WrapperProperties.HOST.name, hostInfo.host);
85+
if (hostInfo.isPortSpecified()) {
86+
resultProps.set(WrapperProperties.PORT.name, hostInfo.port);
87+
}
88+
8389
let connectionHostInfo: HostInfo = hostInfo;
8490
if (
8591
WrapperProperties.ENABLE_GREEN_HOST_REPLACEMENT.get(props) &&
@@ -96,6 +102,7 @@ export class InternalPooledConnectionProvider implements PooledConnectionProvide
96102
// Green instance DNS doesn't exist
97103

98104
const fixedHost: string = this.rdsUtil.removeGreenInstancePrefix(hostInfo.host);
105+
resultProps.set(WrapperProperties.HOST.name, fixedHost);
99106
connectionHostInfo = new HostInfoBuilder({
100107
hostAvailabilityStrategy: hostInfo.hostAvailabilityStrategy
101108
})
@@ -106,9 +113,9 @@ export class InternalPooledConnectionProvider implements PooledConnectionProvide
106113
}
107114

108115
const dialect = pluginService.getDriverDialect();
109-
const preparedConfig = dialect.preparePoolClientProperties(props, this._poolConfig);
116+
const preparedConfig = dialect.preparePoolClientProperties(resultProps, this._poolConfig);
110117
this.internalPool = InternalPooledConnectionProvider.databasePools.computeIfAbsent(
111-
new PoolKey(connectionHostInfo.url, this.getPoolKey(connectionHostInfo, props)).getPoolKeyString(),
118+
new PoolKey(connectionHostInfo.url, this.getPoolKey(connectionHostInfo, resultProps)).getPoolKeyString(),
112119
() => dialect.getAwsPoolClient(preparedConfig),
113120
InternalPooledConnectionProvider.poolExpirationCheckNanos
114121
);
@@ -123,6 +130,14 @@ export class InternalPooledConnectionProvider implements PooledConnectionProvide
123130
}
124131

125132
async releaseResources() {
133+
if (this.internalPool) {
134+
try {
135+
await this.internalPool.releaseResources();
136+
} catch (error) {
137+
// ignore
138+
}
139+
}
140+
126141
await InternalPooledConnectionProvider.databasePools.clear();
127142
}
128143

0 commit comments

Comments
 (0)