Skip to content

Commit 9cb9cf2

Browse files
authored
refactor: sliding expiration cache with cleanup task (#389)
1 parent 4c5aa14 commit 9cb9cf2

20 files changed

Lines changed: 315 additions & 92 deletions

common/lib/host_list_provider/monitoring/monitoring_host_list_provider.ts

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
import { RdsHostListProvider } from "../rds_host_list_provider";
1818
import { HostInfo } from "../../host_info";
19-
import { SlidingExpirationCache } from "../../utils/sliding_expiration_cache";
2019
import { ClusterTopologyMonitor, ClusterTopologyMonitorImpl } from "./cluster_topology_monitor";
2120
import { PluginService } from "../../plugin_service";
2221
import { HostListProviderService } from "../../host_list_provider_service";
@@ -27,23 +26,25 @@ import { Messages } from "../../utils/messages";
2726
import { WrapperProperties } from "../../wrapper_property";
2827
import { BlockingHostListProvider } from "../host_list_provider";
2928
import { logger } from "../../../logutils";
29+
import { SlidingExpirationCacheWithCleanupTask } from "../../utils/sliding_expiration_cache_with_cleanup_task";
3030
import { isDialectTopologyAware } from "../../utils/utils";
3131

3232
export class MonitoringRdsHostListProvider extends RdsHostListProvider implements BlockingHostListProvider {
3333
static readonly CACHE_CLEANUP_NANOS: bigint = BigInt(60_000_000_000); // 1 minute.
3434
static readonly MONITOR_EXPIRATION_NANOS: bigint = BigInt(15 * 60_000_000_000); // 15 minutes.
3535
static readonly DEFAULT_TOPOLOGY_QUERY_TIMEOUT_MS = 5000; // 5 seconds.
3636

37-
private static monitors: SlidingExpirationCache<string, ClusterTopologyMonitor> = new SlidingExpirationCache(
37+
private static monitors: SlidingExpirationCacheWithCleanupTask<string, ClusterTopologyMonitor> = new SlidingExpirationCacheWithCleanupTask(
3838
MonitoringRdsHostListProvider.CACHE_CLEANUP_NANOS,
3939
() => true,
40-
async (monitor: ClusterTopologyMonitor) => {
40+
async (item: ClusterTopologyMonitor) => {
4141
try {
42-
await monitor.close();
42+
await item.close();
4343
} catch {
4444
// Ignore.
4545
}
46-
}
46+
},
47+
"MonitoringRdsHostListProvider.monitors"
4748
);
4849

4950
private readonly pluginService: PluginService;
@@ -55,13 +56,7 @@ export class MonitoringRdsHostListProvider extends RdsHostListProvider implement
5556

5657
async clearAll(): Promise<void> {
5758
RdsHostListProvider.clearAll();
58-
// TODO: refactor when sliding-expiration-cache refactoring is merged.
59-
for (const [key, monitor] of MonitoringRdsHostListProvider.monitors.entries) {
60-
if (monitor !== undefined) {
61-
await monitor.item.close();
62-
}
63-
}
64-
MonitoringRdsHostListProvider.monitors.clear();
59+
await MonitoringRdsHostListProvider.monitors.clear();
6560
}
6661

6762
async queryForTopology(targetClient: ClientWrapper, dialect: DatabaseDialect): Promise<HostInfo[]> {

common/lib/internal_pooled_connection_provider.ts

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import { PluginService } from "./plugin_service";
1818
import { WrapperProperties } from "./wrapper_property";
1919
import { CanReleaseResources } from "./can_release_resources";
20-
import { SlidingExpirationCache } from "./utils/sliding_expiration_cache";
2120
import { PoolKey } from "./utils/pool_key";
2221
import { PooledConnectionProvider } from "./pooled_connection_provider";
2322
import { HostInfo } from "./host_info";
@@ -39,14 +38,16 @@ import { AwsPoolConfig } from "./aws_pool_config";
3938
import { LeastConnectionsHostSelector } from "./least_connections_host_selector";
4039
import { PoolClientWrapper } from "./pool_client_wrapper";
4140
import { logger } from "../logutils";
41+
import { SlidingExpirationCacheWithCleanupTask } from "./utils/sliding_expiration_cache_with_cleanup_task";
4242

4343
export class InternalPooledConnectionProvider implements PooledConnectionProvider, CanReleaseResources {
4444
static readonly CACHE_CLEANUP_NANOS: bigint = BigInt(10 * 60_000_000_000); // 10 minutes
4545
static readonly POOL_EXPIRATION_NANOS: bigint = BigInt(30 * 60_000_000_000); // 30 minutes
46-
protected static databasePools: SlidingExpirationCache<string, any> = new SlidingExpirationCache(
46+
protected static databasePools: SlidingExpirationCacheWithCleanupTask<string, any> = new SlidingExpirationCacheWithCleanupTask(
4747
InternalPooledConnectionProvider.CACHE_CLEANUP_NANOS,
4848
(pool: any) => pool.getActiveCount() === 0,
49-
(pool: any) => pool.end()
49+
async (pool: any) => await pool.end(),
50+
"InternalPooledConnectionProvider.databasePools"
5051
);
5152

5253
private static readonly acceptedStrategies: Map<string, HostSelector> = new Map([
@@ -122,16 +123,7 @@ export class InternalPooledConnectionProvider implements PooledConnectionProvide
122123
}
123124

124125
async releaseResources() {
125-
for (const [_key, value] of InternalPooledConnectionProvider.databasePools.entries) {
126-
if (value.item) {
127-
await value.item.releaseResources();
128-
}
129-
}
130-
InternalPooledConnectionProvider.clearDatabasePools();
131-
}
132-
133-
static clearDatabasePools() {
134-
InternalPooledConnectionProvider.databasePools.clear();
126+
await InternalPooledConnectionProvider.databasePools.clear();
135127
}
136128

137129
getHostInfoByStrategy(hosts: HostInfo[], role: HostRole, strategy: string, props?: Map<string, any>): HostInfo {
@@ -177,7 +169,7 @@ export class InternalPooledConnectionProvider implements PooledConnectionProvide
177169
}
178170

179171
// for testing only
180-
setDatabasePools(connectionPools: SlidingExpirationCache<string, any>): void {
172+
setDatabasePools(connectionPools: SlidingExpirationCacheWithCleanupTask<string, any>): void {
181173
InternalPooledConnectionProvider.databasePools = connectionPools;
182174
LeastConnectionsHostSelector.setDatabasePools(connectionPools);
183175
}

common/lib/plugins/efm/host_monitoring_connection_plugin.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,6 @@ export class HostMonitoringConnectionPlugin extends AbstractConnectionPlugin imp
203203
}
204204

205205
async releaseResources(): Promise<void> {
206-
return this.monitorService.releaseResources();
206+
await this.monitorService.releaseResources();
207207
}
208208
}

common/lib/plugins/efm/monitor_service.ts

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ import { HostInfo } from "../../host_info";
1919
import { AwsWrapperError, IllegalArgumentError } from "../../utils/errors";
2020
import { Monitor, MonitorImpl } from "./monitor";
2121
import { WrapperProperties } from "../../wrapper_property";
22-
import { SlidingExpirationCache } from "../../utils/sliding_expiration_cache";
2322
import { PluginService } from "../../plugin_service";
2423
import { Messages } from "../../utils/messages";
24+
import { SlidingExpirationCacheWithCleanupTask } from "../../utils/sliding_expiration_cache_with_cleanup_task";
2525
import { ClientWrapper } from "../../client_wrapper";
2626

2727
export interface MonitorService {
@@ -44,10 +44,13 @@ export interface MonitorService {
4444

4545
export class MonitorServiceImpl implements MonitorService {
4646
private static readonly CACHE_CLEANUP_NANOS = BigInt(60_000_000_000);
47-
protected static readonly monitors: SlidingExpirationCache<string, Monitor> = new SlidingExpirationCache(
47+
protected static readonly monitors: SlidingExpirationCacheWithCleanupTask<string, Monitor> = new SlidingExpirationCacheWithCleanupTask(
4848
MonitorServiceImpl.CACHE_CLEANUP_NANOS,
4949
undefined,
50-
() => {}
50+
async (monitor: Monitor) => {
51+
await monitor.releaseResources();
52+
},
53+
"efm/MonitorServiceImpl.monitors"
5154
);
5255
private readonly pluginService: PluginService;
5356
private cachedMonitorHostKeys: Set<string> | undefined;
@@ -108,7 +111,7 @@ export class MonitorServiceImpl implements MonitorService {
108111
}
109112

110113
stopMonitoringForAllConnections(hostKeys: Set<string>) {
111-
let monitor;
114+
let monitor: Monitor;
112115
for (const hostKey of hostKeys) {
113116
monitor = MonitorServiceImpl.monitors.get(hostKey);
114117
if (monitor) {
@@ -119,8 +122,8 @@ export class MonitorServiceImpl implements MonitorService {
119122
}
120123

121124
async getMonitor(hostKeys: Set<string>, hostInfo: HostInfo, properties: Map<string, any>): Promise<Monitor | null> {
122-
let monitor;
123-
let anyHostKey;
125+
let monitor: Monitor;
126+
let anyHostKey: string;
124127
for (const hostKey of hostKeys) {
125128
monitor = MonitorServiceImpl.monitors.get(hostKey);
126129
anyHostKey = hostKey;
@@ -159,16 +162,13 @@ export class MonitorServiceImpl implements MonitorService {
159162
}
160163

161164
async releaseResources() {
162-
for (const [_key, monitor] of MonitorServiceImpl.monitors.entries) {
163-
if (monitor.item) {
164-
await monitor.item.releaseResources();
165-
}
166-
}
165+
await MonitorServiceImpl.monitors.clear();
167166
this.cachedMonitorHostKeys = undefined;
168167
this.cachedMonitorRef = undefined;
169168
}
170169

171-
static clearMonitors() {
172-
MonitorServiceImpl.monitors.clear();
170+
// Used for performance testing.
171+
static async clearMonitors() {
172+
await MonitorServiceImpl.monitors.clear();
173173
}
174174
}

common/lib/plugins/efm2/host_monitoring2_connection_plugin.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,6 @@ export class HostMonitoring2ConnectionPlugin extends AbstractConnectionPlugin im
171171
}
172172

173173
async releaseResources(): Promise<void> {
174-
return this.monitorService.releaseResources();
174+
await this.monitorService.releaseResources();
175175
}
176176
}

common/lib/plugins/efm2/monitor_service.ts

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,13 @@ import { HostInfo } from "../../host_info";
1919
import { AwsWrapperError } from "../../utils/errors";
2020
import { Monitor, MonitorImpl } from "./monitor";
2121
import { WrapperProperties } from "../../wrapper_property";
22-
import { SlidingExpirationCache } from "../../utils/sliding_expiration_cache";
2322
import { PluginService } from "../../plugin_service";
2423
import { Messages } from "../../utils/messages";
2524
import { TelemetryCounter } from "../../utils/telemetry/telemetry_counter";
2625
import { TelemetryFactory } from "../../utils/telemetry/telemetry_factory";
2726
import { ClientWrapper } from "../../client_wrapper";
2827
import { logger } from "../../../logutils";
28+
import { SlidingExpirationCacheWithCleanupTask } from "../../utils/sliding_expiration_cache_with_cleanup_task";
2929

3030
export interface MonitorService {
3131
startMonitoring(
@@ -52,18 +52,19 @@ export interface MonitorService {
5252
export class MonitorServiceImpl implements MonitorService {
5353
private static readonly CACHE_CLEANUP_NANOS = BigInt(60_000_000_000);
5454

55-
protected static readonly monitors: SlidingExpirationCache<string, Monitor> = new SlidingExpirationCache(
55+
protected static readonly monitors: SlidingExpirationCacheWithCleanupTask<string, Monitor> = new SlidingExpirationCacheWithCleanupTask(
5656
MonitorServiceImpl.CACHE_CLEANUP_NANOS,
5757
(monitor: Monitor) => monitor.canDispose(),
5858
async (monitor: Monitor) => {
5959
{
6060
try {
61-
await monitor.endMonitoringClient();
61+
await monitor.releaseResources();
6262
} catch (error) {
6363
// ignore
6464
}
6565
}
66-
}
66+
},
67+
"efm2/MonitorServiceImpl.monitors"
6768
);
6869
private readonly pluginService: PluginService;
6970
private telemetryFactory: TelemetryFactory;
@@ -133,7 +134,6 @@ export class MonitorServiceImpl implements MonitorService {
133134
failureDetectionCount: number
134135
): Promise<Monitor | null> {
135136
const monitorKey: string = `${failureDetectionTimeMillis.toString()} ${failureDetectionIntervalMillis.toString()} ${failureDetectionCount.toString()} ${hostInfo.host}`;
136-
137137
const cacheExpirationNanos = BigInt(WrapperProperties.MONITOR_DISPOSAL_TIME_MS.get(properties) * 1_000_000);
138138
return MonitorServiceImpl.monitors.computeIfAbsent(
139139
monitorKey,
@@ -152,15 +152,6 @@ export class MonitorServiceImpl implements MonitorService {
152152
}
153153

154154
async releaseResources() {
155-
for (const [_key, monitor] of MonitorServiceImpl.monitors.entries) {
156-
if (monitor.item) {
157-
await monitor.item.releaseResources();
158-
}
159-
}
160-
MonitorServiceImpl.clearMonitors();
161-
}
162-
163-
static clearMonitors() {
164-
MonitorServiceImpl.monitors.clear();
155+
await MonitorServiceImpl.monitors.clear();
165156
}
166157
}

common/lib/plugins/limitless/limitless_router_service.ts

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import { HostRole } from "../../host_role";
3131
import { HostAvailability } from "../../host_availability/host_availability";
3232
import { HighestWeightHostSelector } from "../../highest_weight_host_selector";
3333
import { sleep } from "../../utils/utils";
34+
import { SlidingExpirationCacheWithCleanupTask } from "../../utils/sliding_expiration_cache_with_cleanup_task";
3435

3536
export interface LimitlessRouterService {
3637
startMonitor(hostInfo: HostInfo, properties: Map<string, any>): void;
@@ -40,11 +41,13 @@ export interface LimitlessRouterService {
4041

4142
export class LimitlessRouterServiceImpl implements LimitlessRouterService {
4243
protected static readonly CACHE_CLEANUP_NANOS = BigInt(60_000_000_000); // 1 min
43-
protected static readonly monitors: SlidingExpirationCache<string, LimitlessRouterMonitor> = new SlidingExpirationCache(
44-
LimitlessRouterServiceImpl.CACHE_CLEANUP_NANOS,
45-
undefined,
46-
async (monitor) => await monitor.close()
47-
);
44+
protected static readonly monitors: SlidingExpirationCacheWithCleanupTask<string, LimitlessRouterMonitor> =
45+
new SlidingExpirationCacheWithCleanupTask(
46+
LimitlessRouterServiceImpl.CACHE_CLEANUP_NANOS,
47+
undefined,
48+
async (monitor: LimitlessRouterMonitor) => await monitor.close(),
49+
"LimitlessRouterServiceImpl.monitors"
50+
);
4851
protected static readonly limitlessRouterCache: SlidingExpirationCache<string, HostInfo[]> = new SlidingExpirationCache(
4952
LimitlessRouterServiceImpl.CACHE_CLEANUP_NANOS,
5053
undefined,
@@ -254,7 +257,7 @@ export class LimitlessRouterServiceImpl implements LimitlessRouterService {
254257
}
255258
}
256259

257-
static clearMonitors() {
258-
LimitlessRouterServiceImpl.monitors.clear();
260+
static async clearMonitors() {
261+
await LimitlessRouterServiceImpl.monitors.clear();
259262
}
260263
}

common/lib/plugins/strategy/fastest_response/host_response_time_service.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ export class HostResponseTimeServiceImpl implements HostResponseTimeService {
4545
readonly intervalMs: number;
4646
protected hosts: HostInfo[];
4747
private readonly telemetryFactory: TelemetryFactory;
48-
protected static monitoringHosts: SlidingExpirationCache<string, any> = new SlidingExpirationCache(
48+
protected static monitoringHosts: SlidingExpirationCache<string, HostResponseTimeMonitor> = new SlidingExpirationCache(
4949
HostResponseTimeServiceImpl.CACHE_CLEANUP_NANOS,
5050
undefined,
5151
async (monitor: HostResponseTimeMonitor) => {

common/lib/utils/locales/en.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,11 @@
233233
"HostMonitor.detectedWriter": "Detected writer: '%s'.",
234234
"HostMonitor.endMonitoring": "Host monitor '%s' completed in '%s'.",
235235
"HostMonitor.writerHostChanged": "Writer host has changed from '%s' to '%s'.",
236+
"SlidingExpirationCacheWithCleanupTask.cleaningUp": "Cleanup interval of '%s' minutes has passed, cleaning up sliding expiration cache '%s'.",
237+
"SlidingExpirationCacheWithCleanupTask.cleanUpTaskInterrupted": "Sliding expiration cache '%s' cleanup task has been interrupted and is exiting.",
238+
"SlidingExpirationCacheWithCleanupTask.cleanUpTaskStopped": "Sliding expiration cache '%s' cleanup task has been stopped and is exiting.",
239+
"SlidingExpirationCacheWithCleanupTask.clear": "Sliding expiration cache '%s' has been cleared, all resources are released.",
240+
"SlidingExpirationCacheWithCleanupTask.cleanUpTaskInitialized": "Sliding expiration cache '%s' cleanup task has been initialized.",
236241
"HostMonitoringConnectionPlugin.monitoringDeactivated": "Monitoring deactivated for method '%s'.",
237242
"CustomEndpointPlugin.connectionRequestToCustomEndpoint": "Detected a connection request to a custom endpoint URL: '%s'.",
238243
"CustomEndpointPlugin.errorParsingEndpointIdentifier": "Unable to parse custom endpoint identifier from URL: '%s'.",

common/lib/utils/sliding_expiration_cache.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class CacheItem<V> {
3737
}
3838

3939
export class SlidingExpirationCache<K, V> {
40-
private _cleanupIntervalNanos: bigint = BigInt(10 * 60_000_000_000); // 10 minutes
40+
protected _cleanupIntervalNanos: bigint = BigInt(10 * 60_000_000_000); // 10 minutes
4141
private readonly _shouldDisposeFunc?: (item: V) => boolean;
4242
private readonly _itemDisposalFunc?: (item: V) => void;
4343
map: Map<K, CacheItem<V>> = new Map<K, CacheItem<V>>();
@@ -116,7 +116,7 @@ export class SlidingExpirationCache<K, V> {
116116
return cacheItem;
117117
});
118118

119-
if (item != undefined && item != null && this._itemDisposalFunc != null) {
119+
if (item != undefined && this._itemDisposalFunc != null) {
120120
this._itemDisposalFunc(item);
121121
}
122122
}

0 commit comments

Comments
 (0)