This file is indexed.

/usr/lib/python2.7/dist-packages/rekall/plugins/windows/malware/impscan.py is in python-rekall-core 1.6.0+dfsg-2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
# Rekall Memory Forensics
#
# Copyright (c) 2010 - 2012 Michael Ligh <michael.ligh@mnin.org>
# Copyright 2013 Google Inc. All Rights Reserved.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or (at
# your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#

from rekall import plugin
from rekall import obj
from rekall import testlib

from rekall.plugins.overlays.windows import pe_vtypes
from rekall.plugins.windows import common


class ImpScan(common.WinProcessFilter):
    """Scan for calls to imported functions."""

    __name = "impscan"

    FORWARDED_IMPORTS = {
        "RtlGetLastWin32Error" : "kernel32.dll!GetLastError",
        "RtlSetLastWin32Error" : "kernel32.dll!SetLastError",
        "RtlRestoreLastWin32Error" : "kernel32.dll!SetLastError",
        "RtlAllocateHeap" : "kernel32.dll!HeapAlloc",
        "RtlReAllocateHeap" : "kernel32.dll!HeapReAlloc",
        "RtlFreeHeap" : "kernel32.dll!HeapFree",
        "RtlEnterCriticalSection" : "kernel32.dll!EnterCriticalSection",
        "RtlLeaveCriticalSection" : "kernel32.dll!LeaveCriticalSection",
        "RtlDeleteCriticalSection" : "kernel32.dll!DeleteCriticalSection",
        "RtlZeroMemory" : "kernel32.dll!ZeroMemory",
        "RtlSizeHeap" : "kernel32.dll!HeapSize",
        "RtlUnwind" : "kernel32.dll!RtlUnwind",
        }

    @classmethod
    def args(cls, parser):
        """Declare the command line args we need."""
        super(ImpScan, cls).args(parser)
        parser.add_argument("-b", "--base", default=None, type="IntParser",
                            help="Base address in process memory if --pid is "
                            "supplied, otherwise an address in kernel space")

        parser.add_argument("-s", "--size", default=None, type="IntParser",
                            help="Size of memory to scan")

        parser.add_argument("-k", "--kernel", default=None, type="Boolean",
                            help="Scan in kernel space.")

    def __init__(self, base=None, size=None, kernel=None, **kwargs):
        """Scans the imports from a module.

        Often when dumping a PE executable from memory the import address tables
        are over written. This makes it hard to resolve function names when
        disassembling the binary.

        This plugin enumerates all dlls in the process address space and
        examines their export address tables. It then disassembles the
        executable code for calls to external functions. We attempt to resolve
        the names of the calls using the known exported functions we gathered in
        step 1.

        This technique can be used for a process, or the kernel itself. In the
        former case, we examine dlls, while in the later case we examine kernel
        modules using the modules plugin.

        Args:

          base: Start disassembling at this address - this is normally the base
            address of the dll or module we care about. If omitted we use the
            kernel base (if in kernel mode) or the main executable (if in
            process mode).

          size: Disassemble this many bytes from the address space. If omitted
            we use the module which starts at base.

          kernel: The mode to use. If set, we operate in kernel mode.
        """
        super(ImpScan, self).__init__(**kwargs)
        self.base = base
        self.size = size
        self.kernel = kernel

    def _enum_apis(self, all_mods):
        """Enumerate all exported functions from kernel
        or process space.

        @param all_mods: list of _LDR_DATA_TABLE_ENTRY

        To enum kernel APIs, all_mods is a list of drivers.
        To enum process APIs, all_mods is a list of DLLs.

        The function name is used if available, otherwise
        we take the ordinal value.
        """
        exports = {}

        for i, mod in enumerate(all_mods):
            self.session.report_progress("Scanning imports %s/%s" % (
                i, len(all_mods)))

            pe = pe_vtypes.PE(address_space=mod.obj_vm,
                              session=self.session, image_base=mod.DllBase)

            for _, func_pointer, func_name, ordinal in pe.ExportDirectory():
                function_name = func_name or ordinal or ''

                exports[func_pointer.v()] = (mod, func_pointer, function_name)

        return exports

    def _iat_scan(self, addr_space, calls_imported, apis, base_address,
                  end_address):
        """Scan forward from the lowest IAT entry found for new import entries.

        Args:
          addr_space: an AS
          calls_imported: Import database - a dict.
          apis: dictionary of exported functions in the AS.
          base_address: memory base address for this module.
          end_address: end of valid address range.
        """
        if not calls_imported:
            return

        # Search the iat from the earliest function address to the latest
        # address for references to other functions.
        start_addr = min(calls_imported.keys())
        iat_size = min(max(calls_imported.keys()) - start_addr, 2000)

        # The IAT is a table of pointers to functions.
        iat = self.profile.Array(
            offset=start_addr, vm=addr_space, target="Pointer",
            count=iat_size, target_args=dict(target="Function"))

        for func_pointer in iat:
            func = func_pointer.dereference()

            if (not func or
                    (func.obj_offset > base_address and
                     func.obj_offset < end_address)): # skip call to self
                continue

            # Add the export to our database of imported calls.
            if (func.obj_offset in apis and
                    func_pointer.obj_offset not in calls_imported):
                iat_addr = func_pointer.obj_offset
                calls_imported[iat_addr] = (iat_addr, func)

    def _original_import(self, mod_name, func_name):
        """Revert a forwarded import to the original module
        and function name.

        @param mod_name: current module name
        @param func_name: current function name
        """

        if func_name in self.FORWARDED_IMPORTS:
            return self.FORWARDED_IMPORTS[func_name].split("!")
        else:
            return mod_name, func_name

    CALL_RULE = {'mnemonic': 'CALL', 'operands': [
        {'type': 'MEM', 'target': "$target", 'address': '$address'}]}
    JMP_RULE = {'mnemonic': 'JMP', 'operands': [
        {'type': 'MEM', 'target': "$target", 'address': '$address'}]}

    def call_scan(self, addr_space, base_address, size_to_read):
        """Locate calls in a block of code.

        Disassemble a block of data and yield possible calls to imported
        functions.  We're looking for instructions such as these:

        x86:
        CALL DWORD [0x1000400]
        JMP  DWORD [0x1000400]

        x64:
        CALL QWORD [RIP+0x989d]

        On x86, the 0x1000400 address is an entry in the IAT or call table. It
        stores a DWORD which is the location of the API function being called.

        On x64, the 0x989d is a relative offset from the current instruction
        (RIP).

        So we simply disassemble the entire code section of the executable
        looking for calls, then we collect all the targets of the calls.

        @param addr_space: an AS to scan with
        @param base_address: memory base address
        @param data: buffer of data found at base_address

        """
        func_obj = self.profile.Function(vm=addr_space, offset=base_address)
        end_address = base_address + size_to_read

        for instruction in func_obj.disassemble(2**32):
            if instruction.address > end_address:
                break

            context = {}
            if (instruction.match_rule(self.CALL_RULE, context) or
                    instruction.match_rule(self.JMP_RULE, context)):
                target = context.get("$target")
                if target:
                    yield (instruction.address,
                           context.get("$address"),
                           self.profile.Function(vm=addr_space, offset=target))

    def find_process_imports(self, task):
        task_space = task.get_process_address_space()
        all_mods = list(task.get_load_modules())

        # Exported function of all other modules in the address space.
        apis = self._enum_apis(all_mods)

        # PEB is paged or no DLLs loaded
        if not all_mods:
            self.session.logging.error("Cannot load DLLs in process AS")
            return

        # Its OK to blindly take the 0th element because the executable is
        # always the first module to load.
        base_address = int(all_mods[0].DllBase)
        size_to_read = int(all_mods[0].SizeOfImage)

        calls_imported = {}
        for address, iat, destination in self.call_scan(
                task_space, base_address, size_to_read):
            self.session.report_progress("Resolving import %s->%s" % (
                address, iat))
            calls_imported[iat] = (address, destination)

        # Scan the IAT for additional functions.
        self._iat_scan(task_space, calls_imported, apis,
                       base_address, base_address + size_to_read)

        for iat, (_, func_pointer) in sorted(calls_imported.iteritems()):
            tmp = apis.get(func_pointer.obj_offset)
            if tmp:
                module, func_pointer, func_name = tmp
                yield iat, func_pointer, module, func_name

    def find_kernel_import(self):
        # If the user has not specified the base, we just use the kernel's
        # image.
        base_address = self.base
        if base_address is None:
            base_address = self.session.GetParameter("kernel_base")

        # Get the size from the module list if its not supplied
        size_to_read = self.size
        if not size_to_read:
            modlist = self.session.plugins.modules()
            for module in modlist.lsmod():
                if module.DllBase == base_address:
                    size_to_read = module.SizeOfImage
                    break

        if not size_to_read:
            raise plugin.PluginError("You must specify a size to read.")

        all_mods = list(modlist.lsmod())
        apis = self._enum_apis(all_mods)

        calls_imported = {}
        for address, iat, destination in self.call_scan(
                self.kernel_address_space, base_address, size_to_read):
            calls_imported[iat] = (address, destination)
            self.session.report_progress(
                "Found %s imports" % len(calls_imported))

        # Scan the IAT for additional functions.
        self._iat_scan(self.kernel_address_space, calls_imported, apis,
                       base_address, size_to_read)

        for iat, (address, func_pointer) in sorted(calls_imported.items()):
            module, func_pointer, func_name = apis.get(func_pointer.v(), (
                obj.NoneObject("Unknown"),
                obj.NoneObject("Unknown"),
                obj.NoneObject("Unknown")))

            yield iat, func_pointer, module, func_name

    def render(self, renderer):
        table_header = [("IAT", 'iat', "[addrpad]"),
                        ("Call", 'call', "[addrpad]"),
                        ("Module", 'moduole', "20"),
                        ("Function", 'function', ""),
                       ]

        if self.kernel:
            renderer.format("Kernel Imports\n")

            renderer.table_header(table_header)
            for iat, func, mod, func_name in self.find_kernel_import():
                mod_name, func_name = self._original_import(
                    mod.BaseDllName, func_name)

                renderer.table_row(iat, func, mod_name, func_name)
        else:
            for task in self.filter_processes():
                renderer.section()
                renderer.format("Process {0} PID {1}\n", task.ImageFileName,
                                task.UniqueProcessId)
                renderer.table_header(table_header)

                for iat, func, mod, func_name in self.find_process_imports(
                        task):
                    mod_name, func_name = self._original_import(
                        mod.BaseDllName, func_name)
                    renderer.table_row(iat, func, mod_name, func_name)


class TestImpScan(testlib.SimpleTestCase):
    """Test the impscan module."""

    PARAMETERS = dict(commandline="impscan %(pids)s")