Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# pylint: disable=wrong-import-position,wrong-import-order,superfluous-parens 

2import os 

3import sys 

4import logging 

5import argparse 

6from datetime import timedelta 

7import tqdm 

8import pytimeparse 

9import dxf.exceptions 

10 

11choices = ['list-repos', 

12 'auth', 

13 'create-root-key', 

14 'create-metadata-keys', 

15 'create-metadata', 

16 'reset-keys', 

17 'push-target', 

18 'del-target', 

19 'push-metadata', 

20 'list-master-targets', 

21 'get-master-expirations', 

22 'pull-metadata', 

23 'pull-target', 

24 'blob-sizes', 

25 'check-target', 

26 'list-copy-targets', 

27 'get-copy-expirations'] 

28 

29parser = argparse.ArgumentParser() 

30subparsers = parser.add_subparsers(dest='op') 

31for c in choices: 

32 sp = subparsers.add_parser(c) 

33 if c != 'list-repos': 

34 sp.add_argument('repo') 

35 sp.add_argument('args', nargs='*') 

36 

37def access_denied(): 

38 import traceback 

39 traceback.print_exc() 

40 import errno 

41 return errno.EACCES 

42 

43def get_lifetime(environ, role): 

44 lifetime = environ.get('DTUF_' + role + '_LIFETIME') 

45 if lifetime is None: 

46 return lifetime 

47 return timedelta(seconds=pytimeparse.parse(lifetime)) 

48 

49# pylint: disable=too-many-statements,too-many-locals 

50def doit(args, environ): 

51 import tuf.settings 

52 log_file = environ.get('DTUF_LOG_FILE', 'dtuf.log') 

53 if log_file: 53 ↛ 57line 53 didn't jump to line 57, because the condition on line 53 was never false

54 tuf.settings.LOG_FILENAME = log_file 

55 tuf.settings.ENABLE_FILE_LOGGING = True 

56 else: 

57 tuf.settings.ENABLE_FILE_LOGGING = False 

58 

59 import tuf.log 

60 if log_file: 60 ↛ 63line 60 didn't jump to line 63, because the condition on line 60 was never false

61 log_level = environ.get('DTUF_FILE_LOG_LEVEL', 'WARNING') 

62 tuf.log.set_filehandler_log_level(getattr(logging, log_level)) 

63 log_level = environ.get('DTUF_CONSOLE_LOG_LEVEL', 'WARNING') 

64 # tuf.repository_tool calls tuf.log.add_console_handler 

65 import tuf.repository_tool 

66 tuf.log.set_console_log_level(getattr(logging, log_level)) 

67 

68 import dtuf 

69 

70 dtuf_progress = environ.get('DTUF_PROGRESS') 

71 if dtuf_progress == '1' or (dtuf_progress != '0' and sys.stderr.isatty()): 

72 bars = {} 

73 def progress(dgst, chunk, size): 

74 if dgst not in bars: 

75 bars[dgst] = tqdm.tqdm(desc=dgst[0:8], 

76 total=size, 

77 leave=True) 

78 if len(chunk) > 0: 

79 bars[dgst].update(len(chunk)) 

80 if bars[dgst].n >= bars[dgst].total: 

81 bars[dgst].close() 

82 del bars[dgst] 

83 else: 

84 progress = None 

85 

86 def auth(dtuf_obj, response): 

87 # pylint: disable=redefined-outer-name 

88 username = environ.get('DTUF_USERNAME') 

89 password = environ.get('DTUF_PASSWORD') 

90 dtuf_obj.authenticate(username, password, response=response) 

91 

92 args = parser.parse_args(args) 

93 if args.op == 'list-repos': 

94 dtuf_base = dtuf.DTufBase(environ['DTUF_HOST'], 

95 auth, 

96 environ.get('DTUF_INSECURE') == '1', 

97 environ.get('DTUF_AUTH_HOST')) 

98 dtuf_obj = dtuf_base 

99 elif args.op in ['auth', 

100 'create-root-key', 

101 'create-metadata-keys', 

102 'create-metadata', 

103 'reset-keys', 

104 'push-target', 

105 'del-target', 

106 'push-metadata', 

107 'list-master-targets', 

108 'get-master-expirations']: 

109 dtuf_master = dtuf.DTufMaster(environ['DTUF_HOST'], 

110 args.repo, 

111 environ.get('DTUF_REPOSITORIES_ROOT'), 

112 auth, 

113 environ.get('DTUF_INSECURE') == '1', 

114 environ.get('DTUF_AUTH_HOST'), 

115 get_lifetime(environ, 'ROOT'), 

116 get_lifetime(environ, 'TARGETS'), 

117 get_lifetime(environ, 'SNAPSHOT'), 

118 get_lifetime(environ, 'TIMESTAMP')) 

119 dtuf_obj = dtuf_master 

120 else: 

121 dtuf_copy = dtuf.DTufCopy(environ['DTUF_HOST'], 

122 args.repo, 

123 environ.get('DTUF_REPOSITORIES_ROOT'), 

124 auth, 

125 environ.get('DTUF_INSECURE') == '1', 

126 environ.get('DTUF_AUTH_HOST')) 

127 dtuf_obj = dtuf_copy 

128 

129 def _doit(): 

130 # pylint: disable=too-many-branches,too-many-statements 

131 if args.op == 'auth': 

132 token = dtuf_master.authenticate(environ['DTUF_USERNAME'], 

133 environ['DTUF_PASSWORD'], 

134 actions=args.args) 

135 if token: 

136 print(token) 

137 return 

138 

139 token = environ.get('DTUF_TOKEN') 

140 if token: 

