/usr/lib/python3/dist-packages/cookiecutter/vcs.py is in python3-cookiecutter 1.5.0-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 | # -*- coding: utf-8 -*-
"""Helper functions for working with version control systems."""
from __future__ import unicode_literals
import logging
import os
import subprocess
import sys
from whichcraft import which
from .exceptions import (
RepositoryNotFound, RepositoryCloneFailed, UnknownRepoType, VCSNotInstalled
)
from .prompt import read_user_yes_no
from .utils import make_sure_path_exists, rmtree
logger = logging.getLogger(__name__)
BRANCH_ERRORS = [
'error: pathspec',
'unknown revision',
]
def prompt_and_delete_repo(repo_dir, no_input=False):
"""Ask the user whether it's okay to delete the previously-cloned repo.
If yes, deletes it. Otherwise, Cookiecutter exits.
:param repo_dir: Directory of previously-cloned repo.
:param no_input: Suppress prompt to delete repo and just delete it.
"""
# Suppress prompt if called via API
if no_input:
ok_to_delete = True
else:
question = (
"You've cloned {} before. "
"Is it okay to delete and re-clone it?"
).format(repo_dir)
ok_to_delete = read_user_yes_no(question, 'yes')
if ok_to_delete:
rmtree(repo_dir)
else:
sys.exit()
def identify_repo(repo_url):
"""Determine if `repo_url` should be treated as a URL to a git or hg repo.
Repos can be identified by prepending "hg+" or "git+" to the repo URL.
:param repo_url: Repo URL of unknown type.
:returns: ('git', repo_url), ('hg', repo_url), or None.
"""
repo_url_values = repo_url.split('+')
if len(repo_url_values) == 2:
repo_type = repo_url_values[0]
if repo_type in ["git", "hg"]:
return repo_type, repo_url_values[1]
else:
raise UnknownRepoType
else:
if 'git' in repo_url:
return 'git', repo_url
elif 'bitbucket' in repo_url:
return 'hg', repo_url
else:
raise UnknownRepoType
def is_vcs_installed(repo_type):
"""
Check if the version control system for a repo type is installed.
:param repo_type:
"""
return bool(which(repo_type))
def clone(repo_url, checkout=None, clone_to_dir='.', no_input=False):
"""Clone a repo to the current directory.
:param repo_url: Repo URL of unknown type.
:param checkout: The branch, tag or commit ID to checkout after clone.
:param clone_to_dir: The directory to clone to.
Defaults to the current directory.
:param no_input: Suppress all user prompts when calling via API.
"""
# Ensure that clone_to_dir exists
clone_to_dir = os.path.expanduser(clone_to_dir)
make_sure_path_exists(clone_to_dir)
# identify the repo_type
repo_type, repo_url = identify_repo(repo_url)
# check that the appropriate VCS for the repo_type is installed
if not is_vcs_installed(repo_type):
msg = "'{0}' is not installed.".format(repo_type)
raise VCSNotInstalled(msg)
repo_url = repo_url.rstrip('/')
tail = os.path.split(repo_url)[1]
if repo_type == 'git':
repo_dir = os.path.normpath(os.path.join(clone_to_dir,
tail.rsplit('.git')[0]))
elif repo_type == 'hg':
repo_dir = os.path.normpath(os.path.join(clone_to_dir, tail))
logger.debug('repo_dir is {0}'.format(repo_dir))
if os.path.isdir(repo_dir):
prompt_and_delete_repo(repo_dir, no_input=no_input)
try:
subprocess.check_output(
[repo_type, 'clone', repo_url],
cwd=clone_to_dir,
stderr=subprocess.STDOUT,
)
if checkout is not None:
subprocess.check_output(
[repo_type, 'checkout', checkout],
cwd=repo_dir,
stderr=subprocess.STDOUT,
)
except subprocess.CalledProcessError as clone_error:
if 'not found' in clone_error.output.lower():
raise RepositoryNotFound(
'The repository {} could not be found, '
'have you made a typo?'.format(repo_url)
)
if any(error in clone_error.output for error in BRANCH_ERRORS):
raise RepositoryCloneFailed(
'The {} branch of repository {} could not found, '
'have you made a typo?'.format(checkout, repo_url)
)
raise
return repo_dir
|