Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# pylint: disable=wrong-import-position,wrong-import-order,superfluous-parens
2import os
3import sys
4import logging
5import argparse
6from datetime import timedelta
7import tqdm
8import pytimeparse
9import dxf.exceptions
11choices = ['list-repos',
12 'auth',
13 'create-root-key',
14 'create-metadata-keys',
15 'create-metadata',
16 'reset-keys',
17 'push-target',
18 'del-target',
19 'push-metadata',
20 'list-master-targets',
21 'get-master-expirations',
22 'pull-metadata',
23 'pull-target',
24 'blob-sizes',
25 'check-target',
26 'list-copy-targets',
27 'get-copy-expirations']
29parser = argparse.ArgumentParser()
30subparsers = parser.add_subparsers(dest='op')
31for c in choices:
32 sp = subparsers.add_parser(c)
33 if c != 'list-repos':
34 sp.add_argument('repo')
35 sp.add_argument('args', nargs='*')
37def access_denied():
38 import traceback
39 traceback.print_exc()
40 import errno
41 return errno.EACCES
43def get_lifetime(environ, role):
44 lifetime = environ.get('DTUF_' + role + '_LIFETIME')
45 if lifetime is None:
46 return lifetime
47 return timedelta(seconds=pytimeparse.parse(lifetime))
49# pylint: disable=too-many-statements,too-many-locals
50def doit(args, environ):
51 import tuf.settings
52 log_file = environ.get('DTUF_LOG_FILE', 'dtuf.log')
53 if log_file: 53 ↛ 57line 53 didn't jump to line 57, because the condition on line 53 was never false
54 tuf.settings.LOG_FILENAME = log_file
55 tuf.settings.ENABLE_FILE_LOGGING = True
56 else:
57 tuf.settings.ENABLE_FILE_LOGGING = False
59 import tuf.log
60 if log_file: 60 ↛ 63line 60 didn't jump to line 63, because the condition on line 60 was never false
61 log_level = environ.get('DTUF_FILE_LOG_LEVEL', 'WARNING')
62 tuf.log.set_filehandler_log_level(getattr(logging, log_level))
63 log_level = environ.get('DTUF_CONSOLE_LOG_LEVEL', 'WARNING')
64 # tuf.repository_tool calls tuf.log.add_console_handler
65 import tuf.repository_tool
66 tuf.log.set_console_log_level(getattr(logging, log_level))
68 import dtuf
70 dtuf_progress = environ.get('DTUF_PROGRESS')
71 if dtuf_progress == '1' or (dtuf_progress != '0' and sys.stderr.isatty()):
72 bars = {}
73 def progress(dgst, chunk, size):
74 if dgst not in bars:
75 bars[dgst] = tqdm.tqdm(desc=dgst[0:8],
76 total=size,
77 leave=True)
78 if len(chunk) > 0:
79 bars[dgst].update(len(chunk))
80 if bars[dgst].n >= bars[dgst].total:
81 bars[dgst].close()
82 del bars[dgst]
83 else:
84 progress = None
86 def auth(dtuf_obj, response):
87 # pylint: disable=redefined-outer-name
88 username = environ.get('DTUF_USERNAME')
89 password = environ.get('DTUF_PASSWORD')
90 dtuf_obj.authenticate(username, password, response=response)
92 args = parser.parse_args(args)
93 if args.op == 'list-repos':
94 dtuf_base = dtuf.DTufBase(environ['DTUF_HOST'],
95 auth,
96 environ.get('DTUF_INSECURE') == '1',
97 environ.get('DTUF_AUTH_HOST'))
98 dtuf_obj = dtuf_base
99 elif args.op in ['auth',
100 'create-root-key',
101 'create-metadata-keys',
102 'create-metadata',
103 'reset-keys',
104 'push-target',
105 'del-target',
106 'push-metadata',
107 'list-master-targets',
108 'get-master-expirations']:
109 dtuf_master = dtuf.DTufMaster(environ['DTUF_HOST'],
110 args.repo,
111 environ.get('DTUF_REPOSITORIES_ROOT'),
112 auth,
113 environ.get('DTUF_INSECURE') == '1',
114 environ.get('DTUF_AUTH_HOST'),
115 get_lifetime(environ, 'ROOT'),
116 get_lifetime(environ, 'TARGETS'),
117 get_lifetime(environ, 'SNAPSHOT'),
118 get_lifetime(environ, 'TIMESTAMP'))
119 dtuf_obj = dtuf_master
120 else:
121 dtuf_copy = dtuf.DTufCopy(environ['DTUF_HOST'],
122 args.repo,
123 environ.get('DTUF_REPOSITORIES_ROOT'),
124 auth,
125 environ.get('DTUF_INSECURE') == '1',
126 environ.get('DTUF_AUTH_HOST'))
127 dtuf_obj = dtuf_copy
129 def _doit():
130 # pylint: disable=too-many-branches,too-many-statements
131 if args.op == 'auth':
132 token = dtuf_master.authenticate(environ['DTUF_USERNAME'],
133 environ['DTUF_PASSWORD'],
134 actions=args.args)
135 if token:
136 print(token)
137 return
139 token = environ.get('DTUF_TOKEN')
140 if token:
141 dtuf_obj.token = token
143 if args.op == 'create-root-key':
144 if len(args.args) > 0:
145 parser.error('too many arguments')
146 dtuf_master.create_root_key(environ.get('DTUF_ROOT_KEY_PASSWORD'))
148 elif args.op == 'create-metadata-keys':
149 if len(args.args) > 0:
150 parser.error('too many arguments')
151 dtuf_master.create_metadata_keys(environ.get('DTUF_TARGETS_KEY_PASSWORD'),
152 environ.get('DTUF_SNAPSHOT_KEY_PASSWORD'),
153 environ.get('DTUF_TIMESTAMP_KEY_PASSWORD'))
155 elif args.op == 'create-metadata':
156 if len(args.args) > 0:
157 parser.error('too many arguments')
158 dtuf_master.create_metadata(environ.get('DTUF_ROOT_KEY_PASSWORD'),
159 environ.get('DTUF_TARGETS_KEY_PASSWORD'),
160 environ.get('DTUF_SNAPSHOT_KEY_PASSWORD'),
161 environ.get('DTUF_TIMESTAMP_KEY_PASSWORD'))
163 elif args.op == 'reset-keys':
164 if len(args.args) > 0:
165 parser.error('too many arguments')
166 dtuf_master.reset_keys(environ.get('DTUF_ROOT_KEY_PASSWORD'),
167 environ.get('DTUF_TARGETS_KEY_PASSWORD'),
168 environ.get('DTUF_SNAPSHOT_KEY_PASSWORD'),
169 environ.get('DTUF_TIMESTAMP_KEY_PASSWORD'))
171 elif args.op == 'push-target':
172 if len(args.args) < 2:
173 parser.error('too few arguments')
174 dtuf_master.push_target(args.args[0],
175 *args.args[1:],
176 progress=progress)
178 elif args.op == 'del-target':
179 for name in args.args: 179 ↛ exitline 179 didn't return from function '_doit', because the loop on line 179 didn't complete
180 dtuf_master.del_target(name)
182 elif args.op == 'push-metadata':
183 if len(args.args) > 0:
184 parser.error('too many arguments')
185 dtuf_master.push_metadata(environ.get('DTUF_TARGETS_KEY_PASSWORD'),
186 environ.get('DTUF_SNAPSHOT_KEY_PASSWORD'),
187 environ.get('DTUF_TIMESTAMP_KEY_PASSWORD'),
188 progress)
190 elif args.op == 'list-master-targets':
191 if len(args.args) > 0:
192 parser.error('too many arguments')
193 for name in dtuf_master.list_targets():
194 print(name)
196 elif args.op == 'get-master-expirations':
197 if len(args.args) > 0:
198 parser.error('too many arguments')
199 for role, expiry in dtuf_master.get_expirations().items():
200 print(role + ': ' + expiry.isoformat())
202 elif args.op == 'pull-metadata':
203 if len(args.args) > 1:
204 parser.error('too many arguments')
205 root_public_key = None
206 if len(args.args) == 1:
207 if args.args[0] == '-':
208 root_public_key = sys.stdin.read()
209 else:
210 with open(args.args[0], 'rb') as f:
211 root_public_key = f.read().decode('utf-8')
212 for name in dtuf_copy.pull_metadata(root_public_key, progress):
213 print(name)
215 elif args.op == 'pull-target':
216 _stdout = getattr(sys.stdout, 'buffer', sys.stdout)
217 for name in args.args:
218 for it, dgst, size in dtuf_copy.pull_target(name, True):
219 if environ.get('DTUF_BLOB_INFO') == '1':
220 print(dgst + ' ' + str(size))
221 # pylint: disable=protected-access
222 dtuf._write_with_progress(it, dgst, size, _stdout, progress)
224 elif args.op == 'blob-sizes':
225 for name in args.args:
226 for size in dtuf_copy.blob_sizes(name):
227 print(size)
229 elif args.op == 'check-target':
230 if len(args.args) < 2:
231 parser.error('too few arguments')
232 dtuf_copy.check_target(args.args[0], *args.args[1:])
234 elif args.op == 'list-copy-targets':
235 if len(args.args) > 0:
236 parser.error('too many arguments')
237 for name in dtuf_copy.list_targets():
238 print(name)
240 elif args.op == 'get-copy-expirations':
241 if len(args.args) > 0:
242 parser.error('too many arguments')
243 for role, expiry in dtuf_copy.get_expirations().items():
244 print(role + ': ' + expiry.isoformat())
246 elif args.op == 'list-repos': 246 ↛ exitline 246 didn't return from function '_doit', because the condition on line 246 was never false
247 for name in dtuf_base.list_repos():
248 print(name)
250 try:
251 _doit()
252 return 0
253 except dxf.exceptions.DXFUnauthorizedError: 253 ↛ 254line 253 didn't jump to line 254, because the exception caught by line 253 didn't happen
254 return access_denied()
255 except tuf.exceptions.NoWorkingMirrorError as ex:
256 for ex2 in ex.mirror_errors.values():
257 if isinstance(ex2, dxf.exceptions.DXFUnauthorizedError):
258 return access_denied()
259 raise
261def main():
262 sys.exit(doit(sys.argv[1:], os.environ))