Skip to content

Commit b2ca534

Browse files
authored
Merge pull request #18 from nuclearcat/implement-streaming-upload
feat: Implement Async/Streaming for uploads
2 parents 64ee369 + 4a5d21e commit b2ca534

5 files changed

Lines changed: 463 additions & 101 deletions

File tree

CHANGELOG.md

Lines changed: 69 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,72 @@
11
# kernelci-storage ChangeLog
22

3-
## 0.1.0 (2025-01-06)
3+
All notable changes to this project will be documented in this file.
44

5-
- Initial release
5+
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
6+
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
7+
8+
## [0.2.0] - 2025-10-23
9+
10+
### Added
11+
- Streaming multipart upload support for large files (200MB+)
12+
- New `write_file_streaming()` async method in Driver trait
13+
- `FieldStream` wrapper to convert multipart fields to AsyncRead
14+
- Streaming implementations for both Azure and Local storage backends
15+
- `async-trait` dependency (0.1) for trait async methods
16+
- `bytes` dependency (1.9) for efficient byte buffer handling
17+
18+
### Changed
19+
- Refactored `ax_post_file` handler to stream uploads directly without loading entire file into memory
20+
- Azure backend now streams uploads in 10MB chunks directly to blob storage
21+
- Local backend now streams uploads in 10MB chunks directly to filesystem
22+
- Upload handler processes multipart fields sequentially and starts streaming immediately
23+
24+
### Performance Improvements
25+
- Reduced memory usage: Only 10MB buffer in memory at any time instead of entire file
26+
- Improved upload performance for large files by eliminating full memory buffering
27+
- Removed temporary file creation during Azure uploads (streaming directly from multipart)
28+
- Better scalability: Can handle files larger than available RAM
29+
30+
### Technical Details
31+
- No breaking changes to existing API - fully backward compatible
32+
- Existing upload clients continue to work without modifications
33+
- All existing functionality preserved (permissions, content-type, owner tags, file locking)
34+
- Proper lifetime management for multipart field streaming
35+
- Uses async/await with tokio AsyncRead trait
36+
37+
## [0.1.0] - 2025-01-06
38+
39+
### Added
40+
- Initial release of KernelCI Storage Server
41+
- JWT token-based authentication with HMAC-SHA256
42+
- File upload/download with local caching
43+
- Azure Blob Storage backend support with chunked uploads (10MB blocks)
44+
- Local filesystem backend support
45+
- Range request support for partial content downloads
46+
- Prometheus metrics endpoint (`/metrics`)
47+
- File locking mechanism to prevent concurrent uploads to same path
48+
- Automatic cache cleanup when disk space < 12%
49+
- LRU-style cache deletion (removes files older than 60 minutes)
50+
- User-specific upload path restrictions via configuration
51+
- SSL/TLS support with certificate configuration
52+
- Docker support with Dockerfile
53+
54+
### Features
55+
- RESTful API with endpoints:
56+
- `GET /` - Server status
57+
- `POST /upload` or `POST /v1/file` - File upload (requires JWT)
58+
- `GET /*filepath` - File download (public)
59+
- `GET /v1/checkauth` - Validate JWT token
60+
- `GET /v1/list` - List all files (public)
61+
- `GET /metrics` - Prometheus metrics
62+
- Header preservation for HTTP caching (ETag, Last-Modified)
63+
- Content-type detection with heuristics
64+
- Configurable via TOML configuration file
65+
- Environment variable override for config path (`KCI_STORAGE_CONFIG`)
66+
- Verbose logging support via `--verbose` flag
67+
- Command-line utilities:
68+
- `--generate-jwt-secret` - Generate JWT secret
69+
- `--generate-jwt-token` - Generate JWT token for user
70+
- SHA-512 based cache filenames to avoid path conflicts
71+
- Client IP detection with X-Forwarded-For support
72+
- Disk space monitoring with hysteresis

Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
[package]
22
name = "kernelci-storage"
3-
version = "0.1.0"
3+
version = "0.2.0"
44
edition = "2021"
55

66
[dependencies]
7+
async-trait = "0.1"
78
axum = { version = "0.7.9", features = ["tracing", "multipart", "macros"] }
89
axum-server = { version = "0.7.1", features = ["rustls", "rustls-pemfile", "tls-rustls"] }
910
azure_blob_uploader = "0.1.4"
1011
azure_storage = "0.21.0"
1112
azure_storage_blobs = "0.21.0"
13+
bytes = "1.9"
1214
chksum-hash-sha2-512 = "0.0.1"
1315
chrono = "0.4.39"
1416
clap = { version = "4.5.23", features = ["derive"] }

src/azure.rs

Lines changed: 117 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ impl AzureDriver {
1111
}
1212

