-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
322 lines (259 loc) · 12.8 KB
/
main.py
File metadata and controls
322 lines (259 loc) · 12.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
"""
Salesforce Deployment Failure Analyser
---------------------------------------
Analyses Salesforce DevOps metrics and returns a structured risk assessment.
Modes:
python main.py Interactive menu (mocked)
python main.py 1 | 2 | 3 Preset scenario (mocked)
python main.py 1 --live Preset scenario via Claude API
python main.py --input path/to/data.json --live Custom JSON via Claude API
"""
import argparse
import json
import os
import sys
# ── Scenario registry ─────────────────────────────────────────────────────────
SCENARIOS = {
"1": {
"label": "Failure — coverage 62%, NullPointerException, 120 PMD violations",
"input": "sample_data/failure_scenario.json",
"output": "sample_outputs/failure_output.json",
},
"2": {
"label": "Medium — coverage 75%, 0 failures, 40 PMD violations (5 critical)",
"input": "sample_data/medium_scenario.json",
"output": "sample_outputs/medium_output.json",
},
"3": {
"label": "Healthy — coverage 90%, 0 failures, 5 PMD violations (0 critical)",
"input": "sample_data/healthy_scenario.json",
"output": "sample_outputs/healthy_output.json",
},
}
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
CLAUDE_PROMPT = """\
You are a Salesforce DevOps engineer performing a deployment readiness review.
Analyse the input below and return ONLY a valid JSON object.
No explanation, no markdown, no text outside the JSON.
Input:
{input_json}
Rules:
1. risk_score is an integer 0-10:
0-2 = no blockers | 3-5 = quality issues, no blockers | 6-8 = at least one blocker | 9-10 = multiple blockers
2. risk_level must match risk_score: 0-2 = Low, 3-5 = Medium, 6-10 = High
3. Every risks[], root_causes[], recommendations[] item must name a specific component from the input.
4. risks[].severity: one of Critical, High, Medium, Low
5. recommendations[].priority: one of P0 - Immediate, P1 - High, P2 - Medium, P3 - Low
6. Order recommendations highest to lowest priority.
7. Root causes must state the technical reason, not restate the error.
8. If code_coverage < 75, include a Critical risk and P0 recommendation citing the exact value.
9. If failed_deployments is empty, do not fabricate deployment risks.
10. If code_quality_issues.critical is 0, do not fabricate critical PMD risks.
Required output structure:
{{
"risk_score": <integer 0-10>,
"risk_level": <"Low" | "Medium" | "High">,
"risks": [{{"issue": "", "severity": "", "component": ""}}],
"root_causes": [{{"cause": "", "component": ""}}],
"recommendations": [{{"action": "", "priority": ""}}]
}}"""
# ── Input validation ──────────────────────────────────────────────────────────
def validate_input(data: dict) -> list:
"""Return a list of validation error strings. Empty = valid."""
errors = []
required_top = {
"code_coverage": (int, float),
"failed_deployments": (list,),
"code_quality_issues": (dict,),
}
for field, types in required_top.items():
if field not in data:
errors.append(f"Missing required field: '{field}'")
elif not isinstance(data[field], types):
type_names = "/".join(t.__name__ for t in types)
errors.append(f"'{field}' must be {type_names}, got {type(data[field]).__name__}")
if isinstance(data.get("code_coverage"), (int, float)):
if not (0 <= data["code_coverage"] <= 100):
errors.append(f"'code_coverage' must be 0–100, got {data['code_coverage']}")
if isinstance(data.get("code_quality_issues"), dict):
for sub in ["pmd_violations", "critical"]:
if sub not in data["code_quality_issues"]:
errors.append(f"'code_quality_issues' missing field: '{sub}'")
elif not isinstance(data["code_quality_issues"][sub], int):
errors.append(f"'code_quality_issues.{sub}' must be int")
if isinstance(data.get("failed_deployments"), list):
for i, item in enumerate(data["failed_deployments"]):
for sub in ["component", "error", "failed_tests"]:
if sub not in item:
errors.append(f"'failed_deployments[{i}]' missing field: '{sub}'")
return errors
def validate_output(data: dict) -> list:
"""Return validation errors for Claude's JSON output. Empty = valid."""
errors = []
required = {
"risk_score": int,
"risk_level": str,
"risks": list,
"root_causes": list,
"recommendations": list,
}
for field, expected_type in required.items():
if field not in data:
errors.append(f"Missing field: '{field}'")
elif not isinstance(data[field], expected_type):
errors.append(f"'{field}' must be {expected_type.__name__}, got {type(data[field]).__name__}")
if isinstance(data.get("risk_score"), int) and not (0 <= data["risk_score"] <= 10):
errors.append(f"'risk_score' must be 0–10, got {data['risk_score']}")
if isinstance(data.get("risk_level"), str) and data["risk_level"] not in ("Low", "Medium", "High"):
errors.append(f"'risk_level' must be Low/Medium/High, got '{data['risk_level']}'")
return errors
# ── Claude API ────────────────────────────────────────────────────────────────
def call_claude(input_data: dict) -> dict:
"""Send input to Claude and return parsed JSON output."""
try:
import anthropic
except ImportError:
print("\n ❌ 'anthropic' package not installed. Run: pip install anthropic\n")
sys.exit(1)
api_key = os.environ.get("ANTHROPIC_API_KEY")
if not api_key:
print("\n ❌ ANTHROPIC_API_KEY environment variable is not set.")
print(" Set it with: set ANTHROPIC_API_KEY=your_key_here (Windows)")
print(" Or: export ANTHROPIC_API_KEY=your_key_here (Mac/Linux)\n")
sys.exit(1)
client = anthropic.Anthropic(api_key=api_key)
prompt = CLAUDE_PROMPT.format(input_json=json.dumps(input_data, indent=2))
print(" 🤖 Calling Claude API...")
message = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}]
)
raw = message.content[0].text.strip()
try:
output = json.loads(raw)
except json.JSONDecodeError:
print(f"\n ❌ Claude returned invalid JSON:\n\n{raw}\n")
sys.exit(1)
errors = validate_output(output)
if errors:
print("\n ⚠️ Claude response failed schema validation:")
for e in errors:
print(f" • {e}")
print(f"\n Raw response:\n{raw}\n")
sys.exit(1)
return output
# ── Display ───────────────────────────────────────────────────────────────────
def load_json(relative_path: str) -> dict:
with open(os.path.join(BASE_DIR, relative_path), "r", encoding="utf-8") as f:
return json.load(f)
def print_section(title: str, items: list, fields: list) -> None:
print(f"\n {'─' * 54}")
print(f" {title}")
print(f" {'─' * 54}")
for i, item in enumerate(items, start=1):
print(f" [{i}] " + " | ".join(f"{k}: {item.get(k, '')}" for k in fields))
def display_output(output: dict) -> None:
score = output.get("risk_score", "?")
level = output.get("risk_level", "Unknown")
badge = {"High": "🔴", "Medium": "🟡", "Low": "🟢"}.get(level, "⚪")
print("\n" + "═" * 58)
print(" SALESFORCE DEPLOYMENT FAILURE ANALYSER — Result")
print("═" * 58)
print(f" Risk Score : {score}/10")
print(f" Risk Level : {badge} {level}")
print_section("RISKS IDENTIFIED", output.get("risks", []),
["severity", "component", "issue"])
print_section("ROOT CAUSES", output.get("root_causes", []),
["component", "cause"])
print_section("RECOMMENDATIONS", output.get("recommendations", []),
["priority", "action"])
print("\n" + "═" * 58 + "\n")
# ── Argument parsing ──────────────────────────────────────────────────────────
def parse_args():
parser = argparse.ArgumentParser(
prog="main.py",
description="Salesforce Deployment Failure Analyser — powered by Claude",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
examples:
python main.py interactive menu (mocked)
python main.py 1 preset failure scenario (mocked)
python main.py 2 --live preset + live Claude API
python main.py --input mydata.json --live custom JSON + Claude API
"""
)
parser.add_argument(
"scenario", nargs="?", choices=["1", "2", "3"],
help="Preset scenario: 1=Failure, 2=Medium, 3=Healthy"
)
parser.add_argument(
"--input", metavar="FILE",
help="Path to a custom DevOps metrics JSON file"
)
parser.add_argument(
"--live", action="store_true",
help="Call Claude API (requires ANTHROPIC_API_KEY env var)"
)
return parser.parse_args()
# ── Main ──────────────────────────────────────────────────────────────────────
def main() -> None:
args = parse_args()
mode_label = "Live — Claude API" if args.live else "Mocked — pre-generated output"
print("\n╔════════════════════════════════════════════════════════╗")
print("║ Salesforce Deployment Failure Analyser ║")
print(f"║ Mode: {mode_label:<47}║")
print("╚════════════════════════════════════════════════════════╝\n")
# ── Custom input file ──────────────────────────────────────────────────
if args.input:
input_path = os.path.abspath(args.input)
if not os.path.exists(input_path):
print(f" ❌ File not found: {input_path}\n")
sys.exit(1)
with open(input_path, "r", encoding="utf-8") as f:
try:
input_data = json.load(f)
except json.JSONDecodeError as e:
print(f" ❌ Invalid JSON in {input_path}:\n {e}\n")
sys.exit(1)
errors = validate_input(input_data)
if errors:
print(" ❌ Input validation failed:")
for err in errors:
print(f" • {err}")
print()
sys.exit(1)
print(f" ✅ Input file: {input_path}")
print("\n 📥 Data:")
print(" " + json.dumps(input_data, indent=4).replace("\n", "\n "))
if not args.live:
print("\n ⚠️ Custom input requires --live to call Claude.")
print(" Run: python main.py --input <file> --live\n")
sys.exit(0)
output_data = call_claude(input_data)
display_output(output_data)
return
# ── Preset scenario ────────────────────────────────────────────────────
choice = args.scenario
if not choice:
print(" Select a scenario:\n")
for key, s in SCENARIOS.items():
print(f" [{key}] {s['label']}")
print()
choice = input(" Enter choice (1 / 2 / 3): ").strip()
if choice not in SCENARIOS:
print(f"\n ❌ Invalid choice '{choice}'. Enter 1, 2, or 3.\n")
sys.exit(1)
scenario = SCENARIOS[choice]
print(f" ✅ Scenario: {scenario['label']}")
input_data = load_json(scenario["input"])
print("\n 📥 Input:")
print(" " + json.dumps(input_data, indent=4).replace("\n", "\n "))
if args.live:
output_data = call_claude(input_data)
else:
print("\n 🤖 Loading mocked Claude analysis...")
output_data = load_json(scenario["output"])
display_output(output_data)
if __name__ == "__main__":
main()