|
1 | 1 | import hashlib |
| 2 | +import os |
2 | 3 | import signal |
3 | 4 | import time |
4 | 5 | from datetime import datetime |
5 | 6 | from typing import Literal, Optional, Sequence, TypedDict, Union |
| 7 | +from django.conf import settings |
6 | 8 | from django.core.management.base import BaseCommand |
7 | 9 | from django.db import connection, transaction |
8 | 10 | from kernelCI_app.constants.general import MAESTRO_DUMMY_BUILD_PREFIX |
9 | 11 | from kernelCI_app.helpers.logger import out |
10 | 12 | from kernelCI_app.management.commands.helpers.aggregation_helpers import simplify_status |
| 13 | +from prometheus_client import start_http_server |
11 | 14 | from kernelCI_app.models import ( |
12 | 15 | Builds, |
13 | 16 | Checkouts, |
|
17 | 20 | SimplifiedStatusChoices, |
18 | 21 | ) |
19 | 22 |
|
| 23 | +from prometheus_client import Counter |
| 24 | + |
| 25 | +AGGREGATION_RECORDS_WRITTEN = Counter( |
| 26 | + "aggregation_records_written_total", |
| 27 | + "Total number of records written to destination tables", |
| 28 | + ["table"], # values: "tree_listing", "hardware_status", "processed_items" |
| 29 | +) |
| 30 | + |
20 | 31 |
|
21 | 32 | class ListingItemCount(TypedDict): |
22 | 33 | build_pass: int |
@@ -516,6 +527,11 @@ def handle(self, *args, **options): |
516 | 527 | loop = options["loop"] |
517 | 528 | interval = options["interval"] |
518 | 529 |
|
| 530 | + metrics_port = int(os.environ.get("PROMETHEUS_METRICS_PORT", 8001)) |
| 531 | + if settings.PROMETHEUS_METRICS_ENABLED: |
| 532 | + start_http_server(metrics_port) |
| 533 | + out(f"Prometheus metrics server started on port {metrics_port}") |
| 534 | + |
519 | 535 | if loop: |
520 | 536 | signal.signal(signal.SIGTERM, self.signal_handler) |
521 | 537 | signal.signal(signal.SIGINT, self.signal_handler) |
@@ -585,6 +601,9 @@ def _process_new_processed_entries( |
585 | 601 | f"bulk_create ProcessedListingItems: n={len(new_processed_entries)} " |
586 | 602 | f"in {time.time() - t0:.3f}s" |
587 | 603 | ) |
| 604 | + AGGREGATION_RECORDS_WRITTEN.labels(table="processed_items").inc( |
| 605 | + len(new_processed_entries) |
| 606 | + ) |
588 | 607 |
|
589 | 608 | def _process_tree_listing( |
590 | 609 | self, |
@@ -640,6 +659,7 @@ def _process_tree_listing( |
640 | 659 | ) |
641 | 660 |
|
642 | 661 | out(f"Inserted {len(values)} tree_listing records in {time.time() - t0:.3f}s") |
| 662 | + AGGREGATION_RECORDS_WRITTEN.labels(table="tree_listing").inc(len(values)) |
643 | 663 |
|
644 | 664 | def _process_hardware_status( |
645 | 665 | self, |
|
0 commit comments