Skip to content

Commit c77bead

Browse files
author
Elias Nygren
committed
add configure_firewall and documentation to firewall feature
1 parent 4031035 commit c77bead

File tree

7 files changed

+226
-113
lines changed

7 files changed

+226
-113
lines changed

test/test_firewall.py

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
from upcloud import FirewallRule
2+
import responses
3+
import json
4+
from conftest import Mock
5+
import pytest
6+
7+
8+
def firewall_rule_callback(request):
9+
"""
10+
Checks that firewall rule contains required fields.
11+
Returns the same body with 201 Created.
12+
"""
13+
required_fields = [
14+
'position', 'direction', 'family', 'protocol',
15+
'source_address_start', 'source_address_end',
16+
'destination_port_start', 'destination_port_end',
17+
'action'
18+
]
19+
20+
request_body = json.loads(request.body)
21+
22+
23+
def check_fields(body):
24+
"""
25+
Helper for checking a firewall rule body against required_fields.
26+
"""
27+
for field in required_fields:
28+
if field not in request_body["firewall_rule"]:
29+
raise Exception("missing required field: {0}. Body was:{1}".format(field, request_body))
30+
31+
if isinstance(request_body, list):
32+
for body in request_body:
33+
check_fields(body)
34+
else:
35+
check_fields(request_body)
36+
37+
return(201, {}, json.dumps(request_body))
38+
39+
40+
41+
class TestFirewall():
42+
@responses.activate
43+
def test_add_firewall_rule(self, manager):
44+
Mock.mock_get("server/00798b85-efdc-41ca-8021-f6ef457b8531")
45+
server = manager.get_server("00798b85-efdc-41ca-8021-f6ef457b8531")
46+
47+
responses.add_callback(
48+
responses.POST,
49+
Mock.base_url + "/server/00798b85-efdc-41ca-8021-f6ef457b8531/firewall_rule",
50+
content_type='application/json',
51+
callback=firewall_rule_callback
52+
)
53+
54+
returned_firewall = server.add_firewall_rule(FirewallRule(
55+
position = "1",
56+
direction = "in",
57+
family = "IPv4",
58+
protocol = "tcp",
59+
source_address_start = "192.168.1.1",
60+
source_address_end = "192.168.1.255",
61+
destination_port_start = "22",
62+
destination_port_end = "22",
63+
action = "accept"
64+
))
65+
66+
# everything should run without errors, returned created object
67+
assert returned_firewall.position == "1"
68+
assert returned_firewall.direction == "in"
69+
assert returned_firewall.source_address_end == "192.168.1.255"
70+
71+
72+
@responses.activate
73+
def test_remove_firewall_rule(self, manager):
74+
Mock.mock_get("server/00798b85-efdc-41ca-8021-f6ef457b8531")
75+
server = manager.get_server("00798b85-efdc-41ca-8021-f6ef457b8531")
76+
77+
target = "server/00798b85-efdc-41ca-8021-f6ef457b8531/firewall_rule"
78+
Mock.mock_get(target, "firewall_rules.json")
79+
firewall_rules = server.get_firewall_rules()
80+
81+
Mock.mock_delete("server/00798b85-efdc-41ca-8021-f6ef457b8531/firewall_rule/1")
82+
res = firewall_rules[0].destroy()
83+
84+
Mock.mock_delete("server/00798b85-efdc-41ca-8021-f6ef457b8531/firewall_rule/1")
85+
res = server.remove_firewall_rule(firewall_rules[0])
86+
87+
assert res == {}
88+
89+
@responses.activate
90+
def test_list_and_get_firewall_rules(self, manager):
91+
Mock.mock_get("server/00798b85-efdc-41ca-8021-f6ef457b8531")
92+
server = manager.get_server("00798b85-efdc-41ca-8021-f6ef457b8531")
93+
94+
target = "server/00798b85-efdc-41ca-8021-f6ef457b8531/firewall_rule"
95+
Mock.mock_get(target, "firewall_rules.json")
96+
firewall_rules = server.get_firewall_rules()
97+
98+
assert firewall_rules[0].position == "1"
99+
100+
101+
@responses.activate
102+
def test_configure_firewall(self, manager):
103+
Mock.mock_get("server/00798b85-efdc-41ca-8021-f6ef457b8531")
104+
server = manager.get_server("00798b85-efdc-41ca-8021-f6ef457b8531")
105+
106+
responses.add_callback(
107+
responses.POST,
108+
Mock.base_url + "/server/00798b85-efdc-41ca-8021-f6ef457b8531/firewall_rule",
109+
content_type='application/json',
110+
callback=firewall_rule_callback
111+
)
112+
113+
returned_firewall = server.configure_firewall(
114+
[
115+
FirewallRule(
116+
position = "1",
117+
direction = "in",
118+
family = "IPv4",
119+
protocol = "tcp",
120+
source_address_start = "192.168.1.1",
121+
source_address_end = "192.168.1.255",
122+
destination_port_start = "22",
123+
destination_port_end = "22",
124+
action = "accept"
125+
),
126+
FirewallRule(
127+
position = "2",
128+
direction = "out",
129+
family = "IPv4",
130+
protocol = "tcp",
131+
source_address_start = "192.168.1.1",
132+
source_address_end = "192.168.1.255",
133+
destination_port_start = "22",
134+
destination_port_end = "22",
135+
action = "accept"
136+
)
137+
]
138+
)
139+
140+
# everything should run without errors, returned created object
141+
assert returned_firewall[0].position == "1"
142+
assert returned_firewall[0].direction == "in"
143+
assert returned_firewall[0].source_address_end == "192.168.1.255"
144+
assert returned_firewall[1].position == "2"
145+
assert returned_firewall[1].direction == "out"
146+
assert returned_firewall[1].source_address_end == "192.168.1.255"

