This file is indexed.

/usr/share/pyshared/logsparser/tests/test_lognormalizer.py is in python-logsparser 0.4-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# -*- python -*-

# pylogsparser - Logs parsers python library
#
# Copyright (C) 2011 Wallix Inc.
#
# This library is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by the
# Free Software Foundation; either version 2.1 of the License, or (at your
# option) any later version.
#
# This library is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this library; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#

import os
import unittest
import tempfile
import shutil
from logsparser.lognormalizer import LogNormalizer
from lxml.etree import parse, fromstring as XMLfromstring

class Test(unittest.TestCase):
    """Unit tests for logsparser.lognormalizer"""
    normalizer_path = os.environ['NORMALIZERS_PATH']

    def test_000_invalid_paths(self):
        """Verify that we cannot instanciate LogNormalizer on invalid paths"""
        def bleh(paths):
            n = LogNormalizer(paths)
            return n
        self.assertRaises(ValueError, bleh, [self.normalizer_path, "/path/to/nowhere"])
        self.assertRaises(ValueError, bleh, ["/path/to/nowhere",])
        self.assertRaises(StandardError, bleh, ["/usr/bin/",])
    
    def test_001_all_normalizers_activated(self):
        """ Verify that we have all normalizer
        activated when we instanciate LogNormalizer with
        an activate dict empty.
        """
        ln = LogNormalizer(self.normalizer_path)
        self.assertTrue(len(ln))
        self.assertEqual(len([an[0] for an in ln.get_active_normalizers() if an[1]]), len(ln))
        self.assertEqual(len(ln._cache), len(ln))

    def test_002_deactivate_normalizer(self):
        """ Verify that normalizer deactivation is working.
        """
        ln = LogNormalizer(self.normalizer_path)
        active_n = ln.get_active_normalizers()
        to_deactivate = active_n.keys()[:2]
        for to_d in to_deactivate:
            del active_n[to_d]
        ln.set_active_normalizers(active_n)
        ln.reload()
        self.assertEqual(len([an[0] for an in ln.get_active_normalizers().items() if an[1]]), len(ln)-2)
        self.assertEqual(len(ln._cache), len(ln)-2)

    def test_003_activate_normalizer(self):
        """ Verify that normalizer activation is working.
        """
        ln = LogNormalizer(self.normalizer_path)
        active_n = ln.get_active_normalizers()
        to_deactivate = active_n.keys()[0]
        to_activate = to_deactivate
        del active_n[to_deactivate]
        ln.set_active_normalizers(active_n)
        ln.reload()
        # now deactivation should be done so reactivate
        active_n[to_activate] = True
        ln.set_active_normalizers(active_n)
        ln.reload()
        self.assertEqual(len([an[0] for an in ln.get_active_normalizers() if an[1]]), len(ln))
        self.assertEqual(len(ln._cache), len(ln))

    def test_004_normalizer_uuid(self):
        """ Verify that we get at least uuid tag
        """
        testlog = {'raw': 'a minimal log line'}
        ln = LogNormalizer(self.normalizer_path)
        ln.lognormalize(testlog)
        self.assertTrue('uuid' in testlog.keys())

    def test_005_normalizer_test_a_syslog_log(self):
        """ Verify that lognormalizer extracts
        syslog header as tags
        """
        testlog = {'raw': 'Jul 18 08:55:35 naruto app[3245]: body message'}
        ln = LogNormalizer(self.normalizer_path)
        ln.lognormalize(testlog)
        self.assertTrue('uuid' in testlog.keys())
        self.assertTrue('date' in testlog.keys())
        self.assertEqual(testlog['body'], 'body message')
        self.assertEqual(testlog['program'], 'app')
        self.assertEqual(testlog['pid'], '3245')

    def test_006_normalizer_test_a_syslog_log_with_syslog_deactivate(self):
        """ Verify that lognormalizer does not extract
        syslog header as tags when syslog normalizer is deactivated.
        """
        testlog = {'raw': 'Jul 18 08:55:35 naruto app[3245]: body message'}
        ln = LogNormalizer(self.normalizer_path)
        active_n = ln.get_active_normalizers()
        to_deactivate = [n for n in active_n.keys() if n.find('syslog') >= 0]
        for n in to_deactivate:
            del active_n[n]
        ln.set_active_normalizers(active_n)
        ln.reload()
        ln.lognormalize(testlog)
        self.assertTrue('uuid' in testlog.keys())
        self.assertFalse('date' in testlog.keys())
        self.assertFalse('program' in testlog.keys())

    def test_007_normalizer_getsource(self):
        """ Verify we can retreive XML source
        of a normalizer.
        """
        ln = LogNormalizer(self.normalizer_path)
        source = ln.get_normalizer_source('syslog-0.99')
        self.assertEquals(XMLfromstring(source).getroottree().getroot().get('name'), 'syslog')

    def test_008_normalizer_multiple_paths(self):
        """ Verify we can can deal with multiple normalizer paths.
        """
        fdir = tempfile.mkdtemp()
        sdir = tempfile.mkdtemp()
        for f in os.listdir(self.normalizer_path):
            path_f = os.path.join(self.normalizer_path, f)
            if os.path.isfile(path_f):
                shutil.copyfile(path_f, os.path.join(fdir, f))
        shutil.move(os.path.join(fdir, 'postfix.xml'), 
                    os.path.join(sdir, 'postfix.xml'))
        ln = LogNormalizer([fdir, sdir])
        source = ln.get_normalizer_source('postfix-0.99')
        self.assertEquals(XMLfromstring(source).getroottree().getroot().get('name'), 'postfix')
        self.assertTrue(ln.get_normalizer_path('postfix-0.99').startswith(sdir))
        self.assertTrue(ln.get_normalizer_path('syslog-0.99').startswith(fdir))
        xml_src = ln.get_normalizer_source('syslog-0.99')
        os.unlink(os.path.join(fdir, 'syslog.xml'))
        ln.reload()
        self.assertRaises(ValueError, ln.get_normalizer_path, 'syslog-0.99')
        ln.update_normalizer(xml_src, dir_path = sdir)
        self.assertTrue(ln.get_normalizer_path('syslog-0.99').startswith(sdir))
        shutil.rmtree(fdir)
        shutil.rmtree(sdir)

    def test_009_normalizer_multiple_version(self):
        """ Verify we can can deal with a normalizer with more than one version.
        """
        fdir = tempfile.mkdtemp()
        shutil.copyfile(os.path.join(self.normalizer_path, 'postfix.xml'),
                        os.path.join(fdir, 'postfix.xml'))
        # Change normalizer version in fdir path
        xml = parse(os.path.join(fdir, 'postfix.xml'))
        xmln = xml.getroot()
        xmln.set('version', '1.0')
        xml.write(os.path.join(fdir, 'postfix.xml'))
        ln = LogNormalizer([self.normalizer_path, fdir])
        self.assertEquals(XMLfromstring(ln.get_normalizer_source('postfix-0.99')).getroottree().getroot().get('version'), '0.99')
        self.assertEquals(XMLfromstring(ln.get_normalizer_source('postfix-1.0')).getroottree().getroot().get('version'), '1.0')
        shutil.rmtree(fdir)

if __name__ == "__main__":
    unittest.main()