This file is indexed.

/usr/share/perl5/Lire/DlfConverterProcess.pm is in lire 2:2.1.1-2.1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
package Lire::DlfConverterProcess;

use strict;

use Carp;
use POSIX qw/ strftime /;

use Lire::PluginManager;
use Lire::Config;
use Lire::DlfQuery;
use Lire::Utils qw/ check_param check_object_param /;
use Lire::I18N qw/ set_fh_encoding /;

=pod

=head1 NAME

Lire::DlfConverterProcess - Object that controls the DLF conversion process

=head1 SYNOPSIS

  use Lire::ImportJob;
  use Lire::DlfStore;
  use Lire::DlfConverterProcess;

  my $src = new Lire::ImportJob( "file", 'pattern' => "/var/log/messages" );
  my $store = Lire::DlfStore->open( "store" );

  my $process = new Lire::DlfConverterProcess( $src, $store );

  $process->run_import_job();

  print "Log lines read: ",      $process->line_count(), "\n";
  print "DLF records created: ", $process->dlf_count,    "\n";
  print "Errors encountered: ",  $process->errors_count, "\n";
  print "Ignored records: ",     $process->ignored_count,"\n";

=head1 DESCRIPTION

This object encapsulates the Lire DLF conversion process. It takes as
parameter a Lire::ImportJob and a Lire::DlfStore. It will setup the
converter and will converter the content of Lire::ImportJob to DLF
which will be saved in the Lire::DlfStore.

The object provides the API to the converter. Methods are also
available to query information on the conversion process.

=head1 new( $job, $store );

Create a Lire::DlfConverterProcess that will be used to import the
log specified in Lire::ImportJob into DLF records which will be stored into
Lire::DlfStore.

=cut

sub new {
    my ( $pkg, $job, $store ) = @_;

    check_object_param( $job, 'job', 'Lire::ImportJob' );
    check_object_param( $store, 'store', 'Lire::DlfStore' );

    return bless { '_job'   => $job,
                   '_store' => $store,
                   '_convert_called' => 0,
                 }, ref $pkg || $pkg;
}

=pod

=head2 run_import_job( [$time] )

Import the log data from ImportJob as DLF. This method will throw an
exception if it is called more than once. The $time parameter will be
used to determine the time window covered by period. It defaults to the
current time.

=cut

sub run_import_job {
    my ( $self, $time ) = @_;

    croak $self->{'_job'}->name(), " ImportJob was already processed"
      if $self->{'_convert_called'};

    $self->{'_job_id'} = $self->{'_job'}->name() . '-' .
      strftime( "%Y%m%d_%H%M%S", localtime )
        unless exists $self->{'_job_id'};

    $self->{'_time_start'} = time;
    $self->_init_counters();
    $self->_init_converter();
    $self->_init_streams();
    $self->{'_fh'} = $self->{'_job'}->log_fh( $time );

    if ( $self->{'_converter'}->handle_log_lines() ) {
        $self->_handle_continuation();
        $self->_process_log_lines( $self->{'_fh'} );
    } else {
        $self->{'_converter'}->process_log_file( $self, $self->{'_fh'} );
    }
    close $self->{'_fh'};

    $self->{'_converter'}->finish_conversion( $self );

    # Close the dlf streams
    foreach my $s ( values %{$self->{'_streams'}} ) {
        $s->close();
    }
    $self->{'_log_stream'}->close();

    $self->_save_import_stats();

    $self->{'_convert_called'} = 1;

    return;
}

sub _init_converter {
    my $self = $_[0];

    my $cname = $self->{'_job'}->converter();
    croak "ImportJob ", $self->{'_job'}->name(), " doesn't have a converter defined"
      unless defined $cname;

    croak "DLF converter $cname isn't available"
      unless Lire::PluginManager->has_plugin( 'dlf_converter', $cname );
    $self->{'_converter'} =
      Lire::PluginManager->get_plugin( 'dlf_converter', $cname );

    $self->{'_converter'}->init_dlf_converter( $self, $self->{'_job'}->converter_config() );

    return;
}

