,p=$[$]
# where:
# logN, r, p -- decimal-encoded positive integer, no zero-padding
# logN -- log cost setting
# r -- block size setting (usually 8)
# p -- parallelism setting (usually 1)
# salt, digest -- b64-nopad encoded bytes
#
@classmethod
def _parse_scrypt_string(cls, suffix):
# break params, salt, and digest sections
parts = suffix.split("$")
if len(parts) == 3:
params, salt, digest = parts
elif len(parts) == 2:
params, salt = parts
digest = None
else:
raise uh.exc.MalformedHashError(cls, "malformed hash")
# break params apart
parts = params.split(",")
if len(parts) == 3:
nstr, bstr, pstr = parts
assert nstr.startswith("ln=")
assert bstr.startswith("r=")
assert pstr.startswith("p=")
else:
raise uh.exc.MalformedHashError(cls, "malformed settings field")
return dict(
ident=IDENT_SCRYPT,
rounds=int(nstr[3:]),
block_size=int(bstr[2:]),
parallelism=int(pstr[2:]),
salt=b64s_decode(salt.encode("ascii")),
checksum=b64s_decode(digest.encode("ascii")) if digest else None,
)
#
# official format specification defined at
# https://gitlab.com/jas/scrypt-unix-crypt/blob/master/unix-scrypt.txt
# format:
# $7$[$]
# 0 12345 67890 1
# where:
# All bytes use h64-little-endian encoding
# N: 6-bit log cost setting
# r: 30-bit block size setting
# p: 30-bit parallelism setting
# salt: variable length salt bytes
# digest: fixed 32-byte digest
#
@classmethod
def _parse_7_string(cls, suffix):
# XXX: annoyingly, official spec embeds salt *raw*, yet doesn't specify a hash encoding.
# so assuming only h64 chars are valid for salt, and are ASCII encoded.
# split into params & digest
parts = suffix.encode("ascii").split(b"$")
if len(parts) == 2:
params, digest = parts
elif len(parts) == 1:
params, = parts
digest = None
else:
raise uh.exc.MalformedHashError()
# parse params & return
if len(params) < 11:
raise uh.exc.MalformedHashError(cls, "params field too short")
return dict(
ident=IDENT_7,
rounds=h64.decode_int6(params[:1]),
block_size=h64.decode_int30(params[1:6]),
parallelism=h64.decode_int30(params[6:11]),
salt=params[11:],
checksum=h64.decode_bytes(digest) if digest else None,
)
#===================================================================
# formatting
#===================================================================
def to_string(self):
ident = self.ident
if ident == IDENT_SCRYPT:
return "$scrypt$ln=%d,r=%d,p=%d$%s$%s" % (
self.rounds,
self.block_size,
self.parallelism,
bascii_to_str(b64s_encode(self.salt)),
bascii_to_str(b64s_encode(self.checksum)),
)
else:
assert ident == IDENT_7
salt = self.salt
try:
salt.decode("ascii")
except UnicodeDecodeError:
raise suppress_cause(NotImplementedError("scrypt $7$ hashes dont support non-ascii salts"))
return bascii_to_str(b"".join([
b"$7$",
h64.encode_int6(self.rounds),
h64.encode_int30(self.block_size),
h64.encode_int30(self.parallelism),
self.salt,
b"$",
h64.encode_bytes(self.checksum)
]))
#===================================================================
# init
#===================================================================
def __init__(self, block_size=None, **kwds):
super(scrypt, self).__init__(**kwds)
# init block size
if block_size is None:
assert uh.validate_default_value(self, self.block_size, self._norm_block_size,
param="block_size")
else:
self.block_size = self._norm_block_size(block_size)
# NOTE: if hash contains invalid complex constraint, relying on error
# being raised by scrypt call in _calc_checksum()
@classmethod
def _norm_block_size(cls, block_size, relaxed=False):
return uh.norm_integer(cls, block_size, min=1, param="block_size", relaxed=relaxed)
def _generate_salt(self):
salt = super(scrypt, self)._generate_salt()
if self.ident == IDENT_7:
# this format doesn't support non-ascii salts.
# as workaround, we take raw bytes, encoded to base64
salt = b64s_encode(salt)
return salt
#===================================================================
# backend configuration
# NOTE: this following HasManyBackends' API, but provides it's own implementation,
# which actually switches the backend that 'passlib.crypto.scrypt.scrypt()' uses.
#===================================================================
@classproperty
def backends(cls):
return _scrypt.backend_values
@classmethod
def get_backend(cls):
return _scrypt.backend
@classmethod
def has_backend(cls, name="any"):
try:
cls.set_backend(name, dryrun=True)
return True
except uh.exc.MissingBackendError:
return False
@classmethod
def set_backend(cls, name="any", dryrun=False):
_scrypt._set_backend(name, dryrun=dryrun)
#===================================================================
# digest calculation
#===================================================================
def _calc_checksum(self, secret):
secret = to_bytes(secret, param="secret")
return _scrypt.scrypt(secret, self.salt, n=(1 << self.rounds), r=self.block_size,
p=self.parallelism, keylen=self.checksum_size)
#===================================================================
# hash migration
#===================================================================
def _calc_needs_update(self, **kwds):
"""
mark hash as needing update if rounds is outside desired bounds.
"""
# XXX: for now, marking all hashes which don't have matching block_size setting
if self.block_size != type(self).block_size:
return True
return super(scrypt, self)._calc_needs_update(**kwds)
#===================================================================
# eoc
#===================================================================
#=============================================================================
# eof
#=============================================================================