test/test_server.py

Lines changed: 0 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
import json
33
from conftest import Mock
44
import pytest
5-
from upcloud import FirewallRule
65

76
class TestServer():
87
@responses.activate
@@ -141,97 +140,3 @@ def test_update_server_non_updateable_fields(self, manager):
141140
with pytest.raises(Exception) as excinfo:
142141
server.state = "rekt"
143142
assert "'state' is a readonly field" in str(excinfo.value)
144-
145-
@responses.activate
146-
def test_add_firewall_rule(self, manager):
147-
Mock.mock_get("server/00798b85-efdc-41ca-8021-f6ef457b8531")
148-
server = manager.get_server("00798b85-efdc-41ca-8021-f6ef457b8531")
149-
150-
def callback(request):
151-
required_fields = [
152-
'position', 'direction', 'family', 'protocol',
153-
'source_address_start', 'source_address_end',
154-
'destination_port_start', 'destination_port_end',
155-
'action'
156-
]
157-
158-
request_body = json.loads(request.body)
159-
160-
for field in required_fields:
161-
if field not in request_body:
162-
raise Exception('missing required field: {0}'.format(field))
163-
164-
return(201, {}, json.dumps({ 'firewall_rule': request_body }))
165-
166-
167-
responses.add_callback(
168-
responses.POST,
169-
Mock.base_url + "/server/00798b85-efdc-41ca-8021-f6ef457b8531/firewall_rule",
170-
content_type='application/json',
171-
callback=callback
172-
)
173-
174-
returned_firewall = server.add_firewall_rule(FirewallRule(
175-
position = "1",
176-
direction = "in",
177-
family = "IPv4",
178-
protocol = "tcp",
179-
source_address_start = "192.168.1.1",
180-
source_address_end = "192.168.1.255",
181-
destination_port_start = "22",
182-
destination_port_end = "22",
183-
action = "accept"
184-
))
185-
186-
# everything should run without errors, returned created object
187-
assert returned_firewall.position == "1"
188-
assert returned_firewall.direction == "in"
189-
assert returned_firewall.source_address_end == "192.168.1.255"
190-
191-
192-
@responses.activate
193-
def test_remove_firewall_rule(self, manager):
194-
Mock.mock_get("server/00798b85-efdc-41ca-8021-f6ef457b8531")
195-
server = manager.get_server("00798b85-efdc-41ca-8021-f6ef457b8531")
196-
197-
target = "server/00798b85-efdc-41ca-8021-f6ef457b8531/firewall_rule"
198-
Mock.mock_get(target, "firewall_rules.json")
199-
firewall_rules = server.get_firewall_rules()
200-
201-
Mock.mock_delete("server/00798b85-efdc-41ca-8021-f6ef457b8531/firewall_rule/1")
202-
res = firewall_rules[0].destroy()
203-
204-
Mock.mock_delete("server/00798b85-efdc-41ca-8021-f6ef457b8531/firewall_rule/1")
205-
res = server.remove_firewall_rule(firewall_rules[0])
206-
207-
assert res == {}
208-
209-
@responses.activate
210-
def test_list_and_get_firewall_rules(self, manager):
211-
Mock.mock_get("server/00798b85-efdc-41ca-8021-f6ef457b8531")
212-
server = manager.get_server("00798b85-efdc-41ca-8021-f6ef457b8531")
213-
214-
target = "server/00798b85-efdc-41ca-8021-f6ef457b8531/firewall_rule"
215-
Mock.mock_get(target, "firewall_rules.json")
216-
firewall_rules = server.get_firewall_rules()
217-
218-
assert firewall_rules[0].position == "1"
219-
220-
221-
222-
223-
224-
225-
226-
227-
228-
229-
230-
231-
232-
233-
234-
235-
236-
237-

