1
0
mirror of https://github.com/RIOT-OS/RIOT.git synced 2024-12-29 04:50:03 +01:00

dist/tools/suit: Update suit tooling to IETF-v7 compliance

This commit is contained in:
Koen Zandberg 2020-06-29 15:42:47 +02:00
parent c8ecc9c3ca
commit 401f8eb9bd
No known key found for this signature in database
GPG Key ID: 0895A893E6D2985B
23 changed files with 782 additions and 228 deletions

View File

@ -28,7 +28,7 @@ EXCLUDE="^(.+/vendor/\
|dist/tools/mcuboot\
|dist/tools/uhcpd\
|dist/tools/stm32loader\
|dist/tools/suit_v3/suit-manifest-generator)\
|dist/tools/suit/suit-manifest-generator)\
|dist/tools/esptool"
FILEREGEX='(\.py$|pyterm$)'
FILES=$(FILEREGEX=${FILEREGEX} EXCLUDE=${EXCLUDE} changed_files)

View File

@ -144,6 +144,8 @@ The `suit-tool` supports three sub-commands:
* `create` generates a new manifest.
* `sign` signs a manifest.
* `parse` parses an existing manifest into cbor-debug or a json representation.
* `keygen` Create a signing key. Not for production use.
* `pubkey` Get the public key for a supplied private key in uECC-compatible C definition.
The `suit-tool` has a configurable log level, specified with `-l`:
@ -178,7 +180,7 @@ To add a component to the manifest from the command-line, use the following synt
The supported fields are:
* `file` the path to a file to use as a payload file.
* `file` the path fo a file to use as a payload file.
* `inst` the `install-id`.
* `uri` the URI where the file will be found.
@ -207,3 +209,28 @@ suit-tool parse -m MANIFEST
```
If a json-representation is needed, add the '-j' flag.
## Keygen
Create an asymmetric keypair for non-production use. Production systems should use closely guarded keys, such as keys stored in an HSM.
```sh
suit-tool keygen [-t TYPE] -o KEYFILE
```
`suit-tool keygen` defaults to creating SECP256r1 keys. To create another type of key, use `-t`followed by one of:
* `secp256r1`
* `secp384r1`
* `secp521r1`
* `ed25519`
## UECC public key
Derive a public key in the format used by micro ECC. The input is a PEM private key.
```sh
suit-tool pubkey -k FILE
```
The tool will then print the public key in micro ECC format.

View File

@ -1,7 +1,7 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ----------------------------------------------------------------------------
# Copyright 2016-2019 ARM Limited or its affiliates
# Copyright 2016-2020 ARM Limited or its affiliates
#
# SPDX-License-Identifier: Apache-2.0
#

View File

@ -55,7 +55,8 @@ setuptools.setup (
install_requires = [
'cbor>=1.0.0',
'colorama>=0.4.0',
'cryptography>=2.8'
'cryptography>=2.8',
'pyhsslms>=1.0.0',
],
classifiers = [
"Programming Language :: Python :: 3",

View File

@ -1,4 +1,3 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# ----------------------------------------------------------------------------
# Copyright 2016-2019 ARM Limited or its affiliates
@ -17,4 +16,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
__version__ = '0.0.1'
__version__ = '0.0.2'

View File

@ -1,4 +1,3 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# ----------------------------------------------------------------------------
# Copyright 2019-2020 ARM Limited or its affiliates
@ -17,22 +16,23 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# ----------------------------------------------------------------------------
import sys
import argparse
import sys, argparse, os
from suit_tool import __version__
from suit_tool import keygen
from suit_tool import get_pubkey
import json
import re
def str_to_component(s):
types = {
'file' : ('file', lambda x : str(x.strip('"'))),
# 'desc' : ('component-description', lambda x : str(x.strip('"'))),
'inst' : ('install-id', lambda x : [ str(y) for y in eval(x) ]),
'uri' : ('uri', lambda x : str(x.strip('"')))
}
d = {types[k][0]:types[k][1](v) for k,v in [ re.split(r'=',e, maxsplit=1) for e in re.split(r''',\s*(?=["']?[a-zA-Z0-9_-]+["']?=)''', s)]}
return d
class MainArgumentParser(object):
def __init__(self):
@ -54,7 +54,7 @@ class MainArgumentParser(object):
# create_parser.add_argument('-v', '--manifest-version', choices=['1'], default='1')
create_parser.add_argument('-i', '--input-file', metavar='FILE', type=argparse.FileType('r'),
help='An input file describing the update. The file must be formatted as JSON. The overal structure is described in README.')
help='An input file describing the update. The file must be formated as JSON. The overal structure is described in README.')
create_parser.add_argument('-o', '--output-file', metavar='FILE', type=argparse.FileType('wb'), required=True)
create_parser.add_argument('-f', '--format', metavar='FMT', choices=['suit', 'suit-debug', 'json'], default='suit')
create_parser.add_argument('-s', '--severable', action='store_true', help='Convert large elements to severable fields.')
@ -72,9 +72,25 @@ class MainArgumentParser(object):
parse_parser.add_argument('-m', '--manifest', metavar='FILE', type=argparse.FileType('rb'), required=True)
parse_parser.add_argument('-j', '--json-output', default=False, action='store_true', dest='json')
get_uecc_pubkey_parser = subparsers.add_parser('pubkey', help='Get the public key for a supplied private key in uECC-compatible C definition.')
get_pubkey_parser = subparsers.add_parser('pubkey', help='Get the public key for a supplied private key.')
get_uecc_pubkey_parser.add_argument('-k', '--private-key', metavar='FILE', type=argparse.FileType('rb'), required=True)
get_pubkey_parser.add_argument('-k', '--private-key', metavar='FILE', type=argparse.FileType('rb'), required=True)
get_pubkey_parser.add_argument('-f', '--output-format', choices=get_pubkey.OutputFormaters.keys(), default='pem')
get_pubkey_parser.add_argument('-o', '--output-file', metavar='FILE', type=argparse.FileType('wb'), default=sys.stdout)
keygen_parser = subparsers.add_parser('keygen', help='Create a signing key. Not for production use')
keygen_parser.add_argument('-t', '--type', choices=keygen.KeyGenerators.keys(),
default='secp256r1', help='The type of the key to generate')
keygen_parser.add_argument('-o', '--output-file', metavar='FILE', type=argparse.FileType('wb'), default=sys.stdout)
keygen_parser.add_argument('-f', '--output-format', choices=keygen.OutputFormaters.keys(), default='pem')
keygen_parser.add_argument('-l', '--levels', help='The number of hss-lms levels', type=int, default=2)
sever_parser = subparsers.add_parser('sever', help='Remove one or more severable elements from the manifest, if present.')
sever_parser.add_argument('-m', '--manifest', metavar='FILE', type=argparse.FileType('rb'), required=True)
sever_parser.add_argument('-o', '--output-file', metavar='FILE', type=argparse.FileType('wb'), required=True)
sever_parser.add_argument('-e', '--element', action='append', type=str, dest='elements', default=[])
sever_parser.add_argument('-a', '--all', action='store_true', default=False)
return parser

View File

@ -1,4 +1,4 @@
#!/usr/bin/python3
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ----------------------------------------------------------------------------
# Copyright 2018-2020 ARM Limited or its affiliates
@ -17,21 +17,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# ----------------------------------------------------------------------------
import logging
import sys
import logging, sys
from suit_tool.argparser import MainArgumentParser
from suit_tool import create, sign, parse, get_uecc_pubkey
from suit_tool import create, sign, parse, get_pubkey, keygen, sever #, verify, cert, init
LOG = logging.getLogger(__name__)
LOG_FORMAT = '[%(levelname)s] %(asctime)s - %(name)s - %(message)s'
LOG_FORMAT='[%(levelname)s] %(asctime)s - %(name)s - %(message)s'
def main():
driver = CLIDriver()
return driver.main()
class CLIDriver(object):
def __init__(self):
@ -46,6 +44,10 @@ class CLIDriver(object):
logging.basicConfig(level=log_level,
format=LOG_FORMAT,
datefmt='%Y-%m-%d %H:%M:%S')
logging.addLevelName( logging.INFO, "\033[1;32m%s\033[1;0m" % logging.getLevelName(logging.INFO))
logging.addLevelName( logging.WARNING, "\033[1;93m%s\033[1;0m" % logging.getLevelName(logging.WARNING))
logging.addLevelName( logging.CRITICAL, "\033[1;31m%s\033[1;0m" % logging.getLevelName(logging.CRITICAL))
LOG.debug('CLIDriver created. Arguments parsed and logging setup.')
def main(self):
@ -56,8 +58,10 @@ class CLIDriver(object):
# "cert": cert.main,
# "init": init.main,
# "update" : update.main,
"pubkey": get_uecc_pubkey.main,
"sign": sign.main
"pubkey": get_pubkey.main,
"sign": sign.main,
"keygen": keygen.main,
"sever" : sever.main,
}[self.options.action](self.options) or 0
sys.exit(rc)

View File

@ -1,7 +1,6 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# ----------------------------------------------------------------------------
# Copyright 2019 ARM Limited or its affiliates
# Copyright 2019-2020 ARM Limited or its affiliates
#
# SPDX-License-Identifier: Apache-2.0
#
@ -19,16 +18,28 @@
# ----------------------------------------------------------------------------
import binascii
import copy
import collections
import json
import cbor2 as cbor
import sys
import textwrap
import itertools
import logging
from collections import OrderedDict
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from suit_tool.manifest import SUITComponentId, SUITCommon, SUITSequence, \
SUITCommand, \
SUITWrapper, SUITTryEach
suitCommonInfo, SUITCommand, SUITManifest, \
SUITEnvelope, SUITTryEach, SUITBWrapField, SUITText, \
SUITDigest, SUITDependencies, SUITDependency
import suit_tool.create
import suit_tool.sign
LOG = logging.getLogger(__name__)
@ -92,16 +103,30 @@ def make_sequence(cid, choices, seq, params, cmds, pcid_key=None, param_drctv='d
seq.append(mkCommand(pcid, param_drctv, params))
TryEachCmd = SUITTryEach()
for c in choices:
TECseq = SUITSequence()
for item, cmd in neqcmds.items():
TECseq.append(cmd(cid, c))
TECseq = TryEachCmd.field.obj().from_json([])
params = {}
for param, pcmd in neqparams.items():
k,v = pcmd(cid, c)
params[k] = v
dep_params = {}
TECseq_cmds = []
for item, cmd in neqcmds.items():
ncmd = cmd(cid, c)
for dp in ncmd.dep_params:
if dp in params:
dep_params[dp] = params[dp]
del params[dp]
TECseq_cmds.append(ncmd)
if len(dep_params):
TECseq.v.append(mkCommand(pcid, param_drctv, dep_params))
for cmd in TECseq_cmds:
TECseq.v.append(cmd)
if len(params):
TECseq.append(mkCommand(pcid, param_drctv, params))
if len(TECseq.items):
TECseq.v.append(mkCommand(pcid, param_drctv, params))
if hasattr(TECseq, "v") and len(TECseq.v.items):
TryEachCmd.append(TECseq)
if len(TryEachCmd.items):
seq.append(mkCommand(cid, 'directive-try-each', TryEachCmd))
@ -114,14 +139,15 @@ def compile_manifest(options, m):
m = copy.deepcopy(m)
m['components'] += options.components
# Compile list of All Component IDs
ids = set([
# There is no ordered set, so use ordered dict instead
ids = OrderedDict.fromkeys([
SUITComponentId().from_json(id) for comp_ids in [
[c[f] for f in [
'install-id', 'download-id', 'load-id'
] if f in c] for c in m['components']
] for id in comp_ids
])
cid_data = {}
cid_data = OrderedDict()
for c in m['components']:
if not 'install-id' in c:
LOG.critical('install-id required for all components')
@ -139,7 +165,7 @@ def compile_manifest(options, m):
digest, imgsize = hash_file(c['file'], hashes.SHA256())
c['install-digest'] = {
'algorithm-id' : 'sha256',
'digest-bytes' : binascii.b2a_hex(digest.finalize())
'digest-bytes' : digest.finalize()
}
c['install-size'] = imgsize
@ -153,37 +179,106 @@ def compile_manifest(options, m):
# Construct common sequence
CommonCmds = {
'offset': lambda cid, data: mkCommand(cid, 'condition-component-offset', data['offset'])
'offset': lambda cid, data: mkCommand(cid, 'condition-component-offset', None),
'vendor-id': lambda cid, data: mkCommand(cid, 'condition-vendor-identifier', None),
'class-id': lambda cid, data: mkCommand(cid, 'condition-class-identifier', None),
}
CommonParams = {
'install-digest': lambda cid, data: ('image-digest', data['install-digest']),
'install-size': lambda cid, data: ('image-size', data['install-size']),
'vendor-id' : lambda cid, data: ('vendor-id', data['vendor-id']),
'class-id' : lambda cid, data: ('class-id', data['class-id']),
'offset' : lambda cid, data: ('offset', data['offset'])
}
# print('Common')
CommonSeq = SUITSequence()
for cid, choices in cid_data.items():
if any(['vendor-id' in c for c in choices]):
CommonSeq.append(mkCommand(cid, 'condition-vendor-identifier',
[c['vendor-id'] for c in choices if 'vendor-id' in c][0]))
if any(['vendor-id' in c for c in choices]):
CommonSeq.append(mkCommand(cid, 'condition-class-identifier',
[c['class-id'] for c in choices if 'class-id' in c][0]))
CommonSeq = make_sequence(cid, choices, CommonSeq, CommonParams,
CommonCmds, param_drctv='directive-override-parameters')
# print('Dependencies')
# If there are dependencies
DepSeq = SUITSequence()
Dependencies = SUITDependencies()
DepRequiredSequences = { k:[] for k in ['deres', 'fetch', 'install', 'validate', 'run', 'load']}
if 'dependencies' in m:
for dep in m['dependencies']:
# Prepare dependency if necessary
if "src-file" in dep:
# Create
with open(dep['src-file']) as input_fd:
with open(dep['file']+'.tmp','wb') as output_fd:
create_opts = type('',(object,),{
# 'input_file': open(dep['src-file']),
# 'output_file': open(dep['file']+'.tmp','wb'),
'input_file': input_fd,
'output_file': output_fd,
'format' : 'suit',
'components': [],
'log_level': options.log_level
})()
rc = suit_tool.create.main(create_opts)
if rc:
sys.exit(rc)
# Sign
with open(dep['file']+'.tmp','rb') as manifest_fd:
with open(dep['file'],'wb') as output_fd:
with open(dep['key-file'], 'rb') as private_key_fd:
sign_opts = type('',(object,),{
'manifest': manifest_fd,
'output_file': output_fd,
'private_key': private_key_fd,
'log_level': options.log_level
})()
rc = suit_tool.sign.main(sign_opts)
if rc:
sys.exit(rc)
# Compute the dependency digest
digest = hashes.Hash(hashes.SHA256(), backend=default_backend())
mfst = {}
with open(dep['file'],'rb') as dep_fd:
dep_envelope = cbor.loads(dep_fd.read())
cmfst = dep_envelope[SUITEnvelope.fields['manifest'].suit_key]
digest.update(cbor.dumps(cmfst))
mfst = cbor.loads(cmfst)
did = SUITDigest().from_json({
'algorithm-id' : 'sha256',
'digest-bytes' : binascii.b2a_hex(digest.finalize())
})
Dependencies.append(SUITDependency().from_json({
'dependency-digest' : did.to_json()
}))
# Construct dependency resolution step
if 'uri' in dep:
DepSeq.append(mkCommand(did, 'directive-set-parameters', {
'uri' : dep['uri']
}))
DepSeq.append(mkCommand(did, 'directive-fetch', None))
DepSeq.append(mkCommand(did, 'condition-image-match', None))
for k, l in DepRequiredSequences.items():
if SUITManifest.fields[k].suit_key in mfst:
l.append(mkCommand(did, 'directive-process-dependency', None))
InstSeq = SUITSequence()
FetchSeq = SUITSequence()
# print('Install/Fetch')
for cid, choices in cid_data.items():
if any([c.get('install-on-download', True) and 'uri' in c for c in choices]):
InstParams = {
'uri' : lambda cid, data: ('uri', data['uri']),
'offset' : lambda cid, data: ('offset', data['offset']),
}
if any(['compression-info' in c and not c.get('decompress-on-load', False) for c in choices]):
InstParams['compression-info'] = lambda cid, data: data.get('compression-info')
InstCmds = {
'offset': lambda cid, data: mkCommand(
cid, 'condition-component-offset', data['offset'])
cid, 'condition-component-offset', None)
}
InstSeq = make_sequence(cid, choices, InstSeq, InstParams, InstCmds)
for cmd in DepRequiredSequences['install']:
InstSeq.append(cmd)
InstSeq.append(mkCommand(cid, 'directive-fetch', None))
InstSeq.append(mkCommand(cid, 'condition-image-match', None))
@ -191,7 +286,8 @@ def compile_manifest(options, m):
FetchParams = {
'uri' : lambda cid, data: ('uri', data['uri']),
'download-digest' : lambda cid, data : (
'image-digest', data.get('download-digest', data['install-digest']))
'image-digest', data.get('download-digest', data['install-digest'])),
'offset' : lambda cid, data: ('offset', data['offset']),
}
if any(['compression-info' in c and not c.get('decompress-on-load', False) for c in choices]):
FetchParams['compression-info'] = lambda cid, data: data.get('compression-info')
@ -199,12 +295,18 @@ def compile_manifest(options, m):
FetchCmds = {
'offset': lambda cid, data: mkCommand(
cid, 'condition-component-offset', data['offset']),
'fetch' : lambda cid, data: mkCommand(
data.get('download-id', cid.to_json()), 'directive-fetch', None),
'match' : lambda cid, data: mkCommand(
data.get('download-id', cid.to_json()), 'condition-image-match', None)
# 'fetch' : lambda cid, data: mkCommand(
# data.get('download-id', cid.to_json()), 'directive-fetch', None),
# 'match' : lambda cid, data: mkCommand(
# data.get('download-id', cid.to_json()), 'condition-image-match', None)
}
did = SUITComponentId().from_json([c['download-id'] for c in choices if 'download-id' in c][0])
FetchSeq = make_sequence(cid, choices, FetchSeq, FetchParams, FetchCmds, 'download-id')
for cmd in DepRequiredSequences['fetch']:
FetchSeq.append(cmd)
FetchSeq.append(mkCommand(did, 'directive-fetch', None))
FetchSeq.append(mkCommand(did, 'condition-image-match', None))
InstParams = {
'download-id' : lambda cid, data : ('source-component', data['download-id'])
@ -212,31 +314,40 @@ def compile_manifest(options, m):
InstCmds = {
}
InstSeq = make_sequence(cid, choices, InstSeq, InstParams, InstCmds)
for cmd in DepRequiredSequences['install']:
InstSeq.append(cmd)
InstSeq.append(mkCommand(cid, 'directive-copy', None))
InstSeq.append(mkCommand(cid, 'condition-image-match', None))
# TODO: Dependencies
# If there are dependencies
# Construct dependency resolution step
ValidateSeq = SUITSequence()
RunSeq = SUITSequence()
LoadSeq = SUITSequence()
# print('Validate/Load/Run')
# If any component is marked bootable
for cid, choices in cid_data.items():
if any([c.get('bootable', False) for c in choices]):
# TODO: Dependencies
# If there are dependencies
# Verify dependencies
# Process dependencies
ValidateSeq.append(mkCommand(cid, 'condition-image-match', None))
ValidateCmds = {
# 'install-digest' : lambda cid, data : mkCommand(cid, 'condition-image-match', None)
}
ValidateParams = {
}
ValidateSeq = make_sequence(cid, choices, ValidateSeq, ValidateParams, ValidateCmds)
for cmd in DepRequiredSequences['validate']:
ValidateSeq.append(cmd)
ValidateSeq.append(mkCommand(cid, 'condition-image-match', None))
# if any([c.get('bootable', False) for c in choices]):
# TODO: Dependencies
# If there are dependencies
# Verify dependencies
# Process dependencies
if any(['loadable' in c for c in choices]):
# Generate image load section
LoadParams = {
'install-id' : lambda cid, data : ('source-component', c['install-id']),
'load-digest' : ('image-digest', c.get('load-digest', c['install-digest'])),
'load-size' : ('image-size', c.get('load-size', c['install-size']))
'install-id' : lambda cid, data : ('source-component', c['install-id']),
'load-digest' : lambda cid, data : ('image-digest', c.get('load-digest', c['install-digest'])),
'load-size' : lambda cid, data : ('image-size', c.get('load-size', c['install-size'])),
}
if 'compression-info' in c and c.get('decompress-on-load', False):
LoadParams['compression-info'] = lambda cid, data: ('compression-info', c['compression-info'])
@ -244,7 +355,9 @@ def compile_manifest(options, m):
# Move each loadable component
}
load_id = SUITComponentId().from_json(choices[0]['load-id'])
LoadSeq = make_sequence(load_id, choices, ValidateSeq, LoadParams, LoadCmds)
LoadSeq = make_sequence(load_id, choices, LoadSeq, LoadParams, LoadCmds)
for cmd in DepRequiredSequences['load']:
LoadSeq.append(cmd)
LoadSeq.append(mkCommand(load_id, 'directive-copy', None))
LoadSeq.append(mkCommand(load_id, 'condition-image-match', None))
@ -252,13 +365,15 @@ def compile_manifest(options, m):
bootable_components = [x for x in m['components'] if x.get('bootable')]
if len(bootable_components) == 1:
c = bootable_components[0]
for cmd in DepRequiredSequences['run']:
RunSeq.append(cmd)
RunSeq.append(SUITCommand().from_json({
'component-id' : runable_id(c),
'command-id' : 'directive-run',
'command-arg' : None
}))
else:
t = []
te = []
for c in bootable_components:
pass
# TODO: conditions
@ -266,24 +381,75 @@ def compile_manifest(options, m):
#
# )
#TODO: Text
# print('Common')
common = SUITCommon().from_json({
'components': [id.to_json() for id in ids],
'components': [id.to_json() for id in ids.keys()],
'common-sequence': CommonSeq.to_json(),
})
if len(Dependencies.items):
common.dependencies = Dependencies
# print('manifest')
jmanifest = {
'manifest-version' : m['manifest-version'],
'manifest-sequence-number' : m['manifest-sequence-number'],
'common' : common.to_json()
}
# for k,v in {'deres':DepSeq, 'fetch': FetchSeq, 'install':InstSeq, 'validate':ValidateSeq, 'run':RunSeq, 'load':LoadSeq}.items():
# # print('sequence:{}'.format(k))
# v.to_json()
jmanifest.update({k:v for k,v in {
'payload-fetch' : FetchSeq.to_json(),
'deres' : DepSeq.to_json(),
'fetch' : FetchSeq.to_json(),
'install' : InstSeq.to_json(),
'validate' : ValidateSeq.to_json(),
'run' : RunSeq.to_json(),
'load' : LoadSeq.to_json()
}.items() if v})
wrapped_manifest = SUITWrapper().from_json({'manifest' : jmanifest})
mtext = {}
for k in ['manifest-description', 'update-description']:
if k in m:
mtext[k] = m[k]
for c in m['components']:
ctext = {}
cfields = [
'vendor-name',
'model-name',
'vendor-domain',
'model-info',
'component-description',
'component-version',
'version-required',
]
for k in cfields:
if k in c:
ctext[k] = c[k]
if len(ctext):
cid = SUITComponentId().from_json(c['install-id']).to_suit()
mtext[cid] = ctext
jenvelope = {
'authentication-wrapper' : [],
'manifest' : jmanifest
}
if len(mtext):
text = SUITText().from_json(mtext)
digest_alg = m.get('digest-algorithm', 'sha256')
suit_text = cbor.dumps(text.to_suit(), canonical=True)
digest = hashes.Hash(SUITEnvelope.digest_algorithms.get(digest_alg)(), backend=default_backend())
digest.update(suit_text)
jenvelope['manifest'].update({'text' : {
'algorithm-id' : digest_alg,
'digest-bytes' : digest.finalize()
}})
jenvelope.update({'text' : mtext})
# print('building envelope')
wrapped_manifest = SUITEnvelope().from_json(jenvelope)
return wrapped_manifest

View File

@ -1,4 +1,3 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# ----------------------------------------------------------------------------
# Copyright 2019 ARM Limited or its affiliates
@ -19,18 +18,20 @@
# ----------------------------------------------------------------------------
from suit_tool.compile import compile_manifest
import json
import cbor
import cbor2 as cbor
import itertools
import textwrap
from collections import OrderedDict
def main(options):
m = json.loads(options.input_file.read())
m = json.loads(options.input_file.read(), object_pairs_hook=OrderedDict)
nm = compile_manifest(options, m)
if hasattr(options, 'severable') and options.severable:
nm = nm.to_severable()
print('create done. Serializing')
if m.get('severable') or (hasattr(options, 'severable') and options.severable):
nm = nm.to_severable('sha256')
output = {
'suit' : lambda x: cbor.dumps(x.to_suit(), sort_keys=True),
'suit' : lambda x: cbor.dumps(x.to_suit(), canonical=True),
'suit-debug' : lambda x: '\n'.join(itertools.chain.from_iterable(
map(textwrap.wrap, x.to_debug('').split('\n'))
)).encode('utf-8'),

View File

@ -0,0 +1,95 @@
# -*- coding: utf-8 -*-
# ----------------------------------------------------------------------------
# Copyright 2020 ARM Limited or its affiliates
#
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ----------------------------------------------------------------------------
import textwrap
import binascii
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import ec, ed25519
from cryptography.hazmat.primitives.asymmetric import utils as asymmetric_utils
from cryptography.hazmat.primitives import serialization as ks
def to_uecc_pubkey(pk):
if not isinstance(pk, ec.EllipticCurvePrivateKey):
raise Exception('Private key of type {} is not supported'.format(type(pk)))
public_numbers = pk.public_key().public_numbers()
x = public_numbers.x
y = public_numbers.y
uecc_bytes = x.to_bytes(
(x.bit_length() + 7) // 8, byteorder='big'
) + y.to_bytes(
(y.bit_length() + 7) // 8, byteorder='big'
)
uecc_c_def = ['const uint8_t public_key[] = {'] + textwrap.wrap(
', '.join(['{:0=#4x}'.format(x) for x in uecc_bytes]),
76
)
return '\n '.join(uecc_c_def) + '\n};\n'
def to_header(pk):
if isinstance(pk, ec.EllipticCurvePrivateKey):
return to_uecc_pubkey(pk)
if isinstance(pk, ed25519.Ed25519PrivateKey):
public_bytes = pk.public_key().public_bytes(ks.Encoding.Raw,
ks.PublicFormat.Raw)
public_c_def = ['const uint8_t public_key[] = {'] + textwrap.wrap(
', '.join(['{:0=#4x}'.format(x) for x in public_bytes]),
76
)
return str.encode('\n '.join(public_c_def) + '\n};\n')
OutputFormaters = {
'uecc' : to_uecc_pubkey,
'header': to_header,
'pem' : lambda pk: pk.public_key().public_bytes(ks.Encoding.PEM, ks.PublicFormat.SubjectPublicKeyInfo),
'der' : lambda pk: pk.public_key().public_bytes(ks.Encoding.DER, ks.PublicFormat.SubjectPublicKeyInfo),
'hsslms' : lambda pk: pk.publicKey().serialize(),
'c-hsslms' : lambda pk: ('\n '.join(['const uint8_t hsslms_public_key[] = {'] + textwrap.wrap(
', '.join(['{:0=#4x}'.format(x) for x in pk.publicKey().serialize()]),
76
)) + '\n};\n').encode('utf-8')
}
def main(options):
private_key = None
# This test is here because the cryptography module doesn't know about hss-lms keys
if options.output_format in ('pem', 'der', 'uecc', 'header'):
private_key = ks.load_pem_private_key(
options.private_key.read(),
password=None,
backend=default_backend()
)
odata = OutputFormaters.get(options.output_format)(private_key)
try:
odata = odata.decode('utf-8')
except:
odata = binascii.b2a_hex(odata).decode('utf-8')
odata = '\n'.join(
[line for lines in [textwrap.wrap(line, 80)
for line in odata.split('\n')] for line in lines]
) + '\n'
options.output_file.write(odata)
return 0

View File

@ -0,0 +1,60 @@
# -*- coding: utf-8 -*-
# ----------------------------------------------------------------------------
# Copyright 2019-2020 ARM Limited or its affiliates
#
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ----------------------------------------------------------------------------
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.asymmetric import ed25519
from cryptography.hazmat.primitives.asymmetric import utils as asymmetric_utils
from cryptography.hazmat.primitives import serialization as ks
import logging
import binascii
import textwrap
LOG = logging.getLogger(__name__)
KeyGenerators = {
'secp256r1' : lambda o: ec.generate_private_key(ec.SECP256R1(), default_backend()),
'secp384r1' : lambda o: ec.generate_private_key(ec.SECP384R1(), default_backend()),
'secp521r1' : lambda o: ec.generate_private_key(ec.SECP521R1(), default_backend()),
'ed25519' : lambda o: ed25519.Ed25519PrivateKey.generate(),
}
OutputFormaters = {
'pem' : lambda pk: pk.private_bytes(ks.Encoding.PEM, ks.PrivateFormat.PKCS8, ks.NoEncryption()),
'der' : lambda pk: pk.private_bytes(ks.Encoding.DER, ks.PrivateFormat.PKCS8, ks.NoEncryption()),
'c-hss-lms' : lambda pk: pk.serialize(),
}
def main(options):
if options.type == 'hsslms':
options.output_format = 'c-hss-lms'
# Read the manifest wrapper
private_key = KeyGenerators.get(options.type) (options)
odata = OutputFormaters.get(options.output_format)(private_key)
if options.output_file.isatty():
try:
odata = odata.decode('utf-8')
except:
odata = binascii.b2a_hex(odata).decode('utf-8')
odata = '\n'.join(textwrap.wrap(odata, 64)) + '\n'
options.output_file.write(odata)
return 0

View File

@ -1,7 +1,6 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# ----------------------------------------------------------------------------
# Copyright 2019 ARM Limited or its affiliates
# Copyright 2019-2020 ARM Limited or its affiliates
#
# SPDX-License-Identifier: Apache-2.0
#
@ -19,12 +18,20 @@
# ----------------------------------------------------------------------------
import collections
import binascii
import cbor
import cbor2 as cbor
import json
import copy
import uuid
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from collections import OrderedDict
import logging
LOG = logging.getLogger(__name__)
TreeBranch = []
ManifestKey = collections.namedtuple(
'ManifestKey',
[
@ -34,6 +41,8 @@ ManifestKey = collections.namedtuple(
]
)
def to_bytes(s):
if isinstance(s,bytes):
return s
try:
return binascii.a2b_hex(s)
except:
@ -42,21 +51,29 @@ def to_bytes(s):
except:
if isinstance(s,str):
return s.encode('utf-8')
elif isinstance(s,bytes):
return s
else:
return str(s).encode('utf-8')
class SUITException(Exception):
def __init__(self, m, data, tree_branch):
super().__init__(m)
self.data = data
self.tree_branch = tree_branch
class SUITCommonInformation:
def __init__(self):
self.component_ids = []
self.dependencies = []
self.current_index = 0
self.indent_size = 4
def component_id_to_index(self, cid):
id = -1
for i, c in enumerate(self.component_ids):
if c == cid and i >= 0:
id = i
id = componentIndex(i)
for i, d in enumerate(self.dependencies):
if d.digest == cid and i >= 0:
id = dependencyIndex(i)
return id
suitCommonInfo = SUITCommonInformation()
@ -69,7 +86,9 @@ class SUITInt:
def to_json(self):
return self.v
def from_suit(self, v):
TreeBranch.append(type(self))
self.v = int(v)
TreeBranch.pop()
return self
def to_suit(self):
return self.v
@ -78,21 +97,40 @@ class SUITInt:
class SUITPosInt(SUITInt):
def from_json(self, v):
TreeBranch.append(type(self))
_v = int(v)
# print (_v)
if _v < 0:
raise Exception('Positive Integers must be >= 0')
self.v = _v
TreeBranch.pop()
return self
def from_suit(self, v):
return self.from_json(v)
class SUITManifestDict:
def mkfields(d):
# rd = {}
# rd = OderedDict()
return {k: ManifestKey(*v) for k,v in d.items()}
def __init__(self):
pass
def __eq__(self, rhs):
if not isinstance(rhs, type(self)):
return False
for f, info in self.fields:
if hasattr(self, f) != hasattr(rhs, f):
return False
if hasattr(self, f) and hasattr(rhs, f) and getattr(self, f) != getattr(rhs, f):
return False
for a,b in zip(self.items, rhs.items):
if not a == b:
return False
return True
def from_json(self, data):
for k, f in self.fields.items():
v = data.get(f.json_key, None)
@ -100,7 +138,7 @@ class SUITManifestDict:
return self
def to_json(self):
j = {}
j = OrderedDict()
for k, f in self.fields.items():
v = getattr(self, k)
if v:
@ -108,14 +146,18 @@ class SUITManifestDict:
return j
def from_suit(self, data):
TreeBranch.append(type(self))
for k, f in self.fields.items():
TreeBranch.append(k)
v = data.get(f.suit_key, None)
d = f.obj().from_suit(v) if v is not None else None
setattr(self, k, d)
TreeBranch.pop()
TreeBranch.pop()
return self
def to_suit(self):
sd = {}
sd = OrderedDict()
for k, f in self.fields.items():
v = getattr(self, k)
if v:
@ -136,8 +178,12 @@ class SUITManifestDict:
class SUITManifestNamedList(SUITManifestDict):
def from_suit(self, data):
TreeBranch.append(type(self))
for k, f in self.fields.items():
TreeBranch.append(k)
setattr(self, k, f.obj().from_suit(data[f.suit_key]))
TreeBranch.pop()
TreeBranch.pop()
return self
def to_suit(self):
@ -172,18 +218,34 @@ class SUITKeyMap:
def to_suit(self):
return self.v
def from_suit(self, d):
TreeBranch.append(type(self))
self.v = self.keymap[self.rkeymap[d]]
TreeBranch.pop()
return self
def to_debug(self, indent):
s = str(self.v) + ' / ' + self.to_json() + ' /'
s = str(self.v) + ' / ' + json.dumps(self.to_json(),sort_keys = True) + ' /'
return s
def SUITBWrapField(c):
class SUITBWrapper:
def to_suit(self):
return cbor.dumps(self.v.to_suit(), sort_keys=True)
return cbor.dumps(self.v.to_suit(), canonical=True)
def from_suit(self, d):
self.v = c().from_suit(cbor.loads(d))
TreeBranch.append(type(self))
try:
self.v = c().from_suit(cbor.loads(d))
except SUITException as e:
raise e
except Exception as e:
LOG.debug('At {}: failed to load "{}" as CBOR'.format(type(self),binascii.b2a_hex(d).decode('utf-8')))
LOG.debug('Path: {}'.format(TreeBranch))
# LOG.debug('At {}: failed to load "{}" as CBOR'.format(type(self),binascii.b2a_hex(d).decode('utf-8')))
raise SUITException(
m = 'At {}: failed to load "{}" as CBOR'.format(type(self),binascii.b2a_hex(d).decode('utf-8')),
data = d,
tree_branch = TreeBranch
)
TreeBranch.pop()
return self
def to_json(self):
return self.v.to_json()
@ -204,6 +266,8 @@ class SUITManifestArray:
def __init__(self):
self.items=[]
def __eq__(self, rhs):
if not isinstance(rhs, type(self)):
return False
if len(self.items) != len(rhs.items):
return False
for a,b in zip(self.items, rhs.items):
@ -225,8 +289,12 @@ class SUITManifestArray:
def from_suit(self, data):
self.items = []
TreeBranch.append(type(self))
for d in data:
TreeBranch.append(len(self.items))
self.items.append(self.field.obj().from_suit(d))
TreeBranch.pop()
TreeBranch.pop()
return self
def to_suit(self):
@ -270,7 +338,7 @@ class SUITUUID(SUITBytes):
self.v = uuid.UUID(bytes=d).bytes
return self
def to_debug(self, indent):
return 'h\'' + self.to_json() + '\' / ' + str(uuid.UUID(bytes=self.v)) + ' /'
return 'h\'' + json.dumps(self.to_json(), sort_keys=True) + '\' / ' + str(uuid.UUID(bytes=self.v)) + ' /'
class SUITRaw:
@ -315,6 +383,9 @@ class SUITTStr(SUITRaw):
class SUITComponentId(SUITManifestArray):
field = collections.namedtuple('ArrayElement', 'obj')(obj=SUITBytes)
def to_suit(self):
return tuple(super(SUITComponentId, self).to_suit())
def to_debug(self, indent):
newindent = indent + one_indent
s = '[' + ''.join([v.to_debug(newindent) for v in self.items]) + ']'
@ -337,7 +408,6 @@ class SUITComponentIndex(SUITComponentId):
)
return s
class SUITComponents(SUITManifestArray):
field = collections.namedtuple('ArrayElement', 'obj')(obj=SUITComponentId)
@ -364,6 +434,8 @@ class SUITDigest(SUITManifestNamedList):
'algo' : ('algorithm-id', 0, SUITDigestAlgo),
'digest' : ('digest-bytes', 1, SUITBytes)
})
def __hash__(self):
return hash(tuple([getattr(self, k) for k in self.fields.keys() if hasattr(self, k)]))
class SUITCompressionInfo(SUITKeyMap):
rkeymap, keymap = SUITKeyMap.mkKeyMaps({
@ -376,11 +448,14 @@ class SUITCompressionInfo(SUITKeyMap):
class SUITParameters(SUITManifestDict):
fields = SUITManifestDict.mkfields({
'digest' : ('image-digest', 11, SUITDigest),
'size' : ('image-size', 12, SUITPosInt),
'uri' : ('uri', 6, SUITTStr),
'src' : ('source-component', 10, SUITComponentIndex),
'compress' : ('compression-info', 8, SUITCompressionInfo)
'vendor-id' : ('vendor-id', 1, SUITUUID),
'class-id' : ('class-id', 2, SUITUUID),
'digest' : ('image-digest', 3, SUITBWrapField(SUITDigest)),
'size' : ('image-size', 14, SUITPosInt),
'uri' : ('uri', 21, SUITTStr),
'src' : ('source-component', 22, SUITComponentIndex),
'compress' : ('compression-info', 19, SUITCompressionInfo),
'offset' : ('offset', 5, SUITPosInt)
})
def from_json(self, j):
return super(SUITParameters, self).from_json(j)
@ -388,10 +463,18 @@ class SUITParameters(SUITManifestDict):
class SUITTryEach(SUITManifestArray):
pass
def SUITCommandContainer(jkey, skey, argtype):
class dependencyIndex(int):
def __new__(cls, value):
return super(cls, cls).__new__(cls, value)
class componentIndex(int):
def __new__(cls, value):
return super(cls, cls).__new__(cls, value)
def SUITCommandContainer(jkey, skey, argtype, dp=[]):
class SUITCmd(SUITCommand):
json_key = jkey
suit_key = skey
dep_params = dp
def __init__(self):
pass
def to_suit(self):
@ -408,17 +491,25 @@ def SUITCommandContainer(jkey, skey, argtype):
def from_json(self, j):
if j['command-id'] != self.json_key:
raise Except('JSON Key mismatch error')
if self.json_key != 'directive-set-component-index':
self.cid = SUITComponentId().from_json(j['component-id'])
if self.json_key != 'directive-set-component-index' and self.json_key != 'directive-set-dependency-index':
try:
self.cid = SUITComponentId().from_json(j['component-id'])
except:
self.cid = SUITDigest().from_json(j['component-id'])
self.arg = argtype().from_json(j['command-arg'])
return self
def from_suit(self, s):
if s[0] != self.suit_key:
raise Except('SUIT Key mismatch error')
if self.json_key == 'directive-set-component-index':
suitCommonInfo.current_index = s[1]
suitCommonInfo.current_index = componentIndex(s[1])
elif self.json_key == 'directive-set-dependency-index':
suitCommonInfo.current_index = dependencyIndex(s[1])
else:
self.cid = suitCommonInfo.component_ids[suitCommonInfo.current_index]
if isinstance(suitCommonInfo.current_index, dependencyIndex):
self.cid = suitCommonInfo.dependencies[suitCommonInfo.current_index]
else:
self.cid = suitCommonInfo.component_ids[suitCommonInfo.current_index]
self.arg = argtype().from_suit(s[1])
return self
def to_debug(self, indent):
@ -427,6 +518,14 @@ def SUITCommandContainer(jkey, skey, argtype):
return s
return SUITCmd
def mkPolicy(policy):
class SUITReportingPolicy(SUITPosInt):
default_policy = policy
def from_json(self, j):
if j is None:
j = self.default_policy
return super(SUITReportingPolicy, self).from_json(j)
return SUITReportingPolicy
class SUITCommand:
def from_json(self, j):
@ -435,31 +534,30 @@ class SUITCommand:
return self.scommands[s[0]]().from_suit(s)
SUITCommand.commands = [
SUITCommandContainer('condition-vendor-identifier', 1, SUITUUID),
SUITCommandContainer('condition-class-identifier', 2, SUITUUID),
SUITCommandContainer('condition-image-match', 3, SUITNil),
SUITCommandContainer('condition-use-before', 4, SUITRaw),
SUITCommandContainer('condition-component-offset', 5, SUITRaw),
SUITCommandContainer('condition-custom', 6, SUITRaw),
SUITCommandContainer('condition-device-identifier', 24, SUITRaw),
SUITCommandContainer('condition-image-not-match', 25, SUITRaw),
SUITCommandContainer('condition-minimum-battery', 26, SUITRaw),
SUITCommandContainer('condition-update-authorised', 27, SUITRaw),
SUITCommandContainer('condition-version', 28, SUITRaw),
SUITCommandContainer('condition-vendor-identifier', 1, mkPolicy(policy=0xF), dp=['vendor-id']),
SUITCommandContainer('condition-class-identifier', 2, mkPolicy(policy=0xF), dp=['class-id']),
SUITCommandContainer('condition-image-match', 3, mkPolicy(policy=0xF), dp=['digest']),
SUITCommandContainer('condition-use-before', 4, mkPolicy(policy=0xA)),
SUITCommandContainer('condition-component-offset', 5, mkPolicy(policy=0x5), dp=['offset']),
SUITCommandContainer('condition-device-identifier', 24, mkPolicy(policy=0xF)),
SUITCommandContainer('condition-image-not-match', 25, mkPolicy(policy=0xF)),
SUITCommandContainer('condition-minimum-battery', 26, mkPolicy(policy=0xA)),
SUITCommandContainer('condition-update-authorised', 27, mkPolicy(policy=0x3)),
SUITCommandContainer('condition-version', 28, mkPolicy(policy=0xF)),
SUITCommandContainer('directive-set-component-index', 12, SUITPosInt),
SUITCommandContainer('directive-set-dependency-index', 13, SUITRaw),
SUITCommandContainer('directive-abort', 14, SUITRaw),
SUITCommandContainer('directive-set-dependency-index', 13, SUITPosInt),
SUITCommandContainer('directive-abort', 14, mkPolicy(policy=0x2)),
SUITCommandContainer('directive-try-each', 15, SUITTryEach),
SUITCommandContainer('directive-process-dependency', 18, SUITRaw),
SUITCommandContainer('directive-process-dependency', 18, mkPolicy(policy=0)),
SUITCommandContainer('directive-set-parameters', 19, SUITParameters),
SUITCommandContainer('directive-override-parameters', 20, SUITParameters),
SUITCommandContainer('directive-fetch', 21, SUITNil),
SUITCommandContainer('directive-copy', 22, SUITRaw),
SUITCommandContainer('directive-run', 23, SUITRaw),
SUITCommandContainer('directive-wait', 29, SUITRaw),
SUITCommandContainer('directive-fetch', 21, mkPolicy(policy=0x2)),
SUITCommandContainer('directive-copy', 22, mkPolicy(policy=0x2)),
SUITCommandContainer('directive-run', 23, mkPolicy(policy=0x2)),
SUITCommandContainer('directive-wait', 29, mkPolicy(policy=0x2)),
SUITCommandContainer('directive-run-sequence', 30, SUITRaw),
SUITCommandContainer('directive-run-with-arguments', 31, SUITRaw),
SUITCommandContainer('directive-swap', 32, SUITRaw),
SUITCommandContainer('directive-swap', 32, mkPolicy(policy=0x2)),
]
SUITCommand.jcommands = { c.json_key : c for c in SUITCommand.commands}
SUITCommand.scommands = { c.suit_key : c for c in SUITCommand.commands}
@ -472,17 +570,27 @@ class SUITSequence(SUITManifestArray):
suitCommonInfo.current_index = 0 if len(suitCommonInfo.component_ids) == 1 else None
for i in self.items:
if i.json_key == 'directive-set-component-index':
suitCommonInfo.current_index = i.arg.v
suitCommonInfo.current_index = componentIndex(i.arg.v)
elif i.json_key == 'directive-set-dependency-index':
suitCommonInfo.current_index = dependencyIndex(i.arg.v)
else:
# Option 1: current & command index same class, same number,
# Do nothing
# Option 2: current & command not equal, command is component
# set component index
# Option 3: current & command not equal, command is dependency
# set dependency index
cidx = suitCommonInfo.component_id_to_index(i.cid)
if cidx != suitCommonInfo.current_index:
# Change component
cswitch = SUITCommand().from_json({
'command-id' : 'directive-set-component-index',
'command-arg' : cidx
})
op = 'directive-set-component-index'
if isinstance(cidx, dependencyIndex):
op = 'directive-set-dependency-index'
# Change component/dependency
suitCommonInfo.current_index = cidx
suit_l += cswitch.to_suit()
suit_l += SUITCommand().from_json({
'command-id' : op,
'command-arg' : int(cidx)
}).to_suit()
suit_l += i.to_suit()
return suit_l
def to_debug(self, indent):
@ -491,7 +599,7 @@ class SUITSequence(SUITManifestArray):
self.items = [SUITCommand().from_suit(i) for i in zip(*[iter(s)]*2)]
return self
SUITTryEach.field = collections.namedtuple('ArrayElement', 'obj')(obj=SUITSequence)
SUITTryEach.field = collections.namedtuple('ArrayElement', 'obj')(obj=SUITBWrapField(SUITSequence))
class SUITSequenceComponentReset(SUITSequence):
def to_suit(self):
@ -520,28 +628,111 @@ def SUITMakeSeverableField(c):
def to_debug(self, indent):
return self.v.to_debug(indent)
return SUITSeverableField
# class SUITSequenceOrDigest()
class SUITDependency(SUITManifestDict):
fields = SUITManifestDict.mkfields({
'digest' : ('dependency-digest', 1, SUITDigest),
'prefix' : ('dependency-prefix', 2, SUITComponentId),
})
class SUITDependencies(SUITManifestArray):
field = collections.namedtuple('ArrayElement', 'obj')(obj=SUITDependency)
def from_suit(self, data):
super(SUITDependencies, self).from_suit(data)
suitCommonInfo.dependencies = self.items
return self
def from_json(self, j):
super(SUITDependencies, self).from_json(j)
suitCommonInfo.dependencies = self.items
return self
class SUITCommon(SUITManifestDict):
fields = SUITManifestNamedList.mkfields({
# 'dependencies' : ('dependencies', 1, SUITBWrapField(SUITDependencies)),
'components' : ('components', 2, SUITBWrapField(SUITComponents)),
# 'dependency_components' : ('dependency-components', 3, SUITBWrapField(SUITDependencies)),
'dependencies' : ('dependencies', 1, SUITBWrapField(SUITDependencies)),
'components' : ('components', 2, SUITComponents),
'common_sequence' : ('common-sequence', 4, SUITBWrapField(SUITSequenceComponentReset)),
})
class SUITComponentText(SUITManifestDict):
fields = SUITManifestDict.mkfields({
'vendorname' : ('vendor-name', 1, SUITTStr),
'modelname' : ('model-name', 2, SUITTStr),
'vendordomain' : ('vendor-domain', 3, SUITTStr),
'modelinfo' : ('json-source', 4, SUITTStr),
'cdesc' : ('component-description', 5, SUITTStr),
'version' : ('version', 6, SUITTStr),
'reqversion' : ('required-version', 7, SUITTStr),
})
class SUITText(SUITManifestDict):
fields = SUITManifestDict.mkfields({
'mdesc' : ('manifest-description', 1, SUITTStr),
'udesc' : ('update-description', 2, SUITTStr),
'json' : ('json-source', 3, SUITTStr),
'yaml' : ('yaml-source', 4, SUITTStr),
})
components={}
def to_json(self):
d = super(SUITText, self).to_json()
d.update({k.to_json() : v.to_json() for k,v in self.components.items()})
return d
def from_json(self, data):
# Handle components
for k,v in data.items():
if not isinstance(v, str):
self.components[SUITComponentId().from_json(k)] = SUITComponentText().from_json(v)
# Treat everything else as a normal manifestDict
return super(SUITText, self).from_json(data)
def to_suit(self):
d = super(SUITText, self).to_suit()
d.update({k.to_suit() : v.to_suit() for k,v in self.components.items()})
return d
def from_suit(self, data):
# Handle components
for k,v in data.items():
if not isinstance(v, str):
self.components[SUITComponentId().from_suit(k)] = SUITComponentText().from_suit(v)
# Treat everything else as a normal manifestDict
return super(SUITText, self).from_json(data)
def to_debug(self, indent):
s = '{'
newindent = indent + one_indent
for k, f in self.fields.items():
v = getattr(self, k)
if v:
s += '\n{ind}/ {jk} / {sk}:'.format(ind=newindent, jk=f.json_key, sk=f.suit_key)
s += v.to_debug(newindent) + ','
for k, f in self.components.items():
s += '\n' + newindent + '{}:'.format(k.to_debug(newindent + one_indent))
s += f.to_debug(newindent + one_indent)
s += '\n' + indent + '}'
return s
class SUITManifest(SUITManifestDict):
fields = SUITManifestDict.mkfields({
'version' : ('manifest-version', 1, SUITPosInt),
'sequence' : ('manifest-sequence-number', 2, SUITPosInt),
'common' : ('common', 3, SUITBWrapField(SUITCommon)),
'refuri' : ('reference-uri', 4, SUITTStr),
'deres' : ('dependency-resolution', 7, SUITMakeSeverableField(SUITSequenceComponentReset)),
'fetch' : ('payload-fetch', 8, SUITMakeSeverableField(SUITSequenceComponentReset)),
'install' : ('install', 9, SUITMakeSeverableField(SUITSequenceComponentReset)),
'validate' : ('validate', 10, SUITBWrapField(SUITSequenceComponentReset)),
'load' : ('load', 11, SUITBWrapField(SUITSequenceComponentReset)),
'run' : ('run', 12, SUITBWrapField(SUITSequenceComponentReset)),
'text' : ('text', 13, SUITMakeSeverableField(SUITText)),
'coswid' : ('coswid', 14, SUITBytes),
})
class COSE_Algorithms(SUITKeyMap):
@ -588,7 +779,7 @@ class COSETagChoice(SUITManifestDict):
for k, f in self.fields.items():
v = getattr(self, k, None)
if v:
return cbor.Tag(tag=f.suit_key, value=v.to_suit())
return cbor.CBORTag(tag=f.suit_key, value=v.to_suit())
return None
def from_suit(self, data):
@ -618,11 +809,11 @@ class COSETaggedAuth(COSETagChoice):
})
class COSEList(SUITManifestArray):
field = collections.namedtuple('ArrayElement', 'obj')(obj=COSETaggedAuth)
field = collections.namedtuple('ArrayElement', 'obj')(obj=SUITBWrapField(COSETaggedAuth))
def from_suit(self, data):
return super(COSEList, self).from_suit(data)
class SUITWrapper(SUITManifestDict):
class SUITEnvelope(SUITManifestDict):
fields = SUITManifestDict.mkfields({
'auth' : ('authentication-wrapper', 2, SUITBWrapField(COSEList)),
'manifest' : ('manifest', 3, SUITBWrapField(SUITManifest)),
@ -632,9 +823,10 @@ class SUITWrapper(SUITManifestDict):
'validate': ('validate', 10, SUITBWrapField(SUITSequence)),
'load': ('load', 11, SUITBWrapField(SUITSequence)),
'run': ('run', 12, SUITBWrapField(SUITSequence)),
# 'text': ('text', 13, SUITBWrapField(SUITSequence)),
'text': ('text', 13, SUITBWrapField(SUITText)),
'coswid': ('coswid', 14, SUITBytes),
})
severable_fields = {'deres', 'fetch', 'install'} #, 'text'}
severable_fields = {'deres', 'fetch', 'install', 'text', 'coswid'}
digest_algorithms = {
'sha224' : hashes.SHA224,
'sha256' : hashes.SHA256,
@ -650,14 +842,14 @@ class SUITWrapper(SUITManifestDict):
v = getattr(sev.manifest.v, k)
if v is None:
continue
cbor_field = cbor.dumps(v.to_suit(), sort_keys=True)
digest = hashes.Hash(digest_algorithms.get(digest_alg)(), backend=default_backend())
cbor_field = cbor.dumps(v.to_suit(), canonical=True)
digest = hashes.Hash(self.digest_algorithms.get(digest_alg)(), backend=default_backend())
digest.update(cbor_field)
field_digest = SUITDigest().from_json({
'algorithm-id' : digest_alg,
'digest-bytes' : digest.finalize()
})
cbor_digest = cbor.dumps(field_digest.to_suit(), sort_keys=True)
cbor_digest = cbor.dumps(field_digest.to_suit(), canonical=True)
if len(cbor_digest) < len(cbor_field):
setattr(sev.manifest.v, k, field_digest)
setattr(sev,k,v)
@ -673,11 +865,11 @@ class SUITWrapper(SUITManifestDict):
if v is None:
continue
# Verify digest
cbor_field = cbor.dumps(v.to_suit(), sort_keys=True)
cbor_field = cbor.dumps(v.to_suit(), canonical=True)
digest = hashes.Hash(hashes.SHA256(), backend=default_backend())
digest.update(cbor_field)
actual_digest = digest.finalize()
field_digest = getattr(sev.nsev.v, k)
field_digest = getattr(nsev.v, k)
expected_digest = field_digest.to_suit()[1]
if digest != expected_digest:
raise Exception('Field Digest mismatch: For {}, expected: {}, got {}'.format(

View File

@ -17,17 +17,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# ----------------------------------------------------------------------------
import cbor
import cbor2 as cbor
import json
import itertools
import textwrap
from suit_tool.manifest import SUITWrapper
from suit_tool.manifest import SUITEnvelope
def main(options):
# Read the manifest wrapper
decoded_cbor_wrapper = cbor.loads(options.manifest.read())
wrapper = SUITWrapper().from_suit(decoded_cbor_wrapper)
# print(decoded_cbor_wrapper)
wrapper = SUITEnvelope().from_suit(decoded_cbor_wrapper)
if options.json:
print (json.dumps(wrapper.to_json(),indent=2))
else:

View File

@ -0,0 +1,42 @@
# -*- coding: utf-8 -*-
# ----------------------------------------------------------------------------
# Copyright 2019 ARM Limited or its affiliates
#
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ----------------------------------------------------------------------------
from suit_tool.compile import compile_manifest
import json
import cbor2 as cbor
import itertools
import textwrap
from collections import OrderedDict
from suit_tool.manifest import SUITEnvelope
def main(options):
# Read the manifest wrapper
envelope = cbor.loads(options.manifest.read())
if hasattr(options, 'all'):
options.elements = SUITEnvelope.severable_fields
for e in options.elements:
eid = SUITEnvelope.fields[e].suit_key
if eid in envelope:
del(envelope[eid])
output = cbor.dumps(envelope, canonical=True)
options.output_file.write(output)
return 0

View File

@ -17,7 +17,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# ----------------------------------------------------------------------------
import cbor
import cbor2 as cbor
import json
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
@ -26,31 +27,40 @@ from cryptography.hazmat.primitives.asymmetric import ed25519
from cryptography.hazmat.primitives.asymmetric import utils as asymmetric_utils
from cryptography.hazmat.primitives import serialization as ks
from suit_tool.manifest import COSE_Sign1, COSEList, \
SUITWrapper, SUITBytes, SUITBWrapField
from suit_tool.manifest import COSE_Sign1, COSEList, SUITDigest,\
SUITEnvelope, SUITBytes, SUITBWrapField, \
COSETaggedAuth
import logging
import binascii
LOG = logging.getLogger(__name__)
def get_cose_es_bytes(private_key, sig_val):
def get_cose_es_bytes(options, private_key, sig_val):
ASN1_signature = private_key.sign(sig_val, ec.ECDSA(hashes.SHA256()))
r,s = asymmetric_utils.decode_dss_signature(ASN1_signature)
ssize = private_key.key_size
signature_bytes = r.to_bytes(ssize//8, byteorder='big') + s.to_bytes(ssize//8, byteorder='big')
return signature_bytes
def get_cose_ed25519_bytes(private_key, sig_val):
def get_cose_ed25519_bytes(options, private_key, sig_val):
return private_key.sign(sig_val)
def get_hsslms_bytes(options, private_key, sig_val):
sig = private_key.sign(sig_val)
key_file_name = options.private_key.name
options.private_key.close()
with open(key_file_name, 'wb') as fd:
fd.write(private_key.serialize())
return sig
def main(options):
# Read the manifest wrapper
wrapper = cbor.loads(options.manifest.read())
private_key = None
digest = None
private_key_buffer = options.private_key.read()
try:
private_key = ks.load_pem_private_key(options.private_key.read(), password=None, backend=default_backend())
private_key = ks.load_pem_private_key(private_key_buffer, password=None, backend=default_backend())
if isinstance(private_key, ec.EllipticCurvePrivateKey):
options.key_type = 'ES{}'.format(private_key.key_size)
elif isinstance(private_key, ed25519.Ed25519PrivateKey):
@ -65,13 +75,10 @@ def main(options):
'EdDSA' : hashes.Hash(hashes.SHA256(), backend=default_backend()),
}.get(options.key_type)
except:
digest= hashes.Hash(hashes.SHA256(), backend=default_backend())
# private_key = None
# TODO: Implement loading of DSA keys not supported by python cryptography
LOG.critical('Non-library key type not implemented')
# return 1
return 1
digest.update(cbor.dumps(wrapper[SUITWrapper.fields['manifest'].suit_key]))
digest.update(cbor.dumps(wrapper[SUITEnvelope.fields['manifest'].suit_key]))
cose_signature = COSE_Sign1().from_json({
'protected' : {
@ -80,7 +87,7 @@ def main(options):
'unprotected' : {},
'payload' : {
'algorithm-id' : 'sha256',
'digest-bytes' : binascii.b2a_hex(digest.finalize())
'digest-bytes' : digest.finalize()
}
})
@ -89,23 +96,24 @@ def main(options):
cose_signature.protected.to_suit(),
b'',
cose_signature.payload.to_suit(),
], sort_keys = True)
sig_val = Sig_structure
], canonical = True)
LOG.debug('Signing: {}'.format(binascii.b2a_hex(Sig_structure).decode('utf-8')))
signature_bytes = {
'ES256' : get_cose_es_bytes,
'ES384' : get_cose_es_bytes,
'ES512' : get_cose_es_bytes,
'EdDSA' : get_cose_ed25519_bytes,
}.get(options.key_type)(private_key, sig_val)
'HSS-LMS' : get_hsslms_bytes,
}.get(options.key_type)(options, private_key, Sig_structure)
cose_signature.signature = SUITBytes().from_suit(signature_bytes)
auth = SUITBWrapField(COSEList)().from_json([{
auth = SUITBWrapField(COSEList)().from_suit(wrapper[SUITEnvelope.fields['auth'].suit_key])
auth.v.append(auth.v.field.obj().from_json({
'COSE_Sign1_Tagged' : cose_signature.to_json()
}])
}))
wrapper[SUITEnvelope.fields['auth'].suit_key] = auth.to_suit()
wrapper[SUITWrapper.fields['auth'].suit_key] = auth.to_suit()
options.output_file.write(cbor.dumps(wrapper, sort_keys=True))
options.output_file.write(cbor.dumps(wrapper, canonical=True))
return 0

View File

@ -1,5 +0,0 @@
*__pycache__
*.pyc
*.DS_Store
*.hex
examples/*.cbor

View File

@ -1,53 +0,0 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# ----------------------------------------------------------------------------
# Copyright 2020 ARM Limited or its affiliates
#
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ----------------------------------------------------------------------------
import textwrap
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization as ks
def main(options):
private_key = ks.load_pem_private_key(
options.private_key.read(),
password=None,
backend=default_backend()
)
#public_numbers = private_key.public_key().public_numbers()
#x = public_numbers.x
#y = public_numbers.y
#uecc_bytes = x.to_bytes(
# (x.bit_length() + 7) // 8, byteorder='big'
#) + y.to_bytes(
# (y.bit_length() + 7) // 8, byteorder='big'
#)
#uecc_c_def = ['const uint8_t public_key[] = {'] + textwrap.wrap(
# ', '.join(['{:0=#4x}'.format(x) for x in uecc_bytes]),
# 76
#)
public_bytes = private_key.public_key().public_bytes(
encoding=ks.Encoding.Raw,
format=ks.PublicFormat.Raw
)
c_def = ['const uint8_t public_key[] = {'] + textwrap.wrap(
', '.join(['{:0=#4x}'.format(x) for x in public_bytes]),
76
)
print('\n '.join(c_def) + '\n};')
return 0

View File

@ -1,6 +1,6 @@
#
# path to suit-tool
SUIT_TOOL ?= $(RIOTBASE)/dist/tools/suit_v3/suit-manifest-generator/bin/suit-tool
SUIT_TOOL ?= $(RIOTBASE)/dist/tools/suit/suit-manifest-generator/bin/suit-tool
#
# SUIT encryption keys
@ -27,14 +27,14 @@ BUILDDEPS += $(SUIT_PUB_HDR)
$(SUIT_SEC): $(CLEAN)
@echo suit: generating key in $(SUIT_KEY_DIR)
@mkdir -p $(SUIT_KEY_DIR)
@$(RIOTBASE)/dist/tools/suit_v3/gen_key.py $(SUIT_SEC)
@$(RIOTBASE)/dist/tools/suit/gen_key.py $(SUIT_SEC)
# set FORCE so switching between keys using "SUIT_KEY=foo make ..."
# triggers a rebuild even if the new key would otherwise not (because the other
# key's mtime is too far back).
$(SUIT_PUB_HDR): $(SUIT_SEC) FORCE | $(CLEAN)
@mkdir -p $(SUIT_PUB_HDR_DIR)
@$(SUIT_TOOL) pubkey -k $(SUIT_SEC) \
@$(SUIT_TOOL) pubkey -f header -k $(SUIT_SEC) \
| '$(LAZYSPONGE)' $(LAZYSPONGE_FLAGS) '$@'
suit/genkey: $(SUIT_SEC)

View File

@ -10,13 +10,13 @@ SUIT_COAP_ROOT ?= coap://$(SUIT_COAP_SERVER)/$(SUIT_COAP_BASEPATH)
SUIT_COAP_FSROOT ?= $(RIOTBASE)/coaproot
#
SUIT_MANIFEST ?= $(BINDIR_APP)-riot.suitv3.$(APP_VER).bin
SUIT_MANIFEST_LATEST ?= $(BINDIR_APP)-riot.suitv3.latest.bin
SUIT_MANIFEST_SIGNED ?= $(BINDIR_APP)-riot.suitv3_signed.$(APP_VER).bin
SUIT_MANIFEST_SIGNED_LATEST ?= $(BINDIR_APP)-riot.suitv3_signed.latest.bin
SUIT_MANIFEST ?= $(BINDIR_APP)-riot.suit.$(APP_VER).bin
SUIT_MANIFEST_LATEST ?= $(BINDIR_APP)-riot.suit.latest.bin
SUIT_MANIFEST_SIGNED ?= $(BINDIR_APP)-riot.suit_signed.$(APP_VER).bin
SUIT_MANIFEST_SIGNED_LATEST ?= $(BINDIR_APP)-riot.suit_signed.latest.bin
SUIT_NOTIFY_VERSION ?= latest
SUIT_NOTIFY_MANIFEST ?= $(APPLICATION)-riot.suitv3_signed.$(SUIT_NOTIFY_VERSION).bin
SUIT_NOTIFY_MANIFEST ?= $(APPLICATION)-riot.suit_signed.$(SUIT_NOTIFY_VERSION).bin
# Long manifest names require more buffer space when parsing
export CFLAGS += -DCONFIG_SOCK_URLPATH_MAXLEN=128
@ -27,7 +27,7 @@ SUIT_CLASS ?= $(BOARD)
#
$(SUIT_MANIFEST): $(SLOT0_RIOT_BIN) $(SLOT1_RIOT_BIN)
$(RIOTBASE)/dist/tools/suit_v3/gen_manifest.py \
$(RIOTBASE)/dist/tools/suit/gen_manifest.py \
--urlroot $(SUIT_COAP_ROOT) \
--seqnr $(SUIT_SEQNR) \
--uuid-vendor $(SUIT_VENDOR) \

View File

@ -9,7 +9,7 @@ gen_manifest() {
shift
"${RIOTBASE}/dist/tools/suit_v3/gen_manifest.py" \
"${RIOTBASE}/dist/tools/suit/gen_manifest.py" \
--urlroot "test://test" \
--seqnr "$seqnr" \
--uuid-vendor "riot-os.org" \