11import csv
22import gzip
33import io
4+ import itertools
45import json
56import logging
67from typing import Any
1213
1314logger = logging .getLogger ("datagouv_mcp" )
1415
16+ MAX_DOWNLOAD_SIZE_MB : int = 50 # TODO: make this as a environment variable
17+ MAX_ROWS_HARD_LIMIT : int = 500
18+
1519
1620def register_download_and_parse_resource_tool (mcp : FastMCP ) -> None :
1721 @mcp .tool ()
1822 async def download_and_parse_resource (
1923 resource_id : str ,
2024 max_rows : int = 20 ,
21- max_size_mb : int = 500 ,
2225 ) -> str :
2326 """
2427 Download and parse a resource directly (bypasses Tabular API).
2528
26- Use for JSON/JSONL files, or when full dataset analysis is needed.
29+ Use for JSON/JSONL files only. For CSV/XLSX, prefer query_resource_data
30+ (no download needed, supports pagination and filtering).
2731 Supports CSV, CSV.GZ, JSON, JSONL.
2832
29- Strategy: Start with default max_rows (20) to preview structure and size .
30- If you need all data, call again with a higher max_rows value .
31- For CSV/XLSX preview, prefer query_resource_data (faster) .
33+ Strategy: Start with default max_rows (20) to preview structure.
34+ Increase max_rows up to 500 for a broader sample .
35+ Files larger than 50 MB are rejected .
3236 """
37+ max_rows = min (max (max_rows , 1 ), MAX_ROWS_HARD_LIMIT )
38+
3339 try :
3440 # Get full resource data to find URL and metadata
3541 resource_data = await datagouv_api_client .get_resource_details (resource_id )
@@ -52,7 +58,7 @@ async def download_and_parse_resource(
5258
5359 # Download the file
5460 try :
55- max_size = max_size_mb * 1024 * 1024
61+ max_size = MAX_DOWNLOAD_SIZE_MB * 1024 * 1024
5662 content , filename , content_type = await _download_resource (
5763 resource_url , max_size
5864 )
@@ -87,10 +93,14 @@ async def download_and_parse_resource(
8793 file_format == "gzip" and "csv" in filename .lower ()
8894 ):
8995 content_parts .append ("Format: CSV" )
90- rows = _parse_csv (content , is_gzipped = bool (is_gzipped ))
96+ rows = _parse_csv (
97+ content , is_gzipped = bool (is_gzipped ), max_rows = max_rows
98+ )
9199 elif file_format == "json" or file_format == "jsonl" :
92100 content_parts .append ("Format: JSON/JSONL" )
93- rows = _parse_json (content , is_gzipped = bool (is_gzipped ))
101+ rows = _parse_json (
102+ content , is_gzipped = bool (is_gzipped ), max_rows = max_rows
103+ )
94104 elif file_format == "xlsx" :
95105 content_parts .append ("Format: XLSX" )
96106 content_parts .append (
@@ -122,25 +132,22 @@ async def download_and_parse_resource(
122132 content_parts .append ("⚠️ No data rows found in file." )
123133 return "\n " .join (content_parts )
124134
125- # Limit rows
126135 total_rows = len (rows )
127- rows = rows [:max_rows ]
128136
129137 content_parts .append ("" )
130- content_parts .append (f"Total rows in file : { total_rows } " )
131- content_parts .append (f"Returning: { len ( rows ) } row(s)" )
138+ content_parts .append (f"Total rows parsed (up to limit) : { total_rows } " )
139+ content_parts .append (f"Returning: { total_rows } row(s)" )
132140
133141 # Show column names
134142 if rows :
135143 columns = [str (k ) if k is not None else "" for k in rows [0 ].keys ()]
136144 content_parts .append (f"Columns: { ', ' .join (columns )} " )
137145
138- # Show all parsed data (up to max_rows)
139146 content_parts .append ("" )
140- if len ( rows ) == 1 :
147+ if total_rows == 1 :
141148 content_parts .append ("Data (1 row):" )
142149 else :
143- content_parts .append (f"Data ({ len ( rows ) } rows):" )
150+ content_parts .append (f"Data ({ total_rows } rows):" )
144151 for i , row in enumerate (rows , 1 ):
145152 content_parts .append (f" Row { i } :" )
146153 for key , value in row .items ():
@@ -149,12 +156,11 @@ async def download_and_parse_resource(
149156 val_str = val_str [:100 ] + "..."
150157 content_parts .append (f" { key } : { val_str } " )
151158
152- if total_rows > max_rows :
159+ if total_rows == max_rows :
153160 content_parts .append ("" )
154161 content_parts .append (
155- f"⚠️ Note: File contains { total_rows } rows, "
156- f"only showing first { max_rows } . "
157- "Increase max_rows parameter to see more."
162+ f"⚠️ Row limit reached ({ max_rows } ). "
163+ "The file may contain more rows."
158164 )
159165
160166 return "\n " .join (content_parts )
@@ -167,7 +173,7 @@ async def download_and_parse_resource(
167173
168174
169175async def _download_resource (
170- resource_url : str , max_size : int = 500 * 1024 * 1024
176+ resource_url : str , max_size : int = MAX_DOWNLOAD_SIZE_MB * 1024 * 1024
171177) -> tuple [bytes , str , str | None ]:
172178 """
173179 Download a resource with size limit.
@@ -189,14 +195,16 @@ async def _download_resource(
189195 f"(max: { max_size / (1024 * 1024 ):.1f} MB)"
190196 )
191197
192- # Download with size limit
193- content = bytearray ()
194- async for chunk in resp .aiter_bytes (chunk_size = 8192 ):
195- content .extend (chunk )
196- if len (content ) > max_size :
198+ # Accumulate chunks then join once (avoids bytearray → bytes double-copy)
199+ chunks : list [bytes ] = []
200+ total = 0
201+ async for chunk in resp .aiter_bytes (chunk_size = 65536 ):
202+ total += len (chunk )
203+ if total > max_size :
197204 raise ValueError (
198205 f"File too large: exceeds { max_size / (1024 * 1024 ):.1f} MB limit"
199206 )
207+ chunks .append (chunk )
200208
201209 # Get filename from Content-Disposition or URL
202210 filename = "resource"
@@ -208,7 +216,7 @@ async def _download_resource(
208216
209217 content_type = resp .headers .get ("Content-Type" , "" ).split (";" )[0 ]
210218
211- return bytes ( content ), filename , content_type
219+ return b"" . join ( chunks ), filename , content_type
212220
213221
214222def _detect_file_format (filename : str , content_type : str | None ) -> str :
@@ -251,8 +259,10 @@ def _detect_file_format(filename: str, content_type: str | None) -> str:
251259 return "unknown"
252260
253261
254- def _parse_csv (content : bytes , is_gzipped : bool = False ) -> list [dict [str , Any ]]:
255- """Parse CSV content with automatic delimiter detection."""
262+ def _parse_csv (
263+ content : bytes , is_gzipped : bool = False , max_rows : int = MAX_ROWS_HARD_LIMIT
264+ ) -> list [dict [str , Any ]]:
265+ """Parse CSV content with automatic delimiter detection, stopping at max_rows."""
256266 if is_gzipped :
257267 content = gzip .decompress (content )
258268
@@ -283,11 +293,13 @@ def _parse_csv(content: bytes, is_gzipped: bool = False) -> list[dict[str, Any]]
283293 delimiter = best_delimiter [0 ]
284294
285295 reader = csv .DictReader (io .StringIO (text ), delimiter = delimiter )
286- return list (reader )
296+ return list (itertools . islice ( reader , max_rows ) )
287297
288298
289- def _parse_json (content : bytes , is_gzipped : bool = False ) -> list [dict [str , Any ]]:
290- """Parse JSON content (array or JSONL)."""
299+ def _parse_json (
300+ content : bytes , is_gzipped : bool = False , max_rows : int = MAX_ROWS_HARD_LIMIT
301+ ) -> list [dict [str , Any ]]:
302+ """Parse JSON content (array or JSONL), stopping at max_rows."""
291303 if is_gzipped :
292304 content = gzip .decompress (content )
293305
@@ -299,15 +311,15 @@ def _parse_json(content: bytes, is_gzipped: bool = False) -> list[dict[str, Any]
299311 if isinstance (data , list ):
300312 return data
301313 if isinstance (data , dict ):
302- # Single object, return as list
303314 return [data ]
304315 except json .JSONDecodeError :
305316 pass
306317
307- # Try JSONL (one JSON object per line)
308- lines = text .strip ().split ("\n " )
318+ # Try JSONL (one JSON object per line) — stop early at max_rows
309319 result = []
310- for line in lines :
320+ for line in text .strip ().split ("\n " ):
321+ if len (result ) >= max_rows :
322+ break
311323 if line .strip ():
312324 try :
313325 result .append (json .loads (line ))
0 commit comments