|
1 | 1 | from cumulusci.tasks.salesforce import BaseSalesforceApiTask |
2 | 2 |
|
3 | 3 |
|
4 | | -class GetAvailableLicenses(BaseSalesforceApiTask): |
| 4 | +class BaseUserLicenseAwareTask(BaseSalesforceApiTask): |
| 5 | + def get_available_user_licenses(self): |
| 6 | + """Fetch active user licenses with availability.""" |
| 7 | + query = """ |
| 8 | + SELECT Id, LicenseDefinitionKey, TotalLicenses, UsedLicenses |
| 9 | + FROM UserLicense |
| 10 | + WHERE Status = 'Active' |
| 11 | + """ |
| 12 | + return { |
| 13 | + lic["Id"]: lic |
| 14 | + for lic in self.sf.query(query)["records"] |
| 15 | + if lic["TotalLicenses"] > lic["UsedLicenses"] |
| 16 | + } |
| 17 | + |
| 18 | + def _log_list(self, title, items): |
| 19 | + self.logger.info(f"{title} ({len(items)}):\n" + "\n".join(f"- {item}" for item in items)) |
| 20 | + |
| 21 | +class GetAvailableLicenses(BaseUserLicenseAwareTask): |
5 | 22 | def _run_task(self): |
6 | 23 | self.return_values = [ |
7 | 24 | result["LicenseDefinitionKey"] |
8 | | - for result in self.sf.query("SELECT LicenseDefinitionKey FROM UserLicense")[ |
9 | | - "records" |
10 | | - ] |
| 25 | + for result in self.get_available_user_licenses().values() |
11 | 26 | ] |
12 | 27 | licenses = "\n".join(self.return_values) |
13 | 28 | self.logger.info(f"Found licenses:\n{licenses}") |
@@ -35,11 +50,16 @@ def _run_task(self): |
35 | 50 | self.logger.info(f"Found permission licenses sets assigned:\n{permsets}") |
36 | 51 |
|
37 | 52 |
|
38 | | -class GetAvailablePermissionSets(BaseSalesforceApiTask): |
| 53 | +class GetAvailablePermissionSets(BaseUserLicenseAwareTask): |
39 | 54 | def _run_task(self): |
40 | | - self.return_values = [ |
41 | | - result["Name"] |
42 | | - for result in self.sf.query_all("SELECT Name FROM PermissionSet")["records"] |
| 55 | + license_data = self.get_available_user_licenses() |
| 56 | + permsets = self.sf.query_all("SELECT LicenseId, Name FROM PermissionSet")["records"] |
| 57 | + available_permsets = [ |
| 58 | + ps["Name"] |
| 59 | + for ps in permsets |
| 60 | + if not ps["LicenseId"] or ps["LicenseId"] in license_data |
43 | 61 | ] |
| 62 | + |
| 63 | + self.return_values = available_permsets |
44 | 64 | permsets = "\n".join(self.return_values) |
45 | 65 | self.logger.info(f"Found Permission Sets:\n{permsets}") |
0 commit comments