1
0
mirror of https://github.com/RIOT-OS/RIOT.git synced 2024-12-29 04:50:03 +01:00
RIOT/dist/tools/suit/suit-manifest-generator/suit_tool/manifest.py

881 lines
31 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
# ----------------------------------------------------------------------------
# Copyright 2019-2020 ARM Limited or its affiliates
#
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ----------------------------------------------------------------------------
import collections
import binascii
import cbor2 as cbor
import json
import copy
import uuid
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from collections import OrderedDict
import logging
LOG = logging.getLogger(__name__)
TreeBranch = []
ManifestKey = collections.namedtuple(
'ManifestKey',
[
'json_key',
'suit_key',
'obj'
]
)
def to_bytes(s):
if isinstance(s,bytes):
return s
try:
return binascii.a2b_hex(s)
except:
try:
return binascii.a2b_base64(s)
except:
if isinstance(s,str):
return s.encode('utf-8')
else:
return str(s).encode('utf-8')
class SUITException(Exception):
def __init__(self, m, data, tree_branch):
super().__init__(m)
self.data = data
self.tree_branch = tree_branch
class SUITCommonInformation:
def __init__(self):
self.component_ids = []
self.dependencies = []
self.current_index = 0
self.indent_size = 4
def component_id_to_index(self, cid):
id = -1
for i, c in enumerate(self.component_ids):
if c == cid and i >= 0:
id = componentIndex(i)
for i, d in enumerate(self.dependencies):
if d.digest == cid and i >= 0:
id = dependencyIndex(i)
return id
suitCommonInfo = SUITCommonInformation()
one_indent = ' '
class SUITInt:
def from_json(self, v):
self.v = int(v)
return self
def to_json(self):
return self.v
def from_suit(self, v):
TreeBranch.append(type(self))
self.v = int(v)
TreeBranch.pop()
return self
def to_suit(self):
return self.v
def to_debug(self, indent):
return str(self.v)
class SUITPosInt(SUITInt):
def from_json(self, v):
TreeBranch.append(type(self))
_v = int(v)
# print (_v)
if _v < 0:
raise Exception('Positive Integers must be >= 0')
self.v = _v
TreeBranch.pop()
return self
def from_suit(self, v):
return self.from_json(v)
class SUITManifestDict:
def mkfields(d):
# rd = OderedDict()
return {k: ManifestKey(*v) for k,v in d.items()}
def __init__(self):
pass
def __eq__(self, rhs):
if not isinstance(rhs, type(self)):
return False
for f, info in self.fields:
if hasattr(self, f) != hasattr(rhs, f):
return False
if hasattr(self, f) and hasattr(rhs, f) and getattr(self, f) != getattr(rhs, f):
return False
for a,b in zip(self.items, rhs.items):
if not a == b:
return False
return True
def from_json(self, data):
for k, f in self.fields.items():
v = data.get(f.json_key, None)
setattr(self, k, f.obj().from_json(v) if v is not None else None)
return self
def to_json(self):
j = OrderedDict()
for k, f in self.fields.items():
v = getattr(self, k)
if v:
j[f.json_key] = v.to_json()
return j
def from_suit(self, data):
TreeBranch.append(type(self))
for k, f in self.fields.items():
TreeBranch.append(k)
v = data.get(f.suit_key, None)
d = f.obj().from_suit(v) if v is not None else None
setattr(self, k, d)
TreeBranch.pop()
TreeBranch.pop()
return self
def to_suit(self):
sd = OrderedDict()
for k, f in self.fields.items():
v = getattr(self, k)
if v:
sd[f.suit_key] = v.to_suit()
return sd
def to_debug(self, indent):
s = '{'
newindent = indent + one_indent
for k, f in self.fields.items():
v = getattr(self, k)
if v:
s += '\n{ind}/ {jk} / {sk}:'.format(ind=newindent, jk=f.json_key, sk=f.suit_key)
s += v.to_debug(newindent) + ','
s += '\n' + indent + '}'
return s
class SUITManifestNamedList(SUITManifestDict):
def from_suit(self, data):
TreeBranch.append(type(self))
for k, f in self.fields.items():
TreeBranch.append(k)
setattr(self, k, f.obj().from_suit(data[f.suit_key]))
TreeBranch.pop()
TreeBranch.pop()
return self
def to_suit(self):
sd = [None] * (max([f.suit_key for k, f in self.fields.items()]) + 1)
for k, f in self.fields.items():
v = getattr(self, k)
if v:
sd[f.suit_key] = v.to_suit()
return sd
def to_debug(self, indent):
newindent = indent + one_indent
items = []
for k, f in self.fields.items():
v = getattr(self, k)
if v:
items.append('/ ' + f.json_key + ' / ' + v.to_debug(newindent))
s = '[\n{newindent}{items}\n{indent}]'.format(
newindent=newindent,
indent=indent,
items=',\n{newindent}'.format(newindent=newindent).join(items)
)
return s
class SUITKeyMap:
def mkKeyMaps(m):
return {v:k for k,v in m.items()}, m
def to_json(self):
return self.rkeymap[self.v]
def from_json(self, d):
self.v = self.keymap[d]
return self
def to_suit(self):
return self.v
def from_suit(self, d):
TreeBranch.append(type(self))
self.v = self.keymap[self.rkeymap[d]]
TreeBranch.pop()
return self
def to_debug(self, indent):
s = str(self.v) + ' / ' + json.dumps(self.to_json(),sort_keys = True) + ' /'
return s
def SUITBWrapField(c):
class SUITBWrapper:
def to_suit(self):
return cbor.dumps(self.v.to_suit(), canonical=True)
def from_suit(self, d):
TreeBranch.append(type(self))
try:
self.v = c().from_suit(cbor.loads(d))
except SUITException as e:
raise e
except Exception as e:
LOG.debug('At {}: failed to load "{}" as CBOR'.format(type(self),binascii.b2a_hex(d).decode('utf-8')))
LOG.debug('Path: {}'.format(TreeBranch))
# LOG.debug('At {}: failed to load "{}" as CBOR'.format(type(self),binascii.b2a_hex(d).decode('utf-8')))
raise SUITException(
m = 'At {}: failed to load "{}" as CBOR'.format(type(self),binascii.b2a_hex(d).decode('utf-8')),
data = d,
tree_branch = TreeBranch
)
TreeBranch.pop()
return self
def to_json(self):
return self.v.to_json()
def from_json(self, d):
self.v = c().from_json(d)
return self
def to_debug(self, indent):
s = 'h\''
s += binascii.b2a_hex(self.to_suit()).decode('utf-8')
s += '\' / '
s += self.v.to_debug(indent)
s += ' /'
return s
return SUITBWrapper
class SUITManifestArray:
def __init__(self):
self.items=[]
def __eq__(self, rhs):
if not isinstance(rhs, type(self)):
return False
if len(self.items) != len(rhs.items):
return False
for a,b in zip(self.items, rhs.items):
if not a == b:
return False
return True
def from_json(self, data):
self.items = []
for d in data:
self.items.append(self.field.obj().from_json(d))
return self
def to_json(self):
j = []
for i in self.items:
j.append(i.to_json())
return j
def from_suit(self, data):
self.items = []
TreeBranch.append(type(self))
for d in data:
TreeBranch.append(len(self.items))
self.items.append(self.field.obj().from_suit(d))
TreeBranch.pop()
TreeBranch.pop()
return self
def to_suit(self):
l = []
for i in self.items:
l.append(i.to_suit())
return l
def append(self, element):
if not isinstance(element, self.field.obj):
raise Exception('element {} is not a {}'.format(element, self.field.obj))
self.items.append(element)
def to_debug(self, indent):
newindent = indent + one_indent
s = '[\n'
s += ' ,\n'.join([newindent + v.to_debug(newindent) for v in self.items])
s += '\n' + indent + ']'
return s
class SUITBytes:
def to_json(self):
return binascii.b2a_hex(self.v).decode('utf-8')
def from_json(self, d):
self.v = to_bytes(d)
return self
def from_suit(self, d):
self.v = bytes(d)
return self
def to_suit(self):
return self.v
def to_debug(self, indent):
return 'h\'' + self.to_json() + '\''
def __eq__(self, rhs):
return self.v == rhs.v
class SUITUUID(SUITBytes):
def from_json(self, d):
self.v = uuid.UUID(d).bytes
return self
def from_suit(self, d):
self.v = uuid.UUID(bytes=d).bytes
return self
def to_debug(self, indent):
return 'h\'' + json.dumps(self.to_json(), sort_keys=True) + '\' / ' + str(uuid.UUID(bytes=self.v)) + ' /'
class SUITRaw:
def to_json(self):
return self.v
def from_json(self, d):
self.v = d
return self
def to_suit(self):
return self.v
def from_suit(self, d):
self.v = d
return self
def to_debug(self, indent):
return str(self.v)
class SUITNil:
def to_json(self):
return None
def from_json(self, d):
if d is not None:
raise Exception('Expected Nil')
return self
def to_suit(self):
return None
def from_suit(self, d):
if d is not None:
raise Exception('Expected Nil')
return self
def to_debug(self, indent):
return 'F6 / nil /'
class SUITTStr(SUITRaw):
def from_json(self, d):
self.v = str(d)
return self
def from_suit(self, d):
self.v = str(d)
return self
def to_debug(self, indent):
return '\''+ str(self.v) + '\''
class SUITComponentId(SUITManifestArray):
field = collections.namedtuple('ArrayElement', 'obj')(obj=SUITBytes)
def to_suit(self):
return tuple(super(SUITComponentId, self).to_suit())
def to_debug(self, indent):
newindent = indent + one_indent
s = '[' + ''.join([v.to_debug(newindent) for v in self.items]) + ']'
return s
def __hash__(self):
return hash(tuple([i.v for i in self.items]))
class SUITComponentIndex(SUITComponentId):
def to_suit(self):
return suitCommonInfo.component_id_to_index(self)
def from_suit(self, d):
return super(SUITComponentIndex, self).from_suit(
suitCommonInfo.component_ids[d].to_suit()
)
def to_debug(self, indent):
newindent = indent + one_indent
s = '{suit} / [{dbg}] /'.format(
suit=self.to_suit(),
dbg=''.join([v.to_debug(newindent) for v in self.items])
)
return s
class SUITComponents(SUITManifestArray):
field = collections.namedtuple('ArrayElement', 'obj')(obj=SUITComponentId)
def from_suit(self, data):
super(SUITComponents, self).from_suit(data)
suitCommonInfo.component_ids = self.items
return self
def from_json(self, j):
super(SUITComponents, self).from_json(j)
suitCommonInfo.component_ids = self.items
return self
class SUITDigestAlgo(SUITKeyMap):
rkeymap, keymap = SUITKeyMap.mkKeyMaps({
'sha224' : 1,
'sha256' : 2,
'sha384' : 3,
'sha512' : 4
})
class SUITDigest(SUITManifestNamedList):
fields = SUITManifestNamedList.mkfields({
'algo' : ('algorithm-id', 0, SUITDigestAlgo),
'digest' : ('digest-bytes', 1, SUITBytes)
})
def __hash__(self):
return hash(tuple([getattr(self, k) for k in self.fields.keys() if hasattr(self, k)]))
class SUITCompressionInfo(SUITKeyMap):
rkeymap, keymap = SUITKeyMap.mkKeyMaps({
'gzip' : 1,
'bzip2' : 2,
'deflate' : 3,
'lz4' : 4,
'lzma' : 7
})
class SUITParameters(SUITManifestDict):
fields = SUITManifestDict.mkfields({
'vendor-id' : ('vendor-id', 1, SUITUUID),
'class-id' : ('class-id', 2, SUITUUID),
'digest' : ('image-digest', 3, SUITBWrapField(SUITDigest)),
'size' : ('image-size', 14, SUITPosInt),
'uri' : ('uri', 21, SUITTStr),
'src' : ('source-component', 22, SUITComponentIndex),
'compress' : ('compression-info', 19, SUITCompressionInfo),
'offset' : ('offset', 5, SUITPosInt)
})
def from_json(self, j):
return super(SUITParameters, self).from_json(j)
class SUITTryEach(SUITManifestArray):
pass
class dependencyIndex(int):
def __new__(cls, value):
return super(cls, cls).__new__(cls, value)
class componentIndex(int):
def __new__(cls, value):
return super(cls, cls).__new__(cls, value)
def SUITCommandContainer(jkey, skey, argtype, dp=[]):
class SUITCmd(SUITCommand):
json_key = jkey
suit_key = skey
dep_params = dp
def __init__(self):
pass
def to_suit(self):
return [self.suit_key, self.arg.to_suit()]
def to_json(self):
if self.json_key == 'directive-set-component-index':
return {}
else:
return {
'command-id' : self.json_key,
'command-arg' : self.arg.to_json(),
'component-id' : self.cid.to_json()
}
def from_json(self, j):
if j['command-id'] != self.json_key:
raise Except('JSON Key mismatch error')
if self.json_key != 'directive-set-component-index' and self.json_key != 'directive-set-dependency-index':
try:
self.cid = SUITComponentId().from_json(j['component-id'])
except:
self.cid = SUITDigest().from_json(j['component-id'])
self.arg = argtype().from_json(j['command-arg'])
return self
def from_suit(self, s):
if s[0] != self.suit_key:
raise Except('SUIT Key mismatch error')
if self.json_key == 'directive-set-component-index':
suitCommonInfo.current_index = componentIndex(s[1])
elif self.json_key == 'directive-set-dependency-index':
suitCommonInfo.current_index = dependencyIndex(s[1])
else:
if isinstance(suitCommonInfo.current_index, dependencyIndex):
self.cid = suitCommonInfo.dependencies[suitCommonInfo.current_index]
else:
self.cid = suitCommonInfo.component_ids[suitCommonInfo.current_index]
self.arg = argtype().from_suit(s[1])
return self
def to_debug(self, indent):
s = '/ {} / {},'.format(self.json_key, self.suit_key)
s += self.arg.to_debug(indent)
return s
return SUITCmd
def mkPolicy(policy):
class SUITReportingPolicy(SUITPosInt):
default_policy = policy
def from_json(self, j):
if j is None:
j = self.default_policy
return super(SUITReportingPolicy, self).from_json(j)
return SUITReportingPolicy
class SUITCommand:
def from_json(self, j):
return self.jcommands[j['command-id']]().from_json(j)
def from_suit(self, s):
return self.scommands[s[0]]().from_suit(s)
SUITCommand.commands = [
SUITCommandContainer('condition-vendor-identifier', 1, mkPolicy(policy=0xF), dp=['vendor-id']),
SUITCommandContainer('condition-class-identifier', 2, mkPolicy(policy=0xF), dp=['class-id']),
SUITCommandContainer('condition-image-match', 3, mkPolicy(policy=0xF), dp=['digest']),
SUITCommandContainer('condition-use-before', 4, mkPolicy(policy=0xA)),
SUITCommandContainer('condition-component-offset', 5, mkPolicy(policy=0x5), dp=['offset']),
SUITCommandContainer('condition-device-identifier', 24, mkPolicy(policy=0xF)),
SUITCommandContainer('condition-image-not-match', 25, mkPolicy(policy=0xF)),
SUITCommandContainer('condition-minimum-battery', 26, mkPolicy(policy=0xA)),
SUITCommandContainer('condition-update-authorised', 27, mkPolicy(policy=0x3)),
SUITCommandContainer('condition-version', 28, mkPolicy(policy=0xF)),
SUITCommandContainer('directive-set-component-index', 12, SUITPosInt),
SUITCommandContainer('directive-set-dependency-index', 13, SUITPosInt),
SUITCommandContainer('directive-abort', 14, mkPolicy(policy=0x2)),
SUITCommandContainer('directive-try-each', 15, SUITTryEach),
SUITCommandContainer('directive-process-dependency', 18, mkPolicy(policy=0)),
SUITCommandContainer('directive-set-parameters', 19, SUITParameters),
SUITCommandContainer('directive-override-parameters', 20, SUITParameters),
SUITCommandContainer('directive-fetch', 21, mkPolicy(policy=0x2)),
SUITCommandContainer('directive-copy', 22, mkPolicy(policy=0x2)),
SUITCommandContainer('directive-run', 23, mkPolicy(policy=0x2)),
SUITCommandContainer('directive-wait', 29, mkPolicy(policy=0x2)),
SUITCommandContainer('directive-run-sequence', 30, SUITRaw),
SUITCommandContainer('directive-run-with-arguments', 31, SUITRaw),
SUITCommandContainer('directive-swap', 32, mkPolicy(policy=0x2)),
]
SUITCommand.jcommands = { c.json_key : c for c in SUITCommand.commands}
SUITCommand.scommands = { c.suit_key : c for c in SUITCommand.commands}
class SUITSequence(SUITManifestArray):
field = collections.namedtuple('ArrayElement', 'obj')(obj=SUITCommand)
def to_suit(self):
suit_l = []
suitCommonInfo.current_index = 0 if len(suitCommonInfo.component_ids) == 1 else None
for i in self.items:
if i.json_key == 'directive-set-component-index':
suitCommonInfo.current_index = componentIndex(i.arg.v)
elif i.json_key == 'directive-set-dependency-index':
suitCommonInfo.current_index = dependencyIndex(i.arg.v)
else:
# Option 1: current & command index same class, same number,
# Do nothing
# Option 2: current & command not equal, command is component
# set component index
# Option 3: current & command not equal, command is dependency
# set dependency index
cidx = suitCommonInfo.component_id_to_index(i.cid)
if cidx != suitCommonInfo.current_index:
op = 'directive-set-component-index'
if isinstance(cidx, dependencyIndex):
op = 'directive-set-dependency-index'
# Change component/dependency
suitCommonInfo.current_index = cidx
suit_l += SUITCommand().from_json({
'command-id' : op,
'command-arg' : int(cidx)
}).to_suit()
suit_l += i.to_suit()
return suit_l
def to_debug(self, indent):
return super(SUITSequence, SUITSequence().from_suit(self.to_suit())).to_debug(indent)
def from_suit(self, s):
self.items = [SUITCommand().from_suit(i) for i in zip(*[iter(s)]*2)]
return self
SUITTryEach.field = collections.namedtuple('ArrayElement', 'obj')(obj=SUITBWrapField(SUITSequence))
class SUITSequenceComponentReset(SUITSequence):
def to_suit(self):
suitCommonInfo.current_index = None
return super(SUITSequenceComponentReset, self).to_suit()
def SUITMakeSeverableField(c):
class SUITSeverableField:
objtype = SUITBWrapField(c)
def from_json(self, data):
if 'algorithm-id' in data:
self.v = SUITDigest().from_json(data)
else:
self.v = self.objtype().from_json(data)
return self
def from_suit(self, data):
if isinstance(data, list):
self.v = SUITDigest().from_suit(data)
else:
self.v = self.objtype().from_suit(data)
return self
def to_json(self):
return self.v.to_json()
def to_suit(self):
return self.v.to_suit()
def to_debug(self, indent):
return self.v.to_debug(indent)
return SUITSeverableField
class SUITDependency(SUITManifestDict):
fields = SUITManifestDict.mkfields({
'digest' : ('dependency-digest', 1, SUITDigest),
'prefix' : ('dependency-prefix', 2, SUITComponentId),
})
class SUITDependencies(SUITManifestArray):
field = collections.namedtuple('ArrayElement', 'obj')(obj=SUITDependency)
def from_suit(self, data):
super(SUITDependencies, self).from_suit(data)
suitCommonInfo.dependencies = self.items
return self
def from_json(self, j):
super(SUITDependencies, self).from_json(j)
suitCommonInfo.dependencies = self.items
return self
class SUITCommon(SUITManifestDict):
fields = SUITManifestNamedList.mkfields({
'dependencies' : ('dependencies', 1, SUITBWrapField(SUITDependencies)),
'components' : ('components', 2, SUITComponents),
'common_sequence' : ('common-sequence', 4, SUITBWrapField(SUITSequenceComponentReset)),
})
class SUITComponentText(SUITManifestDict):
fields = SUITManifestDict.mkfields({
'vendorname' : ('vendor-name', 1, SUITTStr),
'modelname' : ('model-name', 2, SUITTStr),
'vendordomain' : ('vendor-domain', 3, SUITTStr),
'modelinfo' : ('json-source', 4, SUITTStr),
'cdesc' : ('component-description', 5, SUITTStr),
'version' : ('version', 6, SUITTStr),
'reqversion' : ('required-version', 7, SUITTStr),
})
class SUITText(SUITManifestDict):
fields = SUITManifestDict.mkfields({
'mdesc' : ('manifest-description', 1, SUITTStr),
'udesc' : ('update-description', 2, SUITTStr),
'json' : ('json-source', 3, SUITTStr),
'yaml' : ('yaml-source', 4, SUITTStr),
})
components={}
def to_json(self):
d = super(SUITText, self).to_json()
d.update({k.to_json() : v.to_json() for k,v in self.components.items()})
return d
def from_json(self, data):
# Handle components
for k,v in data.items():
if not isinstance(v, str):
self.components[SUITComponentId().from_json(k)] = SUITComponentText().from_json(v)
# Treat everything else as a normal manifestDict
return super(SUITText, self).from_json(data)
def to_suit(self):
d = super(SUITText, self).to_suit()
d.update({k.to_suit() : v.to_suit() for k,v in self.components.items()})
return d
def from_suit(self, data):
# Handle components
for k,v in data.items():
if not isinstance(v, str):
self.components[SUITComponentId().from_suit(k)] = SUITComponentText().from_suit(v)
# Treat everything else as a normal manifestDict
return super(SUITText, self).from_json(data)
def to_debug(self, indent):
s = '{'
newindent = indent + one_indent
for k, f in self.fields.items():
v = getattr(self, k)
if v:
s += '\n{ind}/ {jk} / {sk}:'.format(ind=newindent, jk=f.json_key, sk=f.suit_key)
s += v.to_debug(newindent) + ','
for k, f in self.components.items():
s += '\n' + newindent + '{}:'.format(k.to_debug(newindent + one_indent))
s += f.to_debug(newindent + one_indent)
s += '\n' + indent + '}'
return s
class SUITManifest(SUITManifestDict):
fields = SUITManifestDict.mkfields({
'version' : ('manifest-version', 1, SUITPosInt),
'sequence' : ('manifest-sequence-number', 2, SUITPosInt),
'common' : ('common', 3, SUITBWrapField(SUITCommon)),
'refuri' : ('reference-uri', 4, SUITTStr),
'deres' : ('dependency-resolution', 7, SUITMakeSeverableField(SUITSequenceComponentReset)),
'fetch' : ('payload-fetch', 8, SUITMakeSeverableField(SUITSequenceComponentReset)),
'install' : ('install', 9, SUITMakeSeverableField(SUITSequenceComponentReset)),
'validate' : ('validate', 10, SUITBWrapField(SUITSequenceComponentReset)),
'load' : ('load', 11, SUITBWrapField(SUITSequenceComponentReset)),
'run' : ('run', 12, SUITBWrapField(SUITSequenceComponentReset)),
'text' : ('text', 13, SUITMakeSeverableField(SUITText)),
'coswid' : ('coswid', 14, SUITBytes),
})
class COSE_Algorithms(SUITKeyMap):
rkeymap, keymap = SUITKeyMap.mkKeyMaps({
'ES256' : -7,
'ES384' : -35,
'ES512' : -36,
'EdDSA' : -8,
'HSS-LMS' : -46,
})
class COSE_CritList(SUITManifestArray):
field = collections.namedtuple('ArrayElement', 'obj')(obj=SUITInt)
class COSE_header_map(SUITManifestDict):
fields = SUITManifestDict.mkfields({
# 1: algorithm Identifier
'alg' : ('alg', 1, COSE_Algorithms),
# 2: list of critical headers (criticality)
# 3: content type
# 4: key id
'kid' : ('kid', 4, SUITBytes),
# 5: IV
# 6: partial IV
# 7: counter signature(s)
})
class COSE_Sign:
pass
class COSE_Sign1(SUITManifestNamedList):
fields = SUITManifestDict.mkfields({
'protected' : ('protected', 0, SUITBWrapField(COSE_header_map)),
'unprotected' : ('unprotected', 1, COSE_header_map),
'payload' : ('payload', 2, SUITBWrapField(SUITDigest)),
'signature' : ('signature', 3, SUITBytes)
})
class COSE_Mac:
pass
class COSE_Mac0:
pass
class COSETagChoice(SUITManifestDict):
def to_suit(self):
for k, f in self.fields.items():
v = getattr(self, k, None)
if v:
return cbor.CBORTag(tag=f.suit_key, value=v.to_suit())
return None
def from_suit(self, data):
for k, f in self.fields.items():
if data.tag == f.suit_key:
v = data.value
d = f.obj().from_suit(v) if v is not None else None
setattr(self, k, d)
return self
def to_debug(self, indent):
s = ''
for k, f in self.fields.items():
if hasattr(self, k):
v = getattr(self, k)
newindent = indent + one_indent
s = '{tag}({value})'.format(tag=f.suit_key, value=v.to_debug(newindent))
return s
class COSETaggedAuth(COSETagChoice):
fields = SUITManifestDict.mkfields({
'cose_sign' : ('COSE_Sign_Tagged', 98, COSE_Sign),
'cose_sign1' : ('COSE_Sign1_Tagged', 18, COSE_Sign1),
'cose_mac' : ('COSE_Mac_Tagged', 97, COSE_Mac),
'cose_mac0' : ('COSE_Mac0_Tagged', 17, COSE_Mac0)
})
class COSEList(SUITManifestArray):
field = collections.namedtuple('ArrayElement', 'obj')(obj=SUITBWrapField(COSETaggedAuth))
def from_suit(self, data):
return super(COSEList, self).from_suit(data)
class SUITEnvelope(SUITManifestDict):
fields = SUITManifestDict.mkfields({
'auth' : ('authentication-wrapper', 2, SUITBWrapField(COSEList)),
'manifest' : ('manifest', 3, SUITBWrapField(SUITManifest)),
'deres': ('dependency-resolution', 7, SUITBWrapField(SUITSequence)),
'fetch': ('payload-fetch', 8, SUITBWrapField(SUITSequence)),
'install': ('install', 9, SUITBWrapField(SUITSequence)),
'validate': ('validate', 10, SUITBWrapField(SUITSequence)),
'load': ('load', 11, SUITBWrapField(SUITSequence)),
'run': ('run', 12, SUITBWrapField(SUITSequence)),
'text': ('text', 13, SUITBWrapField(SUITText)),
'coswid': ('coswid', 14, SUITBytes),
})
severable_fields = {'deres', 'fetch', 'install', 'text', 'coswid'}
digest_algorithms = {
'sha224' : hashes.SHA224,
'sha256' : hashes.SHA256,
'sha384' : hashes.SHA384,
'sha512' : hashes.SHA512
}
def to_severable(self, digest_alg):
sev = copy.deepcopy(self)
for k in sev.severable_fields:
f = sev.manifest.v.fields[k]
if not hasattr(sev.manifest.v, k):
continue
v = getattr(sev.manifest.v, k)
if v is None:
continue
cbor_field = cbor.dumps(v.to_suit(), canonical=True)
digest = hashes.Hash(self.digest_algorithms.get(digest_alg)(), backend=default_backend())
digest.update(cbor_field)
field_digest = SUITDigest().from_json({
'algorithm-id' : digest_alg,
'digest-bytes' : digest.finalize()
})
cbor_digest = cbor.dumps(field_digest.to_suit(), canonical=True)
if len(cbor_digest) < len(cbor_field):
setattr(sev.manifest.v, k, field_digest)
setattr(sev,k,v)
return sev
def from_severable(self):
raise Exception('From Severable unimplemented')
nsev = copy.deepcopy(self)
for k in nsev.severable_fields:
f = nsev.fields[k]
if not hasattr(nsev, k):
continue
v = getattr(nsev, k)
if v is None:
continue
# Verify digest
cbor_field = cbor.dumps(v.to_suit(), canonical=True)
digest = hashes.Hash(hashes.SHA256(), backend=default_backend())
digest.update(cbor_field)
actual_digest = digest.finalize()
field_digest = getattr(nsev.v, k)
expected_digest = field_digest.to_suit()[1]
if digest != expected_digest:
raise Exception('Field Digest mismatch: For {}, expected: {}, got {}'.format(
f.json_key, expected_digest, actual_digest
))
setattr(nsev.manifest.v, k, v)
delattr(nsev, k)
return nsev