Skip to content

Commit ff67772

Browse files
committed
feat: Implement Async/Streaming for uploads
We had very ugly upload, with buffering whole file in memory, then writing to temp file. Now we implement proper async multipart stream to Azure, with max buffering 10MB (might be configurable in near future). More details in CHANGELOG.md Signed-off-by: Denys Fedoryshchenko <denys.f@collabora.com>
1 parent 64ee369 commit ff67772

5 files changed

Lines changed: 463 additions & 101 deletions

File tree

CHANGELOG.md

Lines changed: 69 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,72 @@
11
# kernelci-storage ChangeLog
22

3-
## 0.1.0 (2025-01-06)
3+
All notable changes to this project will be documented in this file.
44

5-
- Initial release
5+
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
6+
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
7+
8+
## [0.2.0] - 2025-10-23
9+
10+
### Added
11+
- Streaming multipart upload support for large files (200MB+)
12+
- New `write_file_streaming()` async method in Driver trait
13+
- `FieldStream` wrapper to convert multipart fields to AsyncRead
14+
- Streaming implementations for both Azure and Local storage backends
15+
- `async-trait` dependency (0.1) for trait async methods
16+
- `bytes` dependency (1.9) for efficient byte buffer handling
17+
18+
### Changed
19+
- Refactored `ax_post_file` handler to stream uploads directly without loading entire file into memory
20+
- Azure backend now streams uploads in 10MB chunks directly to blob storage
21+
- Local backend now streams uploads in 10MB chunks directly to filesystem
22+
- Upload handler processes multipart fields sequentially and starts streaming immediately
23+
24+
### Performance Improvements
25+
- Reduced memory usage: Only 10MB buffer in memory at any time instead of entire file
26+
- Improved upload performance for large files by eliminating full memory buffering
27+
- Removed temporary file creation during Azure uploads (streaming directly from multipart)
28+
- Better scalability: Can handle files larger than available RAM
29+
30+
### Technical Details
31+
- No breaking changes to existing API - fully backward compatible
32+
- Existing upload clients continue to work without modifications
33+
- All existing functionality preserved (permissions, content-type, owner tags, file locking)
34+
- Proper lifetime management for multipart field streaming
35+
- Uses async/await with tokio AsyncRead trait
36+
37+
## [0.1.0] - 2025-01-06
38+
39+
### Added
40+
- Initial release of KernelCI Storage Server
41+
- JWT token-based authentication with HMAC-SHA256
42+
- File upload/download with local caching
43+
- Azure Blob Storage backend support with chunked uploads (10MB blocks)
44+
- Local filesystem backend support
45+
- Range request support for partial content downloads
46+
- Prometheus metrics endpoint (`/metrics`)
47+
- File locking mechanism to prevent concurrent uploads to same path
48+
- Automatic cache cleanup when disk space < 12%
49+
- LRU-style cache deletion (removes files older than 60 minutes)
50+
- User-specific upload path restrictions via configuration
51+
- SSL/TLS support with certificate configuration
52+
- Docker support with Dockerfile
53+
54+
### Features
55+
- RESTful API with endpoints:
56+
- `GET /` - Server status
57+
- `POST /upload` or `POST /v1/file` - File upload (requires JWT)
58+
- `GET /*filepath` - File download (public)
59+
- `GET /v1/checkauth` - Validate JWT token
60+
- `GET /v1/list` - List all files (public)
61+
- `GET /metrics` - Prometheus metrics
62+
- Header preservation for HTTP caching (ETag, Last-Modified)
63+
- Content-type detection with heuristics
64+
- Configurable via TOML configuration file
65+
- Environment variable override for config path (`KCI_STORAGE_CONFIG`)
66+
- Verbose logging support via `--verbose` flag
67+
- Command-line utilities:
68+
- `--generate-jwt-secret` - Generate JWT secret
69+
- `--generate-jwt-token` - Generate JWT token for user
70+
- SHA-512 based cache filenames to avoid path conflicts
71+
- Client IP detection with X-Forwarded-For support
72+
- Disk space monitoring with hysteresis

Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
[package]
22
name = "kernelci-storage"
3-
version = "0.1.0"
3+
version = "0.2.0"
44
edition = "2021"
55

66
[dependencies]
7+
async-trait = "0.1"
78
axum = { version = "0.7.9", features = ["tracing", "multipart", "macros"] }
89
axum-server = { version = "0.7.1", features = ["rustls", "rustls-pemfile", "tls-rustls"] }
910
azure_blob_uploader = "0.1.4"
1011
azure_storage = "0.21.0"
1112
azure_storage_blobs = "0.21.0"
13+
bytes = "1.9"
1214
chksum-hash-sha2-512 = "0.0.1"
1315
chrono = "0.4.39"
1416
clap = { version = "4.5.23", features = ["derive"] }

src/azure.rs

Lines changed: 117 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ impl AzureDriver {
1111
}
1212

