This file is indexed.

/usr/share/perl5/Mango.pm is in libmango-perl 0.22-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
package Mango;
use Mojo::Base 'Mojo::EventEmitter';

use Carp 'croak';
use Mango::BSON qw(bson_doc bson_false bson_true);
use Mango::Database;
use Mango::Protocol;
use Mojo::IOLoop;
use Mojo::URL;
use Mojo::Util qw(dumper md5_sum monkey_patch);
use Scalar::Util 'weaken';

use constant DEBUG => $ENV{MANGO_DEBUG} || 0;
use constant DEFAULT_PORT => 27017;

has credentials => sub { [] };
has default_db  => 'admin';
has hosts       => sub { [['localhost']] };
has ioloop      => sub { Mojo::IOLoop->new };
has j           => 0;
has max_connections => 5;
has protocol        => sub { Mango::Protocol->new };
has w               => 1;
has wtimeout        => 1000;

our $VERSION = '0.22';

# Operations with reply
for my $name (qw(get_more query)) {
  monkey_patch __PACKAGE__, $name, sub {
    my $self = shift;
    my $cb = ref $_[-1] eq 'CODE' ? pop : undef;
    my ($next, $msg) = $self->_build($name, @_);
    $self->_start({id => $next, safe => 1, msg => $msg, cb => $cb});
  };
}

# Operations followed by getLastError
for my $name (qw(delete insert update)) {
  monkey_patch __PACKAGE__, $name, sub {
    my ($self, $namespace) = (shift, shift);
    my $cb = ref $_[-1] eq 'CODE' ? pop : undef;

    # Make sure both operations can be written together
    my ($next, $msg) = $self->_build($name, $namespace, @_);
    $namespace =~ s/\..+$/\.\$cmd/;
    my $gle = bson_doc
      getLastError => 1,
      j            => $self->j ? bson_true : bson_false,
      w            => $self->w,
      wtimeout     => $self->wtimeout;
    ($next, $gle) = $self->_build('query', $namespace, {}, 0, -1, $gle, {});

    $self->_start({id => $next, safe => 1, msg => "$msg$gle", cb => $cb});
  };
}

sub DESTROY { shift->_cleanup }

sub new { shift->SUPER::new->from_string(@_) }

sub backlog { scalar @{shift->{queue} || []} }

sub db {
  my ($self, $name) = @_;
  $name //= $self->default_db;
  my $db = Mango::Database->new(mango => $self, name => $name);
  weaken $db->{mango};
  return $db;
}

