1
0
mirror of https://github.com/RIOT-OS/RIOT.git synced 2025-01-18 12:52:44 +01:00
RIOT/dist/tools/pkg_version_check/version_check.py
2024-05-25 09:37:09 +07:00

197 lines
7.9 KiB
Python
Executable File

#!/usr/bin/env python3
# Copyright (C) 2024 Nam Nguyen
#
# This file is subject to the terms and conditions of the GNU Lesser
# General Public License v2.1. See the file LICENSE in the top level
# directory for more details.
import os
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
import shutil
def get_makefiles():
# Get the makefiles and read the PKG_URL and PKG_VERSION
file_contents = {}
# Get the current working directory
relative_path = os.path.join(os.path.dirname(__file__), '..', '..', '..', 'pkg')
current_dir = os.path.abspath(relative_path)
# Iterate through each item in the current directory
for item in os.listdir(current_dir):
# Construct the full path of the item
item_path = os.path.join(current_dir, item)
# Check if the item is a directory
if os.path.isdir(item_path):
# Construct the path of the Makefile within the directory
makefile_path = os.path.join(item_path, 'Makefile')
# Check if Makefile exists in the directory
if os.path.isfile(makefile_path):
# Open the Makefile and read its content
with open(makefile_path, 'r') as file:
content = file.readlines()
# Iterate through each line to find PKG_VERSION and PKG_URL
pkg_version = None
pkg_url = None
for line in content:
# Check if line contains PKG_URL
if line.strip().startswith("PKG_URL"):
# Extract the value of PKG_URL
pkg_url = line.strip().split("=")[1]
# Process space and comments
if pkg_url[0] == " ":
pkg_url = pkg_url[1:]
pkg_url = pkg_url.split(" ")[0]
# Check if the line contains PKG_VERSION
if line.strip().startswith("PKG_VERSION"):
# Extract the value of PKG_VERSION
pkg_version = line.strip().split("=")[1]
# Process space and comments
if pkg_version[0] == " ":
pkg_version = pkg_version[1:]
pkg_version = pkg_version.split(" ")[0]
# If PKG_VERSION was found, store it in the dictionary
if pkg_version:
file_contents[item_path] = [pkg_url, pkg_version]
return file_contents
def process_url(repo_url):
# Remove the '.git' suffix from the repository URL if it is present
if repo_url.endswith(".git"):
repo_url = repo_url[:-4]
return repo_url
def get_hash_from_branch(repo_url, branch, headers):
# Get the latest commit hash from the specified branch of the GitHub repository
commit_url = f"{repo_url}/commits/{branch}"
commit_response = requests.get(commit_url, headers=headers)
commit_response.raise_for_status() # Raise an exception for HTTP errors
commit_data = commit_response.json()
return commit_data["sha"]
def get_latest_hash(repo_url, headers):
# Get the latest commit hash from the GitHub repository
repo_url = process_url(repo_url)
# Try to get the commit hash from the main branch
try:
return get_hash_from_branch(repo_url, "main", headers)
except requests.HTTPError:
# If the main branch is not found, try to get the commit hash from the master branch
try:
return get_hash_from_branch(repo_url, "master", headers)
except requests.HTTPError:
try:
return get_hash_from_branch(repo_url, "dev", headers)
except Exception:
print("Failed to fetch the latest commit hash")
return None
def get_release_or_commit_hash(repo_url, headers):
# Get the latest release or commit hash from the GitHub repository
repo_url = process_url(repo_url)
# Get the latest release data from the GitHub API
release_url = f"{repo_url}/releases/latest"
release_response = requests.get(release_url, headers=headers)
# Try to get the latest release commit hash
if release_response.status_code == 200:
release_data = release_response.json()
tag_name = release_data.get("tag_name")
if tag_name:
commit_url = f"{repo_url}/git/refs/tags/{tag_name}"
commit_response = requests.get(commit_url, headers=headers)
commit_response.raise_for_status() # Raise an exception for HTTP errors
commit_data = commit_response.json()
return commit_data["object"]["sha"]
# Fall back to fetching the latest commit hash from the main/master branches
return get_latest_hash(repo_url, headers)
def check_repository(key, item, api_url, base_url, headers, lock, outdated_packages, failures):
# Check a repository for outdated packages
if not item[0].startswith("https://github.com"):
with lock:
filename = os.path.basename(key)
failures.append(filename)
return
api_repo_url = api_url + item[0][len(base_url):]
if api_repo_url.endswith("/"):
api_repo_url = api_repo_url[:-1]
latest_release = get_release_or_commit_hash(api_repo_url, headers)
if item[1] != latest_release:
# Acquire lock and update outdated count
with lock:
filename = os.path.basename(key)
outdated_packages.append(filename)
def print_columns(items, num_columns=3):
# Get the terminal width
terminal_width = shutil.get_terminal_size().columns
# Calculate the width of each column
column_width = terminal_width // num_columns
# Calculate the number of rows needed (distributed evenly)
num_rows = (len(items) + num_columns - 1) // num_columns
# Create a list of rows for each column
columns = [[] for _ in range(num_columns)]
for i, item in enumerate(items):
columns[i % num_columns].append(item)
# Print items in columns
for row in range(num_rows):
row_items = []
for col in range(num_columns):
if row < len(columns[col]):
row_items.append(columns[col][row].ljust(column_width))
else:
row_items.append("".ljust(column_width))
print("".join(row_items))
def main_func():
api_url = "https://api.github.com/repos/"
base_url = "https://github.com/"
# Define GitHub API token here
github_token = input("Enter GitHub API token: ")
# Create a headers dictionary with the token
headers = {
"Authorization": f"token {github_token}"
}
outdated_packages = []
failures = []
# Lock for synchronizing access to the outdated_count variable
lock = threading.Lock()
versions = get_makefiles()
print("Checking for outdated packages...")
with ThreadPoolExecutor() as executor:
futures = []
# Submit tasks to the executor
for key, item in versions.items():
future = executor.submit(check_repository, key, item,
api_url, base_url, headers, lock,
outdated_packages, failures)
futures.append(future)
# Handle each future as it completes
for future in as_completed(futures):
try:
# Get the result (if any) from the future
future.result()
except Exception as e:
print(f"Cannot get result from future: {e}")
# Print total number of outdated packages
print(f"\nTotal number of outdated packages: {len(outdated_packages)}")
print(f"Total number of failures: {len(failures)}")
print("\nOutdated packages: ")
print_columns(outdated_packages)
print("\nPackages that couldn't be checked: ")
for package in failures:
print(package)
if __name__ == "__main__":
main_func()