Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions backend/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -180,3 +180,6 @@ runtime/secrets/postgres_password_secret
# This is where the sqlite cache will be stored by default if not running on Docker.
volume_data/*.sqlite3
volume_data/*.yaml

# Temporary submissions directory (for development/testing purposes)
kernelCI_app/management/commands/tmp_submissions/*
13 changes: 12 additions & 1 deletion backend/kernelCI_app/helpers/trees.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def sanitize_tree(
"""Sanitizes a checkout that was returned by a 'treelisting-like' query

Returns a Checkout object"""

build_status = StatusCount(
PASS=checkout["pass_builds"],
FAIL=checkout["fail_builds"],
Expand Down Expand Up @@ -87,13 +88,23 @@ def sanitize_tree(
"skip": checkout["skip_boots"],
}

if isinstance(checkout.get("git_commit_tags"), str):
# Has to check if it's a string because sqlite doesn't support ArrayFields.
# So if the query came from sqlite, it will be a string.
git_commit_tags = checkout.get("git_commit_tags")
if isinstance(git_commit_tags, str):
try:
checkout["git_commit_tags"] = json.loads(checkout["git_commit_tags"])
if not isinstance(checkout["git_commit_tags"], list):
checkout["git_commit_tags"] = []
except json.JSONDecodeError:
checkout["git_commit_tags"] = []
elif git_commit_tags and isinstance(git_commit_tags, list):
first_tag = git_commit_tags[0]
if isinstance(first_tag, str):
# The git_commit_tags comes as list[str] on a normal query, but `Checkout`
# expects list[list[str]]. This is a workaround, the queries should *always*
# return a simples list[str].
checkout["git_commit_tags"] = [git_commit_tags]

return Checkout(
**checkout,
Expand Down
127 changes: 127 additions & 0 deletions backend/kernelCI_app/management/commands/helpers/denormal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
from datetime import datetime
from django.db import connections
from kernelCI_app.models import Checkouts, TreeListing


def handle_checkout_denormalization(*, buffer: list[Checkouts]) -> None:
"""Deals with the operations related to the extra tables for denormalization.

In the case of checkouts, it will update TreeListing table, and consume from PendingCheckouts.
"""

if not buffer:
return

tuple_params = [
(c.origin, c.tree_name, c.git_repository_branch, c.git_repository_url)
for c in buffer
]
flattened_list = []
for tuple in tuple_params:
flattened_list += list(tuple)

# check if the tree already exists on TreeListing // check which trees exist
query = f"""
SELECT
checkout_id,
start_time
FROM
tree_listing t
JOIN
(VALUES {','.join(["(%s, %s, %s, %s)"] * len(tuple_params))})
AS v(origin, tree_name, git_repository_branch, git_repository_url)
ON (
t.origin = v.origin
AND t.tree_name = v.tree_name
AND t.git_repository_branch = v.git_repository_branch
AND t.git_repository_url = v.git_repository_url
)
"""

with connections["default"].cursor() as cursor:
cursor.execute(query, flattened_list)
results = cursor.fetchall()

existing_checkouts_map = {r[0]: r[1] for r in results}

checkouts_for_update: list[Checkouts] = []

# results now have the list of checkout_id that *are* in the TreeListing
for checkout in buffer:
# if the checkout is in treeListing, check the start_time
if checkout.id in existing_checkouts_map:
# if newer than existing, update
checkout_start_time = datetime.fromisoformat(checkout.start_time)
if checkout_start_time >= existing_checkouts_map[checkout.id]:
checkouts_for_update.append(checkout)
# if older than existing, ignore (no action)
# if it's not on treeListing, add it
else:
checkouts_for_update.append(checkout)

if checkouts_for_update:
tree_listing_objects = [
TreeListing(
field_timestamp=checkout.field_timestamp,
checkout_id=checkout.id,
origin=checkout.origin,
tree_name=checkout.tree_name,
git_repository_url=checkout.git_repository_url,
git_repository_branch=checkout.git_repository_branch,
git_commit_hash=checkout.git_commit_hash,
git_commit_name=checkout.git_commit_name,
git_commit_tags=checkout.git_commit_tags,
start_time=checkout.start_time,
origin_builds_finish_time=checkout.origin_builds_finish_time,
origin_tests_finish_time=checkout.origin_tests_finish_time,
# Countings are defaulted to 0 when not provided
)
for checkout in checkouts_for_update
]

TreeListing.objects.bulk_create(
tree_listing_objects,
update_conflicts=True,
unique_fields=[
"origin",
"tree_name",
"git_repository_branch",
"git_repository_url",
],
update_fields=[
"field_timestamp",
"checkout_id",
"origin",
"tree_name",
"git_repository_url",
"git_repository_branch",
"git_commit_hash",
"git_commit_name",
"git_commit_tags",
"start_time",
"origin_builds_finish_time",
"origin_tests_finish_time",
"pass_builds",
"fail_builds",
"done_builds",
"miss_builds",
"skip_builds",
"error_builds",
"null_builds",
"pass_boots",
"fail_boots",
"done_boots",
"miss_boots",
"skip_boots",
"error_boots",
"null_boots",
"pass_tests",
"fail_tests",
"done_tests",
"miss_tests",
"skip_tests",
"error_tests",
"null_tests",
],
)
print(f"Updated {len(checkouts_for_update)} trees in TreeListing", flush=True)
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,17 @@
import yaml
import kcidb_io
from django.db import transaction
from kernelCI_app.management.commands.helpers.denormal import (
handle_checkout_denormalization,
)
from kernelCI_app.models import Issues, Checkouts, Builds, Tests, Incidents

from kernelCI_app.management.commands.helpers.process_submissions import (
ProcessedSubmission,
TableNames,
build_instances_from_submission,
)
from kernelCI_app.typeModels.modelTypes import MODEL_MAP, TableModels

VERBOSE = 0
LOGEXCERPT_THRESHOLD = 256 # 256 bytes threshold for logexcerpt
Expand Down Expand Up @@ -293,6 +299,29 @@ def prepare_file_data(filename, trees_name, spool_dir):
}


def consume_buffer(buffer: list[TableModels], item_type: TableNames) -> None:
"""
Consume a buffer of items and insert them into the database.
This function is called by the db_worker thread.
"""
if not buffer:
return

if item_type == "checkouts":
handle_checkout_denormalization(buffer=buffer)

model = MODEL_MAP[item_type]

t0 = time.time()
model.objects.bulk_create(
buffer,
batch_size=INGEST_BATCH_SIZE,
ignore_conflicts=True,
)
_out("bulk_create %s: n=%d in %.3fs" % (item_type, len(buffer), time.time() - t0))


# TODO: lower the complexity of this function
def db_worker(stop_event: threading.Event): # noqa: C901
"""
Worker thread that processes the database queue.
Expand All @@ -303,11 +332,11 @@ def db_worker(stop_event: threading.Event): # noqa: C901
"""

# Local buffers for batching
issues_buf = []
checkouts_buf = []
builds_buf = []
tests_buf = []
incidents_buf = []
issues_buf: list[Issues] = []
checkouts_buf: list[Checkouts] = []
builds_buf: list[Builds] = []
tests_buf: list[Tests] = []
incidents_buf: list[Incidents] = []

last_flush_ts = time.time()

Expand All @@ -331,55 +360,11 @@ def flush_buffers():
try:
# Single transaction for all tables in the flush
with transaction.atomic():
if issues_buf:
t0 = time.time()
Issues.objects.bulk_create(
issues_buf, batch_size=INGEST_BATCH_SIZE, ignore_conflicts=True
)
_out(
"bulk_create issues: n=%d in %.3fs"
% (len(issues_buf), time.time() - t0)
)
if checkouts_buf:
t0 = time.time()
Checkouts.objects.bulk_create(
checkouts_buf,
batch_size=INGEST_BATCH_SIZE,
ignore_conflicts=True,
)
_out(
"bulk_create checkouts: n=%d in %.3fs"
% (len(checkouts_buf), time.time() - t0)
)
if builds_buf:
t0 = time.time()
Builds.objects.bulk_create(
builds_buf, batch_size=INGEST_BATCH_SIZE, ignore_conflicts=True
)
_out(
"bulk_create builds: n=%d in %.3fs"
% (len(builds_buf), time.time() - t0)
)
if tests_buf:
t0 = time.time()
Tests.objects.bulk_create(
tests_buf, batch_size=INGEST_BATCH_SIZE, ignore_conflicts=True
)
_out(
"bulk_create tests: n=%d in %.3fs"
% (len(tests_buf), time.time() - t0)
)
if incidents_buf:
t0 = time.time()
Incidents.objects.bulk_create(
incidents_buf,
batch_size=INGEST_BATCH_SIZE,
ignore_conflicts=True,
)
_out(
"bulk_create incidents: n=%d in %.3fs"
% (len(incidents_buf), time.time() - t0)
)
consume_buffer(issues_buf, "issues")
consume_buffer(checkouts_buf, "checkouts")
consume_buffer(builds_buf, "builds")
consume_buffer(tests_buf, "tests")
consume_buffer(incidents_buf, "incidents")
except Exception as e:
logger.error("Error during bulk_create flush: %s", e)
finally:
Expand Down Expand Up @@ -415,7 +400,7 @@ def flush_buffers():
try:
data, metadata = item
if data is not None:
inst = build_instances_from_submission(data)
inst: ProcessedSubmission = build_instances_from_submission(data)
issues_buf.extend(inst["issues"])
checkouts_buf.extend(inst["checkouts"])
builds_buf.extend(inst["builds"])
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,23 @@
import logging
from django.utils import timezone
from typing import Any, Literal
from typing import Any, TypedDict

from django.db import IntegrityError
from pydantic import ValidationError

from kernelCI_app.models import Builds, Checkouts, Incidents, Issues, Tests
from kernelCI_app.typeModels.modelTypes import TableNames


TableNames = Literal["issues", "checkouts", "builds", "tests", "incidents"]
class ProcessedSubmission(TypedDict):
"""Stores the list of items in a single submission.
Lists can't be None but can be empty."""

issues: list[Issues]
checkouts: list[Checkouts]
builds: list[Builds]
tests: list[Tests]
incidents: list[Incidents]


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -128,12 +137,12 @@ def make_incident_instance(incident) -> Incidents:
return obj


def build_instances_from_submission(data: dict[str, Any]) -> dict[TableNames, list]:
def build_instances_from_submission(data: dict[str, Any]) -> ProcessedSubmission:
"""
Convert raw submission dicts into unsaved Django model instances, grouped by type.
Per-item errors are logged and the item is skipped, matching the previous behavior.
"""
out: dict[TableNames, list] = {
out: ProcessedSubmission = {
"issues": [],
"checkouts": [],
"builds": [],
Expand Down
Loading