Skip to content

Commit f753eca

Browse files
committed
multipart: remove field ordering requirement with some overhead
Signed-off-by: Denys Fedoryshchenko <denys.f@collabora.com>
1 parent affc7c0 commit f753eca

1 file changed

Lines changed: 171 additions & 8 deletions

File tree

src/main.rs

Lines changed: 171 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use headers::HeaderMap;
3333
use std::path;
3434
use std::sync::OnceLock;
3535
use std::{net::SocketAddr, path::PathBuf};
36-
use tokio::io::AsyncSeekExt;
36+
use tokio::io::{AsyncSeekExt, AsyncWriteExt};
3737

3838
use std::{collections::HashMap, sync::Arc};
3939
use std::pin::Pin;
@@ -558,6 +558,7 @@ async fn ax_post_file(
558558
let mut path: String = "".to_string();
559559
let mut file0_filename: String = "".to_string();
560560
let mut upload_result: Option<(StatusCode, Vec<u8>)> = None;
561+
let mut buffered_file: Option<tempfile::NamedTempFile> = None;
561562

562563
while let Some(field) = multipart.next_field().await.unwrap() {
563564
let name = field.name().unwrap().to_string();
@@ -576,17 +577,79 @@ async fn ax_post_file(
576577
}
577578
}
578579
} else if name == "file0" {
579-
if path.is_empty() {
580-
let error_msg = "Missing path field before file0".to_string();
581-
eprintln!("{}", error_msg);
582-
upload_result = Some((StatusCode::BAD_REQUEST, error_msg.into_bytes()));
583-
break;
584-
}
585580
match filename {
586581
Some(fname) => {
587582
file0_filename = fname.to_string();
588583

589-
// At this point we have path and filename, so we can verify permissions and start streaming
584+
if path.is_empty() {
585+
// BUFFERED PATH: file0 arrived before path, buffer to temp file
586+
debug_log!("file0 arrived before path, buffering to temp file");
587+
let tmp = match tempfile::NamedTempFile::new() {
588+
Ok(f) => f,
589+
Err(e) => {
590+
eprintln!("Failed to create temp file: {:?}", e);
591+
upload_result = Some((
592+
StatusCode::INTERNAL_SERVER_ERROR,
593+
"Failed to create temp file".to_string().into_bytes(),
594+
));
595+
break;
596+
}
597+
};
598+
let tmp_path = tmp.path().to_path_buf();
599+
let mut async_file = match tokio::fs::File::create(&tmp_path).await {
600+
Ok(f) => f,
601+
Err(e) => {
602+
eprintln!("Failed to open temp file for writing: {:?}", e);
603+
upload_result = Some((
604+
StatusCode::INTERNAL_SERVER_ERROR,
605+
"Failed to open temp file".to_string().into_bytes(),
606+
));
607+
break;
608+
}
609+
};
610+
let mut field_stream = FieldStream::new(field);
611+
let mut buf = [0u8; 64 * 1024];
612+
loop {
613+
use tokio::io::AsyncReadExt;
614+
let n = match field_stream.read(&mut buf).await {
615+
Ok(0) => break,
616+
Ok(n) => n,
617+
Err(e) => {
618+
eprintln!("Error reading file0 field: {:?}", e);
619+
upload_result = Some((
620+
StatusCode::BAD_REQUEST,
621+
"Error reading file data".to_string().into_bytes(),
622+
));
623+
break;
624+
}
625+
};
626+
if let Err(e) = async_file.write_all(&buf[..n]).await {
627+
eprintln!("Error writing temp file: {:?}", e);
628+
upload_result = Some((
629+
StatusCode::INTERNAL_SERVER_ERROR,
630+
"Error writing temp file".to_string().into_bytes(),
631+
));
632+
break;
633+
}
634+
}
635+
if upload_result.is_some() {
636+
break;
637+
}
638+
if let Err(e) = async_file.flush().await {
639+
eprintln!("Error flushing temp file: {:?}", e);
640+
upload_result = Some((
641+
StatusCode::INTERNAL_SERVER_ERROR,
642+
"Error flushing temp file".to_string().into_bytes(),
643+
));
644+
break;
645+
}
646+
drop(async_file);
647+
buffered_file = Some(tmp);
648+
// Continue loop to find path field
649+
continue;
650+
}
651+
652+
// FAST PATH: path already set, stream directly
590653
// if path ends on /, remove it
591654
if path.ends_with("/") {
592655
debug_log!("Removing trailing /, workaround");
@@ -689,6 +752,106 @@ async fn ax_post_file(
689752
}
690753
}
691754

755+
// Handle buffered file0 case (file0 arrived before path)
756+
if upload_result.is_none() {
757+
if let Some(tmp) = buffered_file {
758+
if path.is_empty() {
759+
return (
760+
StatusCode::BAD_REQUEST,
761+
b"Missing path field in upload".to_vec(),
762+
);
763+
}
764+
765+
if path.ends_with("/") {
766+
debug_log!("Removing trailing /, workaround");
767+
path.pop();
768+
}
769+
770+
let full_path = format!("{}/{}", path, file0_filename);
771+
772+
match verify_upload_permissions(&owner, &path) {
773+
Ok(_) => (),
774+
Err(e) => {
775+
return (StatusCode::FORBIDDEN, e.to_string().into_bytes());
776+
}
777+
}
778+
779+
let hdr_content_type = headers.get("Content-Type-Upstream");
780+
let semaphore = get_or_create_semaphore(&state.file_locks, &full_path).await;
781+
782+
let _permit = match semaphore.try_acquire() {
783+
Ok(permit) => permit,
784+
Err(_) => {
785+
return (
786+
StatusCode::CONFLICT,
787+
"Upload already in progress".to_string().into_bytes(),
788+
);
789+
}
790+
};
791+
792+
let content_type: String = match hdr_content_type {
793+
Some(content_type) => content_type.to_str().unwrap().to_string(),
794+
None => {
795+
let heuristic_ctype = heuristic_filetype(file0_filename.clone());
796+
debug_log!(
797+
"Content-Type not found, using heuristics: {}",
798+
heuristic_ctype
799+
);
800+
heuristic_ctype
801+
}
802+
};
803+
804+
debug_log!("Starting buffered upload for {}", full_path);
805+
let mut async_file = match tokio::fs::File::open(tmp.path()).await {
806+
Ok(f) => f,
807+
Err(e) => {
808+
eprintln!("Failed to reopen temp file: {:?}", e);
809+
return (
810+
StatusCode::INTERNAL_SERVER_ERROR,
811+
"Failed to reopen temp file".to_string().into_bytes(),
812+
);
813+
}
814+
};
815+
816+
let driver_name = get_driver_type();
817+
let driver = init_driver(&driver_name);
818+
819+
let (result, file_size) = driver
820+
.write_file_streaming(
821+
full_path.clone(),
822+
&mut async_file,
823+
content_type.to_string(),
824+
Some(owner.clone()),
825+
)
826+
.await;
827+
828+
// tmp (NamedTempFile) is dropped here, auto-deleting the temp file
829+
830+
if result.is_empty() {
831+
return (StatusCode::CONFLICT, Vec::new());
832+
}
833+
834+
let status = StatusCode::OK;
835+
let client_ip = client_ip_from_headers(&headers, remote_addr);
836+
let timestamp = std::time::SystemTime::now();
837+
let human_time = chrono::DateTime::<chrono::Utc>::from(timestamp);
838+
let request_target = original_uri.to_string();
839+
println!(
840+
"{} {} {} {} {} {} {} {} buggyclient",
841+
client_ip,
842+
status.as_u16(),
843+
file_size,
844+
human_time,
845+
Method::POST,
846+
request_target,
847+
full_path,
848+
owner
849+
);
850+
851+
return (status, Vec::new());
852+
}
853+
}
854+
692855
// Return the upload result if we processed the file
693856
if let Some(result) = upload_result {
694857
return result;

0 commit comments

Comments
 (0)