/usr/lib/python3/dist-packages/subuserlib/resolve.py is in subuser 0.6.1-3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 | # -*- coding: utf-8 -*-
"""
This module is used to parse the image source identifiers used to identify image sources during instalation. For more information see the image-source-identifiers section of the subuser standard.
"""
#external imports
import os
#internal imports
from subuserlib.classes.repository import Repository
def resolveImageSource(user,imageSourcePath,contextRepository=None,allowLocalRepositories=True):
"""
From a image source identifier path return a ImageSource object.
>>> import os
>>> from subuserlib.classes.user import User
>>> user = User()
Usually, the syntax is image-name@repository-name.
>>> print(resolveImageSource(user,"foo@default").getName()) # doctest: +ELLIPSIS
Initial commit.
Cloning repository default from ...
foo
If there is no @, then we assume that the repository is the contextRepository. The default contextRepository is the "default" repository.
>>> print(resolveImageSource(user,"foo").getName())
foo
If the repository identifier is a URI and a repository with the same URI already exists, then the URI is resolved to the name of the existing repository. Otherwise, a temporary repository is created.
>>> print(resolveImageSource(user,"bar@file://"+os.getcwd()+"/test-repos/remote-test-repo").getName()) # doctest: +ELLIPSIS
Cloning repository 0 from file://...
Adding new temporary repository file://...
bar
If the repository identifier is a path to a folder on the local machine and a repository pointing to this folder already exists, then the identifier is resolved to the name of the existing repository. Otherwise, a temporary repository is created.
>>> print(resolveImageSource(user,"bar@"+os.getcwd()+"/test-repos/remote-test-repo").getName()) # doctest: +ELLIPSIS
Adding new temporary repository ...
bar
Throws an Key error:
- If the repository does not exist
- If the image is not in the repository
>>> try:
... resolveImageSource(user,"non-existant@default")
... except KeyError:
... print("KeyError")
KeyError
"""
if not contextRepository:
contextRepository = user.getRegistry().getRepositories()["default"]
splitImageIdentifier = imageSourcePath.split("@",1)
imageName = splitImageIdentifier[0]
# For identifiers of the format:
# "foo"
if len(splitImageIdentifier)==1:
repository = contextRepository
else:
repository = resolveRepository(user,splitImageIdentifier[1],allowLocalRepositories=allowLocalRepositories)
try:
return repository[imageName]
except KeyError:
raise KeyError(imageName + " could not be found in the repository. The following images exist in the repository: \"" + "\" \"".join(repository.keys())+"\"")
def resolveRepository(user,repoIdentifier,allowLocalRepositories=True):
# "./"
if repoIdentifier == "./":
if allowLocalRepositories:
return getRepositoryFromURIOrPath(user,os.environ["PWD"])
else:
raise ResolutionError("Error when resolving repository identifier "+repoIdentifier+". Refering to local repositories is forbidden in this context.")
# "/home/timothy/subuser-repo"
elif repoIdentifier.startswith("/"):
if allowLocalRepositories:
return getRepositoryFromURIOrPath(user,repoIdentifier)
else:
raise ResolutionError("Error when resolving repository identifier "+repoIdentifier+". Refering to local repositories is forbidden in this context.")
# "file:///home/timothy/subuser-repo"
elif repoIdentifier.startswith("file:///"):
if allowLocalRepositories:
return getRepositoryFromURIOrPath(user,repoIdentifier)
else:
raise ResolutionError("Error when resolving repository identifier "+repoIdentifier+". Refering to local repositories is forbidden in this context.")
# "https://github.com/subuser-security/some-repo.git"
elif repoIdentifier.startswith("https://") or repoIdentifier.startswith("http://"):
return getRepositoryFromURIOrPath(user,repoIdentifier)
# "bar"
elif not ":" in repoIdentifier and not "/" in repoIdentifier:
if allowLocalRepositories or repoIdentifier == "default":
return user.getRegistry().getRepositories()[repoIdentifier]
else:
raise ResolutionError("Error when resolving repository identifier "+repoIdentifier+". Refering to repositories by name is forbidden in this context.")
else:
raise ResolutionError("Error when resolving repository identifier "+repoIdentifier+".")
def lookupRepositoryByURI(user,uri):
"""
If a repository with this URI exists, return that repository. Otherwise, return None.
"""
if uri == "./":
uri = os.environ["PWD"]
for _,repository in user.getRegistry().getRepositories().items():
if uri == repository.getURI():
return repository
return None
def lookupRepositoryByPath(user,path):
"""
If a repository with this path exists, return that repository. Otherwise, return None.
"""
for _,repository in user.getRegistry().getRepositories().items():
if repository.isLocal() and path == repository.getRepoPath():
return repository
return None
def lookupRepositoryByURIOrPath(user,uriOrPath):
if uriOrPath.startswith("/"):
return lookupRepositoryByPath(user,uriOrPath)
else:
return lookupRepositoryByURI(user,uriOrPath)
def getRepositoryFromURI(user,uri):
"""
Either return the repository who's URI is equal to the given URI or return a new temporary repository with that URI.
"""
#First check if a repository with this URI already exists
repository = lookupRepositoryByURI(user,uri)
if repository:
return repository
# If it doesn't, create a new repo and return it.
newTempRepo = Repository(user=user,name=user.getRegistry().getRepositories().getNewUniqueTempRepoId(),gitOriginURI=uri,gitCommitHash="master",temporary=True)
if newTempRepo.isPresent():
user.getRegistry().getRepositories().addRepository(newTempRepo)
return newTempRepo
else:
raise ResolutionError("Repo at "+uri+" does not exist.")
def getRepositoryFromPath(user,path):
repository = lookupRepositoryByPath(user,path)
if repository:
return repository
else:
# If it doesn't, create a new repo and return it.
newTempRepo = Repository(user=user,name=user.getRegistry().getRepositories().getNewUniqueTempRepoId(),temporary=True,sourceDir=path)
if newTempRepo.isPresent():
user.getRegistry().getRepositories().addRepository(newTempRepo)
return newTempRepo
else:
raise ResolutionError("Repo at "+path+" does not exist.")
def getRepositoryFromURIOrPath(user,uriOrPath):
if uriOrPath.startswith("/"):
return getRepositoryFromPath(user,uriOrPath)
else:
return getRepositoryFromURI(user,uriOrPath)
class ResolutionError(Exception):
pass
|