Skip to content

Commit 734cbe7

Browse files
committed
azure: remove blocking and convert to proper async
Initially i misinterpret Azure as partially sync, but seems everything we use async, so we dont need to use block_in_place() and we can stay fully async. But local driver (which is sync) was running directly on tokio's async worker threads, and its recommended to put it on separate blocking pool via spawn_blocking. Signed-off-by: Denys Fedoryshchenko <denys.f@collabora.com>
1 parent fc987ac commit 734cbe7

3 files changed

Lines changed: 51 additions & 50 deletions

File tree

src/azure.rs

Lines changed: 7 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -600,19 +600,15 @@ async fn azure_list_files(directory: String) -> Vec<String> {
600600
/// Implement Driver trait for AzureDriver
601601
#[async_trait]
602602
impl super::Driver for AzureDriver {
603-
fn write_file(
603+
async fn write_file(
604604
&self,
605605
filename: String,
606606
data: Vec<u8>,
607607
cont_type: String,
608608
owner_email: Option<String>,
609609
) -> String {
610610
let filenameret = filename.clone();
611-
/* Call async write_file_to_blob use tokio::task::block_in_place */
612-
tokio::task::block_in_place(|| {
613-
let rt = tokio::runtime::Runtime::new().unwrap();
614-
rt.block_on(write_file_to_blob(filename, data, cont_type, owner_email));
615-
});
611+
write_file_to_blob(filename, data, cont_type, owner_email).await;
616612
filenameret
617613
}
618614
async fn write_file_streaming(
@@ -625,40 +621,21 @@ impl super::Driver for AzureDriver {
625621
let (_status, size) = write_file_to_blob_streaming(filename.clone(), data, cont_type, owner_email).await;
626622
(filename, size)
627623
}
628-
fn tag_file(
624+
async fn tag_file(
629625
&self,
630626
filename: String,
631627
user_tags: Vec<(String, String)>,
632628
) -> Result<String, String> {
633-
let rt = tokio::runtime::Runtime::new().unwrap();
634-
let ret = rt.block_on(azure_set_filename_tags(filename, user_tags));
635-
ret
629+
azure_set_filename_tags(filename, user_tags).await
636630
}
637-
fn get_file(&self, filename: String) -> ReceivedFile {
638-
/* Call async get_file_from_blob use tokio::task::block_in_place */
639-
let mut received_file = ReceivedFile {
640-
original_filename: "".to_string(),
641-
cached_filename: "".to_string(),
642-
headers: HeaderMap::new(),
643-
valid: false,
644-
};
645-
tokio::task::block_in_place(|| {
646-
let rt = tokio::runtime::Runtime::new().unwrap();
647-
received_file = rt.block_on(get_file_from_blob(filename));
648-
});
649-
received_file
631+
async fn get_file(&self, filename: String) -> ReceivedFile {
632+
get_file_from_blob(filename).await
650633
}
651634
// Disabled: listing files on Azure Blob Storage is extremely slow due to
652635
// the flat namespace requiring enumeration of all blobs with prefix filtering.
653636
// For large containers this can take minutes and time out.
654637
// The HTTP handler (ax_list_files) returns 403 Forbidden for Azure backends.
655-
fn list_files(&self, _directory: String) -> Vec<String> {
656-
// let mut ret = Vec::new();
657-
// tokio::task::block_in_place(|| {
658-
// let rt = tokio::runtime::Runtime::new().unwrap();
659-
// ret = rt.block_on(azure_list_files(directory));
660-
// });
661-
// ret
638+
async fn list_files(&self, _directory: String) -> Vec<String> {
662639
Vec::new()
663640
}
664641
}

src/local.rs

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -326,19 +326,26 @@ fn set_tags_for_local_file(
326326
/// Implement Driver trait for LocalDriver
327327
#[async_trait]
328328
impl super::Driver for LocalDriver {
329-
fn write_file(
329+
async fn write_file(
330330
&self,
331331
filename: String,
332332
data: Vec<u8>,
333333
cont_type: String,
334334
owner_email: Option<String>,
335335
) -> String {
336-
match write_file_to_local(filename.clone(), data, cont_type, owner_email) {
337-
Ok(_) => filename,
338-
Err(e) => {
336+
let fname = filename.clone();
337+
match tokio::task::spawn_blocking(move || {
338+
write_file_to_local(fname, data, cont_type, owner_email)
339+
}).await {
340+
Ok(Ok(_)) => filename,
341+
Ok(Err(e)) => {
339342
eprintln!("Local storage write error: {}", e);
340343
String::new()
341344
}
345+
Err(e) => {
346+
eprintln!("Local storage write task panicked: {}", e);
347+
String::new()
348+
}
342349
}
343350
}
344351

@@ -358,19 +365,36 @@ impl super::Driver for LocalDriver {
358365
}
359366
}
360367

361-
fn get_file(&self, filename: String) -> ReceivedFile {
362-
get_file_from_local(filename)
368+
async fn get_file(&self, filename: String) -> ReceivedFile {
369+
tokio::task::spawn_blocking(move || get_file_from_local(filename))
370+
.await
371+
.unwrap_or_else(|e| {
372+
eprintln!("Local storage get_file task panicked: {}", e);
373+
ReceivedFile {
374+
original_filename: String::new(),
375+
cached_filename: String::new(),
376+
headers: HeaderMap::new(),
377+
valid: false,
378+
}
379+
})
363380
}
364381

365-
fn tag_file(
382+
async fn tag_file(
366383
&self,
367384
filename: String,
368385
user_tags: Vec<(String, String)>,
369386
) -> Result<String, String> {
370-
set_tags_for_local_file(filename, user_tags)
387+
tokio::task::spawn_blocking(move || set_tags_for_local_file(filename, user_tags))
388+
.await
389+
.unwrap_or_else(|e| Err(format!("Local storage tag_file task panicked: {}", e)))
371390
}
372391

373-
fn list_files(&self, directory: String) -> Vec<String> {
374-
list_files_in_local(directory)
392+
async fn list_files(&self, directory: String) -> Vec<String> {
393+
tokio::task::spawn_blocking(move || list_files_in_local(directory))
394+
.await
395+
.unwrap_or_else(|e| {
396+
eprintln!("Local storage list_files task panicked: {}", e);
397+
Vec::new()
398+
})
375399
}
376400
}

src/main.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ impl<'a> tokio::io::AsyncRead for FieldStream<'a> {
192192

193193
#[async_trait]
194194
trait Driver: Send + Sync {
195-
fn write_file(
195+
async fn write_file(
196196
&self,
197197
filename: String,
198198
data: Vec<u8>,
@@ -206,13 +206,13 @@ trait Driver: Send + Sync {
206206
cont_type: String,
207207
owner_email: Option<String>,
208208
) -> (String, usize);
209-
fn get_file(&self, filename: String) -> ReceivedFile;
210-
fn tag_file(
209+
async fn get_file(&self, filename: String) -> ReceivedFile;
210+
async fn tag_file(
211211
&self,
212212
filename: String,
213213
user_tags: Vec<(String, String)>,
214214
) -> Result<String, String>;
215-
fn list_files(&self, directory: String) -> Vec<String>;
215+
async fn list_files(&self, directory: String) -> Vec<String>;
216216
}
217217

218218
fn init_driver(driver_type: &str) -> Box<dyn Driver> {
@@ -984,7 +984,7 @@ async fn ax_get_file(
984984
};
985985

986986
// IMPORTANT! Headers in cache must be stored in lowercase
987-
let received_file = driver_get_file(filepath.clone());
987+
let received_file = driver_get_file(filepath.clone()).await;
988988

989989
// Release the semaphore now that the file is resolved
990990
drop(_permit);
@@ -1147,21 +1147,21 @@ async fn ax_get_file(
11471147
}
11481148
}
11491149

1150-
fn driver_get_file(filepath: String) -> ReceivedFile {
1150+
async fn driver_get_file(filepath: String) -> ReceivedFile {
11511151
let driver_name = get_driver_type();
11521152
let driver = init_driver(&driver_name);
1153-
driver.get_file(filepath)
1153+
driver.get_file(filepath).await
11541154
}
11551155

1156-
fn write_file_driver(
1156+
async fn write_file_driver(
11571157
filename: String,
11581158
data: Vec<u8>,
11591159
cont_type: String,
11601160
owner_email: Option<String>,
11611161
) -> String {
11621162
let driver_name = get_driver_type();
11631163
let driver = init_driver(&driver_name);
1164-
driver.write_file(filename, data, cont_type, owner_email);
1164+
driver.write_file(filename, data, cont_type, owner_email).await;
11651165
"".to_string()
11661166
}
11671167

@@ -1228,7 +1228,7 @@ async fn ax_list_files() -> (StatusCode, String) {
12281228
return (StatusCode::FORBIDDEN, "Listing files is disabled for Azure storage backend".to_string());
12291229
}
12301230
let driver = init_driver(&driver_name);
1231-
let files = driver.list_files("/".to_string());
1231+
let files = driver.list_files("/".to_string()).await;
12321232
// generate nice list of files, with one file per line
12331233
let files_str = files.join("\n");
12341234
(StatusCode::OK, files_str)

0 commit comments

Comments
 (0)