summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--samloader/auth.py49
-rw-r--r--samloader/crypt.py10
-rw-r--r--samloader/fusclient.py45
-rw-r--r--samloader/request.py51
-rw-r--r--samloader/versionfetch.py25
5 files changed, 110 insertions, 70 deletions
diff --git a/samloader/auth.py b/samloader/auth.py
index 5b6a1d5..3758bf8 100644
--- a/samloader/auth.py
+++ b/samloader/auth.py
@@ -1,38 +1,49 @@
# SPDX-License-Identifier: GPL-3.0+
# Copyright (C) 2020 nlscc
-# FUS authentication functions (decrypting nonce, calculating auth token)
+""" FUS authentication functions (decrypting nonce, calculating auth token) """
-from Cryptodome.Cipher import AES
import base64
-import requests
+from Cryptodome.Cipher import AES
+# Constant key input values.
KEY_1 = "hqzdurufm2c8mf6bsjezu1qgveouv7c7"
KEY_2 = "w13r4cvf4hctaujv"
-unpad = lambda d: d[:-d[-1]]
-pad = lambda d: d + bytes([16 - (len(d) % 16)]) * (16 - (len(d) % 16))
+# PKCS#7 padding functions.
+pkcs_unpad = lambda d: d[:-d[-1]]
+pkcs_pad = lambda d: d + bytes([16 - (len(d) % 16)]) * (16 - (len(d) % 16))
-def aes_encrypt(inp, key):
- cipher = AES.new(key, AES.MODE_CBC, key[:16])
- return cipher.encrypt(pad(inp))
+def aes_encrypt(inp: bytes, key: bytes) -> bytes:
+ """ Perform an AES-CBC encryption. Encrypts /inp/ with key /key/. """
+ enc_iv = key[:16] # IV is first 16 bytes of key
+ cipher = AES.new(key, AES.MODE_CBC, enc_iv)
+ return cipher.encrypt(pkcs_pad(inp))
-def aes_decrypt(inp, key):
- cipher = AES.new(key, AES.MODE_CBC, key[:16])
- return unpad(cipher.decrypt(inp))
+def aes_decrypt(inp: bytes, key: bytes) -> bytes:
+ """ Perform an AES-CBC decryption. Decrypts /inp/ with key /key/. """
+ enc_iv = key[:16]
+ cipher = AES.new(key, AES.MODE_CBC, enc_iv)
+ return pkcs_unpad(cipher.decrypt(inp))
-def getfkey(inp):
+def derive_key(nonce: str) -> bytes:
+ """ Calculate the AES key from the FUS input nonce. """
key = ""
+ # First 16 bytes are offsets into KEY_1
for i in range(16):
- key += KEY_1[inp[i]]
+ key += KEY_1[ord(nonce[i]) % 16]
+ # Last 16 bytes are static
key += KEY_2
return key.encode()
-def getauth(nonce):
- keydata = [ord(c) % 16 for c in nonce]
- fkey = getfkey(keydata)
- return base64.b64encode(aes_encrypt(nonce.encode(), fkey)).decode()
+def getauth(nonce: str) -> str:
+ """ Calculate the response token from a given nonce. """
+ nkey = derive_key(nonce)
+ auth_data = aes_encrypt(nonce.encode(), nkey)
+ return base64.b64encode(auth_data).decode()
-def decryptnonce(inp):
- nonce = aes_decrypt(base64.b64decode(inp), KEY_1.encode()).decode()
+def decryptnonce(inp: str) -> str:
+ """ Decrypt the nonce returned by the server. """
+ inp_data = base64.b64decode(inp)
+ nonce = aes_decrypt(inp_data, KEY_1.encode()).decode()
return nonce
diff --git a/samloader/crypt.py b/samloader/crypt.py
index ba7f91c..0b57c5a 100644
--- a/samloader/crypt.py
+++ b/samloader/crypt.py
@@ -1,7 +1,7 @@
# SPDX-License-Identifier: GPL-3.0+
# Copyright (C) 2020 nlscc
-# Calculate keys and decrypt encrypted firmware packages.
+""" Calculate keys and decrypt encrypted firmware packages. """
import hashlib
import xml.etree.ElementTree as ET
@@ -10,11 +10,12 @@ from clint.textui import progress
from . import request
from . import fusclient
-from . import versionfetch
+# PKCS#7 unpad
unpad = lambda d: d[:-d[-1]]
def getv4key(version, model, region):
+ """ Retrieve the AES key for V4 encryption. """
client = fusclient.FUSClient()
req = request.binaryinform(version, model, region, client.nonce)
resp = client.makereq("NF_DownloadBinaryInform.do", req)
@@ -25,12 +26,15 @@ def getv4key(version, model, region):
return hashlib.md5(deckey.encode()).digest()
def getv2key(version, model, region):
+ """ Calculate the AES key for V2 (legacy) encryption. """
deckey = region + ":" + model + ":" + version
return hashlib.md5(deckey.encode()).digest()
def decrypt_progress(inf, outf, key, length):
+ """ Decrypt a stream of data while showing a progress bar. """
cipher = AES.new(key, AES.MODE_ECB)
- assert length % 16 == 0
+ if length % 16 != 0:
+ raise Exception("invalid input block size")
chunks = length//4096+1
for i in progress.bar(range(chunks)):
block = inf.read(4096)
diff --git a/samloader/fusclient.py b/samloader/fusclient.py
index fdb9f28..a203d17 100644
--- a/samloader/fusclient.py
+++ b/samloader/fusclient.py
@@ -1,36 +1,43 @@
# SPDX-License-Identifier: GPL-3.0+
# Copyright (C) 2020 nlscc
-# FUS request helper (automatically sign requests and update tokens)
+""" FUS request helper (automatically sign requests and update tokens) """
import requests
from . import auth
-class FUSClient(object):
+class FUSClient:
+ """ FUS API client. """
def __init__(self):
self.auth = ""
self.sessid = ""
- self.makereq("NF_DownloadGenerateNonce.do")
- def makereq(self, path, data=""):
+ self.makereq("NF_DownloadGenerateNonce.do") # initialize nonce
+ def makereq(self, path: str, data: str = "") -> str:
+ """ Make a FUS request to a given endpoint. """
authv = 'FUS nonce="", signature="' + self.auth + '", nc="", type="", realm="", newauth="1"'
- r = requests.post("https://neofussvr.sslcs.cdngc.net/" + path, data=data,
- headers={"Authorization": authv, "User-Agent": "Kies2.0_FUS"},
- cookies={"JSESSIONID": self.sessid})
- if "NONCE" in r.headers:
- self.encnonce = r.headers["NONCE"]
+ req = requests.post("https://neofussvr.sslcs.cdngc.net/" + path, data=data,
+ headers={"Authorization": authv, "User-Agent": "Kies2.0_FUS"},
+ cookies={"JSESSIONID": self.sessid})
+ # If a new NONCE is present, decrypt it and update our auth token.
+ if "NONCE" in req.headers:
+ self.encnonce = req.headers["NONCE"]
self.nonce = auth.decryptnonce(self.encnonce)
self.auth = auth.getauth(self.nonce)
- if "JSESSIONID" in r.cookies:
- self.sessid = r.cookies["JSESSIONID"]
- r.raise_for_status()
- return r.text
- def downloadfile(self, filename, start=0):
- authv = 'FUS nonce="' + self.encnonce + '", signature="' + self.auth + '", nc="", type="", realm="", newauth="1"'
+ # Update the session cookie if needed.
+ if "JSESSIONID" in req.cookies:
+ self.sessid = req.cookies["JSESSIONID"]
+ req.raise_for_status()
+ return req.text
+ def downloadfile(self, filename: str, start: int = 0) -> requests.Response:
+ """ Make a FUS cloud request to download a given file. """
+ # In a cloud request, we also need to pass the server nonce.
+ authv = 'FUS nonce="' + self.encnonce + '", signature="' + self.auth \
+ + '", nc="", type="", realm="", newauth="1"'
headers = {"Authorization": authv, "User-Agent": "Kies2.0_FUS"}
if start > 0:
headers["Range"] = "bytes={}-".format(start)
- r = requests.get("http://cloud-neofussvr.sslcs.cdngc.net/NF_DownloadBinaryForMass.do",
- params="file=" + filename, headers=headers, stream=True)
- r.raise_for_status()
- return r
+ req = requests.get("http://cloud-neofussvr.sslcs.cdngc.net/NF_DownloadBinaryForMass.do",
+ params="file=" + filename, headers=headers, stream=True)
+ req.raise_for_status()
+ return req
diff --git a/samloader/request.py b/samloader/request.py
index 47619d8..eb1db3e 100644
--- a/samloader/request.py
+++ b/samloader/request.py
@@ -1,11 +1,12 @@
# SPDX-License-Identifier: GPL-3.0+
# Copyright (C) 2020 nlscc
-# Build FUS XML requests.
+""" Build FUS XML requests. """
import xml.etree.ElementTree as ET
-def getlogiccheck(inp, nonce):
+def getlogiccheck(inp: str, nonce: str) -> str:
+ """ Calculate the request checksum for a given input and nonce. """
if len(inp) < 16:
raise Exception("getlogiccheck() input too short")
out = ""
@@ -13,28 +14,42 @@ def getlogiccheck(inp, nonce):
out += inp[ord(c) & 0xf]
return out
-def binaryinform(fw, model, region, nonce):
- fusmsg = ET.Element("FUSMsg")
+def build_reqhdr(fusmsg: ET.Element):
+ """ Build the FUSHdr of an XML message. """
fushdr = ET.SubElement(fusmsg, "FUSHdr")
ET.SubElement(fushdr, "ProtoVer").text = "1.0"
+
+def build_reqbody(fusmsg: ET.Element, params: dict):
+ """ Build the FUSBody of an XML message. """
fusbody = ET.SubElement(fusmsg, "FUSBody")
fput = ET.SubElement(fusbody, "Put")
- ET.SubElement(ET.SubElement(fput, "ACCESS_MODE"), "Data").text = "2"
- ET.SubElement(ET.SubElement(fput, "BINARY_NATURE"), "Data").text = "1"
- ET.SubElement(ET.SubElement(fput, "CLIENT_PRODUCT"), "Data").text = "Smart Switch"
- ET.SubElement(ET.SubElement(fput, "DEVICE_FW_VERSION"), "Data").text = fw
- ET.SubElement(ET.SubElement(fput, "DEVICE_LOCAL_CODE"), "Data").text = region
- ET.SubElement(ET.SubElement(fput, "DEVICE_MODEL_NAME"), "Data").text = model
- ET.SubElement(ET.SubElement(fput, "LOGIC_CHECK"), "Data").text = getlogiccheck(fw, nonce)
+ for tag, value in params.items():
+ setag = ET.SubElement(fput, tag)
+ sedata = ET.SubElement(setag, "Data")
+ sedata.text = str(value)
+
+def binaryinform(fwv: str, model: str, region: str, nonce: str) -> str:
+ """ Build a BinaryInform request. """
+ fusmsg = ET.Element("FUSMsg")
+ build_reqhdr(fusmsg)
+ build_reqbody(fusmsg, {
+ "ACCESS_MODE": 2,
+ "BINARY_NATURE": 1,
+ "CLIENT_PRODUCT": "Smart Switch",
+ "DEVICE_FW_VERSION": fwv,
+ "DEVICE_LOCAL_CODE": region,
+ "DEVICE_MODEL_NAME": model,
+ "LOGIC_CHECK": getlogiccheck(fwv, nonce)
+ })
return ET.tostring(fusmsg)
-def binaryinit(filename, nonce):
+def binaryinit(filename: str, nonce: str) -> str:
+ """ Build a BinaryInit request. """
fusmsg = ET.Element("FUSMsg")
- fushdr = ET.SubElement(fusmsg, "FUSHdr")
- ET.SubElement(fushdr, "ProtoVer").text = "1.0"
- fusbody = ET.SubElement(fusmsg, "FUSBody")
- fput = ET.SubElement(fusbody, "Put")
- ET.SubElement(ET.SubElement(fput, "BINARY_FILE_NAME"), "Data").text = filename
+ build_reqhdr(fusmsg)
checkinp = filename.split(".")[0][-16:]
- ET.SubElement(ET.SubElement(fput, "LOGIC_CHECK"), "Data").text = getlogiccheck(checkinp, nonce)
+ build_reqbody(fusmsg, {
+ "BINARY_FILE_NAME": filename,
+ "LOGIC_CHECK": getlogiccheck(checkinp, nonce)
+ })
return ET.tostring(fusmsg)
diff --git a/samloader/versionfetch.py b/samloader/versionfetch.py
index ff14075..b8f150c 100644
--- a/samloader/versionfetch.py
+++ b/samloader/versionfetch.py
@@ -1,21 +1,24 @@
# SPDX-License-Identifier: GPL-3.0+
# Copyright (C) 2020 nlscc
-# Get the latest firmware version for a device.
+""" Get the latest firmware version for a device. """
import xml.etree.ElementTree as ET
import requests
-def getlatestver(model, region):
- r = requests.get("https://fota-cloud-dn.ospserver.net/firmware/" + region + "/" + model + "/version.xml")
- r.raise_for_status()
- root = ET.fromstring(r.text)
+def getlatestver(model: str, region: str) -> str:
+ """ Get the latest firmware version code for a model and region. """
+ req = requests.get("https://fota-cloud-dn.ospserver.net/firmware/" \
+ + region + "/" + model + "/version.xml")
+ req.raise_for_status()
+ root = ET.fromstring(req.text)
vercode = root.find("./firmware/version/latest").text
if vercode is None:
raise Exception("No latest firmware found")
- vc = vercode.split("/")
- if len(vc) == 3:
- vc.append(vc[0])
- if vc[2] == "":
- vc[2] = vc[0]
- return "/".join(vc)
+ # Normalize retrieved version
+ ver = vercode.split("/")
+ if len(ver) == 3:
+ ver.append(ver[0])
+ if ver[2] == "":
+ ver[2] = ver[0]
+ return "/".join(ver)