diff --git a/backend/actions/actions.py b/backend/actions/actions.py deleted file mode 100644 index cf45e86d..00000000 --- a/backend/actions/actions.py +++ /dev/null @@ -1,299 +0,0 @@ -import json -import os -import traceback -from datetime import datetime -from urllib.parse import urljoin - -import redis -import requests -import yaml - -from actions.yaml_validation import Validator -from deployment.container import Container -from kubernetes.client import V1EnvVar -from models.initial_config import InitialConfig -from models.run_config import RunConfig -from models.scheduler_run import SchedulerRun -from models.trigger_run import TriggerModel, TriggerRun -from packages.vcs.vcs import VCSFactory -from utils.reporter import Reporter -from utils.utils import Utils - -container = Container() -reporter = Reporter() -utils = Utils() -r = redis.Redis() - -SUCCESS = "success" -FAILURE = "failure" -ERROR = "error" -PENDING = "pending" -LOG_TYPE = "log" -TESTSUITE_TYPE = "testsuite" -NEPH_TYPE = "neph" - - -class Actions(Validator): - _REPOS_DIR = "/zeroci/code/vcs_repos" - _BIN_DIR = "/zeroci/bin/" - run_id = None - model_obj = None - - def test_run(self, job): - """Runs tests and store the result in DB. - """ - for line in job["script"]: - if line.get("type") == "neph": - finished = self.neph_run(job_name=job["name"], line=line) - else: - finished = self.normal_run(job_name=job["name"], line=line) - if not finished: - return False - return True - - def normal_run(self, job_name, line): - status = SUCCESS - response, file_path = container.run_test(id=self.run_id, run_cmd=line["cmd"]) - result = response.stdout - type = LOG_TYPE - if response.returncode: - status = FAILURE - if file_path: - try: - result = utils.xml_parse(path=file_path, line=line["cmd"]) - type = TESTSUITE_TYPE - except: - pass - os.remove(file_path) - - name = "{job_name}: {test_name}".format(job_name=job_name, test_name=line["name"]) - self.model_obj.result.append({"type": type, "status": status, "name": name, "content": result}) - self.model_obj.save() - if response.returncode in [137, 124]: - return False - return True - - def neph_run(self, job_name, line): - status = SUCCESS - working_dir = line["working_dir"] - yaml_path = line["yaml_path"] - neph_id = f"{self.run_id}:{job_name}:{line['name']}" - cmd = f"export NEPH_RUN_ID='{neph_id}' \n cd {working_dir} \n /zeroci/bin/neph -y {yaml_path} -m CI" - response = container.execute_command(cmd=cmd, id=self.run_id) - if response.returncode: - status = FAILURE - - name = "{job_name}:{test_name}".format(job_name=job_name, test_name=line["name"]) - self.model_obj.result.append({"type": LOG_TYPE, "status": status, "name": name, "content": response.stdout}) - self.model_obj.save() - - for key in r.keys(): - key = key.decode() - if key.startswith(f"neph:{self.run_id}:{job_name}:{line['name']}"): - status = SUCCESS - logs = r.lrange(key, 0, -1) - all_logs = "" - for log in logs: - log = json.loads(log.decode()) - if log["type"] == "stderr": - status = FAILURE - all_logs += log["content"] - name = key.split(f"neph:{self.run_id}:")[-1] - self.model_obj.result.append({"type": LOG_TYPE, "status": status, "name": name, "content": all_logs}) - self.model_obj.save() - - if response.returncode in [137, 124]: - return False - return True - - def build(self, job, clone_details, job_number): - """Create VM with the required prerequisties and run installation steps to get it ready for running tests. - """ - env = self._get_run_env() - deployed = container.deploy(env=env, prerequisites=job["prerequisites"], repo_path=clone_details["remote_path"]) - installed = False - if deployed: - if job_number != 0: - self._set_bin() - response = container.ssh_command(cmd=clone_details["cmd"]) - if response.returncode: - name = "{job_name}: Clone Repository".format(job_name=job["name"]) - result = response.stdout - r.rpush(self.run_id, result) - else: - response = container.execute_command(cmd=job["install"], id=self.run_id) - if response.returncode: - name = "{job_name}: Installation".format(job_name=job["name"]) - result = response.stdout - else: - installed = True - else: - name = "{job_name}: Deploy".format(job_name=job["name"]) - result = "Couldn't deploy a container" - r.rpush(self.run_id, result) - - if not installed: - self.model_obj.result.append({"type": LOG_TYPE, "status": ERROR, "name": name, "content": result}) - self.model_obj.save() - - return deployed, installed - - def cal_status(self): - """Calculate the status of the whole tests result has been stored on the BD's id. - """ - status = SUCCESS - for result in self.model_obj.result: - if result["status"] != SUCCESS: - status = result["status"] - self.model_obj.status = status - self.model_obj.save() - - def _get_run_env(self): - """Get run environment variables. - """ - if isinstance(self.model_obj, TriggerModel): - name = self.model_obj.repo - else: - name = self.model_obj.schedule_name - run_config = RunConfig(name=name) - run_env = run_config.env - env = [] - for key in run_env.keys(): - env_var = V1EnvVar(name=key, value=run_env.get(key)) - env.append(env_var) - return env - - def _load_yaml(self): - vcs_obj = VCSFactory().get_cvn(repo=self.model_obj.repo) - script = vcs_obj.get_content(ref=self.model_obj.commit, file_path="zeroCI.yaml") - if script: - try: - return yaml.safe_load(script) - except: - msg = traceback.format_exc() - else: - msg = "zeroCI.yaml is not found on the repository's home" - - r.rpush(self.run_id, msg) - self.model_obj.result.append({"type": LOG_TYPE, "status": ERROR, "name": "Yaml File", "content": msg}) - self.model_obj.save() - return False - - def repo_clone_details(self): - """Clone repo. - """ - configs = InitialConfig() - repo_remote_path = os.path.join(self._REPOS_DIR, self.model_obj.repo) - clone_url = urljoin(configs.vcs_host, f"{self.model_obj.repo}.git") - cmd = """git clone {clone_url} {repo_remote_path} --branch {branch} - cd {repo_remote_path} - git reset --hard {commit} - """.format( - clone_url=clone_url, - repo_remote_path=repo_remote_path, - branch=self.model_obj.branch, - commit=self.model_obj.commit, - ) - clone_details = {"cmd": cmd, "remote_path": repo_remote_path} - return clone_details - - def _prepare_bin_dirs(self, bin_remote_path): - self.bin_name = bin_remote_path.split(os.path.sep)[-1] - if isinstance(self.model_obj, TriggerModel): - release = self.model_obj.commit[:7] - local_path = os.path.join(self._BIN_DIR, self.model_obj.repo, self.model_obj.branch) - else: - release = str(datetime.fromtimestamp(self.model_obj.timestamp)).replace(" ", "_")[:16] - local_path = os.path.join(self._BIN_DIR, self.model_obj.schedule_name) - - bin_release = f"{self.bin_name}_{release}" - bin_local_path = os.path.join(local_path, bin_release) - if not os.path.exists(local_path): - os.makedirs(local_path) - - return bin_local_path - - def _get_bin(self, bin_remote_path, job_number): - if bin_remote_path and job_number == 0: - bin_local_path = self._prepare_bin_dirs(bin_remote_path) - bin_release = bin_local_path.split(os.path.sep)[-1] - bin_tmp_path = os.path.join(self._BIN_DIR, bin_release) - cmd = f"cp {bin_remote_path} {bin_tmp_path}" - container.execute_command(cmd=cmd, id="", verbose=False) - container.ssh_get_remote_file(remote_path=bin_tmp_path, local_path=bin_local_path) - - if os.path.exists(bin_local_path): - self.model_obj.bin_release = bin_release - self.model_obj.save() - - def _set_bin(self): - if self.model_obj.bin_release: - bin_local_path = self._prepare_bin_dirs(self.bin_name) - bin_remote_path = os.path.join(self._BIN_DIR, self.bin_name) - container.ssh_set_remote_file(remote_path=bin_remote_path, local_path=bin_local_path) - container.ssh_command(f"chmod +x {bin_remote_path}") - - def build_and_test(self, id, schedule_name=None, script=None): - """Builds, runs tests, calculates status and gives report on telegram and your version control system. - - :param id: DB's id of this run details. - :type id: str - :param schedule_name: it will have a value if the run is scheduled. - :param schedule_name: str - """ - self.run_id = id - if not schedule_name: - self.model_obj = TriggerRun.get(id=self.run_id) - script = self._load_yaml() - else: - self.model_obj = SchedulerRun.get(id=self.run_id) - if script: - valid = self.validate_yaml(run_id=self.run_id, model_obj=self.model_obj, script=script) - if valid: - clone_details = self.repo_clone_details() - worked = deployed = installed = True - for i, job in enumerate(script["jobs"]): - if not (worked and deployed and installed): - break - log = """ - ****************************************************** - Starting {job_name} job - ****************************************************** - """.format( - job_name=job["name"] - ).replace( - " ", "" - ) - r.rpush(self.run_id, log) - deployed, installed = self.build(job=job, clone_details=clone_details, job_number=i) - if deployed: - if installed: - worked = self.test_run(job=job) - self._get_bin(bin_remote_path=job.get("bin_path"), job_number=i) - container.delete() - r.rpush(self.run_id, "hamada ok") - self.cal_status() - reporter.report(run_id=self.run_id, model_obj=self.model_obj, schedule_name=schedule_name) - - def schedule_run(self, job): - """Builds, runs tests, calculates status and gives report on telegram. - - :param schedule_name: the name of the scheduled run. - :type schedule_name: str - :param script: the script that should run your schedule. - :type script: str - """ - triggered_by = job.get("triggered_by", "ZeroCI Scheduler") - data = { - "status": PENDING, - "timestamp": int(datetime.now().timestamp()), - "schedule_name": job["schedule_name"], - "triggered_by": triggered_by, - "bin_release": None, - } - scheduler_run = SchedulerRun(**data) - scheduler_run.save() - id = str(scheduler_run.id) - data["id"] = id - r.publish("zeroci_status", json.dumps(data)) - self.build_and_test(id=id, schedule_name=job["schedule_name"], script=job) diff --git a/backend/actions/reporter.py b/backend/actions/reporter.py new file mode 100644 index 00000000..ec45c5df --- /dev/null +++ b/backend/actions/reporter.py @@ -0,0 +1,89 @@ +import json +from urllib.parse import urljoin + +from models.initial_config import InitialConfig +from packages.telegram.telegram import Telegram +from packages.vcs.vcs import VCSFactory +from redis import Redis +from utils.constants import FAILURE, SUCCESS + + +class Reporter: + def __init__(self): + self._redis = None + self._telegram = None + self._vcs = None + + @property + def redis(self): + if not self._redis: + self._redis = Redis() + return self._redis + + @property + def telegram(self): + if not self._telegram: + self._telegram = Telegram() + return self._telegram + + @property + def vcs(self): + if not self._vcs: + self._vcs = VCSFactory().get_cvn() + return self._vcs + + def report(self, run_id, run_obj): + """Report the result to the commit status and Telegram chat. + + :param run_id: DB's run_id of this run details. + :type run_id: str + :param parent_model: the class that the passed id is belonging to. + :type parent_model: class + :param schedule_name: it will have a value if the run is scheduled. + :param schedule_name: str + """ + configs = InitialConfig() + bin_release = run_obj.bin_release + triggered_by = run_obj.triggered_by + msg = self.report_msg(status=run_obj.status) + url = f"/repos/{run_obj.repo}/{run_obj.branch}/{run_obj.run_id}" + link = urljoin(configs.domain, url) + if bin_release: + bin_url = f"/bin/{run_obj.repo}/{run_obj.branch}/{bin_release}" + bin_link = urljoin(configs.domain, bin_url) + else: + bin_link = None + data = { + "timestamp": run_obj.timestamp, + "commit": run_obj.commit, + "committer": run_obj.committer, + "status": run_obj.status, + "repo": run_obj.repo, + "branch": run_obj.branch, + "bin_release": bin_release, + "triggered_by": triggered_by, + "run_id": run_id, + } + self.redis.publish("zeroci_status", json.dumps(data)) + self.vcs._set_repo_obj(repo=run_obj.repo) + self.vcs.status_send(status=run_obj.status, link=link, commit=run_obj.commit) + self.telegram.send_msg( + msg=msg, + link=link, + repo=run_obj.repo, + branch=run_obj.branch, + commit=run_obj.commit, + committer=run_obj.committer, + bin_link=bin_link, + triggered_by=triggered_by, + ) + + def report_msg(self, status): + if status == SUCCESS: + msg = f"✅ Run passed " + elif status == FAILURE: + msg = f"❌ Run failed " + else: + msg = f"⛔️ Run errored " + + return msg diff --git a/backend/actions/runner.py b/backend/actions/runner.py new file mode 100644 index 00000000..9f08f68b --- /dev/null +++ b/backend/actions/runner.py @@ -0,0 +1,255 @@ +import json +import os +import traceback +from datetime import datetime +from urllib.parse import urljoin + +import requests +import yaml +from deployment.container import Container +from kubernetes.client import V1EnvVar +from models.initial_config import InitialConfig +from models.run import Run +from models.run_config import RunConfig +from packages.vcs.vcs import VCSFactory +from redis import Redis +from utils.constants import * +from utils.utils import Utils + +from actions.reporter import Reporter + + +class Runner: + def __init__(self, run=None, run_id=None): + self.run_id = run_id + self.run_obj = run + self._redis = None + self._container = None + self._reporter = None + self._utils = None + + @property + def redis(self): + if not self._redis: + self._redis = Redis() + return self._redis + + @property + def container(self): + if not self._container: + self._container = Container() + return self._container + + @property + def reporter(self): + if not self._reporter: + self._reporter = Reporter() + return self._reporter + + @property + def utils(self): + if not self._utils: + self._utils = Utils() + return self._utils + + def _test_run(self, job): + """Runs tests and store the result in DB.""" + for line in job["script"]: + if line.get("type") == "neph": + finished = self._neph_run(job_name=job["name"], line=line) + else: + finished = self._normal_run(job_name=job["name"], line=line) + if not finished: + return False + return True + + def _normal_run(self, job_name, line): + status = SUCCESS + response, file_path = self.container.run_test(run_id=self.run_id, run_cmd=line["cmd"]) + result = response.stdout + type = LOG_TYPE + if response.returncode: + status = FAILURE + if file_path: + try: + result = self.utils.xml_parse(path=file_path, line=line["cmd"]) + type = TESTSUITE_TYPE + except: + pass + os.remove(file_path) + + name = "{job_name}: {test_name}".format(job_name=job_name, test_name=line["name"]) + self.run_obj.result.append({"type": type, "status": status, "name": name, "content": result}) + self.run_obj.save() + if response.returncode in [137, 124]: + return False + return True + + def _neph_run(self, job_name, line): + status = SUCCESS + working_dir = line["working_dir"] + yaml_path = line["yaml_path"] + neph_id = f"{self.run_id}:{job_name}:{line['name']}" + cmd = f"export NEPH_RUN_ID='{neph_id}' \n cd {working_dir} \n {BIN_DIR}neph -y {yaml_path} -m CI" + response = self.container.execute_on_test_container(cmd=cmd, run_id=self.run_id) + if response.returncode: + status = FAILURE + + name = "{job_name}:{test_name}".format(job_name=job_name, test_name=line["name"]) + self.run_obj.result.append({"type": LOG_TYPE, "status": status, "name": name, "content": response.stdout}) + self.run_obj.save() + + for key in self.redis.keys(): + key = key.decode() + if key.startswith(f"neph:{self.run_id}:{job_name}:{line['name']}"): + status = SUCCESS + logs = self.redis.lrange(key, 0, -1) + all_logs = "" + for log in logs: + log = json.loads(log.decode()) + if log["type"] == "stderr": + status = FAILURE + all_logs += log["content"] + name = key.split(f"neph:{self.run_id}:")[-1] + self.run_obj.result.append({"type": LOG_TYPE, "status": status, "name": name, "content": all_logs}) + self.run_obj.save() + + if response.returncode in [137, 124]: + return False + return True + + def _build(self, job, clone_details, job_number): + """Create VM with the required prerequisties and run installation steps to get it ready for running tests.""" + env = self._get_run_env() + deployed = self.container.deploy( + env=env, prerequisites=job["prerequisites"], repo_path=clone_details["remote_path"] + ) + installed = False + if deployed: + if job_number != 0: + self._set_bin() + response = self.container.execute_on_helper(cmd=clone_details["cmd"]) + if response.returncode: + name = "{job_name}: Clone Repository".format(job_name=job["name"]) + result = response.stdout + self.redis.rpush(self.run_id, result) + else: + response = self.container.execute_on_test_container(cmd=job["install"], run_id=self.run_id) + if response.returncode: + name = "{job_name}: Installation".format(job_name=job["name"]) + result = response.stdout + else: + installed = True + else: + name = "{job_name}: Deploy".format(job_name=job["name"]) + result = "Couldn't deploy a container" + self.redis.rpush(self.run_id, result) + + if not installed: + self.run_obj.result.append({"type": LOG_TYPE, "status": ERROR, "name": name, "content": result}) + self.run_obj.save() + + return deployed, installed + + def _cal_status(self): + """Calculate the status of the whole tests result has been stored on the BD's id.""" + status = SUCCESS + for result in self.run_obj.result: + if result["status"] != SUCCESS: + status = result["status"] + self.run_obj.status = status + self.run_obj.save() + + def _get_run_env(self): + """Get run environment variables.""" + name = self.run_obj.repo + run_config = RunConfig(name=name) + run_env = run_config.env + env = [] + for key in run_env.keys(): + env_var = V1EnvVar(name=key, value=run_env.get(key)) + env.append(env_var) + return env + + def _repo_clone_details(self): + """Clone repo.""" + configs = InitialConfig() + repo_remote_path = os.path.join(REPOS_DIR, self.run_obj.repo) + clone_url = urljoin(configs.vcs_host, f"{self.run_obj.repo}.git") + cmd = """git clone {clone_url} {repo_remote_path} --branch {branch} + cd {repo_remote_path} + git reset --hard {commit} + """.format( + clone_url=clone_url, + repo_remote_path=repo_remote_path, + branch=self.run_obj.branch, + commit=self.run_obj.commit, + ) + clone_details = {"cmd": cmd, "remote_path": repo_remote_path} + return clone_details + + def _prepare_bin_dirs(self, bin_remote_path): + self.bin_name = bin_remote_path.split(os.path.sep)[-1] + release = self.run_obj.commit[:7] + local_path = os.path.join(BIN_DIR, self.run_obj.repo, self.run_obj.branch) + bin_release = f"{self.bin_name}_{release}" + bin_local_path = os.path.join(local_path, bin_release) + if not os.path.exists(local_path): + os.makedirs(local_path) + + return bin_local_path + + def _get_bin(self, bin_remote_path, job_number): + if bin_remote_path and not job_number: + bin_local_path = self._prepare_bin_dirs(bin_remote_path) + bin_release = bin_local_path.split(os.path.sep)[-1] + bin_tmp_path = os.path.join(BIN_DIR, bin_release) + cmd = f"cp {bin_remote_path} {bin_tmp_path}" + self.container.execute_on_test_container(cmd=cmd, run_id="", verbose=False) + self.container.get_remote_file_from_helper(remote_path=bin_tmp_path, local_path=bin_local_path) + + if os.path.exists(bin_local_path): + self.run_obj.bin_release = bin_release + self.run_obj.save() + + def _set_bin(self): + if self.run_obj.bin_release: + bin_local_path = self._prepare_bin_dirs(self.bin_name) + bin_remote_path = os.path.join(BIN_DIR, self.bin_name) + self.container.set_remote_file_on_helper(remote_path=bin_remote_path, local_path=bin_local_path) + self.container.execute_on_helper(f"chmod +x {bin_remote_path}") + + def build_and_test(self, run_id, repo_config): + """Builds, runs tests, calculates status and gives report on telegram and your version control system. + + :param id: DB's id of this run details. + :type id: str + :param schedule_name: it will have a value if the run is scheduled. + :param schedule_name: str + """ + self.run_id = run_id + self.run_obj = Run.get(run_id=self.run_id) + clone_details = self._repo_clone_details() + worked = deployed = installed = True + for i, job in enumerate(repo_config["jobs"]): + if not (worked and deployed and installed): + break + log = """ + ****************************************************** + Starting {job_name} job + ****************************************************** + """.format( + job_name=job["name"] + ).replace( + " ", "" + ) + self.redis.rpush(self.run_id, log) + deployed, installed = self._build(job=job, clone_details=clone_details, job_number=i) + if deployed: + if installed: + worked = self._test_run(job=job) + self._get_bin(bin_remote_path=job.get("bin_path"), job_number=i) + self.container.delete() + self.redis.rpush(self.run_id, "hamada ok") + self._cal_status() + self.reporter.report(run_id=self.run_id, run_obj=self.run_obj) diff --git a/backend/actions/trigger.py b/backend/actions/trigger.py new file mode 100644 index 00000000..ad6777c5 --- /dev/null +++ b/backend/actions/trigger.py @@ -0,0 +1,262 @@ +import json +import os +import traceback +from datetime import datetime + +import yaml +from bottle import request +from models.initial_config import InitialConfig +from models.run import Run +from packages.vcs.vcs import VCSFactory +from redis import Redis +from rq import Queue +from rq_scheduler import Scheduler +from utils.constants import BIN_DIR, ERROR, LOG_TYPE, PENDING + +from actions.reporter import Reporter +from actions.runner import Runner +from actions.validator import validate_yaml + + +class Trigger: + def __init__(self): + self._redis = None + self._runner = None + self._reporter = None + self._queue = None + self._scheduler = None + self._vcs = None + + @property + def redis(self): + if not self._redis: + self._redis = Redis() + return self._redis + + @property + def runner(self): + if not self._runner: + self._runner = Runner() + return self._runner + + @property + def reporter(self): + if not self._reporter: + self._reporter = Reporter() + return self._reporter + + @property + def queue(self): + if not self._queue: + self._queue = Queue(connection=self.redis, name="default") + return self._queue + + @property + def scheduler(self): + if not self._scheduler: + self._scheduler = Scheduler(connection=self.redis) + return self._scheduler + + @property + def vcs(self): + if not self._vcs: + self._vcs = VCSFactory().get_cvn() + return self._vcs + + def _load_config(self, repo, commit): + self.vcs._set_repo_obj(repo=repo) + script = self.vcs.get_content(ref=commit, file_path="zeroCI.yaml") + if script: + try: + config = yaml.safe_load(script) + return True, config, "" + except: + msg = traceback.format_exc() + else: + msg = "zeroCI.yaml is not found on the repository's home" + + return False, "", msg + + def _load_validate_config( + self, repo="", branch="", commit="", committer="", run_id=None, triggered=False, triggered_by=None + ): + if run_id: + run = Run.get(run_id=run_id) + repo = run.repo + commit = run.commit + status, config, msg = self._load_config(repo, commit) + if not status: + run, run_id = self._prepare_run_object( + repo=repo, + branch=branch, + commit=commit, + committer=committer, + run_id=run_id, + triggered=triggered, + triggered_by=triggered_by, + ) + self._report(msg, run, run_id) + return False + valid, msg = validate_yaml(config) + if not valid: + run, run_id = self._prepare_run_object( + repo=repo, + branch=branch, + commit=commit, + committer=committer, + run_id=run_id, + triggered=triggered, + triggered_by=triggered_by, + ) + self._report(msg, run, run_id) + return False + return config + + def _prepare_run_object( + self, repo="", branch="", commit="", committer="", run_id=None, triggered=False, triggered_by=None + ): + configs = InitialConfig() + timestamp = int(datetime.now().timestamp()) + if run_id: + # Triggered from id. + run = Run.get(run_id=run_id) + triggered_by = triggered_by or request.environ.get("beaker.session").get("username").strip(".3bot") + data = { + "timestamp": timestamp, + "commit": run.commit, + "committer": run.committer, + "status": PENDING, + "repo": run.repo, + "branch": run.branch, + "triggered_by": triggered_by, + "bin_release": None, + "run_id": run_id, + } + run.timestamp = timestamp + run.status = PENDING + run.result = [] + run.triggered_by = triggered_by + if run.bin_release: + bin_path = os.path.join(BIN_DIR, run.repo, run.branch, run.bin_release) + if os.path.exists(bin_path): + os.remove(bin_path) + run.bin_release = None + run.save() + for key in self.redis.keys(): + if run_id in key.decode(): + self.redis.delete(key) + self.redis.publish("zeroci_status", json.dumps(data)) + else: + # Triggered from vcs webhook or rebuild using the button. + if repo in configs.repos: + triggered_by = triggered_by or "VCS Hook" + if triggered: + triggered_by = triggered_by or request.environ.get("beaker.session").get("username").strip(".3bot") + data = { + "timestamp": timestamp, + "commit": commit, + "committer": committer, + "status": PENDING, + "repo": repo, + "branch": branch, + "triggered_by": triggered_by, + "bin_release": None, + } + run = Run(**data) + run.save() + run_id = str(run.run_id) + data["run_id"] = run_id + self.redis.publish("zeroci_status", json.dumps(data)) + if run and run_id: + return run, run_id + return None, None + + def enqueue(self, repo="", branch="", commit="", committer="", target_branch="", run_id=None, triggered=False): + config = self._load_validate_config( + repo=repo, branch=branch, commit=commit, committer=committer, run_id=run_id, triggered=triggered + ) + if not config: + return + + if run_id: + run, run_id = self._prepare_run_object(run_id=run_id, triggered=triggered) + return self._trigger(repo_config=config, run=run, run_id=run_id) + + push = config["run_on"].get("push") + pull_request = config["run_on"].get("pull_request") + manual = config["run_on"].get("manual") + schedule = config["run_on"].get("schedule") + + if repo and branch and not schedule: + schedule_name = f"{repo}_{branch}" + self.scheduler.cancel(schedule_name) + if push: + trigger_branches = push["branches"] + if branch and branch in trigger_branches: + run, run_id = self._prepare_run_object( + repo=repo, branch=branch, commit=commit, committer=committer, triggered=triggered + ) + return self._trigger(repo_config=config, run=run, run_id=run_id) + if pull_request: + target_branches = pull_request["branches"] + if target_branch and target_branch in target_branches: + run, run_id = self._prepare_run_object( + repo=repo, branch=branch, commit=commit, committer=committer, triggered=triggered + ) + return self._trigger(repo_config=config, run=run, run_id=run_id) + if manual and triggered: + trigger_branches = manual["branches"] + if branch and branch in trigger_branches: + run, run_id = self._prepare_run_object( + repo=repo, branch=branch, commit=commit, committer=committer, triggered=triggered + ) + return self._trigger(repo_config=config, run=run, run_id=run_id) + if schedule: + schedule_branch = schedule["branch"] + cron = schedule["cron"] + schedule_name = f"{repo}_{branch}" + if branch == schedule_branch: + self.scheduler.cron( + cron_string=cron, + func=self._trigger_schedule, + args=[repo, branch], + id=schedule_name, + timeout=-1, + ) + + def _trigger(self, repo_config, run, run_id): + if run and run_id: + configs = InitialConfig() + link = f"{configs.domain}/repos/{run.repo}/{run.branch}/{str(run.run_id)}" + self.vcs._set_repo_obj(repo=run.repo) + self.vcs.status_send(status=PENDING, link=link, commit=run.commit) + # TODO: before triggering, check that there is not a run with same commit and in state pending. + job = self.queue.enqueue_call( + func=self.runner.build_and_test, args=(run_id, repo_config), result_ttl=5000, timeout=20000 + ) + return job + + def _trigger_schedule(self, repo, branch): + triggered_by = "ZeroCI Scheduler" + self.vcs._set_repo_obj(repo=repo) + last_commit = self.vcs.get_last_commit(branch=branch) + committer = self.vcs.get_committer(commit=last_commit) + where = {"repo": repo, "branch": branch, "commit": last_commit, "status": PENDING} + run, run_id = self._prepare_run_object( + repo=repo, branch=branch, commit=last_commit, committer=committer, triggered_by=triggered_by + ) + exist_run = Run.get_objects(fields=["status"], **where) + if exist_run: + msg = f"There is a running job from this commit {last_commit}" + return self._report(msg, run, run_id) + config = self._load_validate_config(run_id=run_id, triggered_by=triggered_by) + if config: + self.runner.build_and_test(run_id, config) + + def _report(self, msg, run, run_id): + msg = f"{msg} (see examples: https://github.com/threefoldtech/zeroCI/tree/development/docs/config)" + self.redis.rpush(run_id, msg) + run.result.append({"type": LOG_TYPE, "status": ERROR, "name": "Yaml File", "content": msg}) + run.status = ERROR + run.save() + self.reporter.report(run_id=run_id, run_obj=run) diff --git a/backend/actions/validator.py b/backend/actions/validator.py new file mode 100644 index 00000000..14694332 --- /dev/null +++ b/backend/actions/validator.py @@ -0,0 +1,238 @@ +import traceback + +import requests +from croniter import croniter + + +def _validate_test_script(test_script): + msg = "" + if not test_script: + msg = "script should be in job file and shouldn't be empty" + else: + if not isinstance(test_script, list): + msg = "script should be list" + else: + for item in test_script: + if not isinstance(item, dict): + msg = "Every element in script should be dict" + else: + name = item.get("name") + if not name: + msg = "Every element in script should conttain a name" + else: + if not isinstance(name, str): + msg = "Every name in script should be str" + cmd = item.get("cmd") + if not cmd: + type = item.get("type") + if not type: + msg = "Every element in script should conttain a cmd or type" + else: + if type == "neph": + working_dir = item.get("working_dir") + if not working_dir: + msg = "working_dir should be added for neph type" + yaml_path = item.get("yaml_path") + if not yaml_path: + msg = "yaml_path should be added for neph type" + else: + msg = f"{type} is not supported" + else: + if not isinstance(cmd, str): + msg = "Every cmd in script should be str" + return msg + + +def _validate_install_script(install_script): + msg = "" + if not install_script: + msg = "install should be in job file and shouldn't be empty" + else: + if not isinstance(install_script, str): + msg = "install should be str" + + return msg + + +def _validate_prerequisites(prerequisites): + msg = "" + if not prerequisites: + msg = "prerequisites should be in job file and shouldn't be empty" + else: + if not isinstance(prerequisites, dict): + msg = "prerequisites should be dict" + else: + image_name = prerequisites.get("image_name") + if not image_name: + msg = "prerequisites should contain image_name and shouldn't be empty" + else: + if not isinstance(image_name, str): + msg = "image_name should be str" + else: + if ":" in image_name: + repository, tag = image_name.split(":") + else: + repository = image_name + tag = "latest" + response = requests.get(f"https://index.docker.io/v1/repositories/{repository}/tags/{tag}") + if response.status_code is not requests.codes.ok: + msg = "Invalid docker image's name " + shell_bin = prerequisites.get("shell_bin") + if shell_bin: + if not isinstance(shell_bin, str): + msg = "shell_bin should be str" + return msg + + +def _validate_bin_path(bin_path): + msg = "" + if bin_path: + if not isinstance(bin_path, str): + msg = "bin_path should be str" + + return msg + + +def _validate_job_name(name): + msg = "" + if not name: + msg = "name should be in job file and shouldn't be empty" + else: + if not isinstance(name, str): + msg = "name of the job should be str" + + return msg + + +def _validate_job(job): + job_name = job.get("name") + msg = _validate_job_name(job_name) + if msg: + return msg + + bin_path = job.get("bin_path") + msg = _validate_bin_path(bin_path) + if msg: + return msg + + test_script = job.get("script") + msg = _validate_test_script(test_script) + if msg: + return msg + + install_script = job.get("install") + msg = _validate_install_script(install_script) + if msg: + return msg + + prerequisites = job.get("prerequisites") + msg = _validate_prerequisites(prerequisites) + return msg + + +def _validate_run_on(run_on): + msg = "" + if not run_on: + msg = "run_on should be in yaml and shouldn't be empty" + else: + if not isinstance(run_on, dict): + msg = "run_on should have push or pull_request as keys" + else: + push = run_on.get("push") + pull_request = run_on.get("pull_request") + schedule = run_on.get("schedule") + manual = run_on.get("manual") + if not any([push, pull_request, schedule, manual]): + msg = "run_on should have push, pull_request, schedule or manual as keys and at least one of them should be filled" + else: + if push: + if not isinstance(push, dict): + msg = "push should have branches as a key" + else: + branches = push.get("branches") + if not branches: + msg = "branches on push shouldn't be empty" + else: + if not isinstance(branches, list): + msg = "branches should be of contain list of the branches" + else: + for branch in branches: + if not isinstance(branch, str): + msg = "branches should be list of str" + if pull_request: + if not isinstance(pull_request, dict): + msg = "pull_request should have branches as a key" + else: + branches = pull_request.get("branches") + if not branches: + msg = "branches on pull_request shouldn't be empty" + else: + if not isinstance(branches, list): + msg = "branches should be of contain list of the branches" + else: + for branch in branches: + if not isinstance(branch, str): + msg = "branches should be list of str" + + if manual: + if not isinstance(manual, dict): + msg = "manual should have branches as a key" + else: + branches = manual.get("branches") + if not branches: + msg = "branches on manual shouldn't be empty" + else: + if not isinstance(branches, list): + msg = "branches should be of contain list of the branches" + else: + for branch in branches: + if not isinstance(branch, str): + msg = "branches should be list of str" + + if schedule: + if not isinstance(schedule, dict): + msg = "schedule should have branch and cron as keys" + else: + branch = schedule.get("branch") + cron = schedule.get("cron") + if not branch: + msg = "branch on schedule shouldn't be empty" + else: + if not isinstance(branch, str): + msg = "branch should be str" + if not cron: + msg = "cron on schedule shouldn't be empty" + else: + if not isinstance(cron, str): + msg = "cron should be str" + else: + try: + croniter(cron) + except Exception as e: + msg = traceback.format_exc(e) + + return msg + + +def validate_yaml(config): + jobs = config.get("jobs") + if not jobs: + msg = "jobs should be in yaml and shouldn't be empty" + else: + if not isinstance(jobs, list): + msg = "jobs should be list" + else: + if len(jobs) > 3: + msg = "jobs shouldn't be more than 3" + else: + for job in jobs: + msg = _validate_job(job) + if msg: + break + + run_on = config.get("run_on") + msg = _validate_run_on(run_on) + + if msg: + return False, msg + return True, "" diff --git a/backend/actions/yaml_validation.py b/backend/actions/yaml_validation.py deleted file mode 100644 index 7d94495f..00000000 --- a/backend/actions/yaml_validation.py +++ /dev/null @@ -1,154 +0,0 @@ -import requests -from redis import Redis - -redis = Redis() -ERROR = "error" -LOG_TYPE = "log" - - -class Validator: - def _validate_test_script(self, test_script): - msg = "" - if not test_script: - msg = "script should be in job file and shouldn't be empty" - else: - if not isinstance(test_script, list): - msg = "script should be list" - else: - for item in test_script: - if not isinstance(item, dict): - msg = "Every element in script should be dict" - else: - name = item.get("name") - if not name: - msg = "Every element in script should conttain a name" - else: - if not isinstance(name, str): - msg = "Every name in script should be str" - cmd = item.get("cmd") - if not cmd: - type = item.get("type") - if not type: - msg = "Every element in script should conttain a cmd or type" - else: - if type == "neph": - working_dir = item.get("working_dir") - if not working_dir: - msg = "working_dir should be added for neph type" - yaml_path = item.get("yaml_path") - if not yaml_path: - msg = "yaml_path should be added for neph type" - else: - msg = f"{type} is not supported" - else: - if not isinstance(cmd, str): - msg = "Every cmd in script should be str" - return msg - - def _validate_install_script(self, install_script): - msg = "" - if not install_script: - msg = "install should be in job file and shouldn't be empty" - else: - if not isinstance(install_script, str): - msg = "install should be str" - - return msg - - def _validate_prerequisites(self, prerequisites): - msg = "" - if not prerequisites: - msg = "prerequisites should be in job file and shouldn't be empty" - else: - if not isinstance(prerequisites, dict): - msg = "prerequisites should be dict" - else: - image_name = prerequisites.get("image_name") - if not image_name: - msg = "prerequisites should contain image_name and shouldn't be empty" - else: - if not isinstance(image_name, str): - msg = "image_name should be str" - else: - if ":" in image_name: - repository, tag = image_name.split(":") - else: - repository = image_name - tag = "latest" - response = requests.get(f"https://index.docker.io/v1/repositories/{repository}/tags/{tag}") - if response.status_code is not requests.codes.ok: - msg = "Invalid docker image's name " - shell_bin = prerequisites.get("shell_bin") - if shell_bin: - if not isinstance(shell_bin, str): - msg = "shell_bin should be str" - return msg - - def _validate_bin_path(self, bin_path): - msg = "" - if bin_path: - if not isinstance(bin_path, str): - msg = "bin_path should be str" - - return msg - - def _validate_job_name(self, name): - msg = "" - if not name: - msg = "name should be in job file and shouldn't be empty" - else: - if not isinstance(name, str): - msg = "name of the job should be str" - - return msg - - def _report(self, run_id, model_obj, msg): - msg = f"{msg} (see examples: https://github.com/threefoldtech/zeroCI/tree/development/docs/config)" - redis.rpush(run_id, msg) - model_obj.result.append({"type": LOG_TYPE, "status": ERROR, "name": "Yaml File", "content": msg}) - model_obj.save() - - def _validate_job(self, job): - job_name = job.get("name") - msg = self._validate_job_name(job_name) - if msg: - return msg - - bin_path = job.get("bin_path") - msg = self._validate_bin_path(bin_path) - if msg: - return msg - - test_script = job.get("script") - msg = self._validate_test_script(test_script) - if msg: - return msg - - install_script = job.get("install") - msg = self._validate_install_script(install_script) - if msg: - return msg - - prerequisites = job.get("prerequisites") - msg = self._validate_prerequisites(prerequisites) - return msg - - def validate_yaml(self, run_id, model_obj, script): - jobs = script.get("jobs") - if not jobs: - msg = "jobs should be in yaml and shouldn't be empty" - else: - if not isinstance(jobs, list): - msg = "jobs should be list" - else: - if len(jobs) > 3: - msg = "jobs shouldn't be more than 3" - else: - for job in jobs: - msg = self._validate_job(job) - if msg: - break - if msg: - self._report(run_id=run_id, model_obj=model_obj, msg=msg) - return False - return True diff --git a/backend/apis/base.py b/backend/apis/base.py index 5717de4e..d484be7f 100644 --- a/backend/apis/base.py +++ b/backend/apis/base.py @@ -1,6 +1,7 @@ +from functools import wraps + from bottle import Bottle, abort, redirect, request, response from models.initial_config import InitialConfig -from functools import wraps app = Bottle() LOGIN_URL = "/auth/login?provider=3bot" diff --git a/backend/apis/config.py b/backend/apis/config.py index c9781ff0..45fdcfda 100644 --- a/backend/apis/config.py +++ b/backend/apis/config.py @@ -2,21 +2,20 @@ import sys import requests -from telegram import Bot -from telegram.error import BadRequest, InvalidToken, Unauthorized - -from apis.base import admin, app, check_configs, user from bottle import HTTPResponse, abort, request from models.initial_config import InitialConfig from models.run_config import RunConfig from packages.vcs.vcs import VCSFactory +from telegram import Bot +from telegram.error import BadRequest, InvalidToken, Unauthorized + +from apis.base import admin, app, check_configs, user @app.route("/api/telegram_config", method=["GET", "POST"]) @admin def validate_telegam(): - """Validate telegram token and chat ID - """ + """Validate telegram token and chat ID""" configs = InitialConfig() confs = ["chat_id", "bot_token"] conf_dict = {} @@ -51,8 +50,7 @@ def validate_telegam(): @app.route("/api/vcs_config", method=["GET", "POST"]) @admin def vcs_config(): - """Initial configuration for the ci before start working. - """ + """Initial configuration for the ci before start working.""" configs = InitialConfig() confs = ["domain", "vcs_host", "vcs_token"] conf_dict = {} diff --git a/backend/apis/default.py b/backend/apis/default.py index ee59cf26..18c1c25f 100644 --- a/backend/apis/default.py +++ b/backend/apis/default.py @@ -1,7 +1,8 @@ -from apis.base import app, check_configs, admin from bottle import static_file +from utils.constants import BIN_DIR + +from apis.base import admin, app, check_configs -BIN_DIR = "/zeroci/bin/" STATIC_DIR = "../dist/static" INDEX_DIR = "../dist" diff --git a/backend/apis/login.py b/backend/apis/login.py index 10fc0735..608327c2 100644 --- a/backend/apis/login.py +++ b/backend/apis/login.py @@ -4,19 +4,16 @@ import nacl import requests +from bottle import abort, redirect, request +from models.initial_config import InitialConfig from nacl.public import Box from nacl.signing import VerifyKey +from utils.utils import Utils from apis.base import app -from bottle import abort, redirect, request -from models.initial_config import InitialConfig -from utils.utils import Utils CALLBACK_URL = "/auth/3bot_callback" REDIRECT_URL = "https://login.threefold.me" - - -utils = Utils() PRIV_KEY = nacl.signing.SigningKey.generate() @@ -28,7 +25,7 @@ def login(): public_key = PRIV_KEY.verify_key if provider and provider == "3bot": - state = utils.random_string() + state = Utils.random_string() session["next_url"] = next_url session["state"] = state app_id = request.get_header("host") diff --git a/backend/apis/results.py b/backend/apis/results.py index 3b551196..55293df4 100644 --- a/backend/apis/results.py +++ b/backend/apis/results.py @@ -1,36 +1,26 @@ import json -from apis.base import app, check_configs from bottle import abort, redirect, request, static_file from models.initial_config import InitialConfig -from models.schedule_info import ScheduleInfo -from models.scheduler_run import SchedulerRun -from models.trigger_run import TriggerRun +from models.run import Run from packages.vcs.vcs import VCSFactory +from utils.constants import ERROR, FAILURE, PENDING, SUCCESS - -SUCCESS = "success" -FAILURE = "failure" -ERROR = "error" -PENDING = "pending" +from apis.base import app, check_configs @app.route("/api/") @check_configs def home(): - """Return repos and schedules which are running on the server. - """ + """Return repos and schedules which are running on the server.""" configs = InitialConfig() - result = {"repos": [], "schedules": []} - result["repos"] = configs.repos - result["schedules"] = ScheduleInfo.distinct("name") - result_json = json.dumps(result) - return result_json + result = {"repos": configs.repos} + return json.dumps(result) @app.route("/api/repos/") @check_configs -def branch(repo): +def result(repo): """Returns tests ran on this repo with specific branch or test details if id is sent. :param repo: repo's name @@ -38,89 +28,49 @@ def branch(repo): :param id: DB id of test details. """ branch = request.query.get("branch") - id = request.query.get("id") + run_id = request.query.get("id") + + if run_id: + run = Run.get(run_id=run_id) + live = True if run.status == PENDING else False + return json.dumps({"live": live, "result": run.result}) - if id: - trigger_run = TriggerRun.get(id=id) - live = True if trigger_run.status == PENDING else False - result = json.dumps({"live": live, "result": trigger_run.result}) - return result if branch: fields = ["status", "commit", "committer", "timestamp", "bin_release", "triggered_by"] where = {"repo": repo, "branch": branch} - trigger_runs = TriggerRun.get_objects(fields=fields, order_by="timestamp", asc=False, **where) - result = json.dumps(trigger_runs) - return result + runs = Run.get_objects(fields=fields, order_by="timestamp", asc=False, **where) + return json.dumps(runs) vcs_obj = VCSFactory().get_cvn(repo=repo) exist_branches = vcs_obj.get_branches() - all_branches = TriggerRun.distinct(field="branch", repo=repo) + all_branches = Run.distinct(field="branch", repo=repo) deleted_branches = list(set(all_branches) - set(exist_branches)) branches = {"exist": exist_branches, "deleted": deleted_branches} - result = json.dumps(branches) - return result - - -@app.route("/api/schedules/") -@check_configs -def schedules(schedule): - """Returns tests ran on this schedule or test details if id is sent. - - :param schedule: schedule's name - :param id: DB id of test details. - """ - id = request.query.get("id") - if id: - scheduler_run = SchedulerRun.get(id=id) - live = True if scheduler_run.status == PENDING else False - result = json.dumps({"live": live, "result": scheduler_run.result}) - return result - - fields = ["status", "timestamp", "bin_release", "triggered_by"] - where = {"schedule_name": schedule} - scheduler_runs = SchedulerRun.get_objects(fields=fields, order_by="timestamp", asc=False, **where) - result = json.dumps(scheduler_runs) - return result + return json.dumps(branches) @app.route("/status") @check_configs def status(): - """Returns repo's branch or schedule status for your version control system. - """ - schedule = request.query.get("schedule") + """Returns repo's branch or schedule status for your version control system.""" repo = request.query.get("repo") branch = request.query.get("branch") result = request.query.get("result") # to return the run result fields = ["status"] configs = InitialConfig() - if schedule: - where = {"schedule_name": schedule, "status": f"{ERROR} OR {FAILURE} OR {SUCCESS}"} - scheduler_run = SchedulerRun.get_objects(fields=fields, order_by="timestamp", asc=False, **where) - if len(scheduler_run) == 0: - return abort(404) - - if result: - link = f"{configs.domain}/schedules/{schedule}?id={str(scheduler_run[0]['id'])}" - return redirect(link) - if scheduler_run[0]["status"] == SUCCESS: - return static_file("svgs/build_passing.svg", mimetype="image/svg+xml", root=".") - else: - return static_file("svgs/build_failing.svg", mimetype="image/svg+xml", root=".") - - elif repo: - if not branch: - branch = "master" - where = {"repo": repo, "branch": branch, "status": f"{ERROR} OR {FAILURE} OR {SUCCESS}"} - trigger_run = TriggerRun.get_objects(fields=fields, order_by="timestamp", asc=False, **where) - if len(trigger_run) == 0: - return abort(404) - if result: - link = f"{configs.domain}/repos/{repo.replace('/', '%2F')}/{branch}/{str(trigger_run[0]['id'])}" - return redirect(link) - if trigger_run[0]["status"] == SUCCESS: - return static_file("svgs/build_passing.svg", mimetype="image/svg+xml", root=".") - else: - return static_file("svgs/build_failing.svg", mimetype="image/svg+xml", root=".") - return abort(404) + if not repo: + return abort(400, "repo is missing") + if not branch: + return abort(400, "branch is missing") + where = {"repo": repo, "branch": branch, "status": f"{ERROR} OR {FAILURE} OR {SUCCESS}"} + run = Run.get_objects(fields=fields, order_by="timestamp", asc=False, **where) + if len(run) == 0: + return abort(404) + if result: + link = f"{configs.domain}/repos/{repo.replace('/', '%2F')}/{branch}/{str(run[0]['run_id'])}" + return redirect(link) + if run[0]["status"] == SUCCESS: + return static_file("svgs/build_passing.svg", mimetype="image/svg+xml", root=".") + else: + return static_file("svgs/build_failing.svg", mimetype="image/svg+xml", root=".") diff --git a/backend/apis/schedule.py b/backend/apis/schedule.py deleted file mode 100644 index b7174ae6..00000000 --- a/backend/apis/schedule.py +++ /dev/null @@ -1,115 +0,0 @@ -import json - -from redis import Redis -from rq import Queue -from rq_scheduler import Scheduler - -from actions.actions import Actions -from apis.base import app, check_configs, user -from bottle import HTTPResponse, abort, redirect, request -from models.schedule_info import ScheduleInfo -from models.scheduler_run import SchedulerRun - -actions = Actions() -q = Queue(connection=Redis()) -scheduler = Scheduler(connection=Redis()) -PENDING = "pending" - - -@app.route("/api/schedule", method=["GET", "POST", "DELETE"]) -@user -@check_configs -def schedule(): - if request.method == "GET": - schedule_name = request.query.get("schedule_name") - if schedule_name: - schedule_info = ScheduleInfo.get_by_name(name=schedule_name) - info = { - "schedule_name": schedule_name, - "install": schedule_info.install, - "script": schedule_info.script, - "prerequisites": schedule_info.prerequisites, - "run_time": schedule_info.run_time, - "created_by": schedule_info.created_by, - } - return json.dumps(info) - - schedules_names = ScheduleInfo.list_all() - return json.dumps(schedules_names) - - if request.headers.get("Content-Type") == "application/json": - if request.method == "POST": - data = ["schedule_name", "run_time", "prerequisites", "install", "script", "bin_path"] - job = {} - for item in data: - value = request.json.get(item) - if not value: - if item == "bin_path": - continue - return HTTPResponse(f"{item} should have a value", 400) - elif item is "script" and not isinstance(value, list): - return HTTPResponse(f"{item} should be str or list", 400) - else: - job[item] = value - - created_by = request.environ.get("beaker.session").get("username").strip(".3bot") - job["created_by"] = created_by - - if job["schedule_name"] in ScheduleInfo.list_all(): - return HTTPResponse(f"Schedule name {job['schedule_name']} is already used", 400) - - schedule_info = ScheduleInfo(**job) - schedule_info.save() - try: - scheduler.cron( - cron_string=job["run_time"], - func=actions.schedule_run, - args=[job,], - id=job["schedule_name"], - timeout=-1, - ) - except: - return HTTPResponse("Wrong time format should be like (0 * * * *)", 400) - return HTTPResponse("Added", 201) - else: - schedule_name = request.json.get("schedule_name") - schedule_info = ScheduleInfo.get_by_name(name=schedule_name) - schedule_info.delete() - scheduler.cancel(schedule_name) - return HTTPResponse("Removed", 200) - return abort(400) - - -@app.route("/api/schedule_trigger", method=["POST", "GET"]) -@user -@check_configs -def schedule_trigger(): - if request.method == "GET": - redirect("/") - - if request.headers.get("Content-Type") == "application/json": - schedule_name = request.json.get("schedule_name") - - where = {"schedule_name": schedule_name} - runs = SchedulerRun.get_objects(fields=["status"], order_by="timestamp", asc=False, **where) - if runs and runs[0]["status"] == PENDING: - return HTTPResponse( - f"There is a running job from this schedule {schedule_name}, please try again after this run finishes", - 503, - ) - if schedule_name not in ScheduleInfo.list_all(): - return HTTPResponse(f"Schedule name {schedule_name} is not found", 400) - - schedule_info = ScheduleInfo.get_by_name(name=schedule_name) - job = { - "schedule_name": schedule_name, - "prerequisites": schedule_info.prerequisites, - "install": schedule_info.install, - "script": schedule_info.script, - "triggered_by": request.environ.get("beaker.session").get("username").strip(".3bot"), - "bin_path": schedule_info.bin_path, - } - job = q.enqueue_call(func=actions.schedule_run, args=(job,), result_ttl=5000, timeout=20000,) - if job: - return HTTPResponse(job.get_id(), 200) - return HTTPResponse("Wrong data", 400) diff --git a/backend/apis/trigger.py b/backend/apis/trigger.py index 4aa46cfb..274bc916 100644 --- a/backend/apis/trigger.py +++ b/backend/apis/trigger.py @@ -1,95 +1,28 @@ import json -import os -from datetime import datetime -from redis import Redis -from rq import Queue - -from actions.actions import Actions -from apis.base import app, check_configs, user +from actions.trigger import Trigger from bottle import HTTPResponse, redirect, request from models.initial_config import InitialConfig -from models.trigger_run import TriggerRun +from models.run import Run from packages.vcs.vcs import VCSFactory +from redis import Redis +from rq import Queue +from utils.constants import PENDING -BIN_DIR = "/zeroci/bin/" - -redis = Redis() -actions = Actions() -q = Queue(connection=redis) -PENDING = "pending" +from apis.base import app, check_configs, user - -def trigger(repo="", branch="", commit="", committer="", id=None, triggered=True): - configs = InitialConfig() - status = PENDING - timestamp = datetime.now().timestamp() - if id: - # Triggered from id. - trigger_run = TriggerRun.get(id=id) - triggered_by = request.environ.get("beaker.session").get("username").strip(".3bot") - data = { - "timestamp": timestamp, - "commit": trigger_run.commit, - "committer": trigger_run.committer, - "status": status, - "repo": trigger_run.repo, - "branch": trigger_run.branch, - "triggered_by": triggered_by, - "bin_release": None, - "id": id, - } - trigger_run.timestamp = int(timestamp) - trigger_run.status = status - trigger_run.result = [] - trigger_run.triggered_by = triggered_by - if trigger_run.bin_release: - bin_path = os.path.join(BIN_DIR, trigger_run.repo, trigger_run.branch, trigger_run.bin_release) - if os.path.exists(bin_path): - os.remove(bin_path) - trigger_run.bin_release = None - trigger_run.save() - for key in redis.keys(): - if id in key.decode(): - redis.delete(key) - redis.publish("zeroci_status", json.dumps(data)) - else: - # Triggered from vcs webhook or rebuild using the button. - if repo in configs.repos: - triggered_by = "VCS Hook" - if triggered: - triggered_by = request.environ.get("beaker.session").get("username").strip(".3bot") - data = { - "timestamp": timestamp, - "commit": commit, - "committer": committer, - "status": status, - "repo": repo, - "branch": branch, - "triggered_by": triggered_by, - "bin_release": None, - } - trigger_run = TriggerRun(**data) - trigger_run.save() - id = str(trigger_run.id) - data["id"] = id - redis.publish("zeroci_status", json.dumps(data)) - if id: - link = f"{configs.domain}/repos/{trigger_run.repo}/{trigger_run.branch}/{str(trigger_run.id)}" - vcs_obj = VCSFactory().get_cvn(repo=trigger_run.repo) - vcs_obj.status_send(status=status, link=link, commit=trigger_run.commit) - job = q.enqueue_call(func=actions.build_and_test, args=(id,), result_ttl=5000, timeout=20000) - return job - return None +trigger = Trigger() +queue = Queue(connection=Redis(), name="zeroci") @app.route("/git_trigger", method=["POST"]) @check_configs def git_trigger(): - """Trigger the test when a post request is sent from a repo's webhook. - """ + """Trigger the test when a post request is sent from a repo's webhook.""" + # TODO: make payload validation before work on it. configs = InitialConfig() if request.headers.get("Content-Type") == "application/json": + job = "" # push case reference = request.json.get("ref") if reference: @@ -102,10 +35,31 @@ def git_trigger(): committer = request.json["pusher"]["login"] branch_exist = not commit.startswith("000000") if branch_exist: - job = trigger(repo=repo, branch=branch, commit=commit, committer=committer, triggered=False) - if job: - return HTTPResponse(job.get_id(), 200) - return HTTPResponse("Done", 200) + job = queue.enqueue_call( + trigger.enqueue, + args=(repo, branch, commit, committer, "", None, False), + result_ttl=5000, + timeout=20000, + ) + + # pull case + # TODO: Handle the request for gitea. + elif request.json.get("pull_request"): + if request.json.get("action") in ["opened", "synchronize"]: + repo = request.json["pull_request"]["head"]["repo"]["full_name"] + branch = request.json["pull_request"]["head"]["ref"] + target_branch = request.json["pull_request"]["base"]["ref"] + commit = request.json["pull_request"]["head"]["sha"] + committer = request.json["sender"]["login"] + job = queue.enqueue_call( + trigger.enqueue, + args=(repo, branch, commit, committer, target_branch, None, False), + result_ttl=5000, + timeout=20000, + ) + if job: + return HTTPResponse(job.get_id(), 201) + return HTTPResponse("Nothing to be done", 200) return HTTPResponse("Wrong content type", 400) @@ -117,15 +71,18 @@ def run_trigger(): redirect("/") if request.headers.get("Content-Type") == "application/json": - id = request.json.get("id") - if id: - run = TriggerRun.get(id=id) + run_id = request.json.get("id") + if run_id: + run = Run.get(run_id=run_id) if run.status == PENDING: return HTTPResponse( - f"There is a running job for this id {id}, please try again after this run finishes", 503 + f"There is a running job for this run_id {run_id}, please try again after this run finishes", 503 ) - job = trigger(id=id) - return HTTPResponse(job.get_id(), 200) + job = queue.enqueue_call( + trigger.enqueue, args=("", "", "", "", "", run_id, True), result_ttl=5000, timeout=20000 + ) + if job: + return HTTPResponse(job.get_id(), 200) repo = request.json.get("repo") branch = request.json.get("branch") @@ -133,13 +90,18 @@ def run_trigger(): last_commit = vcs_obj.get_last_commit(branch=branch) committer = vcs_obj.get_committer(commit=last_commit) where = {"repo": repo, "branch": branch, "commit": last_commit, "status": PENDING} - run = TriggerRun.get_objects(fields=["status"], **where) + run = Run.get_objects(fields=["status"], **where) if run: return HTTPResponse( f"There is a running job from this commit {last_commit}, please try again after this run finishes", 503 ) if last_commit: - job = trigger(repo=repo, branch=branch, commit=last_commit, committer=committer) + job = queue.enqueue_call( + trigger.enqueue, + args=(repo, branch, last_commit, committer, "", None, True), + result_ttl=5000, + timeout=20000, + ) else: return HTTPResponse(f"Couldn't get last commit from this branch {branch}, please try again", 503) if job: diff --git a/backend/apis/websockets.py b/backend/apis/websockets.py index 899e05b2..9a6d64c9 100644 --- a/backend/apis/websockets.py +++ b/backend/apis/websockets.py @@ -1,29 +1,30 @@ -from gevent import sleep -from redis import Redis import json -from apis.base import app from bottle import abort, request +from gevent import sleep from geventwebsocket import WebSocketError +from redis import Redis + +from apis.base import app redis = Redis() -@app.route("/websocket/logs/") -def logs(id): +@app.route("/websocket/logs/") +def logs(run_id): wsock = request.environ.get("wsgi.websocket") if not wsock: abort(400, "Expected WebSocket request.") start = 0 while start != -1: - length = redis.llen(id) + length = redis.llen(run_id) if start > length: break if start == length: sleep(0.01) continue - result_list = redis.lrange(id, start, length) + result_list = redis.lrange(run_id, start, length) if b"hamada ok" in result_list: result_list.remove(b"hamada ok") start = -1 @@ -67,8 +68,8 @@ def neph_logs(neph_id): break -@app.route("/websocket/neph_jobs/") -def neph_jobs(id): +@app.route("/websocket/neph_jobs/") +def neph_jobs(job_id): wsock = request.environ.get("wsgi.websocket") if not wsock: abort(400, "Expected WebSocket request.") @@ -78,7 +79,7 @@ def neph_jobs(id): new_jobs = [] for key in redis.keys(): key = key.decode() - if key.startswith(f"neph:{id}"): + if key.startswith(f"neph:{job_id}"): if key not in jobs: jobs.append(key) key = key.replace(" ", "%20") diff --git a/backend/deployment/container.py b/backend/deployment/container.py index 72fdc559..4a64eaa5 100644 --- a/backend/deployment/container.py +++ b/backend/deployment/container.py @@ -3,11 +3,11 @@ import time import paramiko -import redis import yaml - from kubernetes import client, config from kubernetes.stream import stream +from redis import Redis +from utils.constants import BIN_DIR from utils.utils import Utils TIMEOUT = 120 @@ -24,11 +24,30 @@ def __init__(self, rc, out): class Container(Utils): - def __init__(self): + def __init__(self, name=None, test_container_name=None, helper_container_name=None, namespace=None): super().__init__() + config.load_incluster_config() self.shell_bin = "/bin/sh" + self.name = name + self.test_container_name = test_container_name + self.helper_container_name = helper_container_name + self.namespace = namespace + self._redis = None + self._client = None + + @property + def redis(self): + if not self._redis: + self._redis = Redis() + return self._redis - def ssh_command(self, cmd, ip=None, port=22): + @property + def client(self): + if not self._client: + self._client = client.CoreV1Api() + return self._client + + def execute_on_helper(self, cmd): """Execute a command on a remote machine using ssh. :param cmd: command to be executed on a remote machine. :type cmd: str @@ -38,13 +57,11 @@ def ssh_command(self, cmd, ip=None, port=22): :type port: int :return: Execution object containing (returncode, stdout) """ - if not ip: - ip = self.name out = "" client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.MissingHostKeyPolicy()) try: - client.connect(hostname=ip, port=port, timeout=30) + client.connect(hostname=self.name, port=22, timeout=30) except: out = "Couldn't ssh on the helper container, maybe the test broke the ssh or the helper container become unreachable" rc = 1 @@ -62,12 +79,10 @@ def ssh_command(self, cmd, ip=None, port=22): return Complete_Execution(rc, out) - def ssh_get_remote_file(self, remote_path, local_path, ip=None, port=22): - if not ip: - ip = self.name + def get_remote_file_from_helper(self, remote_path, local_path): client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.MissingHostKeyPolicy()) - client.connect(hostname=ip, port=port, timeout=30) + client.connect(hostname=self.name, port=22, timeout=30) ftp = client.open_sftp() try: ftp.get(remote_path, local_path) @@ -76,12 +91,10 @@ def ssh_get_remote_file(self, remote_path, local_path, ip=None, port=22): except: return False - def ssh_set_remote_file(self, remote_path, local_path, ip=None, port=22): - if not ip: - ip = self.name + def set_remote_file_on_helper(self, remote_path, local_path): client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.MissingHostKeyPolicy()) - client.connect(hostname=ip, port=port, timeout=30) + client.connect(hostname=self.name, port=22, timeout=30) ftp = client.open_sftp() try: ftp.put(local_path, remote_path) @@ -90,12 +103,11 @@ def ssh_set_remote_file(self, remote_path, local_path, ip=None, port=22): except: return False - def redis_push(self, id, content, verbose=True): + def _redis_push(self, run_id, content, verbose=True): if verbose: - r = redis.Redis() - r.rpush(id, content) + self.redis.rpush(run_id, content) - def execute_command(self, cmd, id, verbose=True): + def execute_on_test_container(self, cmd, run_id, verbose=True): """Execute a command on a remote machine using ssh. :param cmd: command to be executed on a remote machine. @@ -123,7 +135,7 @@ def execute_command(self, cmd, id, verbose=True): ) except: out += "Couldn't run on the testing container, container become unreachable" - self.redis_push(id, out, verbose=verbose) + self._redis_push(run_id, out, verbose=verbose) rc = 137 return Complete_Execution(rc, out) @@ -133,18 +145,18 @@ def execute_command(self, cmd, id, verbose=True): content = response.read_stdout(timeout=600) except: msg = "\nConnectionError: Couldn't execute cmd on the runner" - self.redis_push(id, msg, verbose=verbose) + self._redis_push(run_id, msg, verbose=verbose) out += msg rc = 124 break end = time.time() time_taken = end - start if content: - self.redis_push(id, content, verbose=verbose) + self._redis_push(run_id, content, verbose=verbose) out += content elif time_taken > 590: msg = "\nTimeout exceeded 10 mins with no output" - self.redis_push(id, msg, verbose=verbose) + self._redis_push(run_id, msg, verbose=verbose) out += msg rc = 124 response.close() @@ -154,21 +166,21 @@ def execute_command(self, cmd, id, verbose=True): rc = response.returncode if rc == 137: msg = "Runner expired (job takes more than 1 hour)" - self.redis_push(id, msg, verbose=verbose) + self._redis_push(run_id, msg, verbose=verbose) out += msg return Complete_Execution(rc, out) - def get_remote_file(self, remote_path, local_path): - response = self.execute_command(f"cat {remote_path}", id="", verbose=False) + def get_remote_file_from_test_container(self, remote_path, local_path): + response = self.execute_on_test_container(f"cat {remote_path}", run_id="", verbose=False) if not response.returncode: self.write_file(text=response.stdout, file_path=local_path) return True return False - def create_pod(self, env, prerequisites, repo_path): + def _create_pod(self, env, prerequisites, repo_path): # zeroci vol - bin_mount_path = "/zeroci/bin" + bin_mount_path = BIN_DIR bin_vol_name = "bin-path" bin_vol = client.V1Volume(name=bin_vol_name, empty_dir={}) bin_vol_mount = client.V1VolumeMount(mount_path=bin_mount_path, name=bin_vol_name) @@ -211,7 +223,7 @@ def create_pod(self, env, prerequisites, repo_path): command=[ "/bin/sh", "-ce", - f"echo {ssh_key} > /root/.ssh/authorized_keys && cp /usr/local/bin/* /zeroci/bin/ \ + f"echo {ssh_key} > /root/.ssh/authorized_keys && cp /usr/local/bin/* {BIN_DIR} \ && service ssh start && sleep 3600", ], env=[non_interactive], @@ -220,13 +232,16 @@ def create_pod(self, env, prerequisites, repo_path): resources=resources, ) spec = client.V1PodSpec( - volumes=vols, containers=[test_container, helper_container], hostname=self.name, restart_policy="Never", + volumes=vols, + containers=[test_container, helper_container], + hostname=self.name, + restart_policy="Never", ) meta = client.V1ObjectMeta(name=self.name, namespace=self.namespace, labels={"app": self.name}) pod = client.V1Pod(api_version="v1", kind="Pod", metadata=meta, spec=spec) self.client.create_namespaced_pod(body=pod, namespace=self.namespace) - def create_service(self): + def _create_service(self): port = client.V1ServicePort(name="ssh", port=22) spec = client.V1ServiceSpec(ports=[port], selector={"app": self.name}) meta = client.V1ObjectMeta(name=self.name, namespace=self.namespace, labels={"app": self.name}) @@ -240,8 +255,6 @@ def deploy(self, env, prerequisites, repo_path): :type prerequisites: list :return: bool (True: if virtual machine is created). """ - config.load_incluster_config() - self.client = client.CoreV1Api() self.name = self.random_string() self.test_container_name = f"test-{self.name}" self.helper_container_name = f"helper-{self.name}" @@ -250,9 +263,9 @@ def deploy(self, env, prerequisites, repo_path): self.shell_bin = prerequisites["shell_bin"] for _ in range(RETRIES): try: - self.create_pod(env=env, prerequisites=prerequisites, repo_path=repo_path) - self.create_service() - self.wait_for_container() + self._create_pod(env=env, prerequisites=prerequisites, repo_path=repo_path) + self._create_service() + self._wait_for_container() break except: self.delete() @@ -260,7 +273,7 @@ def deploy(self, env, prerequisites, repo_path): return False return True - def wait_for_container(self): + def _wait_for_container(self): for _ in range(TIMEOUT): time.sleep(1) container_status = self.client.read_namespaced_pod_status(namespace=self.namespace, name=self.name) @@ -270,15 +283,14 @@ def wait_for_container(self): break def delete(self): - """Delete the container after finishing test. - """ + """Delete the container after finishing test.""" try: self.client.delete_namespaced_pod(name=self.name, namespace=self.namespace) self.client.delete_namespaced_service(name=self.name, namespace=self.namespace) except: pass - def run_test(self, run_cmd, id): + def run_test(self, run_cmd, run_id): """Run test command and get the result as xml file if the running command is following junit otherwise result will be log. :param run_cmd: test command to be run. @@ -289,14 +301,14 @@ def run_test(self, run_cmd, id): :type env: dict :return: path to xml file if exist and subprocess object containing (returncode, stdout, stderr) """ - response = self.execute_command(run_cmd, id=id) + response = self.execute_on_test_container(run_cmd, run_id=run_id) file_path = "/zeroci/xml/{}.xml".format(self.random_string()) remote_path = "/test.xml" - copied = self.get_remote_file(remote_path=remote_path, local_path=file_path) + copied = self.get_remote_file_from_test_container(remote_path=remote_path, local_path=file_path) if copied: file_path = file_path delete_cmd = f"rm -f {remote_path}" - self.execute_command(delete_cmd, id=id) + self.execute_on_test_container(delete_cmd, run_id=run_id) else: if os.path.exists(file_path): os.remove(file_path) diff --git a/backend/health/cleanup.py b/backend/health/cleanup.py index bb59023b..5bb47504 100644 --- a/backend/health/cleanup.py +++ b/backend/health/cleanup.py @@ -5,19 +5,17 @@ from datetime import datetime from pathlib import Path -from redis import Redis - from models.base import StoredFactory from models.scheduler_run import SchedulerRun from models.trigger_run import TriggerRun - +from redis import Redis REDIS_PATH = "/var/lib/redis" WHOOSH_PATH = "/root/.config/jumpscale/whoosh_indexes/" def remove(factory, days=30): - r = Redis() + redis = Redis() names = factory.list_all() for name in names: obj = factory.get(name) @@ -26,7 +24,7 @@ def remove(factory, days=30): time_diff = now_time - run_time if time_diff.days > days: factory.delete(name) - r.delete(obj.id) + redis.delete(obj.run_id) def get_size_in_giga_bytes(path): diff --git a/backend/health/health_check.py b/backend/health/health_check.py index 556f0832..e60efa25 100644 --- a/backend/health/health_check.py +++ b/backend/health/health_check.py @@ -3,9 +3,9 @@ sys.path.append("/sandbox/code/github/threefoldtech/zeroCI/backend") from redis import Redis +from utils.utils import Utils from health_recover import Recover -from utils.utils import Utils recover = Recover() @@ -18,15 +18,13 @@ def get_process_pid(self, name): return pids def test_zeroci_server(self): - """Check zeroci server is still running - """ + """Check zeroci server is still running""" pid = self.get_process_pid("python3 zeroci") if not pid: recover.zeroci() def test_redis(self): - """Check redis is still running. - """ + """Check redis is still running.""" pid = self.get_process_pid("redis") if not pid: recover.redis() @@ -39,8 +37,7 @@ def test_redis(self): recover.redis() def test_workers(self): - """Check rq workers are up. - """ + """Check rq workers are up.""" pids = self.get_process_pid("python3 worker") workers = len(pids) if workers < 5: @@ -49,9 +46,18 @@ def test_workers(self): if not pid: recover.worker(i) + def test_zeroci_workers(self): + """Check rq workers are up.""" + pids = self.get_process_pid("python3 zeroci_worker") + zeroci_workers = len(pids) + if zeroci_workers < 2: + for i in range(1, 6): + pid = self.get_process_pid(f"python3 zeroci_worker{i}") + if not pid: + recover.zeroci_worker(i) + def test_schedule(self): - """Check rq schedule is up. - """ + """Check rq schedule is up.""" pid = self.get_process_pid("rqscheduler") if not pid: recover.scheduler() diff --git a/backend/health/health_recover.py b/backend/health/health_recover.py index 3da16df3..db252256 100644 --- a/backend/health/health_recover.py +++ b/backend/health/health_recover.py @@ -13,8 +13,12 @@ def redis(self): cmd = "redis-server /etc/redis/redis.conf" self.execute_cmd(cmd=cmd, timeout=TIMEOUT) - def worker(self, id): - cmd = f"/bin/bash -c 'cd {PATH}; python3 worker{id}.py &>> worker_{id}.log &'" + def worker(self, wid): + cmd = f"/bin/bash -c 'cd {PATH}; python3 worker{wid}.py &>> worker_{wid}.log &'" + self.execute_cmd(cmd=cmd, timeout=TIMEOUT) + + def zeroci_worker(self, wid): + cmd = f"/bin/bash -c 'cd {PATH}; python3 zeroci_worker{wid}.py &>> zeroci_worker_{wid}.log &'" self.execute_cmd(cmd=cmd, timeout=TIMEOUT) def scheduler(self): diff --git a/backend/models/base.py b/backend/models/base.py index 1b89566d..6effaca5 100644 --- a/backend/models/base.py +++ b/backend/models/base.py @@ -3,8 +3,8 @@ class Document(Base): @property - def id(self): - return self.instance_name.strip("model") + def run_id(self): + return self.instance_name @property def name(self): @@ -15,13 +15,8 @@ class ModelFactory: _model = None @classmethod - def get(cls, id): - name = "model" + str(id) - return cls._model.find(name) - - @classmethod - def get_by_name(cls, name): - return cls._model.find(name) + def get(cls, run_id): + return cls._model.find(run_id) @classmethod def list_all(cls): @@ -67,7 +62,7 @@ def get_objects(cls, fields, order_by=None, asc=True, **kwargs): obj_dict = {} for field in fields: obj_dict[field] = getattr(obj, field) - obj_dict["id"] = obj.instance_name.strip("model") + obj_dict["run_id"] = obj.instance_name results.append(obj_dict) if order_by: diff --git a/backend/models/initial_config.py b/backend/models/initial_config.py index c6ea3626..26116036 100644 --- a/backend/models/initial_config.py +++ b/backend/models/initial_config.py @@ -27,6 +27,6 @@ def _get_vcs_type(self, vcs_host): class InitialConfig(ModelFactory): _model = StoredFactory(InitialConfigModel) - def __new__(self, **kwargs): + def __new__(cls, **kwargs): name = "Initial_config" - return self._model.get(name=name, **kwargs) + return cls._model.get(name=name, **kwargs) diff --git a/backend/models/trigger_run.py b/backend/models/run.py similarity index 59% rename from backend/models/trigger_run.py rename to backend/models/run.py index aa42ed80..194e59e0 100644 --- a/backend/models/trigger_run.py +++ b/backend/models/run.py @@ -1,7 +1,11 @@ -from .base import Document, ModelFactory, fields, StoredFactory +import uuid +from utils.utils import Utils -class TriggerModel(Document): +from .base import Document, ModelFactory, StoredFactory, fields + + +class RunModel(Document): timestamp = fields.Integer(required=True, indexed=True) repo = fields.String(required=True) branch = fields.String(required=True) @@ -13,10 +17,10 @@ class TriggerModel(Document): result = fields.List(field=fields.Typed(dict)) -class TriggerRun(ModelFactory): - _model = StoredFactory(TriggerModel) +class Run(ModelFactory): + _model = StoredFactory(RunModel) - def __new__(self, **kwargs): - name = "model" + str(int(kwargs["timestamp"] * 10 ** 6)) + def __new__(cls, **kwargs): + name = Utils.random_string() kwargs["timestamp"] = int(kwargs["timestamp"]) - return self._model.new(name=name, **kwargs) + return cls._model.new(name=name, **kwargs) diff --git a/backend/models/run_config.py b/backend/models/run_config.py index e3c8f3c9..c40830dd 100644 --- a/backend/models/run_config.py +++ b/backend/models/run_config.py @@ -8,6 +8,6 @@ class RunConfigModel(Document): class RunConfig(ModelFactory): _model = StoredFactory(RunConfigModel) - def __new__(self, **kwargs): + def __new__(cls, **kwargs): kwargs["name"] = kwargs["name"].replace("/", "_") - return self._model.get(**kwargs) + return cls._model.get(**kwargs) diff --git a/backend/models/schedule_info.py b/backend/models/schedule_info.py deleted file mode 100644 index 54499146..00000000 --- a/backend/models/schedule_info.py +++ /dev/null @@ -1,18 +0,0 @@ -from .base import fields, ModelFactory, Document, StoredFactory - - -class ScheduleInfoModel(Document): - prerequisites = fields.Typed(dict) - install = fields.String(required=True) - script = fields.List(field=fields.Typed(dict)) - run_time = fields.String(required=True) - bin_path = fields.String() - created_by = fields.String(required=True) - - -class ScheduleInfo(ModelFactory): - _model = StoredFactory(ScheduleInfoModel) - - def __new__(self, **kwargs): - name = kwargs["schedule_name"] - return self._model.new(name=name, **kwargs) diff --git a/backend/models/scheduler_run.py b/backend/models/scheduler_run.py deleted file mode 100644 index 00ad8302..00000000 --- a/backend/models/scheduler_run.py +++ /dev/null @@ -1,19 +0,0 @@ -from .base import Document, ModelFactory, fields, StoredFactory - - -class ScheduleModel(Document): - timestamp = fields.Integer(required=True, indexed=True) - schedule_name = fields.String(required=True) - status = fields.String(required=True) - bin_release = fields.String() - triggered_by = fields.String(default="ZeroCI Scheduler") - result = fields.List(field=fields.Typed(dict)) - - -class SchedulerRun(ModelFactory): - _model = StoredFactory(ScheduleModel) - - def __new__(self, **kwargs): - name = "model" + str(int(kwargs["timestamp"] * 10 ** 6)) - kwargs["timestamp"] = int(kwargs["timestamp"]) - return self._model.new(name=name, **kwargs) diff --git a/backend/packages/telegram/telegram.py b/backend/packages/telegram/telegram.py index 1f22a476..584f8922 100644 --- a/backend/packages/telegram/telegram.py +++ b/backend/packages/telegram/telegram.py @@ -1,10 +1,10 @@ import time from urllib.parse import urljoin -from telegram import Bot, ParseMode, InlineKeyboardMarkup, InlineKeyboardButton - from models.initial_config import InitialConfig +from telegram import Bot, InlineKeyboardButton, InlineKeyboardMarkup, ParseMode + RETRIES = 5 diff --git a/backend/packages/vcs/vcs.py b/backend/packages/vcs/vcs.py index 4fbc302c..93f4447d 100644 --- a/backend/packages/vcs/vcs.py +++ b/backend/packages/vcs/vcs.py @@ -3,11 +3,10 @@ from abc import ABCMeta, abstractmethod from urllib.parse import urljoin +import giteapy from github import Github as GH from github import UnknownObjectException from github.GithubException import GithubException - -import giteapy from models.initial_config import InitialConfig @@ -153,10 +152,13 @@ def __init__(self, repo=None): configs = InitialConfig() self.HOOK_URL = urljoin(configs.domain, "git_trigger") if configs.vcs_token: - self.repo = repo self.github_cl = GH(configs.vcs_token) if repo: - self.repo_obj = self.github_cl.get_repo(self.repo) + self._set_repo_obj(repo) + + def _set_repo_obj(self, repo): + self.repo = repo + self.repo_obj = self.github_cl.get_repo(self.repo) @VCSInterface.call_trial def status_send( @@ -218,7 +220,7 @@ def create_hook(self, repo): repo = self.github_cl.get_repo(repo) hook_config = {"url": self.HOOK_URL, "content_type": "json"} try: - repo.create_hook(name="web", config=hook_config, events=["push"], active=True) + repo.create_hook(name="web", config=hook_config, events=["push", "pull_request"], active=True) except (UnknownObjectException, GithubException) as e: if e.status in [404, 403]: return False @@ -265,14 +267,23 @@ def _get_gitea_cl(): self.repo_obj = giteapy.RepositoryApi(_get_gitea_cl()) self.user_obj = giteapy.UserApi(_get_gitea_cl()) self.org_obj = giteapy.OrganizationApi(_get_gitea_cl()) + if repo: - self.repo = repo - self.owner = repo.split("/")[0] # org name - self.repo_name = self.repo.split("/")[-1] + self._set_repo_obj(repo) + + def _set_repo_obj(self, repo): + self.repo = repo + self.owner = repo.split("/")[0] # org name + self.repo_name = self.repo.split("/")[-1] @VCSInterface.call_trial def status_send( - self, status, link, commit, description="ZeroCI for testing", context="continuous-integration/ZeroCI", + self, + status, + link, + commit, + description="ZeroCI for testing", + context="continuous-integration/ZeroCI", ): body = {"context": context, "description": description, "state": status, "target_url": link} self.repo_obj.repo_create_status(self.owner, self.repo_name, commit, body=body) @@ -324,7 +335,10 @@ def create_hook(self, repo): return True config = giteapy.CreateHookOption( - active=True, config={"url": self.HOOK_URL, "content_type": "json"}, events=["push"], type="gitea" + active=True, + config={"url": self.HOOK_URL, "content_type": "json"}, + events=["push", "pull_request"], + type="gitea", ) try: self.repo_obj.repo_create_hook(owner, repo_name, body=config) diff --git a/backend/utils/constants.py b/backend/utils/constants.py new file mode 100644 index 00000000..3ec5ecc0 --- /dev/null +++ b/backend/utils/constants.py @@ -0,0 +1,14 @@ +# Status +PENDING = "pending" +SUCCESS = "success" +FAILURE = "failure" +ERROR = "error" + +# Result types +LOG_TYPE = "log" +TESTSUITE_TYPE = "testsuite" +NEPH_TYPE = "neph" + +# Dirs +BIN_DIR = "/zeroci/bin/" +REPOS_DIR = "/zeroci/code/vcs_repos" diff --git a/backend/utils/reporter.py b/backend/utils/reporter.py deleted file mode 100644 index 4b2740a6..00000000 --- a/backend/utils/reporter.py +++ /dev/null @@ -1,95 +0,0 @@ -import json -from urllib.parse import urljoin - -from redis import Redis - -from models.initial_config import InitialConfig -from packages.telegram.telegram import Telegram -from packages.vcs.vcs import VCSFactory - -r = Redis() -SUCCESS = "success" -FAILURE = "failure" - - -class Reporter: - def report(self, run_id, model_obj, schedule_name=None): - """Report the result to the commit status and Telegram chat. - - :param run_id: DB's run_id of this run details. - :type run_id: str - :param parent_model: the class that the passed id is belonging to. - :type parent_model: class - :param schedule_name: it will have a value if the run is scheduled. - :param schedule_name: str - """ - configs = InitialConfig() - telegram = Telegram() - bin_release = model_obj.bin_release - triggered_by = model_obj.triggered_by - msg = self.report_msg(status=model_obj.status, schedule_name=schedule_name) - if not schedule_name: - url = f"/repos/{model_obj.repo}/{model_obj.branch}/{model_obj.id}" - link = urljoin(configs.domain, url) - if bin_release: - bin_url = f"/bin/{model_obj.repo}/{model_obj.branch}/{bin_release}" - bin_link = urljoin(configs.domain, bin_url) - else: - bin_link = None - data = { - "timestamp": model_obj.timestamp, - "commit": model_obj.commit, - "committer": model_obj.committer, - "status": model_obj.status, - "repo": model_obj.repo, - "branch": model_obj.branch, - "bin_release": bin_release, - "triggered_by": triggered_by, - "id": run_id, - } - r.publish("zeroci_status", json.dumps(data)) - vcs_obj = VCSFactory().get_cvn(repo=model_obj.repo) - vcs_obj.status_send(status=model_obj.status, link=link, commit=model_obj.commit) - telegram.send_msg( - msg=msg, - link=link, - repo=model_obj.repo, - branch=model_obj.branch, - commit=model_obj.commit, - committer=model_obj.committer, - bin_link=bin_link, - triggered_by=triggered_by, - ) - else: - unspaced_schedule = model_obj.schedule_name.replace(" ", "%20") - url = f"/schedules/{unspaced_schedule}/{model_obj.id}" - link = urljoin(configs.domain, url) - if bin_release: - bin_url = f"/bin/{unspaced_schedule}/{bin_release}" - bin_link = urljoin(configs.domain, bin_url) - else: - bin_link = None - data = { - "status": model_obj.status, - "timestamp": model_obj.timestamp, - "schedule_name": schedule_name, - "bin_release": bin_release, - "triggered_by": triggered_by, - "id": run_id, - } - r.publish("zeroci_status", json.dumps(data)) - telegram.send_msg(msg=msg, link=link, bin_link=bin_link, triggered_by=triggered_by) - - def report_msg(self, status, schedule_name=None): - if schedule_name: - name = f"{schedule_name} tests" - else: - name = "Tests" - if status == SUCCESS: - msg = f"✅ {name} passed " - elif status == FAILURE: - msg = f"❌ {name} failed " - else: - msg = f"⛔️ {name} errored " - - return msg diff --git a/backend/utils/utils.py b/backend/utils/utils.py index a952b248..81dda37d 100644 --- a/backend/utils/utils.py +++ b/backend/utils/utils.py @@ -10,6 +10,10 @@ class Utils: + @staticmethod + def random_string(): + return "s" + uuid4().hex + def execute_cmd(self, cmd, timeout=3600): with Popen(cmd, shell=True, universal_newlines=True, stdout=PIPE, stderr=PIPE, encoding="utf-8") as process: try: @@ -23,9 +27,6 @@ def execute_cmd(self, cmd, timeout=3600): return CompletedProcess(process.args, returncode=retruncode, stdout=stdout, stderr=stderr) - def random_string(self): - return "s" + str(uuid4())[:10].replace("-", "") - def write_file(self, text, file_path, append=False, binary=False): """Write result file. @@ -89,7 +90,7 @@ def xml_parse(self, path, line): def load_file(self, path): """Load file content. - + :param path: path to file. :type path: str :return: file content diff --git a/backend/zeroci.py b/backend/zeroci.py index 9bf96905..4ddcb05e 100644 --- a/backend/zeroci.py +++ b/backend/zeroci.py @@ -8,7 +8,6 @@ import apis.config import apis.login import apis.results -import apis.schedule import apis.trigger import apis.websockets import apis.default diff --git a/backend/zeroci_worker.py b/backend/zeroci_worker.py new file mode 100644 index 00000000..2c1d17de --- /dev/null +++ b/backend/zeroci_worker.py @@ -0,0 +1,11 @@ +import os + +from redis import Redis +from rq import Worker, Queue, Connection + +listen = ["zeroci"] + +if __name__ == "__main__": + with Connection(Redis()): + worker = Worker(list(map(Queue, listen))) + worker.work() diff --git a/docs/config/nosetests.yaml b/docs/config/nosetests.yaml index ff0a1e0d..985aca08 100644 --- a/docs/config/nosetests.yaml +++ b/docs/config/nosetests.yaml @@ -1,4 +1,12 @@ # For a repository named AhmedHanafy725/test_zeroci +run_on: + push: + branches: + - development + pull_request: + branches: + - development + jobs: - name: Generate Bin prerequisites: diff --git a/install/startup.sh b/install/startup.sh index ad1c4fc1..11b66c3e 100644 --- a/install/startup.sh +++ b/install/startup.sh @@ -14,6 +14,7 @@ cp jsng_config.toml /root/.config/jumpscale/config.toml cd /sandbox/code/github/threefoldtech/zeroCI/backend redis-server /etc/redis/redis.conf for i in {1..5}; do cp worker.py worker$i.py; python3 worker$i.py &> worker_$i.log & done +for i in {1..2}; do cp zeroci_worker.py zeroci_worker$i.py; python3 zeroci_worker$i.py &> zeroci_worker_$i.log & done rqscheduler &> schedule.log & service cron start python3 zeroci.py