-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdata_usage_analyzer.py
More file actions
958 lines (896 loc) · 48.5 KB
/
data_usage_analyzer.py
File metadata and controls
958 lines (896 loc) · 48.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
#!/usr/bin/env python3
"""
Salesforce Data Usage Analyzer
Analyzes object and field usage across Salesforce orgs: describes all sobjects/fields,
queries data to compute filled counts per field, and outputs one CSV per object (tabs)
with rows = fields, columns = usage (FieldApiName, FieldLabel, TotalRecords, Filled, UsagePct),
plus a combined JSON keyed by object. Uses OAuth with local callback server.
Requirements: Python 3.7+, requests
"""
import requests
import webbrowser
import json
import urllib.parse
import sys
import argparse
import base64
import hashlib
import secrets
import threading
import glob
import os
import re
import csv
import shutil
import time
from datetime import datetime
from http.server import HTTPServer, BaseHTTPRequestHandler, SimpleHTTPRequestHandler
from typing import List, Dict, Optional, Tuple, Any
# Default objects for quick mode when config objects list is empty
QUICK_DEFAULT_OBJECTS = ["Lead", "Account", "Contact", "Case", "Opportunity"]
# Objects to exclude from analysis by default (not queryable via normal SOQL or not useful for field usage).
# Only standard Salesforce objects are listed here. Managed-package and customer custom objects are NOT excluded
# by default; use config "exclusion_objects" to add org-specific exclusions (e.g. from temp/failed_objects.json).
# Pattern-based exclusions (see DEFAULT_EXCLUDE_OBJECT_PATTERNS) handle *ChangeEvent, *__e, *History, *Share,
# *Snapshot, *Feed, *Tag, *TeamMember, *Relation, *Definition, *Metric, *Access, *Link, *Member, *View, *Log, *Stream, *Type, *Info.
# Explicit list below = standard objects that don't match those suffixes.
DEFAULT_EXCLUDE_OBJECTS = {
"AccountContactRole", "AccountPartner", "ActionLinkGroupTemplate", "ActionLinkTemplate",
"ActionPlan", "ActionPlanItem", "ActionPlanItemDependency", "ActionPlanTemplate", "ActionPlanTemplateItem",
"ActionPlanTemplateItemValue", "ActionPlanTemplateVersion", "ActionPlanTmplPkgConfig", "ActionPlnTmplItmDependency",
"ActivityUsrConnectionStatus", "AdditionalNumber", "AgentWork", "AgentWorkSkill", "AggregateResult",
"AIApplication", "AIApplicationConfig", "AIInsightAction", "AIInsightFeedback", "AIInsightReason", "AIInsightValue",
"AIRecordInsight", "AIPredictionEvent", "Announcement", "ApiEvent", "AppAnalyticsQueryRequest", "AppDefinition",
"AppMenuItem", "AsyncOperationEvent", "AsyncOperationStatus", "AssignmentRule", "AssociatedLocation", "Attachment",
"Audience", "AuraDefinitionBundle", "AuthConfig", "AuthConfigProviders", "AuthProvider", "AuthSession",
"AuthorizationForm", "AuthorizationFormConsent", "AuthorizationFormDataUse", "AuthorizationFormText",
"BackgroundOperation", "BatchDataSource", "BatchDataSrcFilterCriteria", "BatchJob", "BatchJobPart",
"BatchJobPartFailedRecord", "BatchApexErrorEvent", "BatchJobStatusChangedEvent", "BrandTemplate", "BrandingSet",
"BrandingSetProperty", "BriefcaseAssignment", "BriefcaseRule", "BriefcaseRuleFilter", "BroadcastTopic",
"BroadcastTopicCollabRoom", "BroadcastTopicUserRole", "BusinessBrand", "BusinessHours", "BusinessProcess",
"CombinedAttachment", "ContentFolderItem", "DataObjectDataChgEvent", "DataStatistics",
"EmailBounceEvent", "EmailStatus", "EntityParticle", "ExperienceDiagnosticEvent", "FeedLike", "FeedSignal",
"FeedTrackedChange", "FlexQueueItem", "FlowExecutionErrorEvent", "FlowOrchestrationEvent", "FlowInterview",
"FolderedContentDocument", "ListViewChartInstance", "LookedUpFromActivity", "MLEngagementEvent", "Name",
"NetworkUserHistoryRecent", "NoteAndAttachment", "OpenActivity", "OrgLifecycleNotification", "Organization",
"OwnedContentDocument", "PlatformAction", "PlatformStatusAlertEvent", "QuoteTemplateRichTextData", "RecentFieldChange",
"RelationshipDomain", "RelationshipInfo", "Report", "ReportEvent", "SalesforceContract", "SalesforceInvoice",
"SalesforceQuote", "SearchLayout", "SetupAuditTrail", "SiteDetail", "SlackAppConversationEvent",
"UserAppMenuCustomization", "Vote", "ApexClass", "ApexComponent", "ApexEmailNotification", "ApexPage",
"ApexTestQueueItem", "ApexTestResultLimits", "ApexTestRunResult", "ApexTestSuite", "ApexTrigger", "ApexTypeImplementor",
"ApexTestResult", "ApexCodeCoverage", "ApexCodeCoverageAggregate", "ApexExecutionOverlayAction", "Dashboard",
"CronTrigger", "CronJobDetail", "LogFile", "UriEvent", "TabDefinition", "LoginGeo", "LoginEvent", "UnifiedActivity",
"CallCenter", "CallCoachingMediaProvider", "CallDisposition", "CallDispositionCategory",
"CaseSolution", "CaseStatus", "CaseSubjectParticle", "CaseTeamRole", "CaseTeamTemplate", "CaseTeamTemplateRecord",
"CategoryData", "CategoryNode", "ChatterActivity", "ChatterExtension", "ChatterExtensionConfig", "ClientBrowser",
"CollaborationGroup", "CollaborationGroupMemberRequest", "CollaborationGroupRecord", "CollaborationInvitation",
"CollaborationRoom", "CommSubscription", "CommSubscriptionConsent", "CommSubscriptionTiming", "Community",
"ConferenceNumber", "ConnectedApplication", "ContactPointAddress", "ContactPointConsent", "ContactPointEmail",
"ContactPointPhone", "ContactPointTypeConsent", "ContactRequest", "ContentAsset", "ContentBody", "ContentDistribution",
"ContentDocument", "ContentDocumentSubscription", "ContentFolder", "ContentNote", "ContentNotification",
"ContentTagSubscription", "ContentUserSubscription", "ContentVersion", "ContentVersionComment", "ContentVersionRating",
"ContentWorkspace", "ContentWorkspaceDoc", "ContentWorkspacePermission", "ContentWorkspaceSubscription",
"ContractContactRole", "ContractStatus", "CorsWhitelistEntry", "CspTrustedSite", "CustomBrand", "CustomBrandAsset",
"CustomHelpMenuItem", "CustomHelpMenuSection", "CustomHttpHeader", "CustomPermission", "CustomPermissionDependency",
"CustomSetup", "DashboardComponent", "DataIntegrationRecordPurchasePermission", "DataUseLegalBasis", "DataUsePurpose",
"DataWeaveResource", "DeleteEvent", "Document", "DocumentAttachmentMap", "Domain", "DomainSite",
"DuplicateRecordItem", "DuplicateRecordSet", "DuplicateRule", "EmailCapture", "EmailDomainFilter", "EmailDomainKey",
"EmailMessage", "EmailRelay", "EmailRoutingAddress", "EmailServicesAddress", "EmailServicesFunction", "EmailTemplate",
"EmbeddedServiceDetail", "EmbeddedServiceLabel", "EngmtChannelTypeSettings", "EnhancedLetterhead", "EntitySubscription",
"FeedAttachment", "FeedComment", "FeedItem", "FeedPollChoice", "FeedPollVote", "FeedRevision",
"FieldPermissions", "FieldSecurityClassification", "FileSearchActivity", "FiscalYearSettings",
"FlowInterviewLogEntry", "FlowOrchestrationInstance", "FlowOrchestrationStageInstance", "FlowOrchestrationStepInstance",
"FlowOrchestrationWorkItem", "FlowRecord", "FlowRecordElement", "FlowRecordVersion", "FlowRecordVersionOccurrence",
"FlowTestResult", "Folder", "ForecastingAdjustment", "ForecastingCategoryMapping", "ForecastingCustomData",
"ForecastingDisplayedFamily", "ForecastingFact", "ForecastingFilter", "ForecastingFilterCondition", "ForecastingGroup",
"ForecastingGroupItem", "ForecastingItem", "ForecastingOwnerAdjustment", "ForecastingQuota", "ForecastingTypeSource",
"ForecastingTypeToCategory", "ForecastingUserPreference", "FormulaFunction", "FormulaFunctionCategory",
"GrantedByLicense", "Group", "Holiday", "IPAddressRange", "IframeWhiteListUrl", "Image", "Individual",
"InstalledMobileApp", "KnowledgeArticle", "KnowledgeArticleVersion", "KnowledgeArticleViewStat", "KnowledgeArticleVoteStat",
"KnowledgeableUser", "LightningExperienceTheme", "LightningOnboardingConfig", "LinkedArticle", "ListEmail",
"ListEmailIndividualRecipient", "ListEmailRecipientSource", "ListViewChart", "Location", "LocationTrustMeasure",
"LoginIp", "MLModel", "MLModelFactor", "MLModelFactorComponent", "Macro", "MacroInstruction", "MacroUsage",
"MailmergeTemplate", "ManagedContent", "ManagedContentChannel", "ManagedContentSpace", "ManagedContentVariant",
"MatchingInformation", "MatchingRule", "MatchingRuleItem",
"MobSecurityCertPinConfig", "MobileApplicationDetail", "MobileSecurityAssignment", "MobileSecurityPolicy",
"MutingPermissionSet", "MyDomainDiscoverableLogin", "NamedCredential", "NavigationLinkSet", "NavigationMenuItem",
"Network", "NetworkActivityAudit", "NetworkAffinity", "NetworkAuthApiSettings", "NetworkDataCategory",
"NetworkDiscoverableLogin", "NetworkEmailTmplAllowlist", "NetworkMemberGroup", "NetworkModeration",
"NetworkPageOverride", "NetworkSelfRegistration", "OauthCustomScope", "OauthCustomScopeApp", "OauthToken",
"OauthTokenExchHandlerApp", "OauthTokenExchangeHandler", "ObjectDataImport", "ObjectDataImportReference",
"ObjectPermissions", "ObjectRelatedUrl", "OmniRoutingEventStore", "OmniSupervisorConfig", "OmniSupervisorConfigAction",
"OmniSupervisorConfigGroup", "OmniSupervisorConfigProfile", "OmniSupervisorConfigQueue", "OmniSupervisorConfigSkill",
"OmniSupervisorConfigTab", "OmniSupervisorConfigUser", "OnboardingMetrics", "OperatingHours", "OperatingHoursHoliday",
"OpportunityCompetitor", "OpportunityContactRole", "OpportunityPartner", "OpportunityStage",
"OrgEmailAddressSecurity", "OrgWideEmailAddress", "PackageLicense", "Partner", "PartnerRole", "PartyConsent",
"PendingServiceRouting", "Period", "PermissionSet", "PermissionSetAssignment", "PermissionSetGroup",
"PermissionSetGroupComponent", "PermissionSetLicense", "PermissionSetLicenseAssign", "PermissionSetTabSetting",
"PipelineInspMetricConfig", "PipelineInspectionSumField", "PlatformCachePartition",
"PresenceConfigDeclineReason", "PresenceDeclineReason", "PresenceUserConfig", "PresenceUserConfigProfile",
"PresenceUserConfigUser", "ProcessException", "ProcessExceptionEvent", "ProcessFlowMigration", "ProcessInstance", "ProcessInstanceNode",
"ProcessInstanceStep", "ProcessInstanceWorkitem", "ProcessNode", "Profile", "ProfileCallLogQuickAction",
"ProfileSkill", "ProfileSkillEndorsement", "ProfileSkillUser", "Prompt", "PromptAction", "PromptError",
"PromptVersion", "Publisher", "PushTopic", "QueueRoutingConfig", "QueueSobject", "QuickText", "QuickTextUsage",
"QuoteDocument", "RecentlyViewed", "Recommendation", "RecommendationResponse", "RecordAction", "RedirectWhitelistUrl",
"ReputationLevel", "ReputationPointsRule", "SPSamlAttributes", "SalesWorkQueueSettings", "SamlSsoConfig",
"Scontrol", "SearchActivity", "SearchPromotionRule", "SecurityCustomBaseline", "Seller", "ServiceChannel",
"ServiceChannelStatus", "ServicePresenceStatus", "ServiceResource", "ServiceResourceSkill", "ServiceSetupProvisioning",
"SessionPermSetActivation", "SetupAssistantStep", "SharingRecordCollection", "SharingRecordCollectionItem",
"Site", "SiteIframeWhiteListUrl", "SiteRedirectMapping", "Skill", "SkillRequirement", "SocialPersona", "SocialPost",
"Solution", "SolutionStatus", "Stamp", "StampAssignment", "StaticResource", "TaskPriority", "TaskStatus",
"TenantUsageEntitlement", "TestSuiteMembership", "TimeSlot", "TodayGoal", "Topic", "TopicAssignment", "Translation",
"UserAccessChange", "UserAccessPolicy", "UserAppMenuItem", "UserCustomBadge", "UserDefinedLabel",
"UserDefinedLabelAssignment", "UserEmailCalendarSync", "UserEmailPreferredPerson", "UserLicense",
"UserListViewCriterion", "UserLogin", "UserPackageLicense", "UserPreference", "UserPrioritizedRecord",
"UserProvAccount", "UserProvAccountStaging", "UserProvMockTarget", "UserProvisioningConfig", "UserProvisioningRequest",
"UserRole", "UserServicePresence", "VideoCall", "VideoCallParticipant", "VideoCallRecording", "VideoVendorAdminConsent",
"VisualforceAccessMetrics", "VoiceCall", "VoiceCallRecording", "VoiceOrgSetting", "WaveAutoInstallRequest",
"WaveCompatibilityCheckItem",
}
# Name patterns: excluded so we don't hardcode managed-package or customer-specific names.
# Custom objects end with __c so they are never matched by these patterns.
DEFAULT_EXCLUDE_OBJECT_PATTERNS = [
re.compile(r"ChangeEvent$"), # CDC
re.compile(r"__e$"), # platform events
re.compile(r"History$"), # standard history (AccountHistory, etc.)
re.compile(r"Share$"), # manual share tables (AccountShare, etc.)
re.compile(r"Snapshot$"), # reporting/field snapshots
re.compile(r"Feed$"), # Chatter feed tables (AccountFeed, etc.)
re.compile(r"Tag$"), # tag tables (AccountTag, etc.)
re.compile(r"TeamMember$"), # team member tables (AccountTeamMember, etc.)
re.compile(r"Relation$"), # relation tables (EventRelation, AcceptedEventRelation, etc.)
re.compile(r"Definition$"), # metadata definitions (FieldDefinition, ColorDefinition, etc.)
re.compile(r"Metric$"), # metric objects (ActivityMetric, etc.)
re.compile(r"Access$"), # access tables (UserRecordAccess, UserEntityAccess, etc.)
re.compile(r"Link$"), # link tables (ContentDocumentLink, etc.)
re.compile(r"Member$"), # member tables (AppTabMember, ContentFolderMember, etc.)
re.compile(r"View$"), # list/flow views (ListView, FlowVersionView, etc.)
re.compile(r"Log$"), # log objects (ApexLog, etc.)
re.compile(r"Stream$"), # event streams (LogoutEventStream, etc.)
re.compile(r"Type$"), # type metadata (DataType, RecordType, etc.)
re.compile(r"Info$"), # info metadata (PicklistValueInfo, OwnerChangeOptionInfo, etc.)
]
# Built-in PII field type names and name patterns to mask (config pii_fields extends this)
DEFAULT_PII_FIELD_TYPES = {"email", "phone", "url"}
DEFAULT_PII_NAME_PATTERNS = [
re.compile(r"email", re.I),
re.compile(r"phone", re.I),
re.compile(r"ssn|social", re.I),
]
def _prompt(prompt: str, default: Optional[str] = None) -> str:
"""Prompt for input; return value or default. If user types 'exit', exit the tool."""
value = input(prompt).strip()
if value.lower() == "exit":
print("Exiting.")
sys.exit(0)
if default is not None and value == "":
return default
return value
class CallbackHandler(BaseHTTPRequestHandler):
def do_GET(self):
path_only = self.path.split("?")[0].rstrip("/") or "/"
if path_only == "/callback":
query_params = urllib.parse.parse_qs(urllib.parse.urlparse(self.path).query)
if "code" in query_params:
self.server.auth_code = query_params["code"][0]
if self.server.auth_received_cb:
self.server.auth_received_cb()
self.server.auth_event.set()
self.send_response(200)
self.send_header("Content-type", "text/html")
self.end_headers()
self.wfile.write(
b"<html><body><h1>Authentication Successful!</h1>"
b"<p>You can close this window and return to the terminal.</p></body></html>"
)
elif "error" in query_params:
error = query_params["error"][0]
error_desc = query_params.get("error_description", ["Unknown error"])[0]
self.server.auth_error = f"{error}: {error_desc}"
if self.server.auth_received_cb:
self.server.auth_received_cb()
self.server.auth_event.set()
self.send_response(400)
self.send_header("Content-type", "text/html")
self.end_headers()
self.wfile.write(
f"<html><body><h1>Authentication Failed</h1><p>{error}: {error_desc}</p></body></html>".encode()
)
else:
self.send_response(400)
self.send_header("Content-type", "text/html")
self.end_headers()
self.wfile.write(
b"<html><body><h1>Unexpected callback</h1><p>No code or error in URL.</p></body></html>"
)
else:
self.send_response(404)
self.end_headers()
def log_message(self, format, *args):
pass
class DataUsageAnalyzer:
def __init__(self):
self.instance_url = None
self.access_token = None
self.api_version = "v60.0"
self.log_file = None
self._log_handle = None
self.session_id = datetime.now().strftime("%Y%m%d_%H%M%S")
self.client_id = None
self.client_secret = None
self.session = requests.Session()
self._describe_global_cache = None
self.ensure_configs_folder()
def ensure_configs_folder(self) -> None:
configs_dir = "configs"
os.makedirs(configs_dir, exist_ok=True)
def setup_logging(self) -> None:
logs_dir = "logs"
os.makedirs(logs_dir, exist_ok=True)
log_filename = os.path.join(logs_dir, f"data_usage_{self.session_id}.log")
if self._log_handle:
try:
self._log_handle.close()
except OSError:
pass
self._log_handle = None
self.log_file = log_filename
self._log_handle = open(log_filename, "w", encoding="utf-8")
self._log_handle.write(f"=== Salesforce Data Usage Analyzer Log ===\n")
self._log_handle.write(f"Session ID: {self.session_id}\n")
self._log_handle.write(f"Started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
self._log_handle.write("=" * 50 + "\n\n")
self._log_handle.flush()
self._print_and_log(f"Logging to: {log_filename}", mask=False)
def _print_and_log(self, message: str, mask: bool = True) -> None:
print(message)
if not self._log_handle:
return
if mask:
message = self.mask_sensitive_data(message)
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self._log_handle.write(f"[{timestamp}] {message}\n")
self._log_handle.flush()
def log_message(self, message: str, mask_sensitive: bool = True) -> None:
if not self._log_handle:
return
if mask_sensitive:
message = self.mask_sensitive_data(message)
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self._log_handle.write(f"[{timestamp}] {message}\n")
self._log_handle.flush()
def mask_sensitive_data(self, text: str, pii_fields_set: Optional[set] = None) -> str:
"""Mask tokens, client credentials, and PII. Config pii_fields extends built-in."""
# Auth
text = re.sub(
r'client_id["\']?\s*[:=]\s*["\']?[A-Za-z0-9]{15,}',
'client_id="***MASKED***"',
text,
)
text = re.sub(
r'client_secret["\']?\s*[:=]\s*["\']?[A-Za-z0-9]{15,}',
'client_secret="***MASKED***"',
text,
)
text = re.sub(
r'access_token["\']?\s*[:=]\s*["\']?[A-Za-z0-9]{50,}',
'access_token="***MASKED***"',
text,
)
text = re.sub(
r'code["\']?\s*[:=]\s*["\']?[A-Za-z0-9]{20,}',
'code="***MASKED***"',
text,
)
# Generic email/phone/SSN in log text
text = re.sub(
r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b",
"***EMAIL***",
text,
)
text = re.sub(r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b", "***PHONE***", text)
text = re.sub(r"\b\d{3}-\d{2}-\d{4}\b", "***SSN***", text)
return text
def load_config_file(self, config_path: str) -> Optional[Dict]:
"""Load and validate single-org config. Returns config dict or None."""
try:
path = config_path
if not os.path.isabs(config_path) and not os.path.isfile(config_path):
path = os.path.join("configs", config_path)
with open(path, "r", encoding="utf-8") as f:
config = json.load(f)
# Required
for key in ("instance", "client_id"):
if not config.get(key):
raise ValueError(f"Missing required field: {key}")
# Normalize instance URL
instance = config["instance"].strip()
if not instance.startswith("http"):
instance = "https://" + instance
if ".my.salesforce.com" not in instance:
instance = instance.rstrip("/") + ".my.salesforce.com"
config["instance"] = instance
config.setdefault("client_secret", "")
config.setdefault("callback_port", 8080)
config.setdefault("mode", "quick")
config.setdefault("objects", [])
config.setdefault("object_scope", ["all"])
config.setdefault("pii_fields", [])
config.setdefault("exclusion_fields", [])
config.setdefault("exclusion_objects", [])
if isinstance(config["object_scope"], str):
config["object_scope"] = [config["object_scope"]]
return config
except FileNotFoundError:
self._print_and_log(f"Configuration file not found: {config_path}", mask=False)
return None
except json.JSONDecodeError as e:
self._print_and_log(f"Invalid JSON in config: {e}", mask=False)
return None
except ValueError as e:
self._print_and_log(f"Config validation error: {e}", mask=False)
return None
def list_existing_configs(self) -> List[str]:
configs_dir = "configs"
if not os.path.exists(configs_dir):
return []
return sorted(
f for f in os.listdir(configs_dir)
if f.endswith(".json") and f != "config_example.json"
)
def get_user_input(self, silent: bool = False, config_path: Optional[str] = None) -> Optional[Dict]:
"""Get config: from file (silent or interactive) or walk user through when no configs."""
print("=== Salesforce Data Usage Analyzer ===")
print("Type 'exit' at any prompt to quit.\n")
existing = self.list_existing_configs()
if silent:
if not config_path:
print("--silent requires --config. Example: python data_usage_analyzer.py --silent --config configs/your_config.json")
return None
config = self.load_config_file(config_path)
return config
# Interactive: no configs -> walk through and offer save or run without saving
if not existing:
print("No configuration files found. Walking through setup.\n")
instance = _prompt("Salesforce instance URL (e.g. mycompany.my.salesforce.com): ").strip()
if not instance:
print("Instance is required.")
return None
if not instance.startswith("http"):
instance = "https://" + instance
if ".my.salesforce.com" not in instance:
instance = instance.rstrip("/") + ".my.salesforce.com"
client_id = _prompt("Client ID (Consumer Key): ").strip()
if not client_id:
print("Client ID is required.")
return None
client_secret = _prompt("Client Secret [optional]: ", default="")
port_str = _prompt("Callback port [default 8080]: ", default="8080")
try:
port = int(port_str) if port_str else 8080
except ValueError:
port = 8080
mode = _prompt("Mode: quick or full [default quick]: ", default="quick").lower() or "quick"
if mode not in ("quick", "full"):
mode = "quick"
objects_str = _prompt("Object API names (comma-separated, empty for defaults): ", default="")
objects = [x.strip() for x in objects_str.split(",") if x.strip()] if objects_str else []
config = {
"instance": instance,
"client_id": client_id,
"client_secret": client_secret,
"callback_port": port,
"mode": mode,
"objects": objects,
"object_scope": ["all"],
"pii_fields": [],
"exclusion_fields": [],
}
save = _prompt("Save this configuration for next time? (y/n): ", default="n").lower()
if save in ("y", "yes"):
name = _prompt("Config filename (e.g. myorg): ", default="").strip()
if name:
if not name.endswith(".json"):
name += ".json"
path = os.path.join("configs", name)
with open(path, "w", encoding="utf-8") as f:
json.dump(config, f, indent=2)
self._print_and_log(f"Saved to {path}", mask=False)
return config
# Has configs: offer to use one or enter path
print("Available configuration files:")
for i, cfg in enumerate(existing, 1):
print(f" {i}. {cfg}")
print(f" {len(existing) + 1}. Enter path")
choice = _prompt(f"Select (1-{len(existing) + 1}): ", default="1")
try:
idx = int(choice)
if 1 <= idx <= len(existing):
config = self.load_config_file(os.path.join("configs", existing[idx - 1]))
return config
if idx == len(existing) + 1:
path = _prompt("Path to config file: ", default="").strip()
if path:
return self.load_config_file(path)
except ValueError:
pass
return self.load_config_file(os.path.join("configs", existing[0]))
def authenticate(self, instance_url: str, client_id: str, client_secret: str, port: int = 8080, callback_host: str = "localhost", silent: bool = False) -> bool:
"""OAuth 2.0 Web Server Flow with PKCE and local callback server.
Use callback_host='127.0.0.1' in config if auth gets stuck (e.g. localhost resolving to IPv6)."""
if not silent:
print("\n=== Authentication ===")
print(f"Starting local callback server on port {port}...")
host_for_uri = callback_host if callback_host else "localhost"
redirect_uri = f"http://{host_for_uri}:{port}/callback"
code_verifier = base64.urlsafe_b64encode(secrets.token_bytes(32)).decode("utf-8").rstrip("=")
code_challenge = base64.urlsafe_b64encode(hashlib.sha256(code_verifier.encode("utf-8")).digest()).decode("utf-8").rstrip("=")
def on_callback_received():
if not silent:
print("Callback received from Salesforce; completing authentication...")
try:
# Bind: use 127.0.0.1 when callback_host is set to avoid IPv6 localhost issues
bind_host = "127.0.0.1" if (callback_host and callback_host.strip() == "127.0.0.1") else ""
server = HTTPServer((bind_host, port), CallbackHandler)
server.auth_code = None
server.auth_error = None
server.auth_received_cb = on_callback_received
server.auth_event = threading.Event()
thread = threading.Thread(target=server.serve_forever)
thread.daemon = True
thread.start()
except OSError as e:
if e.errno == 48:
print(f"Port {port} is already in use. Try another port (e.g. set callback_port in config).")
else:
print(f"Failed to start server: {e}")
return False
try:
auth_params = {
"response_type": "code",
"client_id": client_id,
"redirect_uri": redirect_uri,
"scope": "api refresh_token",
"code_challenge": code_challenge,
"code_challenge_method": "S256",
}
auth_url = instance_url + "/services/oauth2/authorize?" + urllib.parse.urlencode(auth_params)
if not silent:
print("Opening browser for authentication...")
print("If the browser did not open, copy this URL into your browser:")
print(f" {auth_url}")
print("Complete sign-in in the browser; this window will continue when done.")
sys.stdout.flush()
webbrowser.open(auth_url)
timeout = 300
server.auth_event.wait(timeout=timeout)
if server.auth_error:
print(f"Authentication failed: {server.auth_error}")
return False
if server.auth_code is None:
if not silent:
print("Authentication timed out or cancelled.")
print("Tip: In your Salesforce Connected App, set Callback URL to exactly:")
print(f" http://localhost:{port}/callback")
print(" (or http://127.0.0.1:{port}/callback if localhost does not work)")
return False
auth_code = server.auth_code
token_url = instance_url + "/services/oauth2/token"
token_data = {
"grant_type": "authorization_code",
"client_id": client_id,
"redirect_uri": redirect_uri,
"code": auth_code,
"code_verifier": code_verifier,
}
if client_secret:
token_data["client_secret"] = client_secret
resp = self.session.post(token_url, data=token_data)
resp.raise_for_status()
token_response = resp.json()
self.access_token = token_response["access_token"]
self.instance_url = instance_url
self._update_session_headers()
print("Authentication successful.")
self.log_message("Authentication successful")
return True
except requests.exceptions.RequestException as e:
print(f"Token exchange failed: {e}")
return False
except KeyError:
print("Invalid response from Salesforce.")
return False
finally:
server.shutdown()
server.server_close()
def clear_temp(self) -> None:
"""Clear temp folder at run start."""
temp_dir = "temp"
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
os.makedirs(temp_dir, exist_ok=True)
os.makedirs(os.path.join(temp_dir, "fields"), exist_ok=True)
os.makedirs(os.path.join(temp_dir, "usage"), exist_ok=True)
self._print_and_log("Cleared temp folder.")
def _headers(self) -> Dict[str, str]:
return {
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json",
}
def _update_session_headers(self) -> None:
"""Update the session's default headers after token is set."""
self.session.headers.update(self._headers())
def get_objects_to_analyze(self, config: Dict) -> List[str]:
"""Resolve list of object API names: full vs quick and config objects."""
mode = config.get("mode", "full").lower()
objects_config = config.get("objects") or []
# Get all sobjects from describe global
url = f"{self.instance_url}/services/data/{self.api_version}/sobjects"
try:
r = self.session.get(url)
r.raise_for_status()
data = r.json()
except Exception as e:
self._print_and_log(f"Describe global failed: {e}")
return []
self._describe_global_cache = data.get("sobjects", [])
sobjects = self._describe_global_cache
object_scope = config.get("object_scope") or ["all"]
if isinstance(object_scope, str):
object_scope = [object_scope]
def in_scope(sob: Dict) -> bool:
if "all" in object_scope:
pass
else:
custom = sob.get("custom", False)
manageable = (sob.get("manageableState") or "").lower()
if "standard" in object_scope and custom:
return False
if "custom" in object_scope and not custom:
return False
if "managed" in object_scope and manageable != "installed":
return False
if "unmanaged" in object_scope and (not custom or manageable == "installed"):
return False
if sob.get("customSetting"):
return False
return True
def should_exclude(name: str) -> bool:
if name in DEFAULT_EXCLUDE_OBJECTS:
return True
if any(p.search(name) for p in DEFAULT_EXCLUDE_OBJECT_PATTERNS):
return True
config_exclude = config.get("exclusion_objects") or []
if name in config_exclude:
return True
return False
all_names = [s["name"] for s in sobjects if in_scope(s) and not should_exclude(s["name"])]
if mode == "quick":
if objects_config:
return [n for n in objects_config if n in all_names]
return [n for n in QUICK_DEFAULT_OBJECTS if n in all_names]
# full
if objects_config:
return [n for n in objects_config if n in all_names]
return all_names
def describe_global_and_save_objects(self, object_names: List[str], config: Dict) -> List[Dict]:
"""Describe global, filter to object_names, save object list to temp. Uses cache from get_objects_to_analyze when available."""
if self._describe_global_cache is not None:
sobjects = {s["name"]: s for s in self._describe_global_cache}
else:
url = f"{self.instance_url}/services/data/{self.api_version}/sobjects"
try:
r = self.session.get(url)
r.raise_for_status()
data = r.json()
except Exception as e:
self._print_and_log(f"Describe global failed: {e}")
return []
sobjects = {s["name"]: s for s in data.get("sobjects", [])}
list_out = []
for name in object_names:
if name in sobjects:
sob = sobjects[name]
entry = {"name": name, "label": sob.get("label", name), "custom": sob.get("custom", False)}
if sob.get("customizable") is not None:
entry["customizable"] = sob.get("customizable")
list_out.append(entry)
path = os.path.join("temp", "objects_list.json")
with open(path, "w", encoding="utf-8") as f:
json.dump(list_out, f, indent=2)
self._print_and_log(f"Saved {len(list_out)} objects to temp/objects_list.json")
return list_out
def describe_sobject_fields(self, object_name: str) -> Optional[Dict]:
"""Get sobject describe and save to temp/fields/<Object>.json. Returns describe dict."""
url = f"{self.instance_url}/services/data/{self.api_version}/sobjects/{object_name}/describe"
try:
r = self.session.get(url)
r.raise_for_status()
data = r.json()
except Exception as e:
self._print_and_log(f"Describe {object_name} failed: {e}")
return None
out_path = os.path.join("temp", "fields", f"{object_name}.json")
with open(out_path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
return data
def get_queryable_fields(self, describe: Dict) -> List[Dict]:
"""Return list of field dicts that can be included in SELECT (not compound)."""
fields = []
for f in describe.get("fields", []):
if f.get("type") == "address":
continue
name = f.get("name")
if name and name != "Id":
fields.append(f)
return fields
def query_object_usage(self, object_name: str, field_list: List[Dict], config: Dict) -> Optional[Dict]:
"""Run SELECT for all queryable fields, stream results, count total and per-field filled. Write temp/usage/<Object>.json.
Retries the initial request once on failure (e.g. transient server error)."""
if not field_list:
return None
names = [f["name"] for f in field_list]
select_clause = "Id," + ",".join(names)
soql = f"SELECT {select_clause} FROM {object_name}"
url = f"{self.instance_url}/services/data/{self.api_version}/query"
params = {"q": soql}
total = 0
filled = {n: 0 for n in names}
next_url = None
while True:
if next_url:
req_url = next_url if next_url.startswith("http") else (self.instance_url + next_url)
r = self.session.get(req_url)
else:
r = None
last_err: Optional[Exception] = None
for _ in range(2):
try:
r = self.session.get(url, params=params)
r.raise_for_status()
last_err = None
break
except Exception as e:
last_err = e
if _ == 0:
time.sleep(2)
else:
self._print_and_log(f"Query {object_name} failed: {e}")
return None
if last_err is not None or r is None:
return None
try:
if next_url:
r.raise_for_status()
data = r.json()
except Exception as e:
self._print_and_log(f"Query {object_name} failed: {e}")
return None
records = data.get("records", [])
total += len(records)
for rec in records:
for fn in names:
val = rec.get(fn)
if val is not None and str(val).strip() != "":
filled[fn] += 1
next_url = data.get("nextRecordsUrl")
if not next_url or not records:
break
if total % 10000 == 0 and total > 0:
self._print_and_log(f" {object_name}: {total} records so far...")
result = {
"object": object_name,
"total_records": total,
"field_usage": {fn: {"filled": filled[fn], "pct": (filled[fn] / total * 100) if total else 0} for fn in names},
}
out_path = os.path.join("temp", "usage", f"{object_name}.json")
with open(out_path, "w", encoding="utf-8") as f:
json.dump(result, f, indent=2)
return result
def build_csv_and_json(self, config: Dict) -> Tuple[Optional[str], Optional[str]]:
"""Build CSV and JSON: one tab per object; rows = fields, columns = usage. Apply exclusion_fields and exclusion_objects."""
exclusion = set(config.get("exclusion_fields") or [])
exclusion_objects = set(config.get("exclusion_objects") or [])
usage_dir = os.path.join("temp", "usage")
fields_dir = os.path.join("temp", "fields")
if not os.path.exists(usage_dir):
return None, None
# Per-object: rows = fields, columns = FieldApiName, FieldLabel, TotalRecords, Filled, UsagePct
headers = ["FieldApiName", "FieldLabel", "TotalRecords", "Filled", "UsagePct"]
os.makedirs("output", exist_ok=True)
# Remove old dashboard assets from output/ (app is served from static/, output/ is data only)
for name in ("index.html", "dashboard.css", "dashboard.js", "chart.min.js"):
path = os.path.join("output", name)
if os.path.isfile(path):
try:
os.remove(path)
except OSError:
pass
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
json_by_object: Dict[str, Dict[str, Any]] = {}
csv_paths: List[str] = []
for fn in sorted(os.listdir(usage_dir)):
if not fn.endswith(".json"):
continue
obj_name = fn[:-5]
if obj_name in exclusion_objects:
continue
usage_path = os.path.join(usage_dir, fn)
with open(usage_path, "r", encoding="utf-8") as f:
usage = json.load(f)
field_usage = usage.get("field_usage", {})
total_records = usage.get("total_records", 0)
# Field labels and object metadata (label, icon, customizable) from describe
field_labels: Dict[str, str] = {}
object_label = obj_name
icon_url: Optional[str] = None
customizable: Optional[bool] = None
meta_path = os.path.join(fields_dir, f"{obj_name}.json")
if os.path.exists(meta_path):
with open(meta_path, "r", encoding="utf-8") as f:
meta = json.load(f)
for fmeta in meta.get("fields", []):
name = fmeta.get("name")
if name:
field_labels[name] = fmeta.get("label", name)
object_label = meta.get("label", obj_name)
theme_info = meta.get("themeInfo") or {}
icon_url = theme_info.get("iconUrl") or None
if meta.get("customizable") is not None:
customizable = meta.get("customizable")
if customizable is None and self._describe_global_cache:
for s in self._describe_global_cache:
if s.get("name") == obj_name and s.get("customizable") is not None:
customizable = s.get("customizable")
break
rows: List[Dict[str, Any]] = []
for fname in sorted(field_usage.keys()):
key = f"{obj_name}.{fname}"
if key in exclusion:
continue
stats = field_usage[fname]
filled = stats.get("filled", 0)
pct = stats.get("pct", 0)
rows.append({
"FieldApiName": fname,
"FieldLabel": field_labels.get(fname, fname),
"TotalRecords": total_records,
"Filled": filled,
"UsagePct": round(pct, 2),
})
if not rows:
continue
obj_payload: Dict[str, Any] = {"headers": headers, "rows": rows, "label": object_label}
if icon_url:
obj_payload["iconUrl"] = icon_url
if customizable is not None:
obj_payload["customizable"] = customizable
json_by_object[obj_name] = obj_payload
# One CSV per object (one "tab" per object)
csv_path = os.path.join("output", f"field_usage_{obj_name}_{ts}.csv")
csv_paths.append(csv_path)
with open(csv_path, "w", newline="", encoding="utf-8") as f:
w = csv.DictWriter(f, fieldnames=headers, extrasaction="ignore")
w.writeheader()
for row in rows:
w.writerow(row)
if not json_by_object:
return None, None
json_path = os.path.join("output", f"field_usage_{ts}.json")
json_filename = os.path.basename(json_path)
with open(json_path, "w", encoding="utf-8") as f:
json.dump(json_by_object, f, indent=2)
# Option C: write manifest.json for dashboard dropdown (lists latest + all available runs)
manifest = {"latest": json_filename, "files": []}
for path in sorted(glob.glob(os.path.join("output", "field_usage_*.json")), reverse=True):
name = os.path.basename(path)
if re.match(r"field_usage_\d{8}_\d{6}\.json", name):
manifest["files"].append(name)
if json_filename not in manifest["files"]:
manifest["files"].insert(0, json_filename)
manifest_path = os.path.join("output", "manifest.json")
with open(manifest_path, "w", encoding="utf-8") as f:
json.dump(manifest, f, indent=2)
self._print_and_log(f"Wrote manifest {manifest_path}")
self._print_and_log(f"Wrote {len(csv_paths)} object CSVs and {json_path}")
return (csv_paths[0] if csv_paths else None), json_path
def run(self, config: Dict) -> None:
"""Full run: clear temp, auth, get objects, describe, query, build CSV+JSON."""
self.clear_temp()
self.instance_url = config["instance"]
self.setup_logging()
port = config.get("callback_port", 8080)
callback_host = config.get("callback_host", "localhost")
if not self.authenticate(
config["instance"],
config["client_id"],
config.get("client_secret", ""),
port=port,
callback_host=callback_host,
silent=False,
):
self._print_and_log("Authentication failed. Exiting.")
return
self._print_and_log("Fetching object list...")
object_names = self.get_objects_to_analyze(config)
if not object_names:
self._print_and_log("No objects to analyze.")
return
self._print_and_log(f"Analyzing {len(object_names)} objects: {', '.join(object_names[:10])}{'...' if len(object_names) > 10 else ''}")
self.describe_global_and_save_objects(object_names, config)
failed_objects: List[str] = []
for i, obj_name in enumerate(object_names, 1):
self._print_and_log(f"[{i}/{len(object_names)}] {obj_name}: describing...")
describe = self.describe_sobject_fields(obj_name)
if not describe:
failed_objects.append(obj_name)
continue
queryable = self.get_queryable_fields(describe)
self._print_and_log(f" {obj_name}: querying {len(queryable)} fields...")
result = self.query_object_usage(obj_name, queryable, config)
if result is None:
failed_objects.append(obj_name)
if failed_objects:
msg = (
f"The following {len(failed_objects)} objects failed (describe or query). "
"Add them to exclusion_objects in your config to skip in future runs: "
) + ", ".join(sorted(failed_objects)[:30]) + ("..." if len(failed_objects) > 30 else "")
self._print_and_log(msg)
failed_path = os.path.join("temp", "failed_objects.json")
try:
with open(failed_path, "w", encoding="utf-8") as f:
json.dump(
{
"failed_objects": sorted(failed_objects),
"message": "Copy exclusion_objects from this list into your config to skip these objects on the next run.",
},
f,
indent=2,
)
self._print_and_log(f"Failed object list written to {failed_path}", mask=False)
except OSError:
pass
self._print_and_log("Building CSV and JSON...")
self.build_csv_and_json(config)
self._print_and_log("Done.")
self._serve_dashboard()
def _serve_dashboard(self, port: int = 8000) -> None:
"""Start a local HTTP server (project root) and open the dashboard (static/) in the browser. Data is in output/."""
output_dir = os.path.abspath("output")
project_root = os.path.dirname(output_dir)
static_index = os.path.join(project_root, "static", "index.html")
if not os.path.isdir(output_dir):
self._print_and_log("Output folder not found. Skipping dashboard.")
return
if not os.path.isfile(static_index):
self._print_and_log("static/index.html not found. Skipping dashboard.")
return
url = f"http://localhost:{port}/static/"
try:
class RootHandler(SimpleHTTPRequestHandler):
def __init__(self, *args, **kwargs):
super().__init__(*args, directory=project_root, **kwargs)
server = HTTPServer(("", port), RootHandler)
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
webbrowser.open(url)
self._print_and_log(f"Dashboard: {url}")
input("Press Enter to stop the dashboard server and exit.")
server.shutdown()
except OSError as e:
if e.errno == 48:
self._print_and_log(f"Port {port} in use. Open {url} in your browser.")
else:
self._print_and_log(f"Could not start dashboard server: {e}")
def main():
parser = argparse.ArgumentParser(description="Salesforce Data Usage Analyzer")
parser.add_argument("--silent", action="store_true", help="Headless: use config as-is (requires --config)")
parser.add_argument("--config", metavar="PATH", help="Path to config file (e.g. configs/myorg.json)")
args = parser.parse_args()
analyzer = DataUsageAnalyzer()
config = analyzer.get_user_input(silent=args.silent, config_path=args.config)
if not config:
sys.exit(1)
analyzer.run(config)
if __name__ == "__main__":
main()