# SPDX-License-Identifier: GPL-3.0+
# Copyright (C) 2020 nlscc
""" FUS request helper (automatically sign requests and update tokens) """
import requests
from . import auth
class FUSClient:
""" FUS API client. """
def __init__(self):
self.auth = ""
self.sessid = ""
self.makereq("NF_DownloadGenerateNonce.do") # initialize nonce
def makereq(self, path: str, data: str = "") -> str:
""" Make a FUS request to a given endpoint. """
authv = 'FUS nonce="", signature="' + self.auth + '", nc="", type="", realm="", newauth="1"'
req = requests.post("https://neofussvr.sslcs.cdngc.net/" + path, data=data,
headers={"Authorization": authv, "User-Agent": "Kies2.0_FUS"},
cookies={"JSESSIONID": self.sessid})
# If a new NONCE is present, decrypt it and update our auth token.
if "NONCE" in req.headers:
self.encnonce = req.headers["NONCE"]
self.nonce = auth.decryptnonce(self.encnonce)
self.auth = auth.getauth(self.nonce)
# Update the session cookie if needed.
if "JSESSIONID" in req.cookies:
self.sessid = req.cookies["JSESSIONID"]
req.raise_for_status()
return req.text
def downloadfile(self, filename: str, start: int = 0) -> requests.Response:
""" Make a FUS cloud request to download a given file. """
# In a cloud request, we also need to pass the server nonce.
authv = 'FUS nonce="' + self.encnonce + '", signature="' + self.auth \
+ '", nc="", type="", realm="", newauth="1"'
headers = {"Authorization": authv, "User-Agent": "Kies2.0_FUS"}
if start > 0:
headers["Range"] = "bytes={}-".format(start)
req = requests.get("http://cloud-neofussvr.sslcs.cdngc.net/NF_DownloadBinaryForMass.do",
params="file=" + filename, headers=headers, stream=True)
req.raise_for_status()
return req