sub _init_streams {
    my $self = $_[0];

    foreach my $s ( $self->{'_converter'}->schemas() ) {
        $self->{'_streams'}{$s} = $self->{'_store'}->open_dlf_stream( $s, "w" );
    }

    $self->{'_log_stream'} =
      $self->{'_store'}->open_dlf_stream( 'lire_import_log', "w" );

    return;
}

sub _init_counters {
    my $self = $_[0];

    $self->{'_line_count'}    = 0;
    $self->{'_dlf_count'}     = 0;
    $self->{'_ignored_count'} = 0;
    $self->{'_error_count'}   = 0;
    $self->{'_saved_count'}   = 0;

    return;
}

sub _handle_continuation {
    my $self = $_[0];

    my $query = new Lire::DlfQuery( 'lire_import_log' );
    $query->add_field( 'line' );
    $query->add_field( 'time' );
    $query->add_field( 'line_no' );
    $query->set_filter_clause( "type = 'continuation' AND job_name = ?",
                               $self->{'_job'}->name() );
    $query->set_sort_spec( 'time line_no' );
    my $result = $query->execute( $self->{'_store'} );
    while ( defined( my $row = $result->next_row_aref() ) ) {
        $self->{'_line_count'}++;
        $self->{'_converter'}->process_log_line( $self, $row->[0] );
    }

    $self->{'_store'}->_dbh()->do( <<EOSQL, {}, $self->{'_job'}->name(), $self->{'_time_start'} );
DELETE FROM dlf_lire_import_log 
      WHERE type='continuation' AND job_name = ? AND time < ?
EOSQL
    return;
}

#------------------------------------------------------------------------
# Method _process_log_lines( $fh )
#
# Method which handles the read a line, process a line loop
sub _process_log_lines {
    my ($self, $fh) = @_;

    my $line;
    my $converter = $self->{'_converter'};
    while (defined ($line = <$fh>)) {
        # Remove DOS and/or UNIX line ending
        $line =~ s/\r?\n?$//;
        $self->{'_line_count'}++;

        $converter->process_log_line( $self, $line )
    }

    return;
}

sub _save_import_stats {
    my $self = $_[0];

    my $elapsed = time - $self->{'_time_start'};
    my $stream = $self->{'_store'}->open_dlf_stream( 'lire_import_stats', "w");
    $stream->write_dlf( { 'time_start' => $self->{'_time_start'},
                          'elapsed' => $elapsed,
                          'job_name' => $self->{'_job'}->name(),
                          'job_id' => $self->{'_job_id'},
                          'line_count' => $self->{'_line_count'},
                          'dlf_count' => $self->{'_dlf_count'},
                          'error_count' => $self->{'_error_count'},
                          'ignored_count' => $self->{'_ignored_count'},
                          'saved_count' => $self->{'_saved_count'},
                        } );
    $stream->close();

    return;
}

=pod

=head2 job_id()

Returns the job identifier associated to this process.

=cut

sub job_id {
    return $_[0]{'_job_id'};
}

=pod

=head2 dlf_store()

Returns the Lire::DlfStore in which this conversion process is storing
the DLF records.

=cut

sub dlf_store { $_[0]{'_store'} }

=pod

=head2 import_job()

Returns the Lire::ImportJob upon which this conversion process is operating.

=cut

sub import_job { $_[0]{'_job'}}

=pod

=head2 line_count()

Returns the number of lines processed. This will 0 in case the DLF
converter process file and not log lines. During a processing, this is
always equals to the line that is currently being converted.

=cut

sub line_count { $_[0]{'_line_count'}}

=pod

=head2 dlf_count()

Returns the number of DLF records created.

=cut

sub dlf_count { $_[0]{'_dlf_count'} }

=pod

=head2 error_count()

Returns the number of errors encountered in the conversion process.

=cut

sub error_count { $_[0]{'_error_count'} }

=pod

=head2 ignored_count()

Returns the number of records which were ignored in the conversion
process.

=cut

sub ignored_count { $_[0]{'_ignored_count'} }

=pod

=head2 saved_count()

Returns the number of lines which were saved for later processing.

=cut

sub saved_count { $_[0]{'_saved_count'} }

=pod

