This file is indexed.

/usr/share/pyshared/landscape/user/management.py is in landscape-common 12.04.3-0ubuntu1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# XXX: There is the potential for some sort of "unixadmin" package
# which wraps up the commands which we use in this module in a Python
# API, with thorough usage of exceptions and such, instead of pipes to
# subprocesses. liboobs (i.e. System Tools) is a possibility, and has
# documentation now in the 2.17 series, but is not wrapped to Python.

import os
import logging
import subprocess

from landscape.lib import md5crypt
from landscape.user.provider import UserManagementError, UserProvider


class UserManagement(object):
    """Manage system users and groups."""

    def __init__(self, provider=None):
        self._provider = provider or UserProvider()

    def add_user(self, username, name, password, require_password_reset,
                 primary_group_name, location, work_phone, home_phone):
        """Add C{username} to the computer.

        @raises UserManagementError: Raised when C{adduser} fails.
        @raises UserManagementError: Raised when C{passwd} fails.
        """
        logging.info("Adding user %s.", username)
        gecos = "%s,%s,%s,%s" % (name, location or "", work_phone or "",
                                 home_phone or "")
        command = ["adduser", username, "--disabled-password", "--gecos",
                   gecos]
        if primary_group_name:
            command.extend(["--gid", str(self._provider.get_gid(
                primary_group_name))])
        result, output = self.call_popen(command)
        if result != 0:
            raise UserManagementError("Error adding user %s.\n%s" %
                                      (username, output))

        self._set_password(username, password)
        if require_password_reset:
            result, new_output = self.call_popen(["passwd", username, "-e"])
            if result !=0:
                raise UserManagementError("Error resetting password for user "
                                          "%s.\n%s" % (username, new_output))
            else:
                output += new_output
        return output

    def _set_password(self, username, password):
        # XXX temporary workaround? We're getting unicode here.
        username = username.encode("ascii")
        password = password.encode("ascii")
        salt = os.urandom(6).encode("base64")[:-1]
        crypted = md5crypt.md5crypt(password, salt)
        result, output = self.call_popen(["usermod", "-p", crypted, username])
        if result != 0:
            raise UserManagementError("Error setting password for user "
                                      "%s.\n%s" % (username, output))
        return output

    def _set_primary_group(self, username, groupname):
        primary_gid = self._provider.get_gid(groupname)
        command = ["usermod", "-g", str(primary_gid), username]
        result, output = self.call_popen(command)
        if result != 0:
            raise UserManagementError("Error setting primary group to %d for"
                                      "%s.\n%s" % (primary_gid, username,
                                                   output))
        return output

    def set_user_details(self, username, password=None, name=None,
                         location=None, work_number=None, home_number=None,
                         primary_group_name=None):
        """Update details for the account matching C{uid}."""
        uid = self._provider.get_uid(username)
        logging.info("Updating metadata for user %s (UID %d).", username, uid)
        if password:
            self._set_password(username, password)

        if primary_group_name:
            self._set_primary_group(username, primary_group_name)

        command = ["chfn"]
        for option, value in [("-r", location), ("-f", name),
                              ("-w", work_number), ("-h", home_number)]:
            if value is not None:
                command += [option, value]

        if len(command) > 1:
            result, output = self.call_popen(command + [username])
            if result != 0:
                raise UserManagementError("Error setting details for user "
                                          "%s.\n%s" % (username, output))
            return output

    def lock_user(self, username):
        """
        Lock the account matching C{username} to prevent them from logging in.
        """
        uid = self._provider.get_uid(username)
        logging.info("Locking out user %s (UID %d).", username, uid)
        result, output = self.call_popen(["usermod", "-L", username])
        if result != 0:
            raise UserManagementError("Error locking user %s.\n%s"
                                     % (username, output))

    def unlock_user(self, username):
        """Unlock the account matching C{username}."""
        uid = self._provider.get_uid(username)
        logging.info("Unlocking user %s (UID %d).", username, uid)

        result, output = self.call_popen(["usermod", "-U", username])
        if result != 0:
            raise UserManagementError("Error unlocking user %s.\n%s"
                                      % (username, output))
        return output

    def remove_user(self, username, delete_home=False):
        """Remove the account matching C{username} from the computer."""
        uid = self._provider.get_uid(username)
        command = ["deluser", username]
        if delete_home:
            logging.info("Removing user %s (UID %d) and deleting their home "
                         "directory.", username, uid)
            command.append("--remove-home")
        else:
            logging.info("Removing user %s (UID %d) without deleting their "
                         "home directory.", username, uid)

        result, output = self.call_popen(command)
        if result != 0:
            raise UserManagementError("Error removing user %s (UID %d).\n%s"
                                      % (username, uid, output))
        return output

    def add_group(self, groupname):
        """Add C{group} with the C{addgroup} system command."""
        logging.info("Adding group %s.", groupname)
        result, output = self.call_popen(["addgroup", groupname])
        if result != 0:
            raise UserManagementError("Error adding group %s.\n%s" %
                                      (groupname, output))
        return output

    def set_group_details(self, groupname, new_name):
        """Update details for the group matching C{gid}."""
        gid = self._provider.get_gid(groupname)
        logging.info("Renaming group %s (GID %d) to %s.",
                     groupname, gid, new_name)
        command = ["groupmod", "-n", new_name, groupname]
        result, output = self.call_popen(command)
        if result != 0:
            raise UserManagementError("Error renaming group %s (GID %d) to "
                                       "%s.\n%s" % (groupname, gid, new_name,
                                                    output))
        return output

    def add_group_member(self, username, groupname):
        """
        Add the user matching C{username} to the group matching C{groupname}
        with the C{gpasswd} system command.
        """
        uid = self._provider.get_uid(username)
        gid = self._provider.get_gid(groupname)
        logging.info("Adding user %s (UID %d) to group %s (GID %d).",
                     username, uid, groupname, gid)
        result, output = self.call_popen(["gpasswd", "-a", username, groupname])
        if result != 0:
            raise UserManagementError("Error adding user %s (UID %d) to "
                                      "group %s (GID %d).\n%s" %
                                      (username, uid, groupname, gid, output))
        return output

    def remove_group_member(self, username, groupname):
        """
        Remove the user matching C{username} from the group matching
        C{groupname} with the C{gpasswd} system command.
        """
        uid = self._provider.get_uid(username)
        gid = self._provider.get_gid(groupname)
        logging.info("Removing user %s (UID %d) from group %s (GID %d).",
                     username, uid, groupname, gid)
        result, output = self.call_popen(["gpasswd", "-d", username, groupname])
        if result != 0:
            raise UserManagementError("Error removing user %s (UID %d) "
                                      "from group %s (GID (%d).\n%s"
                                      % (username, uid, groupname,
                                         gid, output))
        return output

    def remove_group(self, groupname):
        """Remove the account matching C{groupname} from the computer."""
        gid = self._provider.get_gid(groupname)
        logging.info("Removing group %s (GID %d).", groupname, gid)
        result, output = self.call_popen(["groupdel", groupname])
        if result != 0:
            raise UserManagementError("Error removing group %s (GID %d).\n%s"
                                   % (groupname, gid, output))
        return output

    def call_popen(self, args):
        popen = self._provider.popen(args, stdout=subprocess.PIPE,
                                           stderr=subprocess.STDOUT)
        output = popen.stdout.read()
        result = popen.wait()
        return result, output