Skip to content

Commit 6a47296

Browse files
sattvikctamassolteszanku255
authored
feat: multithreaded bulk import (#237)
* feat: multithreaded bulk import (#235) * feat: Add BulkImport APIs and cron * fix: PR changes * fix: PR changes * fix: PR changes * fix: PR changes * fix: PR changes * fix: PR changes * fix: Update version and changelog * fix: PR changes * fix: PR changes * fix: PR changes * fix: removing restriction of connection pool size for bulk import * fix: actually closing the connection * fix: add bulk import retry logic for postgres too * fix: fix failing tests * chore: current state save * fix: fixing merge error with changelog * feat: bulk inserting the bulk migration data * fix: fixes and error handling changes * fix: fixing tests * chore: changelog and build version update * fix: handling app/tenant not found * fix: review fix --------- Co-authored-by: Ankit Tiwari <ankucodes@gmail.com> * Update pluginInterfaceSupported.json --------- Co-authored-by: Tamas Soltesz <tamas@supertokens.com> Co-authored-by: Ankit Tiwari <ankucodes@gmail.com>
1 parent 71a33d8 commit 6a47296

22 files changed

Lines changed: 2845 additions & 53 deletions

CHANGELOG.md

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,33 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

88
## [Unreleased]
99

10+
## [7.3.0]
11+
12+
- Adds tables and queries for Bulk Import
13+
14+
### Migration
15+
16+
```sql
17+
"CREATE TABLE IF NOT EXISTS bulk_import_users (
18+
id CHAR(36),
19+
app_id VARCHAR(64) NOT NULL DEFAULT 'public',
20+
primary_user_id VARCHAR(36),
21+
raw_data TEXT NOT NULL,
22+
status VARCHAR(128) DEFAULT 'NEW',
23+
error_msg TEXT,
24+
created_at BIGINT NOT NULL,
25+
updated_at BIGINT NOT NULL,
26+
CONSTRAINT bulk_import_users_pkey PRIMARY KEY(app_id, id),
27+
CONSTRAINT bulk_import_users__app_id_fkey FOREIGN KEY(app_id) REFERENCES apps(app_id) ON DELETE CASCADE
28+
);
29+
30+
CREATE INDEX IF NOT EXISTS bulk_import_users_status_updated_at_index ON bulk_import_users (app_id, status, updated_at);
31+
32+
CREATE INDEX IF NOT EXISTS bulk_import_users_pagination_index1 ON bulk_import_users (app_id, status, created_at DESC,
33+
id DESC);
34+
35+
CREATE INDEX IF NOT EXISTS bulk_import_users_pagination_index2 ON bulk_import_users (app_id, created_at DESC, id DESC);
36+
```
1037
## [7.2.0] - 2024-10-03
1138
1239
- Compatible with plugin interface version 6.3

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ plugins {
22
id 'java-library'
33
}
44

5-
version = "7.2.0"
5+
version = "7.3.0"
66

77
repositories {
88
mavenCentral()

pluginInterfaceSupported.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"_comment": "contains a list of plugin interfaces branch names that this core supports",
33
"versions": [
4-
"6.3"
4+
"6.4"
55
]
6-
}
6+
}
Lines changed: 329 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,329 @@
1+
/*
2+
* Copyright (c) 2024, VRAI Labs and/or its affiliates. All rights reserved.
3+
*
4+
* This software is licensed under the Apache License, Version 2.0 (the
5+
* "License") as published by the Apache Software Foundation.
6+
*
7+
* You may not use this file except in compliance with the License. You may
8+
* obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
17+
package io.supertokens.storage.postgresql;
18+
19+
import java.sql.*;
20+
import java.util.Map;
21+
import java.util.Properties;
22+
import java.util.concurrent.Executor;
23+
24+
/**
25+
* BulkImportProxyConnection is a class implementing the Connection interface, serving as a Connection instance in the bulk import user cronjob.
26+
* This cron extensively utilizes existing queries to import users, all of which internally operate within transactions and those query sometimes
27+
* call the commit/rollback method on the connection.
28+
*
29+
* For the purpose of bulkimport cronjob, we aim to employ a single connection for all queries and rollback any operations in case of query failures.
30+
* To achieve this, we use our own proxy Connection instance and override the commit/rollback/close methods to do nothing.
31+
*/
32+
33+
public class BulkImportProxyConnection implements Connection {
34+
private Connection con = null;
35+
36+
public BulkImportProxyConnection(Connection con) {
37+
this.con = con;
38+
}
39+
40+
@Override
41+
public void close() throws SQLException {
42+
//this.con.close();
43+
//we don't want to close here because we are trying to reuse existing code but also using the same connection
44+
//for bulk importing
45+
}
46+
47+
@Override
48+
public void commit() throws SQLException {
49+
//this.con.commit();
50+
}
51+
52+
@Override
53+
public void rollback() throws SQLException {
54+
//this.con.rollback();
55+
}
56+
57+
public void closeForBulkImportProxyStorage() throws SQLException {
58+
this.con.close();
59+
}
60+
61+
public void commitForBulkImportProxyStorage() throws SQLException {
62+
this.con.commit();
63+
}
64+
65+
public void rollbackForBulkImportProxyStorage() throws SQLException {
66+
this.con.rollback();
67+
}
68+
69+
/* Following methods are unchaged */
70+
71+
@Override
72+
public Statement createStatement() throws SQLException {
73+
return this.con.createStatement();
74+
}
75+
76+
@Override
77+
public PreparedStatement prepareStatement(String sql) throws SQLException {
78+
return this.con.prepareStatement(sql);
79+
}
80+
81+
@Override
82+
public CallableStatement prepareCall(String sql) throws SQLException {
83+
return this.con.prepareCall(sql);
84+
}
85+
86+
@Override
87+
public String nativeSQL(String sql) throws SQLException {
88+
return this.con.nativeSQL(sql);
89+
}
90+
91+
@Override
92+
public void setAutoCommit(boolean autoCommit) throws SQLException {
93+
this.con.setAutoCommit(autoCommit);
94+
}
95+
96+
@Override
97+
public boolean getAutoCommit() throws SQLException {
98+
return this.con.getAutoCommit();
99+
}
100+
101+
@Override
102+
public boolean isClosed() throws SQLException {
103+
return this.con.isClosed();
104+
}
105+
106+
@Override
107+
public DatabaseMetaData getMetaData() throws SQLException {
108+
return this.con.getMetaData();
109+
}
110+
111+
@Override
112+
public void setReadOnly(boolean readOnly) throws SQLException {
113+
this.con.setReadOnly(readOnly);
114+
}
115+
116+
@Override
117+
public boolean isReadOnly() throws SQLException {
118+
return this.con.isReadOnly();
119+
}
120+
121+
@Override
122+
public void setCatalog(String catalog) throws SQLException {
123+
this.con.setCatalog(catalog);
124+
}
125+
126+
@Override
127+
public String getCatalog() throws SQLException {
128+
return this.con.getCatalog();
129+
}
130+
131+
@Override
132+
public void setTransactionIsolation(int level) throws SQLException {
133+
this.con.setTransactionIsolation(level);
134+
}
135+
136+
@Override
137+
public int getTransactionIsolation() throws SQLException {
138+
return this.con.getTransactionIsolation();
139+
}
140+
141+
@Override
142+
public SQLWarning getWarnings() throws SQLException {
143+
return this.con.getWarnings();
144+
}
145+
146+
@Override
147+
public void clearWarnings() throws SQLException {
148+
this.con.clearWarnings();
149+
}
150+
151+
@Override
152+
public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
153+
return this.con.createStatement(resultSetType, resultSetConcurrency);
154+
}
155+
156+
@Override
157+
public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency)
158+
throws SQLException {
159+
return this.con.prepareStatement(sql, resultSetType, resultSetConcurrency);
160+
}
161+
162+
@Override
163+
public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
164+
return this.con.prepareCall(sql, resultSetType, resultSetConcurrency);
165+
}
166+
167+
@Override
168+
public Map<String, Class<?>> getTypeMap() throws SQLException {
169+
return this.con.getTypeMap();
170+
}
171+
172+
@Override
173+
public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
174+
this.con.setTypeMap(map);
175+
}
176+
177+
@Override
178+
public void setHoldability(int holdability) throws SQLException {
179+
this.con.setHoldability(holdability);
180+
}
181+
182+
@Override
183+
public int getHoldability() throws SQLException {
184+
return this.con.getHoldability();
185+
}
186+
187+
@Override
188+
public Savepoint setSavepoint() throws SQLException {
189+
return this.con.setSavepoint();
190+
}
191+
192+
@Override
193+
public Savepoint setSavepoint(String name) throws SQLException {
194+
return this.con.setSavepoint(name);
195+
}
196+
197+
@Override
198+
public void rollback(Savepoint savepoint) throws SQLException {
199+
this.con.rollback(savepoint);
200+
}
201+
202+
@Override
203+
public void releaseSavepoint(Savepoint savepoint) throws SQLException {
204+
this.con.releaseSavepoint(savepoint);
205+
}
206+
207+
@Override
208+
public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability)
209+
throws SQLException {
210+
return this.con.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
211+
}
212+
213+
@Override
214+
public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency,
215+
int resultSetHoldability) throws SQLException {
216+
return this.con.prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
217+
}
218+
219+
@Override
220+
public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency,
221+
int resultSetHoldability) throws SQLException {
222+
return this.con.prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
223+
}
224+
225+
@Override
226+
public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
227+
return this.con.prepareStatement(sql, autoGeneratedKeys);
228+
}
229+
230+
@Override
231+
public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
232+
return this.con.prepareStatement(sql, columnIndexes);
233+
}
234+
235+
@Override
236+
public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
237+
return this.con.prepareStatement(sql, columnNames);
238+
}
239+
240+
@Override
241+
public Clob createClob() throws SQLException {
242+
return this.con.createClob();
243+
}
244+
245+
@Override
246+
public Blob createBlob() throws SQLException {
247+
return this.con.createBlob();
248+
}
249+
250+
@Override
251+
public NClob createNClob() throws SQLException {
252+
return this.con.createNClob();
253+
}
254+
255+
@Override
256+
public SQLXML createSQLXML() throws SQLException {
257+
return this.con.createSQLXML();
258+
}
259+
260+
@Override
261+
public boolean isValid(int timeout) throws SQLException {
262+
return this.con.isValid(timeout);
263+
}
264+
265+
@Override
266+
public void setClientInfo(String name, String value) throws SQLClientInfoException {
267+
this.con.setClientInfo(name, value);
268+
}
269+
270+
@Override
271+
public void setClientInfo(Properties properties) throws SQLClientInfoException {
272+
this.con.setClientInfo(properties);
273+
}
274+
275+
@Override
276+
public String getClientInfo(String name) throws SQLException {
277+
return this.con.getClientInfo(name);
278+
}
279+
280+
@Override
281+
public Properties getClientInfo() throws SQLException {
282+
return this.con.getClientInfo();
283+
}
284+
285+
@Override
286+
public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
287+
return this.con.createArrayOf(typeName, elements);
288+
}
289+
290+
@Override
291+
public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
292+
return this.con.createStruct(typeName, attributes);
293+
}
294+
295+
@Override
296+
public void setSchema(String schema) throws SQLException {
297+
this.con.setSchema(schema);
298+
}
299+
300+
@Override
301+
public String getSchema() throws SQLException {
302+
return this.con.getSchema();
303+
}
304+
305+
@Override
306+
public void abort(Executor executor) throws SQLException {
307+
this.con.abort(executor);
308+
}
309+
310+
@Override
311+
public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
312+
this.con.setNetworkTimeout(executor, milliseconds);
313+
}
314+
315+
@Override
316+
public int getNetworkTimeout() throws SQLException {
317+
return this.con.getNetworkTimeout();
318+
}
319+
320+
@Override
321+
public <T> T unwrap(Class<T> iface) throws SQLException {
322+
return this.con.unwrap(iface);
323+
}
324+
325+
@Override
326+
public boolean isWrapperFor(Class<?> iface) throws SQLException {
327+
return this.con.isWrapperFor(iface);
328+
}
329+
}

0 commit comments

Comments
 (0)