From 4b49246774a867e558e99bf43ffe266d746aed11 Mon Sep 17 00:00:00 2001 From: nlscc <66028747+nlscc@users.noreply.github.com> Date: Wed, 7 Oct 2020 17:56:36 +0100 Subject: improve CLI and cleanup code - switch to argparse, improve argument parsing - fix error when resuming a file that was already downloaded - cleanup code - bump version number --- samloader/main.py | 141 +++++++++++++++++++--------------------------- samloader/request.py | 2 +- samloader/versionfetch.py | 2 +- 3 files changed, 60 insertions(+), 85 deletions(-) (limited to 'samloader') diff --git a/samloader/main.py b/samloader/main.py index d1c37f5..baa9d48 100644 --- a/samloader/main.py +++ b/samloader/main.py @@ -1,7 +1,7 @@ # SPDX-License-Identifier: GPL-3.0+ # Copyright (C) 2020 nlscc -import click +import argparse import os import base64 import xml.etree.ElementTree as ET @@ -12,92 +12,67 @@ from . import crypt from . import fusclient from . import versionfetch -def getbinaryfile(client, fw, region, model): - req = request.binaryinform(fw, region, model, client.nonce) +def main(): + parser = argparse.ArgumentParser(description="Download and query firmware for Samsung devices.") + parser.add_argument("-m", "--dev-model", help="device region code", required=True) + parser.add_argument("-r", "--dev-region", help="device model", required=True) + subparsers = parser.add_subparsers(dest="command") + dload = subparsers.add_parser("download", help="download a firmware") + dload.add_argument("-v", "--fw-ver", help="firmware version to download", required=True) + dload.add_argument("-R", "--resume", help="resume an unfinished download", action="store_true") + dload.add_argument("-M", "--show-md5", help="print the expected MD5 hash of the downloaded file", action="store_true") + dload_out = dload.add_mutually_exclusive_group(required=True) + dload_out.add_argument("-O", "--out-dir", help="output the server filename to the specified directory") + dload_out.add_argument("-o", "--out-file", help="output to the specified file") + chkupd = subparsers.add_parser("checkupdate", help="check for the latest available firmware version") + decrypt = subparsers.add_parser("decrypt", help="decrypt an encrypted firmware") + decrypt.add_argument("-v", "--fw-ver", help="encrypted firmware version", required=True) + decrypt.add_argument("-V", "--enc-ver", type=int, choices=[2, 4], default=4, help="encryption version (default 4)") + decrypt.add_argument("-i", "--in-file", help="encrypted firmware file input", required=True) + decrypt.add_argument("-o", "--out-file", help="decrypted firmware file output", required=True) + args = parser.parse_args() + if args.command == "download": + client = fusclient.FUSClient() + path, filename, size = getbinaryfile(client, args.fw_ver, args.dev_model, args.dev_region) + print("resuming" if args.resume else "downloading", filename) + out = args.out_file if args.out_file else os.path.join(args.out_dir, filename) + dloffset = os.stat(out).st_size if args.resume else 0 + if dloffset == size: + print("already downloaded!") + return + fd = open(out, "ab" if args.resume else "wb") + initdownload(client, filename) + r = client.downloadfile(path+filename, dloffset) + if args.show_md5 and "Content-MD5" in r.headers: + print("MD5:", base64.b64decode(r.headers["Content-MD5"]).hex()) + # TODO: use own progress bar instead of clint + for chunk in progress.bar(r.iter_content(chunk_size=0x10000), expected_size=(size/0x10000)+1): + if chunk: + fd.write(chunk) + fd.flush() + fd.close() + elif args.command == "checkupdate": + print(versionfetch.getlatestver(args.dev_model, args.dev_region)) + elif args.command == "decrypt": + getkey = crypt.getv4key if args.enc_ver == 4 else crypt.getv2key + key = getkey(args.fw_ver, args.dev_model, args.dev_region) + length = os.stat(args.in_file).st_size + with open(args.in_file, "rb") as inf: + with open(args.out_file, "wb") as outf: + crypt.decrypt_progress(inf, outf, key, length) + +def initdownload(client, filename): + req = request.binaryinit(filename, client.nonce) + resp = client.makereq("NF_DownloadBinaryInitForMass.do", req) + +def getbinaryfile(client, fw, model, region): + req = request.binaryinform(fw, model, region, client.nonce) resp = client.makereq("NF_DownloadBinaryInform.do", req) root = ET.fromstring(resp) status = int(root.find("./FUSBody/Results/Status").text) if status != 200: raise Exception("DownloadBinaryInform returned {}, firmware could not be found?".format(status)) + size = int(root.find("./FUSBody/Put/BINARY_BYTE_SIZE/Data").text) filename = root.find("./FUSBody/Put/BINARY_NAME/Data").text path = root.find("./FUSBody/Put/MODEL_PATH/Data").text - return path, filename - -def initdownload(client, filename): - req = request.binaryinit(filename, client.nonce) - resp = client.makereq("NF_DownloadBinaryInitForMass.do", req) - -@click.group() -def cli(): - pass - -@cli.command(help="Check the update server for the latest available firmware.") -@click.argument("model") -@click.argument("region") -def checkupdate(model, region): - fw = versionfetch.getlatestver(region, model) - print(fw) - -@cli.command(help="Download the specified firmware version.") -@click.argument("version") -@click.argument("model") -@click.argument("region") -@click.argument("out") -def download(version, model, region, out): - client = fusclient.FUSClient() - path, filename = getbinaryfile(client, version, region, model) - initdownload(client, filename) - if os.path.isdir(out): - out = os.path.join(out, filename) - if os.path.exists(out): - f = open(out, "ab") - start = os.stat(out).st_size - print("Resuming {} at {}".format(path+filename, start)) - else: - f = open(out, "wb") - start = 0 - print("Downloading {}".format(path+filename)) - r = client.downloadfile(path+filename, start) - length = int(r.headers["Content-Length"]) - if "Content-MD5" in r.headers: - md5 = base64.b64decode(r.headers["Content-MD5"]).hex() - print("MD5: {}".format(md5)) - for chunk in progress.bar(r.iter_content(chunk_size=0x10000), expected_size=(length/0x10000)+1): - if chunk: - f.write(chunk) - f.flush() - f.close() - print("Done!") - -@cli.command(help="Decrypt enc4 files.") -@click.argument("version") -@click.argument("model") -@click.argument("region") -@click.argument("infile") -@click.argument("outfile") -def decrypt4(version, model, region, infile, outfile): - key = crypt.getv4key(version, model, region) - print("Decrypting with key {}...".format(key.hex())) - length = os.stat(infile).st_size - with open(infile, "rb") as inf: - with open(outfile, "wb") as outf: - crypt.decrypt_progress(inf, outf, key, length) - print("Done!") - -@cli.command(help="Decrypt enc2 files.") -@click.argument("version") -@click.argument("model") -@click.argument("region") -@click.argument("infile") -@click.argument("outfile") -def decrypt2(version, model, region, infile, outfile): - key = crypt.getv2key(version, model, region) - print("Decrypting with key {}...".format(key.hex())) - length = os.stat(infile).st_size - with open(infile, "rb") as inf: - with open(outfile, "wb") as outf: - crypt.decrypt_progress(inf, outf, key, length) - print("Done!") - -if __name__ == "__main__": - cli() + return path, filename, size diff --git a/samloader/request.py b/samloader/request.py index bb95e78..e19d072 100644 --- a/samloader/request.py +++ b/samloader/request.py @@ -11,7 +11,7 @@ def getlogiccheck(inp, nonce): out += inp[ord(c) & 0xf] return out -def binaryinform(fw, region, model, nonce): +def binaryinform(fw, model, region, nonce): fusmsg = ET.Element("FUSMsg") fushdr = ET.SubElement(fusmsg, "FUSHdr") ET.SubElement(fushdr, "ProtoVer").text = "1.0" diff --git a/samloader/versionfetch.py b/samloader/versionfetch.py index d173486..ff14075 100644 --- a/samloader/versionfetch.py +++ b/samloader/versionfetch.py @@ -6,7 +6,7 @@ import xml.etree.ElementTree as ET import requests -def getlatestver(region, model): +def getlatestver(model, region): r = requests.get("https://fota-cloud-dn.ospserver.net/firmware/" + region + "/" + model + "/version.xml") r.raise_for_status() root = ET.fromstring(r.text) -- cgit v1.2.3