=head1 API FOR THE DLF CONVERTERS

This is the object that encapsulates the Dlf implementation and hides
the complexitity of the storage framework from the DLF converter. It
offers the following methods to the DLf converter.

=head2 write_dlf( $schema, $dlf )

This writes the $dlf DLF record conforming the $schema's schema in the
Lire::DlfStore. The schema is the schema's name (e.g. 'www'). $dlf is
an hash reference. Keys are the schema's field name. Undefined value
means that this field isn't available in that record.

=cut

sub write_dlf {
    my ( $self, $schema, $dlf ) = @_;

    check_param( $schema, 'schema' );
    check_param( $dlf, 'dlf' );

    my $s = $self->{'_streams'}{$schema};
    croak "schema $schema wasn't defined by ", $self->{'_converter'}->name,
      " converter"
        unless defined $s;

    $dlf->{'dlf_source'} = $self->{'_job_id'};
    $s->write_dlf( $dlf );

    $self->{'_dlf_count'}++;

    return;
}

=pod

=head2 save_log_line( $line )

Method that should be used to save $line for a future processing run
of the converter on the same Lire::ImportJob.

=cut

sub save_log_line {
    my ( $self, $line ) = @_;

    check_param( $line, 'line' );

    croak "only DLF converter handling log lines can save log line"
      unless $self->{'_converter'}->handle_log_lines();

    $self->{'_saved_count'}++;

    $self->{'_log_stream'}->write_dlf( { 'time' => time(),
                                         'job_name' => $self->{'_job'}->name(),
                                         'job_id' => $self->{'_job_id'},
                                         'line_no' => $self->{'_line_count'},
                                         'type' => 'continuation',
                                         'line' => $line,
                                       }
                                     );

    return;
}

=pod

=head2 ignored_log_line( $line, [ $reason ] )

Method that can be used by the Lire::DlfConverter to report that the
'$line' log line was ignored during that processing. The reason why
the line was ignored can be given in $reason. For example,
syslog-based converter should use that method to report lines that are
for another 'service' than theirs.

=cut

sub ignore_log_line {
    my ( $self, $line, $reason ) = @_;

    $reason ||= "Unknown reason";

    check_param( $line, 'line' );

    $self->{'_ignored_count'}++;

    $self->{'_log_stream'}->write_dlf( { 'time' => time(),
                                         'job_name' => $self->{'_job'}->name(),
                                         'job_id' => $self->{'_job_id'},
                                         'line_no' => $self->{'_line_count'},
                                         'type' => 'ignored',
                                         'line' => $line,
                                         'msg' => $reason,
                                       }
                                     );

    return;
}

=pod

=head2 error( $error_msg, [ $line ] );

Method that should be used by the Lire::DlfConveter to report that an
error was encountered when processing the Lire::ImportJob. $error_msg
should be used to report the nature of the error. The $line parameter
should be used by converter operating on lines to associate the error
message to a particular line.

=cut

sub error {
    my ( $self, $error_msg, $line ) = @_;

    check_param( $error_msg, 'error_msg' );

    $self->{'_error_count'}++;

    $self->{'_log_stream'}->write_dlf( { 'time' => time(),
                                         'job_name' => $self->{'_job'}->name(),
                                         'job_id' => $self->{'_job_id'},
                                         'line_no' => $self->{'_line_count'},
                                         'type' => 'error',
                                         'line' => defined $line ? $line : '',
                                         'msg' => $error_msg,
                                       }
                                     );

    return;
}

# keep perl happy
1;

__END__

=pod

=head1 SEE ALSO

Lire::DlfStore(3pm) Lire::DlfConverter(3pm)

=head1 AUTHOR

Francis J. Lacoste <flacoste@logreport.org>

=head1 VERSION

$Id: DlfConverterProcess.pm,v 1.18 2006/07/23 13:16:28 vanbaal Exp $

=head1 COPYRIGHT

Copyright (C) 2002-2004 Stichting LogReport Foundation LogReport@LogReport.org

This file is part of Lire.

Lire is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program (see COPYING); if not, check with
http://www.gnu.org/copyleft/gpl.html.

=cut