1313
use crate::{debug_log, get_config_content, ReceivedFile};
14+
use async_trait::async_trait;
1415
use axum::http::{HeaderName, HeaderValue};
1516
use azure_storage::StorageCredentials;
1617
use azure_storage_blobs::container::operations::BlobItem;
@@ -26,6 +27,7 @@ use std::io::Read;
2627
use std::io::Write;
2728
use std::sync::Arc;
2829
use tempfile::Builder;
30+
use tokio::io::AsyncReadExt;
2931
use toml::Table;
3032

3133
#[derive(Deserialize)]
@@ -75,7 +77,110 @@ fn calculate_checksum(filename: &String, data: &[u8]) {
7577
debug_log!("File: {} Checksum: {}", filename, digest.to_hex_lowercase());
7678
}
7779

78-
/// Write file to Azure blob storage
80+
/// Write file to Azure blob storage using streaming (new version)
81+
async fn write_file_to_blob_streaming(
82+
filename: String,
83+
data: &mut (dyn tokio::io::AsyncRead + Unpin + Send),
84+
cont_type: String,
85+
owner_email: Option<String>,
86+
) -> (&'static str, usize) {
87+
let azure_cfg = Arc::new(get_azure_credentials("azure"));
88+
89+
let storage_account = azure_cfg.account.as_str();
90+
let storage_key = azure_cfg.key.clone();
91+
let storage_container = azure_cfg.container.as_str();
92+
let storage_blob = filename.as_str();
93+
let storage_credential = StorageCredentials::access_key(storage_account, storage_key);
94+
let blob_client = ClientBuilder::new(storage_account, storage_credential)
95+
.blob_client(storage_container, storage_blob);
96+
97+
let mut total_bytes_uploaded: usize = 0;
98+
let chunk_size = 10 * 1024 * 1024; // 10MB chunks
99+
let mut blocks = BlockList::default();
100+
101+
loop {
102+
let mut buffer = vec![0u8; chunk_size];
103+
let bytes_read = data.read(&mut buffer).await;
104+
match bytes_read {
105+
Ok(bytes_read) => {
106+
if bytes_read == 0 {
107+
break;
108+
}
109+
buffer.truncate(bytes_read);
110+
let block_id = BlockId::new(hex::encode(total_bytes_uploaded.to_le_bytes()));
111+
blocks
112+
.blocks
113+
.push(BlobBlockType::Uncommitted(block_id.clone()));
114+
match blob_client.put_block(block_id, buffer).await {
115+
Ok(_) => {
116+
total_bytes_uploaded += bytes_read;
117+
debug_log!("Uploaded {} bytes", total_bytes_uploaded);
118+
}
119+
Err(e) => {
120+
eprintln!("Error uploading block: {:?}", e);
121+
break;
122+
}
123+
}
124+
}
125+
Err(e) => {
126+
eprintln!("Error reading stream: {:?}", e);
127+
break;
128+
}
129+
}
130+
}
131+
match blob_client
132+
.put_block_list(blocks)
133+
.content_type(cont_type)
134+
.await
135+
{
136+
Ok(_) => {
137+
debug_log!("Block list uploaded");
138+
let blob_url_res = blob_client.url();
139+
match blob_url_res {
140+
Ok(blob_url) => {
141+
debug_log!("Blob URL: {}", blob_url);
142+
}
143+
Err(e) => {
144+
eprintln!("Error getting blob URL: {:?}", e);
145+
}
146+
}
147+
148+
// Set owner tag if email is provided
149+
if let Some(email) = owner_email {
150+
let mut tags = Tags::new();
151+
let sanitized = sanitize_tag_component(&email);
152+
if sanitized != email {
153+
debug_log!(
154+
"Sanitized owner tag value from '{}' to '{}'",
155+
email,
156+
sanitized
157+
);
158+
}
159+
// Ensure non-empty value
160+
let final_value = if sanitized.is_empty() {
161+
"_".to_string()
162+
} else {
163+
sanitized
164+
};
165+
tags.insert("owner".to_string(), final_value);
166+
match blob_client.set_tags(tags).await {
167+
Ok(_) => {
168+
debug_log!("Owner tag set successfully");
169+
}
170+
Err(e) => {
171+
eprintln!("Error setting owner tag: {:?}", e);
172+
}
173+
}
174+
}
175+
}
176+
Err(e) => {
177+
eprintln!("Error uploading block list: {:?}", e);
178+
}
179+
}
180+
("OK", total_bytes_uploaded)
181+
}
182+
183+
/// Write file to Azure blob storage (legacy version using Vec<u8>)
79184
/// TBD: Rework, do not keep whole file as Vec<u8> in memory!!!
80185
async fn write_file_to_blob(
81186
filename: String,
@@ -429,6 +534,7 @@ async fn azure_list_files(directory: String) -> Vec<String> {
429534
}
430535

431536
/// Implement Driver trait for AzureDriver
537+
#[async_trait]
432538
impl super::Driver for AzureDriver {
433539
fn write_file(
434540
&self,
@@ -445,6 +551,16 @@ impl super::Driver for AzureDriver {
445551
});
446552
filenameret
447553
}
554+
async fn write_file_streaming(
555+
&self,
556+
filename: String,
557+
data: &mut (dyn tokio::io::AsyncRead + Unpin + Send),
558+
cont_type: String,
559+
owner_email: Option<String>,
560+
) -> (String, usize) {
561+
let (_status, size) = write_file_to_blob_streaming(filename.clone(), data, cont_type, owner_email).await;
562+
(filename, size)
563+
}
448564
fn tag_file(
449565
&self,
450566
filename: String,

src/local.rs

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,15 @@
33
// Author: Denys Fedoryshchenko <denys.f@collabora.com>
44

55
use crate::{debug_log, get_config_content, ReceivedFile};
6+
use async_trait::async_trait;
67
use axum::http::{HeaderName, HeaderValue};
78
use chksum_hash_sha2_512 as sha2_512;
89
use headers::HeaderMap;
910
use serde::Deserialize;
1011
use std::fs::{self, File, OpenOptions};
1112
use std::io::Write;
1213
use std::path::{Path, PathBuf};
14+
use tokio::io::AsyncReadExt;
1315
use toml::Table;
1416

1517
pub struct LocalDriver;
@@ -91,7 +93,57 @@ fn calculate_checksum(filename: &str, data: &[u8]) {
9193
debug_log!("File: {} Checksum: {}", filename, digest.to_hex_lowercase());
9294
}
9395

94-
/// Write file to local storage
96+
/// Write file to local storage using streaming (new version)
97+
async fn write_file_to_local_streaming(
98+
filename: String,
99+
data: &mut (dyn tokio::io::AsyncRead + Unpin + Send),
100+
cont_type: String,
101+
owner_email: Option<String>,
102+
) -> Result<(String, usize), String> {
103+
let file_path = get_storage_file_path(&filename);
104+
105+
// Ensure directory structure exists
106+
if let Err(e) = ensure_directory_exists(&file_path) {
107+
return Err(format!("Failed to create directory structure: {}", e));
108+
}
109+
110+
// Write the file with streaming
111+
let mut file = tokio::fs::File::create(&file_path)
112+
.await
113+
.map_err(|e| format!("Failed to create file: {}", e))?;
114+
115+
let mut buffer = vec![0u8; 10 * 1024 * 1024]; // 10MB buffer
116+
let mut total_bytes = 0;
117+
118+
loop {
119+
match data.read(&mut buffer).await {
120+
Ok(0) => break, // EOF
121+
Ok(n) => {
122+
tokio::io::AsyncWriteExt::write_all(&mut file, &buffer[..n])
123+
.await
124+
.map_err(|e| format!("Failed to write file: {}", e))?;
125+
total_bytes += n;
126+
debug_log!("Written {} bytes to local storage", total_bytes);
127+
}
128+
Err(e) => return Err(format!("Failed to read stream: {}", e)),
129+
}
130+
}
131+
132+
// Create and write metadata (headers and owner tag)
133+
let metadata_path = get_metadata_file_path(&filename);
134+
if let Ok(mut metadata_file) = File::create(&metadata_path) {
135+
let mut metadata_content = format!("content-type:{}\n", cont_type);
136+
if let Some(email) = owner_email {
137+
metadata_content.push_str(&format!("tag-owner:{}\n", email));
138+
}
139+
let _ = metadata_file.write_all(metadata_content.as_bytes());
140+
}
141+
142+
debug_log!("File written to local storage: {}", file_path.display());
143+
Ok((filename, total_bytes))
144+
}
145+
146+
/// Write file to local storage (legacy version using Vec<u8>)
95147
fn write_file_to_local(
96148
filename: String,
97149
data: Vec<u8>,
@@ -272,6 +324,7 @@ fn set_tags_for_local_file(
272324
}
273325

274326
/// Implement Driver trait for LocalDriver
327+
#[async_trait]
275328
impl super::Driver for LocalDriver {
276329
fn write_file(
277330
&self,
@@ -289,6 +342,22 @@ impl super::Driver for LocalDriver {
289342
}
290343
}
291344

345+
async fn write_file_streaming(
346+
&self,
347+
filename: String,
348+
data: &mut (dyn tokio::io::AsyncRead + Unpin + Send),
349+
cont_type: String,
350+
owner_email: Option<String>,
351+
) -> (String, usize) {
352+
match write_file_to_local_streaming(filename.clone(), data, cont_type, owner_email).await {
353+
Ok((fname, size)) => (fname, size),
354+
Err(e) => {
355+
eprintln!("Local storage streaming write error: {}", e);
356+
(String::new(), 0)
357+
}
358+
}
359+
}
360+
292361
fn get_file(&self, filename: String) -> ReceivedFile {
293362
get_file_from_local(filename)
294363
}

0 commit comments

Comments
 (0)