1313
use crate::{debug_log, get_config_content, ReceivedFile};
14+
use async_trait::async_trait;
1415
use axum::http::{HeaderName, HeaderValue};
1516
use azure_storage::StorageCredentials;
1617
use azure_storage_blobs::container::operations::BlobItem;
@@ -26,6 +27,7 @@ use std::io::Read;
2627
use std::io::Write;
2728
use std::sync::Arc;
2829
use tempfile::Builder;
30+
use tokio::io::AsyncReadExt;
2931
use toml::Table;
3032

3133
#[derive(Deserialize)]
@@ -75,7 +77,110 @@ fn calculate_checksum(filename: &String, data: &[u8]) {
7577
debug_log!("File: {} Checksum: {}", filename, digest.to_hex_lowercase());
7678
}
7779

78-
/// Write file to Azure blob storage
80+
/// Write file to Azure blob storage using streaming (new version)
81+
async fn write_file_to_blob_streaming(
82+
filename: String,
83+
data: &mut (dyn tokio::io::AsyncRead + Unpin + Send),
84+
cont_type: String,
85+
owner_email: Option<String>,
86+
) -> &'static str {
87+
let azure_cfg = Arc::new(get_azure_credentials("azure"));
88+
89+
let storage_account = azure_cfg.account.as_str();
90+
let storage_key = azure_cfg.key.clone();
91+
let storage_container = azure_cfg.container.as_str();
92+
let storage_blob = filename.as_str();
93+
let storage_credential = StorageCredentials::access_key(storage_account, storage_key);
94+
let blob_client = ClientBuilder::new(storage_account, storage_credential)
95+
.blob_client(storage_container, storage_blob);
96+
97+
let mut total_bytes_uploaded: usize = 0;
98+
let chunk_size = 10 * 1024 * 1024; // 10MB chunks
99+
let mut blocks = BlockList::default();
100+
101+
loop {
102+
let mut buffer = vec![0u8; chunk_size];
103+
let bytes_read = data.read(&mut buffer).await;
104+
match bytes_read {
105+
Ok(bytes_read) => {
106+
if bytes_read == 0 {
107+
break;
108+
}
109+
buffer.truncate(bytes_read);
110+
let block_id = BlockId::new(hex::encode(total_bytes_uploaded.to_le_bytes()));
111+
blocks
112+
.blocks
113+
.push(BlobBlockType::Uncommitted(block_id.clone()));
114+
match blob_client.put_block(block_id, buffer).await {
115+
Ok(_) => {
116+
total_bytes_uploaded += bytes_read;
117+
debug_log!("Uploaded {} bytes", total_bytes_uploaded);
118+
}
119+
Err(e) => {
120+
eprintln!("Error uploading block: {:?}", e);
121+
break;
122+
}
123+
}
124+
}
125+
Err(e) => {
126+
eprintln!("Error reading stream: {:?}", e);
127+
break;
128+
}
129+
}
130+
}
131+
match blob_client
132+
.put_block_list(blocks)
133+
.content_type(cont_type)
134+
.await
135+
{
136+
Ok(_) => {
137+
debug_log!("Block list uploaded");
138+
let blob_url_res = blob_client.url();
139+
match blob_url_res {
140+
Ok(blob_url) => {
141+
debug_log!("Blob URL: {}", blob_url);
142+
}
143+
Err(e) => {
144+
eprintln!("Error getting blob URL: {:?}", e);
145+
}
146+
}
147+
148+
// Set owner tag if email is provided
149+
if let Some(email) = owner_email {
150+
let mut tags = Tags::new();
151+
let sanitized = sanitize_tag_component(&email);
152+
if sanitized != email {
153+
debug_log!(
154+
"Sanitized owner tag value from '{}' to '{}'",
155+
email,
156+
sanitized
157+
);
158+
}
159+
// Ensure non-empty value
160+
let final_value = if sanitized.is_empty() {
161+
"_".to_string()
162+
} else {
163+
sanitized
164+
};
165+
tags.insert("owner".to_string(), final_value);
166+
match blob_client.set_tags(tags).await {
167+
Ok(_) => {
168+
debug_log!("Owner tag set successfully");
169+
}
170+
Err(e) => {
171+
eprintln!("Error setting owner tag: {:?}", e);
172+
}
173+
}
174+
}
175+
}
176+
Err(e) => {
177+
eprintln!("Error uploading block list: {:?}", e);
178+
}
179+
}
180+
"OK"
181+
}
182+
183+
/// Write file to Azure blob storage (legacy version using Vec<u8>)
79184
/// TBD: Rework, do not keep whole file as Vec<u8> in memory!!!
80185
async fn write_file_to_blob(
81186
filename: String,
@@ -429,6 +534,7 @@ async fn azure_list_files(directory: String) -> Vec<String> {
429534
}
430535

431536
/// Implement Driver trait for AzureDriver
537+
#[async_trait]
432538
impl super::Driver for AzureDriver {
433539
fn write_file(
434540
&self,
@@ -445,6 +551,16 @@ impl super::Driver for AzureDriver {
445551
});
446552
filenameret
447553
}
554+
async fn write_file_streaming(
555+
&self,
556+
filename: String,
557+
data: &mut (dyn tokio::io::AsyncRead + Unpin + Send),
558+
cont_type: String,
559+
owner_email: Option<String>,
560+
) -> String {
561+
write_file_to_blob_streaming(filename.clone(), data, cont_type, owner_email).await;
562+
filename
563+
}
448564
fn tag_file(
449565
&self,
450566
filename: String,

src/local.rs

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,15 @@
33
// Author: Denys Fedoryshchenko <denys.f@collabora.com>
44

55
use crate::{debug_log, get_config_content, ReceivedFile};
6+
use async_trait::async_trait;
67
use axum::http::{HeaderName, HeaderValue};
78
use chksum_hash_sha2_512 as sha2_512;
89
use headers::HeaderMap;
910
use serde::Deserialize;
1011
use std::fs::{self, File, OpenOptions};
1112
use std::io::Write;
1213
use std::path::{Path, PathBuf};
14+
use tokio::io::AsyncReadExt;
1315
use toml::Table;
1416

1517
pub struct LocalDriver;
@@ -91,7 +93,57 @@ fn calculate_checksum(filename: &str, data: &[u8]) {
9193
debug_log!("File: {} Checksum: {}", filename, digest.to_hex_lowercase());
9294
}
9395

94-
/// Write file to local storage
96+
/// Write file to local storage using streaming (new version)
97+
async fn write_file_to_local_streaming(
98+
filename: String,
99+
data: &mut (dyn tokio::io::AsyncRead + Unpin + Send),
100+
cont_type: String,
101+
owner_email: Option<String>,
102+
) -> Result<String, String> {
103+
let file_path = get_storage_file_path(&filename);
104+
105+
// Ensure directory structure exists
106+
if let Err(e) = ensure_directory_exists(&file_path) {
107+
return Err(format!("Failed to create directory structure: {}", e));
108+
}
109+
110+
// Write the file with streaming
111+
let mut file = tokio::fs::File::create(&file_path)
112+
.await
113+
.map_err(|e| format!("Failed to create file: {}", e))?;
114+
115+
let mut buffer = vec![0u8; 10 * 1024 * 1024]; // 10MB buffer
116+
let mut total_bytes = 0;
117+
118+
loop {
119+
match data.read(&mut buffer).await {
120+
Ok(0) => break, // EOF
121+
Ok(n) => {
122+
tokio::io::AsyncWriteExt::write_all(&mut file, &buffer[..n])
123+
.await
124+
.map_err(|e| format!("Failed to write file: {}", e))?;
125+
total_bytes += n;
126+
debug_log!("Written {} bytes to local storage", total_bytes);
127+
}
128+
Err(e) => return Err(format!("Failed to read stream: {}", e)),
129+
}
130+
}
131+
132+
// Create and write metadata (headers and owner tag)
133+
let metadata_path = get_metadata_file_path(&filename);
134+
if let Ok(mut metadata_file) = File::create(&metadata_path) {
135+
let mut metadata_content = format!("content-type:{}\n", cont_type);
136+
if let Some(email) = owner_email {
137+
metadata_content.push_str(&format!("tag-owner:{}\n", email));
138+
}
139+
let _ = metadata_file.write_all(metadata_content.as_bytes());
140+
}
141+
142+
debug_log!("File written to local storage: {}", file_path.display());
143+
Ok(filename)
144+
}
145+
146+
/// Write file to local storage (legacy version using Vec<u8>)
95147
fn write_file_to_local(
96148
filename: String,
97149
data: Vec<u8>,
@@ -272,6 +324,7 @@ fn set_tags_for_local_file(
272324
}
273325

274326
/// Implement Driver trait for LocalDriver
327+
#[async_trait]
275328
impl super::Driver for LocalDriver {
276329
fn write_file(
277330
&self,
@@ -289,6 +342,22 @@ impl super::Driver for LocalDriver {
289342
}
290343
}
291344

345+
async fn write_file_streaming(
346+
&self,
347+
filename: String,
348+
data: &mut (dyn tokio::io::AsyncRead + Unpin + Send),
349+
cont_type: String,
350+
owner_email: Option<String>,
351+
) -> String {
352+
match write_file_to_local_streaming(filename.clone(), data, cont_type, owner_email).await {
353+
Ok(_) => filename,
354+
Err(e) => {
355+
eprintln!("Local storage streaming write error: {}", e);
356+
String::new()
357+
}
358+
}
359+
}
360+
292361
fn get_file(&self, filename: String) -> ReceivedFile {
293362
get_file_from_local(filename)
294363
}

0 commit comments

Comments
 (0)