upcloud/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
from .server import Server
1313
from .storage import Storage
1414
from .ip_address import IP_address
15-
from .firewall import Firewall
1615
from .firewall import FirewallRule
1716
from .cloud_manager import CloudManager
1817
from .tools import OperatingSystems

upcloud/cloud_manager/cloud_manager.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
from ..base import BaseAPI
2-
from ..firewall import Firewall
32

43
from .server_mixin import ServerManager
54
from .ip_address_mixin import IPManager
Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,66 @@
11
from .. import FirewallRule
22

33
class FirewallManager():
4+
"""
5+
Provides get / list / create / delete functionality for firewall rules.
6+
These functions are used by the FirewallRule class but may also be used
7+
directly.
8+
"""
49

510
def get_firewall_rule(self, server_uuid, firewall_rule_position):
11+
"""
12+
Returns a FirewallRule object based on server uuid and rule position.
13+
"""
614
url = "/server/" + server_uuid + "/firewall_rule/" + str(firewall_rule_position)
715
res = self.get_request(url)
8-
return FirewallRule(**res['firewall_rule'])
16+
return FirewallRule(**res["firewall_rule"])
917

1018
def get_firewall_rules(self, server_uuid):
19+
"""
20+
Returns all FirewallRule objects based on a server uuid.
21+
"""
1122
url = "/server/" + server_uuid + "/firewall_rule"
1223

1324
res = self.get_request(url)
1425

15-
1626
firewall_rules = []
1727
for firewall_rule in res['firewall_rules']['firewall_rule']:
1828
firewall_rules.append(FirewallRule(**firewall_rule))
1929

2030
return firewall_rules
2131

2232
def create_firewall_rule(self, server_uuid, firewall_rule_body):
33+
"""
34+
Creates a new firewall rule for a given server uuid. The rule can be
35+
given as a dict or with FirewallRule.prepare_post_body().
36+
37+
Returns a FirewallRule object.
38+
"""
2339
url = "/server/" + server_uuid + "/firewall_rule"
24-
res = self.post_request(url, firewall_rule_body)
40+
41+
body = { "firewall_rule": firewall_rule_body }
42+
res = self.post_request(url, body)
2543
return FirewallRule(**res['firewall_rule'])
2644

2745
def delete_firewall_rule(self, server_uuid, firewall_rule_position):
46+
"""
47+
Deletes a firewall rule based on a server uuid and rule position.
48+
"""
2849
url = "/server/" + server_uuid + "/firewall_rule/" + str(firewall_rule_position)
2950
return self.request('DELETE', url)
51+
52+
53+
def configure_firewall(self, UUID, firewall_rule_bodies):
54+
"""
55+
Helper for calling create_firewall_rule in series for a list of firewall_rule_bodies.
56+
"""
57+
58+
url = "/server/" + UUID + "/firewall_rule"
59+
60+
FirewallRules = []
61+
62+
for rule in firewall_rule_bodies:
63+
FirewallRule = self.create_firewall_rule(UUID, rule)
64+
FirewallRules.append(FirewallRule)
65+
66+
return FirewallRules

0 commit comments

Comments
 (0)