This file is indexed.

/usr/lib/python3/dist-packages/os_brick/remotefs/windows_remotefs.py is in python3-os-brick 2.3.0-0ubuntu1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# Copyright 2016 Cloudbase Solutions Srl
# All Rights Reserved
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

"""Windows remote filesystem client utilities."""

import os
import re

from oslo_log import log as logging

from os_win import utilsfactory

from os_brick import exception
from os_brick.i18n import _
from os_brick.remotefs import remotefs

LOG = logging.getLogger(__name__)


class WindowsRemoteFsClient(remotefs.RemoteFsClient):
    _username_regex = re.compile(r'user(?:name)?=([^, ]+)')
    _password_regex = re.compile(r'pass(?:word)?=([^, ]+)')
    _loopback_share_map = {}

    def __init__(self, mount_type, root_helper=None,
                 execute=None, *args, **kwargs):
        mount_type_to_option_prefix = {
            'cifs': 'smbfs',
            'smbfs': 'smbfs',
        }

        self._local_path_for_loopback = kwargs.get('local_path_for_loopback',
                                                   False)

        if mount_type not in mount_type_to_option_prefix:
            raise exception.ProtocolNotSupported(protocol=mount_type)

        self._mount_type = mount_type
        option_prefix = mount_type_to_option_prefix[mount_type]

        self._mount_base = kwargs.get(option_prefix + '_mount_point_base')
        self._mount_options = kwargs.get(option_prefix + '_mount_options')

        self._smbutils = utilsfactory.get_smbutils()
        self._pathutils = utilsfactory.get_pathutils()

    def get_local_share_path(self, share, expect_existing=True):
        local_share_path = self._smbutils.get_smb_share_path(share)
        if not local_share_path and expect_existing:
            err_msg = _("Could not find the local "
                        "share path for %(share)s.")
            raise exception.VolumePathsNotFound(err_msg % dict(share=share))

        return local_share_path

    def _get_share_norm_path(self, share):
        return share.replace('/', '\\')

    def get_share_name(self, share):
        return self._get_share_norm_path(share).lstrip('\\').split('\\', 1)[1]

    def mount(self, share, flags=None):
        share_norm_path = self._get_share_norm_path(share)
        use_local_path = (self._local_path_for_loopback and
                          self._smbutils.is_local_share(share_norm_path))

        if use_local_path:
            LOG.info("Skipping mounting local share %(share_path)s.",
                     dict(share_path=share_norm_path))
        else:
            mount_options = " ".join(
                [self._mount_options or '', flags or ''])
            username, password = self._parse_credentials(mount_options)

            if not self._smbutils.check_smb_mapping(
                    share_norm_path):
                self._smbutils.mount_smb_share(share_norm_path,
                                               username=username,
                                               password=password)

        if self._mount_base:
            self._create_mount_point(share, use_local_path)

    def unmount(self, share):
        self._smbutils.unmount_smb_share(self._get_share_norm_path(share))

    def _create_mount_point(self, share, use_local_path):
        # The mount point will contain a hash of the share so we're
        # intentionally preserving the original share path as this is
        # what the caller will expect.
        mnt_point = self.get_mount_point(share)
        share_norm_path = self._get_share_norm_path(share)
        share_name = self.get_share_name(share)
        symlink_dest = (share_norm_path if not use_local_path
                        else self.get_local_share_path(share_name))

        if not os.path.isdir(self._mount_base):
            os.makedirs(self._mount_base)

        if os.path.exists(mnt_point):
            if not self._pathutils.is_symlink(mnt_point):
                raise exception.BrickException(_("Link path already exists "
                                                 "and it's not a symlink"))
        else:
            self._pathutils.create_sym_link(mnt_point, symlink_dest)

    def _parse_credentials(self, opts_str):
        if not opts_str:
            return None, None

        match = self._username_regex.findall(opts_str)
        username = match[0] if match and match[0] != 'guest' else None

        match = self._password_regex.findall(opts_str)
        password = match[0] if match else None

        return username, password