88"""
99
1010try :
11- from typing import Dict , Tuple , Union , TYPE_CHECKING
11+ from typing import List , Dict , Tuple , Union , Any , TYPE_CHECKING
1212 from socket import socket
1313 from socketpool import SocketPool
1414
2222from .headers import Headers
2323
2424
25+ class FormData :
26+ """
27+ Class for parsing and storing form data from POST requests.
28+
29+ Supports ``application/x-www-form-urlencoded``, ``multipart/form-data`` and ``text/plain``
30+ content types.
31+
32+ Examples::
33+
34+ form_data = FormData(b"foo=bar&baz=qux", "application/x-www-form-urlencoded")
35+
36+ # or
37+
38+ form_data = FormData(b"foo=bar\\ r\\ nbaz=qux", "text/plain")
39+
40+ # FormData({"foo": "bar", "baz": "qux"})
41+
42+ form_data.get("foo") # "bar"
43+ form_data["foo"] # "bar"
44+ form_data.get("non-existent-key") # None
45+ form_data.get_list("baz") # ["qux"]
46+ "unknown-key" in form_data # False
47+ form_data.fields # ["foo", "baz"]
48+ """
49+
50+ _storage : Dict [str , List [Union [str , bytes ]]]
51+
52+ def __init__ (self , data : bytes , content_type : str ) -> None :
53+ self .content_type = content_type
54+ self ._storage = {}
55+
56+ if content_type .startswith ("application/x-www-form-urlencoded" ):
57+ self ._parse_x_www_form_urlencoded (data )
58+
59+ elif content_type .startswith ("multipart/form-data" ):
60+ boundary = content_type .split ("boundary=" )[1 ]
61+ self ._parse_multipart_form_data (data , boundary )
62+
63+ elif content_type .startswith ("text/plain" ):
64+ self ._parse_text_plain (data )
65+
66+ def _add_field_value (self , field_name : str , value : Union [str , bytes ]) -> None :
67+ if field_name not in self ._storage :
68+ self ._storage [field_name ] = [value ]
69+ else :
70+ self ._storage [field_name ].append (value )
71+
72+ def _parse_x_www_form_urlencoded (self , data : bytes ) -> None :
73+ decoded_data = data .decode ()
74+
75+ for field_name , value in [
76+ key_value .split ("=" , 1 ) for key_value in decoded_data .split ("&" )
77+ ]:
78+ self ._add_field_value (field_name , value )
79+
80+ def _parse_multipart_form_data (self , data : bytes , boundary : str ) -> None :
81+ blocks = data .split (b"--" + boundary .encode ())[1 :- 1 ]
82+
83+ for block in blocks :
84+ disposition , content = block .split (b"\r \n \r \n " , 1 )
85+ field_name = disposition .split (b'"' , 2 )[1 ].decode ()
86+ value = content [:- 2 ]
87+
88+ self ._add_field_value (field_name , value )
89+
90+ def _parse_text_plain (self , data : bytes ) -> None :
91+ lines = data .split (b"\r \n " )[:- 1 ]
92+
93+ for line in lines :
94+ field_name , value = line .split (b"=" , 1 )
95+
96+ self ._add_field_value (field_name .decode (), value .decode ())
97+
98+ def get (self , field_name : str , default : Any = None ) -> Union [str , bytes , None ]:
99+ """Get the value of a field."""
100+ return self ._storage .get (field_name , [default ])[0 ]
101+
102+ def get_list (self , field_name : str ) -> List [Union [str , bytes ]]:
103+ """Get the list of values of a field."""
104+ return self ._storage .get (field_name , [])
105+
106+ @property
107+ def fields (self ):
108+ """Returns a list of field names."""
109+ return list (self ._storage .keys ())
110+
111+ def __getitem__ (self , field_name : str ):
112+ return self .get (field_name )
113+
114+ def __iter__ (self ):
115+ return iter (self ._storage )
116+
117+ def __len__ (self ):
118+ return len (self ._storage )
119+
120+ def __contains__ (self , key : str ):
121+ return key in self ._storage
122+
123+ def __repr__ (self ) -> str :
124+ return f"FormData({ repr (self ._storage )} )"
125+
126+
25127class Request :
26128 """
27129 Incoming request, constructed from raw incoming bytes.
@@ -91,6 +193,7 @@ def __init__(
91193 self .connection = connection
92194 self .client_address = client_address
93195 self .raw_request = raw_request
196+ self ._form_data = None
94197
95198 if raw_request is None :
96199 raise ValueError ("raw_request cannot be None" )
@@ -117,6 +220,13 @@ def body(self) -> bytes:
117220 def body (self , body : bytes ) -> None :
118221 self .raw_request = self ._raw_header_bytes + b"\r \n \r \n " + body
119222
223+ @property
224+ def form_data (self ) -> Union [FormData , None ]:
225+ """POST data of the request"""
226+ if self ._form_data is None and self .method == "POST" :
227+ self ._form_data = FormData (self .body , self .headers ["Content-Type" ])
228+ return self ._form_data
229+
120230 def json (self ) -> Union [dict , None ]:
121231 """Body of the request, as a JSON-decoded dictionary."""
122232 return json .loads (self .body ) if self .body else None
0 commit comments