#!/usr/bin/env python3 # Copyright (C) 2018 Freie Universitat Berlin # Copyright (C) 2018 Inria # # This file is subject to the terms and conditions of the GNU Lesser # General Public License v2.1. See the file LICENSE in the top level # directory for more details. # # @author Koen Zandberg """Script used to backport PRs.""" import os import os.path import sys import shutil import argparse import git from agithub.GitHub import GitHub ORG = "RIOT-OS" REPO = "RIOT" GITHUBTOKEN_FILE = ".riotgithubtoken" WORKTREE_SUBDIR = "backport_temp" RELEASE_PREFIX = "" RELEASE_SUFFIX = "-branch" LABELS_REMOVE = ["Process: needs backport", "Reviewed: "] LABELS_ADD = ["Process: release backport"] BACKPORT_BRANCH = "backport/{release}/{origbranch}" def _get_labels(pull_request): """ >>> _get_labels({'labels': [{'name': 'test'}, {'name': 'abcd'}]}) ['Process: release backport', 'abcd', 'test'] >>> _get_labels({'labels': [{'name': 'Reviewed: what'}, \ {'name': 'Reviewed: 3-testing'}]}) ['Process: release backport'] >>> _get_labels({'labels': [{'name': 'Process: release backport'}]}) ['Process: release backport'] >>> _get_labels({'labels': [{'name': 'Process: needs backport'}]}) ['Process: release backport'] """ labels = set( label["name"] for label in pull_request["labels"] if all(not label["name"].startswith(remove) for remove in LABELS_REMOVE) ) labels.update(LABELS_ADD) return sorted(list(labels)) def _branch_name_strip(branch_name, prefix=RELEASE_PREFIX, suffix=RELEASE_SUFFIX): """Strip suffix and prefix. >>> _branch_name_strip('2018.10-branch') '2018.10' """ if branch_name.startswith(prefix) and branch_name.endswith(suffix): if prefix: branch_name = branch_name.split(prefix, maxsplit=1)[0] if suffix: branch_name = branch_name.rsplit(suffix, maxsplit=1)[0] return branch_name def _get_latest_release(branches): """Get latest release from a list of branches. >>> _get_latest_release([{'name': '2018.10-branch'}, \ {'name': '2020.10-branch'}]) ('2020.10', '2020.10-branch') >>> _get_latest_release([{'name': '2020.01-branch'}, \ {'name': '2020.04-branch'}]) ('2020.04', '2020.04-branch') >>> _get_latest_release([{'name': 'non-release-branch'}, \ {'name': '2020.04-branch'}]) ('2020.04', '2020.04-branch') """ version_latest = 0 release_fullname = "" release_short = "" for branch in branches: branch_name = _branch_name_strip(branch["name"]) branch_num = 0 try: branch_num = int("".join(branch_name.split("."))) except ValueError: pass if branch_num > version_latest: version_latest = branch_num release_short = branch_name release_fullname = branch["name"] return (release_short, release_fullname) def _find_remote(repo, user, repo_name): for remote in repo.remotes: if remote.url.endswith(f"{user}/{repo_name}.git") or remote.url.endswith( f"{user}/{repo_name}" ): return remote raise ValueError(f"Could not find remote with URL ending in {user}/{repo_name}.git") def _get_upstream(repo): return _find_remote(repo, ORG, REPO) def _delete_worktree(repo, workdir): shutil.rmtree(workdir) repo.git.worktree("prune") def main(): # pylint:disable=too-many-locals,too-many-branches,too-many-statements """Main function of this script.""" keyfile = os.path.join(os.environ["HOME"], GITHUBTOKEN_FILE) parser = argparse.ArgumentParser() parser.add_argument( "-k", "--keyfile", type=argparse.FileType("r"), default=keyfile, help="File containing github token", ) parser.add_argument( "-c", "--comment", action="store_true", help="Put a comment with a reference under" "the original PR", ) parser.add_argument( "-n", "--noop", action="store_true", help="Limited noop mode, creates branch, but doesn't" "push and create the PR", ) parser.add_argument( "-r", "--release-branch", type=str, help="Base the backport on this branch, " "default is the latest", ) parser.add_argument( "--backport-branch-fmt", type=str, default=BACKPORT_BRANCH, help="Backport branch format. " "Fields '{release}' and '{origbranch} will be " "replaced by the release name and remote branch " "name.", ) parser.add_argument( "-d", "--gitdir", type=str, default=os.getcwd(), help="Base git repo to work from", ) parser.add_argument("PR", type=int, help="Pull request number to backport") args = parser.parse_args() gittoken = args.keyfile.read().strip() github_api = GitHub(token=gittoken) # TODO: exception handling status, user = github_api.user.get() if status != 200: print(f'Could not retrieve user: {user["message"]}') sys.exit(1) # Token-scope-check: Is the token is powerful enough to complete # the Backport? response_headers = dict(github_api.getheaders()) # agithub documentation says it's lower case header field-names but # at this moment it's not if "X-OAuth-Scopes" in response_headers: scopes = response_headers["X-OAuth-Scopes"] else: scopes = response_headers["x-oauth-scopes"] scopes_list = [x.strip() for x in scopes.split(",")] if not ("public_repo" in scopes_list or "repo" in scopes_list): print( "missing public_repo scope from token settings." " Please add it on the GitHub webinterface" ) sys.exit(1) username = user["login"] status, pulldata = github_api.repos[ORG][REPO].pulls[args.PR].get() if status != 200: print(f'Commit #{args.PR} not found: {pulldata["message"]}') sys.exit(2) if not pulldata["merged"]: print("Original PR not yet merged") sys.exit(0) print(f'Fetching for commit: #{args.PR}: {pulldata["title"]}') orig_branch = pulldata["head"]["ref"] status, commits = github_api.repos[ORG][REPO].pulls[args.PR].commits.get() if status != 200: print(f'No commits found for #{args.PR}: {commits["message"]}') sys.exit(3) for commit in commits: print(f'found {commit["sha"]} : {commit["commit"]["message"]}') # Find latest release branch if args.release_branch: release_fullname = args.release_branch release_shortname = _branch_name_strip(args.release_branch) else: status, branches = github_api.repos[ORG][REPO].branches.get() if status != 200: print( f"Could not retrieve branches for {ORG}/{REPO}: " f'{branches["message"]}' ) sys.exit(4) release_shortname, release_fullname = _get_latest_release(branches) if not release_fullname: print("No release branch found, exiting") sys.exit(5) print(f"Backport based on branch {release_fullname}") repo = git.Repo(args.gitdir, search_parent_directories=True) # Fetch current upstream upstream_remote = _get_upstream(repo) if not upstream_remote: print("No upstream remote found, can't fetch") sys.exit(6) print(f"Fetching {upstream_remote} remote") upstream_remote.fetch() # Build topic branch in temp dir new_branch = args.backport_branch_fmt.format( release=release_shortname, origbranch=orig_branch ) if new_branch in repo.branches: print(f"ERROR: Branch {new_branch} already exists") sys.exit(1) worktree_dir = os.path.join(repo.working_dir, WORKTREE_SUBDIR) repo.git.worktree( "add", "-b", new_branch, WORKTREE_SUBDIR, f"{upstream_remote}/{release_fullname}", ) # transform branch name into Head object for later configuring new_branch = repo.branches[new_branch] try: bp_repo = git.Repo(worktree_dir) # Apply commits for commit in commits: bp_repo.git.cherry_pick("-x", commit["sha"]) # Push to github origin = _find_remote(repo, username, REPO) print(f"Pushing branch {new_branch} to {origin}") if not args.noop: push_info = origin.push(f"{new_branch}:{new_branch}") new_branch.set_tracking_branch(push_info[0].remote_ref) except Exception as exc: # Delete worktree print(f"Pruning temporary workdir at {worktree_dir}") _delete_worktree(repo, worktree_dir) # also delete branch created by worktree; this is only possible after # the worktree was deleted repo.delete_head(new_branch) raise exc else: # Delete worktree print(f"Pruning temporary workdir at {worktree_dir}") _delete_worktree(repo, worktree_dir) labels = _get_labels(pulldata) merger = pulldata["merged_by"]["login"] if not args.noop: # Open new PR on github pull_request = { "title": f'{pulldata["title"]} [backport {release_shortname}]', "head": f"{username}:{new_branch}", "base": release_fullname, "body": f'# Backport of #{args.PR}\n\n{pulldata["body"]}', "maintainer_can_modify": True, } status, new_pr = github_api.repos[ORG][REPO].pulls.post(body=pull_request) if status != 201: print( f'Error creating the new pr: "{new_pr["message"]}". ' 'Is "Public Repo" access enabled for the token?' ) pr_number = new_pr["number"] print(f"Create PR number #{pr_number} for backport") github_api.repos[ORG][REPO].issues[pr_number].labels.post(body=labels) review_request = {"reviewers": [merger]} github_api.repos[ORG][REPO].pulls[pr_number].requested_reviewers.post( body=review_request ) # Put commit under old PR if args.comment and not args.noop: comment = {"body": f"Backport provided in #{pr_number}"} status, res = ( github_api.repos[ORG][REPO].issues[args.PR].comments.post(body=comment) ) if status != 201: print(f'Something went wrong adding the comment: {res["message"]}') print(f"Added comment to #{args.PR}") if __name__ == "__main__": main()