sub from_string {
  my ($self, $str) = @_;

  # Protocol
  return $self unless $str;
  my $url = Mojo::URL->new($str);
  croak qq{Invalid MongoDB connection string "$str"}
    unless $url->protocol eq 'mongodb';

  # Hosts
  my @hosts;
  /^([^,:]+)(?::(\d+))?/ and push @hosts, $2 ? [$1, $2] : [$1]
    for split /,/, join(':', map { $_ // '' } $url->host, $url->port);
  $self->hosts(\@hosts) if @hosts;

  # Database
  if (my $db = $url->path->parts->[0]) { $self->default_db($db) }

  # User and password
  push @{$self->credentials}, [$self->default_db, $1, $2]
    if ($url->userinfo // '') =~ /^([^:]+):([^:]+)$/;

  # Options
  my $query = $url->query;
  if (my $j       = $query->param('journal'))    { $self->j($j) }
  if (my $w       = $query->param('w'))          { $self->w($w) }
  if (my $timeout = $query->param('wtimeoutMS')) { $self->wtimeout($timeout) }

  return $self;
}

sub kill_cursors {
  my $self = shift;
  my $cb = ref $_[-1] eq 'CODE' ? pop : undef;
  my ($next, $msg) = $self->_build('kill_cursors', @_);
  $self->_start({id => $next, safe => 0, msg => $msg, cb => $cb});
}

sub _active {
  my $self = shift;
  return 1 if $self->backlog;
  return !!grep { $_->{last} && !$_->{start} }
    values %{$self->{connections} || {}};
}

sub _auth {
  my ($self, $id, $credentials, $auth, $err, $reply) = @_;
  my ($db, $user, $pass) = @$auth;

  # Run "authenticate" command with "nonce" value
  my $nonce = $reply->{docs}[0]{nonce} // '';
  my $key = md5_sum $nonce . $user . md5_sum "$user:mongo:$pass";
  my $authenticate
    = bson_doc(authenticate => 1, user => $user, nonce => $nonce, key => $key);
  my $cb = sub { shift->_connected($id, $credentials) };
  $self->_fast($id, $db, $authenticate, $cb);
}

sub _build {
  my ($self, $name) = (shift, shift);
  my $next = $self->_id;
  warn "-- Operation #$next ($name)\n@{[dumper [@_]]}" if DEBUG;
  my $method = "build_$name";
  return ($next, $self->protocol->$method($next, @_));
}

sub _cleanup {
  my $self = shift;
  return unless my $loop = $self->_loop;

  # Clean up connections
  delete $self->{pid};
  my $connections = delete $self->{connections};
  $loop->remove($_) for keys %$connections;

  # Clean up active operations
  my $queue = delete $self->{queue} || [];
  $_->{last} and unshift @$queue, $_->{last} for values %$connections;
  $self->_finish(undef, $_->{cb}, 'Premature connection close') for @$queue;
}

sub _connect {
  my ($self, $hosts) = @_;
  my ($host, $port) = @{shift @{$hosts ||= [@{$self->hosts}]}};

  weaken $self;
  my $id;
  $id = $self->_loop->client(
    {address => $host, port => $port //= DEFAULT_PORT} => sub {
      my ($loop, $err, $stream) = @_;

      # Connection error (try next server)
      if ($err) {
        return $self->_error($id, $err) unless @$hosts;
        delete $self->{connections}{$id};
        return $self->_connect($hosts);
      }

      # Connection established
      $stream->timeout(0);
      $stream->on(close => sub { $self->_error($id) });
      $stream->on(error => sub { $self && $self->_error($id, pop) });
      $stream->on(read => sub { $self->_read($id, pop) });
      $self->emit(connection => $id)->_connected($id, [@{$self->credentials}]);
    }
  );
  $self->{connections}{$id} = {start => 1};

  my $num = scalar keys %{$self->{connections}};
  warn "-- New connection ($host:$port:$num)\n" if DEBUG;
}

sub _connected {
  my ($self, $id, $credentials) = @_;

  # No authentication
  return $self->_next unless my $auth = shift @$credentials;

  # Run "getnonce" command followed by "authenticate"
  my $cb = sub { shift->_auth($id, $credentials, $auth, @_) };
  $self->_fast($id, $auth->[0], {getnonce => 1}, $cb);
}

sub _error {
  my ($self, $id, $err) = @_;

  my $c    = delete $self->{connections}{$id};
  my $last = $c->{last};
  $last //= shift @{$self->{queue}} if $err;
  $self->_connect if @{$self->{queue}};
  return $err ? $self->emit(error => $err) : $self unless $last;
  $self->_finish(undef, $last->{cb}, $err || 'Premature connection close');
}

sub _fast {
  my ($self, $id, $db, $command, $cb) = @_;

  # Handle errors
  my $protocol = $self->protocol;
  my $wrapper  = sub {
    my ($self, $err, $reply) = @_;
    $err ||= $protocol->command_error($reply);
    return $err ? $self->_error($id, $err) : $self->$cb($err, $reply);
  };

  # Skip the queue and run command right away
  my ($next, $msg)
    = $self->_build('query', "$db.\$cmd", {}, 0, -1, $command, {});
  $self->{connections}{$id}{fast}
    = {id => $next, safe => 1, msg => $msg, cb => $wrapper};
  $self->_next;
}

sub _finish {
  my ($self, $reply, $cb, $err) = @_;
  $self->$cb($err || $self->protocol->query_failure($reply), $reply);
}

sub _id { $_[0]{id} = $_[0]->protocol->next_id($_[0]{id} // 0) }

sub _loop { $_[0]{nb} ? Mojo::IOLoop->singleton : $_[0]->ioloop }

sub _next {
  my ($self, $op) = @_;

  push @{$self->{queue} ||= []}, $op if $op;

  my @ids = keys %{$self->{connections}};
  my $start;
  $self->_write($_) and $start++ for @ids;
  $self->_connect
    if $op && !$start && @{$self->{queue}} && @ids < $self->max_connections;
}

sub _read {
  my ($self, $id, $chunk) = @_;

  my $c = $self->{connections}{$id};
  $c->{buffer} .= $chunk;
  while (my $reply = $self->protocol->parse_reply(\$c->{buffer})) {
    warn "-- Client <<< Server (#$reply->{to})\n@{[dumper $reply]}" if DEBUG;
    next unless $reply->{to} == $c->{last}{id};
    $self->_finish($reply, (delete $c->{last})->{cb});
  }
  $self->_next;
}

sub _start {
  my ($self, $op) = @_;

  # Fork safety
  $self->_cleanup unless ($self->{pid} //= $$) eq $$;

  # Non-blocking
  if ($op->{cb}) {

    # Start non-blocking
    unless ($self->{nb}) {
      croak 'Blocking operation in progress' if $self->_active;
      warn "-- Switching to non-blocking mode\n" if DEBUG;
      $self->_cleanup;
      $self->{nb}++;
    }

    return $self->_next($op);
  }

  # Start blocking
  if ($self->{nb}) {
    croak 'Non-blocking operations in progress' if $self->_active;
    warn "-- Switching to blocking mode\n" if DEBUG;
    $self->_cleanup;
    delete $self->{nb};
  }

  my ($err, $reply);
  $op->{cb} = sub {
    (my $self, $err, $reply) = @_;
    $self->ioloop->stop;
  };
  $self->_next($op);
  $self->ioloop->start;

  # Throw blocking errors
  croak $err if $err;

  return $reply;
}

sub _write {
  my ($self, $id) = @_;

  # Make sure connection has not been corrupted while event loop was stopped
  my $c = $self->{connections}{$id};
  return $c->{start} if $c->{last};
  my $loop = $self->_loop;
  return undef unless my $stream = $loop->stream($id);
  if (!$loop->is_running && $stream->is_readable) {
    $stream->close;
    return undef;
  }

  delete $c->{start} unless my $last = delete $c->{fast};
  return $c->{start} unless $c->{last} = $last ||= shift @{$self->{queue}};
  warn "-- Client >>> Server (#$last->{id})\n" if DEBUG;
  $stream->write(delete $last->{msg});

  # Unsafe operations are done when they are written
  return $c->{start} if $last->{safe};
  weaken $self;
  $stream->write('', sub { $self->_finish(undef, delete($c->{last})->{cb}) });
  return $c->{start};
}

1;

=encoding utf8

=head1 NAME

Mango - Pure-Perl non-blocking I/O MongoDB driver

=head1 SYNOPSIS

  use Mango;

  # Insert document
  my $mango = Mango->new('mongodb://localhost:27017');
  my $oid   = $mango->db('test')->collection('foo')->insert({bar => 'baz'});

  # Find document
  my $doc = $mango->db('test')->collection('foo')->find_one({bar => 'baz'});
  say $doc->{bar};

  # Update document
  $mango->db('test')->collection('foo')
    ->update({bar => 'baz'}, {bar => 'yada'});

  # Remove document
  $mango->db('test')->collection('foo')->remove({bar => 'yada'});

  # Insert document with special BSON types
  use Mango::BSON ':bson';
  my $oid = $mango->db('test')->collection('foo')
    ->insert({data => bson_bin("\x00\x01"), now => bson_time});

  # Blocking parallel find (does not work inside a running event loop)
  my $delay = Mojo::IOLoop->delay;
  for my $name (qw(sri marty)) {
    my $end = $delay->begin(0);
    $mango->db('test')->collection('users')->find({name => $name})->all(sub {
      my ($cursor, $err, $docs) = @_;
      $end->(@$docs);
    });
  }
  my @docs = $delay->wait;

  # Non-blocking parallel find (does work inside a running event loop)
  my $delay = Mojo::IOLoop->delay(sub {
    my ($delay, @docs) = @_;
    ...
  });
  for my $name (qw(sri marty)) {
    my $end = $delay->begin(0);
    $mango->db('test')->collection('users')->find({name => $name})->all(sub {
      my ($cursor, $err, $docs) = @_;
      $end->(@$docs);
    });
  }
  $delay->wait unless Mojo::IOLoop->is_running;

  # Event loops such as AnyEvent are supported through EV
  use EV;
  use AnyEvent;
  my $cv = AE::cv;
  $mango->db('test')->command(buildInfo => sub {
    my ($db, $err, $doc) = @_;
    $cv->send($doc->{version});
  });
  say $cv->recv;

=head1 DESCRIPTION

L<Mango> is a pure-Perl non-blocking I/O MongoDB driver, optimized for use
with the L<Mojolicious> real-time web framework, and with multiple event loop
support. Since MongoDB is still changing rapidly, only the latest stable
version is supported.

To learn more about MongoDB you should take a look at the
L<official documentation|http://docs.mongodb.org>, the documentation included
in this distribution is no replacement for it.

Note that this whole distribution is EXPERIMENTAL and will change without
warning!

Most of the API is not changing much anymore, but you should wait for a stable
1.0 release before using any of the modules in this distribution in a
production environment. Unsafe operations are not supported, so far this is
considered a feature.

Many arguments passed to methods as well as values of attributes get
serialized to BSON with L<Mango::BSON>, which provides many helper functions
you can use to generate data types that are not available natively in Perl.
All connections will be reset automatically if a new process has been forked,
this allows multiple processes to share the same L<Mango> object safely.

For better scalability (epoll, kqueue) and to provide IPv6 as well as TLS
support, the optional modules L<EV> (4.0+), L<IO::Socket::IP> (0.16+) and
L<IO::Socket::SSL> (1.75+) will be used automatically by L<Mojo::IOLoop> if
they are installed. Individual features can also be disabled with the
MOJO_NO_IPV6 and MOJO_NO_TLS environment variables.

=head1 EVENTS

L<Mango> inherits all events from L<Mojo::EventEmitter> and can emit the
following new ones.

=head2 connection

  $mango->on(connection => sub {
    my ($mango, $id) = @_;
    ...
  });

Emitted when a new connection has been established.

=head2 error

  $mango->on(error => sub {
    my ($mango, $err) = @_;
    ...
  });

Emitted if an error occurs that can't be associated with an operation.

  $mango->on(error => sub {
    my ($mango, $err) = @_;
    say "This looks bad: $err";
  });

=head1 ATTRIBUTES

L<Mango> implements the following attributes.

=head2 credentials

  my $credentials = $mango->credentials;
  $mango          = $mango->credentials([['test', 'sri', 's3cret']]);

Authentication credentials that will be used on every reconnect.

=head2 default_db

  my $name = $mango->default_db;
  $mango   = $mango->default_db('test');

Default database, defaults to C<admin>.

=head2 hosts

  my $hosts = $mango->hosts;
  $mango    = $mango->hosts([['localhost', 3000], ['localhost', 4000]]);

Servers to connect to, defaults to C<localhost> and port C<27017>.

=head2 ioloop

  my $loop = $mango->ioloop;
  $mango   = $mango->ioloop(Mojo::IOLoop->new);

Event loop object to use for blocking I/O operations, defaults to a
L<Mojo::IOLoop> object.

=head2 j

  my $j  = $mango->j;
  $mango = $mango->j(1);

Wait for all operations to have reached the journal, defaults to C<0>.

=head2 max_connections

  my $max = $mango->max_connections;
  $mango  = $mango->max_connections(5);

Maximum number of connections to use for non-blocking operations, defaults to
C<5>.

=head2 protocol

  my $protocol = $mango->protocol;
  $mango       = $mango->protocol(Mango::Protocol->new);

Protocol handler, defaults to a L<Mango::Protocol> object.

=head2 w

  my $w  = $mango->w;
  $mango = $mango->w(2);

Wait for all operations to have reached at least this many servers, C<1>
indicates just primary, C<2> indicates primary and at least one secondary,
defaults to C<1>.

=head2 wtimeout

  my $timeout = $mango->wtimeout;
  $mango      = $mango->wtimeout(1);

Timeout for write propagation in milliseconds, defaults to C<1000>.

=head1 METHODS

L<Mango> inherits all methods from L<Mojo::Base> and implements the following
new ones.

=head2 new

  my $mango = Mango->new;
  my $mango = Mango->new('mongodb://sri:s3cret@localhost:3000/test?w=2');

Construct a new L<Mango> object and parse connection string with
L</"from_string"> if necessary.

=head2 backlog

  my $num = $mango->backlog;

Number of queued operations that have not yet been assigned to a connection.

=head2 db

  my $db = $mango->db;
  my $db = $mango->db('test');

Get L<Mango::Database> object for database, uses L</"default_db"> if no name
is provided. Note that the reference L<Mango::Database/"mango"> is weakened,
so the L<Mango> object needs to be referenced elsewhere as well.

=head2 delete

  my $reply = $mango->delete($namespace, $flags, $query);

Perform low level C<delete> operation followed by C<getLastError> command. You
can also append a callback to perform operation non-blocking.

  $mango->delete(($namespace, $flags, $query) => sub {
    my ($mango, $err, $reply) = @_;
    ...
  });
  Mojo::IOLoop->start unless Mojo::IOLoop->is_running;

=head2 from_string

  $mango
    = $mango->from_string('mongodb://sri:s3cret@localhost:3000/test?w=2');

Parse configuration from connection string.

=head2 get_more

  my $reply = $mango->get_more($namespace, $return, $cursor);

Perform low level C<get_more> operation. You can also append a callback to
perform operation non-blocking.

  $mango->get_more(($namespace, $return, $cursor) => sub {
    my ($mango, $err, $reply) = @_;
    ...
  });
  Mojo::IOLoop->start unless Mojo::IOLoop->is_running;

=head2 insert

  my $reply = $mango->insert($namespace, $flags, @docs);

Perform low level C<insert> operation followed by C<getLastError> command. You
can also append a callback to perform operation non-blocking.

  $mango->insert(($namespace, $flags, @docs) => sub {
    my ($mango, $err, $reply) = @_;
    ...
  });
  Mojo::IOLoop->start unless Mojo::IOLoop->is_running;

=head2 kill_cursors

  $mango->kill_cursors(@ids);

Perform low level C<kill_cursors> operation. You can also append a callback to
perform operation non-blocking.

    $mango->kill_cursors(@ids => sub {
      my ($mango, $err) = @_;
      ...
    });
    Mojo::IOLoop->start unless Mojo::IOLoop->is_running;

=head2 query

  my $reply
    = $mango->query($namespace, $flags, $skip, $return, $query, $fields);

Perform low level C<query> operation. You can also append a callback to
perform operation non-blocking.

  $mango->query(($namespace, $flags, $skip, $return, $query, $fields) => sub {
    my ($mango, $err, $reply) = @_;
    ...
  });
  Mojo::IOLoop->start unless Mojo::IOLoop->is_running;

=head2 update

  my $reply = $mango->update($namespace, $flags, $query, $update);

Perform low level C<update> operation followed by C<getLastError> command. You
can also append a callback to perform operation non-blocking.

  $mango->update(($namespace, $flags, $query, $update) => sub {
    my ($mango, $err, $reply) = @_;
    ...
  });
  Mojo::IOLoop->start unless Mojo::IOLoop->is_running;

=head1 DEBUGGING

You can set the C<MANGO_DEBUG> environment variable to get some advanced
diagnostics information printed to C<STDERR>.

  MANGO_DEBUG=1

=head1 SPONSORS

Some of the work on this distribution has been sponsored by
L<Drip Depot|http://www.dripdepot.com>, thank you!

=head1 AUTHOR

Sebastian Riedel, C<sri@cpan.org>.

=head1 CREDITS

In alphabetical order:

=over 2

Andrey Khozov

=back

=head1 COPYRIGHT AND LICENSE

Copyright (C) 2013, Sebastian Riedel.

This program is free software, you can redistribute it and/or modify it under
the terms of the Artistic License version 2.0.

=head1 SEE ALSO

L<Mojolicious::Guides>, L<http://mojolicio.us>.

=cut