141 dtuf_obj.token = token 

142 

143 if args.op == 'create-root-key': 

144 if len(args.args) > 0: 

145 parser.error('too many arguments') 

146 dtuf_master.create_root_key(environ.get('DTUF_ROOT_KEY_PASSWORD')) 

147 

148 elif args.op == 'create-metadata-keys': 

149 if len(args.args) > 0: 

150 parser.error('too many arguments') 

151 dtuf_master.create_metadata_keys(environ.get('DTUF_TARGETS_KEY_PASSWORD'), 

152 environ.get('DTUF_SNAPSHOT_KEY_PASSWORD'), 

153 environ.get('DTUF_TIMESTAMP_KEY_PASSWORD')) 

154 

155 elif args.op == 'create-metadata': 

156 if len(args.args) > 0: 

157 parser.error('too many arguments') 

158 dtuf_master.create_metadata(environ.get('DTUF_ROOT_KEY_PASSWORD'), 

159 environ.get('DTUF_TARGETS_KEY_PASSWORD'), 

160 environ.get('DTUF_SNAPSHOT_KEY_PASSWORD'), 

161 environ.get('DTUF_TIMESTAMP_KEY_PASSWORD')) 

162 

163 elif args.op == 'reset-keys': 

164 if len(args.args) > 0: 

165 parser.error('too many arguments') 

166 dtuf_master.reset_keys(environ.get('DTUF_ROOT_KEY_PASSWORD'), 

167 environ.get('DTUF_TARGETS_KEY_PASSWORD'), 

168 environ.get('DTUF_SNAPSHOT_KEY_PASSWORD'), 

169 environ.get('DTUF_TIMESTAMP_KEY_PASSWORD')) 

170 

171 elif args.op == 'push-target': 

172 if len(args.args) < 2: 

173 parser.error('too few arguments') 

174 dtuf_master.push_target(args.args[0], 

175 *args.args[1:], 

176 progress=progress) 

177 

178 elif args.op == 'del-target': 

179 for name in args.args: 179 ↛ exitline 179 didn't return from function '_doit', because the loop on line 179 didn't complete

180 dtuf_master.del_target(name) 

181 

182 elif args.op == 'push-metadata': 

183 if len(args.args) > 0: 

184 parser.error('too many arguments') 

185 dtuf_master.push_metadata(environ.get('DTUF_TARGETS_KEY_PASSWORD'), 

186 environ.get('DTUF_SNAPSHOT_KEY_PASSWORD'), 

187 environ.get('DTUF_TIMESTAMP_KEY_PASSWORD'), 

188 progress) 

189 

190 elif args.op == 'list-master-targets': 

191 if len(args.args) > 0: 

192 parser.error('too many arguments') 

193 for name in dtuf_master.list_targets(): 

194 print(name) 

195 

196 elif args.op == 'get-master-expirations': 

197 if len(args.args) > 0: 

198 parser.error('too many arguments') 

199 for role, expiry in dtuf_master.get_expirations().items(): 

200 print(role + ': ' + expiry.isoformat()) 

201 

202 elif args.op == 'pull-metadata': 

203 if len(args.args) > 1: 

204 parser.error('too many arguments') 

205 root_public_key = None 

206 if len(args.args) == 1: 

207 if args.args[0] == '-': 

208 root_public_key = sys.stdin.read() 

209 else: 

210 with open(args.args[0], 'rb') as f: 

211 root_public_key = f.read().decode('utf-8') 

212 for name in dtuf_copy.pull_metadata(root_public_key, progress): 

213 print(name) 

214 

215 elif args.op == 'pull-target': 

216 _stdout = getattr(sys.stdout, 'buffer', sys.stdout) 

217 for name in args.args: 

218 for it, dgst, size in dtuf_copy.pull_target(name, True): 

219 if environ.get('DTUF_BLOB_INFO') == '1': 

220 print(dgst + ' ' + str(size)) 

221 # pylint: disable=protected-access 

222 dtuf._write_with_progress(it, dgst, size, _stdout, progress) 

223 

224 elif args.op == 'blob-sizes': 

225 for name in args.args: 

226 for size in dtuf_copy.blob_sizes(name): 

227 print(size) 

228 

229 elif args.op == 'check-target': 

230 if len(args.args) < 2: 

231 parser.error('too few arguments') 

232 dtuf_copy.check_target(args.args[0], *args.args[1:]) 

233 

234 elif args.op == 'list-copy-targets': 

235 if len(args.args) > 0: 

236 parser.error('too many arguments') 

237 for name in dtuf_copy.list_targets(): 

238 print(name) 

239 

240 elif args.op == 'get-copy-expirations': 

241 if len(args.args) > 0: 

242 parser.error('too many arguments') 

243 for role, expiry in dtuf_copy.get_expirations().items(): 

244 print(role + ': ' + expiry.isoformat()) 

245 

246 elif args.op == 'list-repos': 246 ↛ exitline 246 didn't return from function '_doit', because the condition on line 246 was never false

247 for name in dtuf_base.list_repos(): 

248 print(name) 

249 

250 try: 

251 _doit() 

252 return 0 

253 except dxf.exceptions.DXFUnauthorizedError: 253 ↛ 254line 253 didn't jump to line 254, because the exception caught by line 253 didn't happen

254 return access_denied() 

255 except tuf.exceptions.NoWorkingMirrorError as ex: 

256 for ex2 in ex.mirror_errors.values(): 

257 if isinstance(ex2, dxf.exceptions.DXFUnauthorizedError): 

258 return access_denied() 

259 raise 

260 

261def main(): 

262 sys.exit(doit(sys.argv[1:], os.environ))