From 2dffa310a144eebe579032e213469d7595277432 Mon Sep 17 00:00:00 2001 From: nlscc <66028747+nlscc@users.noreply.github.com> Date: Fri, 21 May 2021 23:24:56 +0100 Subject: partial code refactor / cleanup --- samloader/auth.py | 49 +++++++++++++++++++++++++++------------------ samloader/crypt.py | 10 +++++++--- samloader/fusclient.py | 45 +++++++++++++++++++++++------------------ samloader/request.py | 51 ++++++++++++++++++++++++++++++----------------- samloader/versionfetch.py | 25 +++++++++++++---------- 5 files changed, 110 insertions(+), 70 deletions(-) diff --git a/samloader/auth.py b/samloader/auth.py index 5b6a1d5..3758bf8 100644 --- a/samloader/auth.py +++ b/samloader/auth.py @@ -1,38 +1,49 @@ # SPDX-License-Identifier: GPL-3.0+ # Copyright (C) 2020 nlscc -# FUS authentication functions (decrypting nonce, calculating auth token) +""" FUS authentication functions (decrypting nonce, calculating auth token) """ -from Cryptodome.Cipher import AES import base64 -import requests +from Cryptodome.Cipher import AES +# Constant key input values. KEY_1 = "hqzdurufm2c8mf6bsjezu1qgveouv7c7" KEY_2 = "w13r4cvf4hctaujv" -unpad = lambda d: d[:-d[-1]] -pad = lambda d: d + bytes([16 - (len(d) % 16)]) * (16 - (len(d) % 16)) +# PKCS#7 padding functions. +pkcs_unpad = lambda d: d[:-d[-1]] +pkcs_pad = lambda d: d + bytes([16 - (len(d) % 16)]) * (16 - (len(d) % 16)) -def aes_encrypt(inp, key): - cipher = AES.new(key, AES.MODE_CBC, key[:16]) - return cipher.encrypt(pad(inp)) +def aes_encrypt(inp: bytes, key: bytes) -> bytes: + """ Perform an AES-CBC encryption. Encrypts /inp/ with key /key/. """ + enc_iv = key[:16] # IV is first 16 bytes of key + cipher = AES.new(key, AES.MODE_CBC, enc_iv) + return cipher.encrypt(pkcs_pad(inp)) -def aes_decrypt(inp, key): - cipher = AES.new(key, AES.MODE_CBC, key[:16]) - return unpad(cipher.decrypt(inp)) +def aes_decrypt(inp: bytes, key: bytes) -> bytes: + """ Perform an AES-CBC decryption. Decrypts /inp/ with key /key/. """ + enc_iv = key[:16] + cipher = AES.new(key, AES.MODE_CBC, enc_iv) + return pkcs_unpad(cipher.decrypt(inp)) -def getfkey(inp): +def derive_key(nonce: str) -> bytes: + """ Calculate the AES key from the FUS input nonce. """ key = "" + # First 16 bytes are offsets into KEY_1 for i in range(16): - key += KEY_1[inp[i]] + key += KEY_1[ord(nonce[i]) % 16] + # Last 16 bytes are static key += KEY_2 return key.encode() -def getauth(nonce): - keydata = [ord(c) % 16 for c in nonce] - fkey = getfkey(keydata) - return base64.b64encode(aes_encrypt(nonce.encode(), fkey)).decode() +def getauth(nonce: str) -> str: + """ Calculate the response token from a given nonce. """ + nkey = derive_key(nonce) + auth_data = aes_encrypt(nonce.encode(), nkey) + return base64.b64encode(auth_data).decode() -def decryptnonce(inp): - nonce = aes_decrypt(base64.b64decode(inp), KEY_1.encode()).decode() +def decryptnonce(inp: str) -> str: + """ Decrypt the nonce returned by the server. """ + inp_data = base64.b64decode(inp) + nonce = aes_decrypt(inp_data, KEY_1.encode()).decode() return nonce diff --git a/samloader/crypt.py b/samloader/crypt.py index ba7f91c..0b57c5a 100644 --- a/samloader/crypt.py +++ b/samloader/crypt.py @@ -1,7 +1,7 @@ # SPDX-License-Identifier: GPL-3.0+ # Copyright (C) 2020 nlscc -# Calculate keys and decrypt encrypted firmware packages. +""" Calculate keys and decrypt encrypted firmware packages. """ import hashlib import xml.etree.ElementTree as ET @@ -10,11 +10,12 @@ from clint.textui import progress from . import request from . import fusclient -from . import versionfetch +# PKCS#7 unpad unpad = lambda d: d[:-d[-1]] def getv4key(version, model, region): + """ Retrieve the AES key for V4 encryption. """ client = fusclient.FUSClient() req = request.binaryinform(version, model, region, client.nonce) resp = client.makereq("NF_DownloadBinaryInform.do", req) @@ -25,12 +26,15 @@ def getv4key(version, model, region): return hashlib.md5(deckey.encode()).digest() def getv2key(version, model, region): + """ Calculate the AES key for V2 (legacy) encryption. """ deckey = region + ":" + model + ":" + version return hashlib.md5(deckey.encode()).digest() def decrypt_progress(inf, outf, key, length): + """ Decrypt a stream of data while showing a progress bar. """ cipher = AES.new(key, AES.MODE_ECB) - assert length % 16 == 0 + if length % 16 != 0: + raise Exception("invalid input block size") chunks = length//4096+1 for i in progress.bar(range(chunks)): block = inf.read(4096) diff --git a/samloader/fusclient.py b/samloader/fusclient.py index fdb9f28..a203d17 100644 --- a/samloader/fusclient.py +++ b/samloader/fusclient.py @@ -1,36 +1,43 @@ # SPDX-License-Identifier: GPL-3.0+ # Copyright (C) 2020 nlscc -# FUS request helper (automatically sign requests and update tokens) +""" FUS request helper (automatically sign requests and update tokens) """ import requests from . import auth -class FUSClient(object): +class FUSClient: + """ FUS API client. """ def __init__(self): self.auth = "" self.sessid = "" - self.makereq("NF_DownloadGenerateNonce.do") - def makereq(self, path, data=""): + self.makereq("NF_DownloadGenerateNonce.do") # initialize nonce + def makereq(self, path: str, data: str = "") -> str: + """ Make a FUS request to a given endpoint. """ authv = 'FUS nonce="", signature="' + self.auth + '", nc="", type="", realm="", newauth="1"' - r = requests.post("https://neofussvr.sslcs.cdngc.net/" + path, data=data, - headers={"Authorization": authv, "User-Agent": "Kies2.0_FUS"}, - cookies={"JSESSIONID": self.sessid}) - if "NONCE" in r.headers: - self.encnonce = r.headers["NONCE"] + req = requests.post("https://neofussvr.sslcs.cdngc.net/" + path, data=data, + headers={"Authorization": authv, "User-Agent": "Kies2.0_FUS"}, + cookies={"JSESSIONID": self.sessid}) + # If a new NONCE is present, decrypt it and update our auth token. + if "NONCE" in req.headers: + self.encnonce = req.headers["NONCE"] self.nonce = auth.decryptnonce(self.encnonce) self.auth = auth.getauth(self.nonce) - if "JSESSIONID" in r.cookies: - self.sessid = r.cookies["JSESSIONID"] - r.raise_for_status() - return r.text - def downloadfile(self, filename, start=0): - authv = 'FUS nonce="' + self.encnonce + '", signature="' + self.auth + '", nc="", type="", realm="", newauth="1"' + # Update the session cookie if needed. + if "JSESSIONID" in req.cookies: + self.sessid = req.cookies["JSESSIONID"] + req.raise_for_status() + return req.text + def downloadfile(self, filename: str, start: int = 0) -> requests.Response: + """ Make a FUS cloud request to download a given file. """ + # In a cloud request, we also need to pass the server nonce. + authv = 'FUS nonce="' + self.encnonce + '", signature="' + self.auth \ + + '", nc="", type="", realm="", newauth="1"' headers = {"Authorization": authv, "User-Agent": "Kies2.0_FUS"} if start > 0: headers["Range"] = "bytes={}-".format(start) - r = requests.get("http://cloud-neofussvr.sslcs.cdngc.net/NF_DownloadBinaryForMass.do", - params="file=" + filename, headers=headers, stream=True) - r.raise_for_status() - return r + req = requests.get("http://cloud-neofussvr.sslcs.cdngc.net/NF_DownloadBinaryForMass.do", + params="file=" + filename, headers=headers, stream=True) + req.raise_for_status() + return req diff --git a/samloader/request.py b/samloader/request.py index 47619d8..eb1db3e 100644 --- a/samloader/request.py +++ b/samloader/request.py @@ -1,11 +1,12 @@ # SPDX-License-Identifier: GPL-3.0+ # Copyright (C) 2020 nlscc -# Build FUS XML requests. +""" Build FUS XML requests. """ import xml.etree.ElementTree as ET -def getlogiccheck(inp, nonce): +def getlogiccheck(inp: str, nonce: str) -> str: + """ Calculate the request checksum for a given input and nonce. """ if len(inp) < 16: raise Exception("getlogiccheck() input too short") out = "" @@ -13,28 +14,42 @@ def getlogiccheck(inp, nonce): out += inp[ord(c) & 0xf] return out -def binaryinform(fw, model, region, nonce): - fusmsg = ET.Element("FUSMsg") +def build_reqhdr(fusmsg: ET.Element): + """ Build the FUSHdr of an XML message. """ fushdr = ET.SubElement(fusmsg, "FUSHdr") ET.SubElement(fushdr, "ProtoVer").text = "1.0" + +def build_reqbody(fusmsg: ET.Element, params: dict): + """ Build the FUSBody of an XML message. """ fusbody = ET.SubElement(fusmsg, "FUSBody") fput = ET.SubElement(fusbody, "Put") - ET.SubElement(ET.SubElement(fput, "ACCESS_MODE"), "Data").text = "2" - ET.SubElement(ET.SubElement(fput, "BINARY_NATURE"), "Data").text = "1" - ET.SubElement(ET.SubElement(fput, "CLIENT_PRODUCT"), "Data").text = "Smart Switch" - ET.SubElement(ET.SubElement(fput, "DEVICE_FW_VERSION"), "Data").text = fw - ET.SubElement(ET.SubElement(fput, "DEVICE_LOCAL_CODE"), "Data").text = region - ET.SubElement(ET.SubElement(fput, "DEVICE_MODEL_NAME"), "Data").text = model - ET.SubElement(ET.SubElement(fput, "LOGIC_CHECK"), "Data").text = getlogiccheck(fw, nonce) + for tag, value in params.items(): + setag = ET.SubElement(fput, tag) + sedata = ET.SubElement(setag, "Data") + sedata.text = str(value) + +def binaryinform(fwv: str, model: str, region: str, nonce: str) -> str: + """ Build a BinaryInform request. """ + fusmsg = ET.Element("FUSMsg") + build_reqhdr(fusmsg) + build_reqbody(fusmsg, { + "ACCESS_MODE": 2, + "BINARY_NATURE": 1, + "CLIENT_PRODUCT": "Smart Switch", + "DEVICE_FW_VERSION": fwv, + "DEVICE_LOCAL_CODE": region, + "DEVICE_MODEL_NAME": model, + "LOGIC_CHECK": getlogiccheck(fwv, nonce) + }) return ET.tostring(fusmsg) -def binaryinit(filename, nonce): +def binaryinit(filename: str, nonce: str) -> str: + """ Build a BinaryInit request. """ fusmsg = ET.Element("FUSMsg") - fushdr = ET.SubElement(fusmsg, "FUSHdr") - ET.SubElement(fushdr, "ProtoVer").text = "1.0" - fusbody = ET.SubElement(fusmsg, "FUSBody") - fput = ET.SubElement(fusbody, "Put") - ET.SubElement(ET.SubElement(fput, "BINARY_FILE_NAME"), "Data").text = filename + build_reqhdr(fusmsg) checkinp = filename.split(".")[0][-16:] - ET.SubElement(ET.SubElement(fput, "LOGIC_CHECK"), "Data").text = getlogiccheck(checkinp, nonce) + build_reqbody(fusmsg, { + "BINARY_FILE_NAME": filename, + "LOGIC_CHECK": getlogiccheck(checkinp, nonce) + }) return ET.tostring(fusmsg) diff --git a/samloader/versionfetch.py b/samloader/versionfetch.py index ff14075..b8f150c 100644 --- a/samloader/versionfetch.py +++ b/samloader/versionfetch.py @@ -1,21 +1,24 @@ # SPDX-License-Identifier: GPL-3.0+ # Copyright (C) 2020 nlscc -# Get the latest firmware version for a device. +""" Get the latest firmware version for a device. """ import xml.etree.ElementTree as ET import requests -def getlatestver(model, region): - r = requests.get("https://fota-cloud-dn.ospserver.net/firmware/" + region + "/" + model + "/version.xml") - r.raise_for_status() - root = ET.fromstring(r.text) +def getlatestver(model: str, region: str) -> str: + """ Get the latest firmware version code for a model and region. """ + req = requests.get("https://fota-cloud-dn.ospserver.net/firmware/" \ + + region + "/" + model + "/version.xml") + req.raise_for_status() + root = ET.fromstring(req.text) vercode = root.find("./firmware/version/latest").text if vercode is None: raise Exception("No latest firmware found") - vc = vercode.split("/") - if len(vc) == 3: - vc.append(vc[0]) - if vc[2] == "": - vc[2] = vc[0] - return "/".join(vc) + # Normalize retrieved version + ver = vercode.split("/") + if len(ver) == 3: + ver.append(ver[0]) + if ver[2] == "": + ver[2] = ver[0] + return "/".join(ver) -- cgit v1.2.3