/usr/share/perl5/AtomBus.pm is in libatombus-perl 1.0405-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 | package AtomBus;
use Dancer qw(:syntax);
use Dancer::Plugin::DBIC qw(schema);
use Atompub::DateTime qw(datetime);
use UUID::Tiny;
use XML::Atom;
$XML::Atom::DefaultVersion = '1.0';
our $VERSION = '1.0405'; # VERSION
set content_type => 'application/xml';
config->{plugins}{DBIC}{atombus} = config->{atombus}{db};
config->{plugins}{DBIC}{atombus}{schema_class} = 'AtomBus::Schema';
eval { schema->deploy }; # Fails gracefully if tables already exist.
get '/feeds/:feed_title/entries/:entry_id' => sub {
my $entry_id = 'urn:uuid:' . params->{entry_id};
my $db_entry = schema->resultset('AtomBusEntry')->find({id => $entry_id});
return send_error("No such message exists", 404)
unless $db_entry;
my $if_none_match = request->header('If-None-Match');
#TODO: support revised entries (i.e. Atompub PUT)
# my $revision_id = $db_entry->revision_id || $db_entry->id;
my $revision_id = $db_entry->id;
# If ETag matches current revision id of entry
if (my $id = $if_none_match) {
$id =~ s/^"(.*)"$/$1/; # Remove surrounding quotes
if ($revision_id eq $id) {
status 304;
return '';
}
}
my $entry = _entry_from_db($db_entry);
_add_etag($revision_id);
header Vary => 'If-None-Match';
return $entry->as_xml;
};
get '/feeds/:feed_title' => sub {
my $feed_title = lc params->{feed_title};
my $start_after = params->{start_after};
my $start_at = params->{start_at};
my $if_none_match = request->header('If-None-Match');
my $order_id;
if (my $id = $if_none_match) {
$id =~ s/^"(.*)"$/$1/; # Remove surrounding quotes
my $entry = schema->resultset('AtomBusEntry')->find({id => $id});
$order_id = $entry->order_id if $entry;
}
if (my $id = $start_after || $start_at) {
my $entry = schema->resultset('AtomBusEntry')->find({id => $id});
return send_error("No such message exists with id $id", 400)
unless $entry;
$order_id = $entry->order_id;
}
my $db_feed = schema->resultset('AtomBusFeed')->find(
{ title => $feed_title });
return send_error("No such feed exists named $feed_title", 404)
unless $db_feed;
my $feed = XML::Atom::Feed->new;
$feed->title($feed_title);
$feed->id($db_feed->id);
my $person = XML::Atom::Person->new;
$person->name($db_feed->author_name);
$feed->author($person);
$feed->updated($db_feed->updated);
my $self_link = XML::Atom::Link->new;
$self_link->rel('self');
$self_link->type('application/atom+xml');
$self_link->href(request->uri_for(request->path));
$feed->add_link($self_link);
#my $hub_link = XML::Atom::Link->new;
#$hub_link->rel('hub');
#$hub_link->href('http://184.106.189.98:8080/publish');
#$feed->add_link($hub_link);
my %query = (feed_title => $feed_title);
if ($order_id) {
$query{order_id} = { '>' => $order_id } if $if_none_match;
$query{order_id} = { '>' => $order_id } if $start_after;
$query{order_id} = { '>=' => $order_id } if $start_at;
}
my $rset = schema->resultset('AtomBusEntry')->search(
\%query, { order_by => ['order_id'] });
my $count = config->{atombus}{page_size} || 1000;
my $last_id;
while ($count-- && (my $entry = $rset->next)) {
$feed->add_entry(_entry_from_db($entry));
$last_id = $entry->id;
}
# If ETag was provided and there are no new entries
if (not $last_id and $if_none_match) {
status 304;
return '';
}
_add_etag($last_id) if $last_id;
header Vary => 'If-None-Match';
return $feed->as_xml;
};
post '/feeds/:feed_title' => sub {
my $feed_title = lc params->{feed_title};
my $body = request->body;
return send_error("Request body is empty", 400)
unless $body;
my $entry = XML::Atom::Entry->new(\$body);
my $updated = datetime->w3cz;
my $db_feed = schema->resultset('AtomBusFeed')->find_or_create({
title => $feed_title,
id => _gen_id(),
author_name => 'AtomBus',
updated => $updated,
}, { key => 'title_unique' });
my $db_entry = schema->resultset('AtomBusEntry')->create({
feed_title => $feed_title,
id => _gen_id(),
title => $entry->title,
content => $entry->content->body,
updated => $updated,
});
$db_feed->update({updated => $updated});
$entry = _entry_from_db($db_entry);
_add_etag($entry->id);
header Location => $entry->link->href;
content_type 'application/atom+xml;type=entry';
status 'created';
return $entry->as_xml;
};
sub _gen_id { 'urn:uuid:' . create_UUID_as_string() }
sub _id_nss { $_ = shift; s/^urn:uuid://; return $_ }
sub _entry_from_db {
my $row = shift;
my $entry = XML::Atom::Entry->new;
$entry->title($row->title);
$entry->content($row->content);
$entry->id($row->id);
$entry->updated($row->updated);
my $self_link = XML::Atom::Link->new;
$self_link->rel('self');
$self_link->type('application/atom+xml');
$self_link->href( join( '/', uri_for('/feeds'), $row->feed_title->title, 'entries', _id_nss($row->id) ));
$entry->add_link($self_link);
return $entry;
}
sub _add_etag { header ETag => qq("$_[0]") }
# ABSTRACT: An AtomPub server for messaging.
1;
__END__
=pod
=head1 NAME
AtomBus - An AtomPub server for messaging.
=head1 VERSION
version 1.0405
=head1 SYNOPSIS
use Dancer;
use AtomBus;
dance;
=head1 DESCRIPTION
AtomBus is an AtomPub server that can be used for messaging.
The idea is that atom feeds can correspond to conceptual queues or buses.
AtomBus is built on top of the L<Dancer> framework.
It is also pubsubhubbub friendly.
These examples assume that you have configured your web server to point HTTP
requests starting with /atombus to your AtomBus server (see L</DEPLOYMENT>).
To publish an entry, make a HTTP POST request:
$ curl -d '<entry> <title>allo</title> <content type="xhtml">
<div xmlns="http://www.w3.org/1999/xhtml" >an important message</div>
</content> </entry>' http://localhost/atombus/feeds/widgets
That adds a new entry to a feed titled widgets.
If that feed didn't exist before, it will be created for you.
To retrieve the widgets feed, make a HTTP GET request:
$ curl http://localhost/atombus/feeds/widgets
Clients can request only entries that came after the last entry they processed.
They can do this by providing the id of the last message as the start_after
parameter:
$ curl http://localhost/atombus/feeds/widgets?start_after=42
Alternatively, you can provide a start_at param. This will retrieve entries
starting with the given id:
$ curl http://localhost/atombus/feeds/widgets?start_at=42
HTTP ETags are also supported.
The server responds with an ETag header for each request.
The client can provide that ETag as the If-None-Match header.
The following example will work the same as if the client provided a start_after
parameter.
Except that it will return an empty body and a 304 status if there are no new
entries.
This is the behavior that pubsubhubbub recommends
L<http://code.google.com/p/pubsubhubbub/wiki/PublisherEfficiency>.
$ curl -H 'If-None-Match: "42"' http://localhost/atombus/feeds/widgets
Note that the most messages you will get per request is determined by the
page_size setting. If you do not specify a page_size setting, it defaults to
1000. This default may change in the future, so don't count on it.
AtomBus is mostly a proper implementation of the AtomPub protocol and will
validate 100% against L<http://validator.w3.org/feed>.
One point where it diverges from the AtomPub spec is that feed entries are
returned in fifo order.
This is because a message consumer will most likely want to consume messages
in the order that they were published.
In the future, a config setting may be available to reverse the order.
=head1 CONFIGURATION
Configuration can be achieved via a config.yml file or via the set keyword.
To use the config.yml approach, you will need to install L<YAML>.
See the L<Dancer> documentation for more information.
The only required config setting is the dsn.
Example config.yml:
# Dancer specific config settings
logger: file
log: errors
atombus:
page_size: 100
db:
dsn: dbi:mysql:database=atombus
user: joe
pass: momma
You can alternatively configure the server via the 'set' keyword in the source
code. This approach does not require a config file.
use Dancer;
use AtomBus;
# Dancer specific config settings
set logger => 'file';
set log => 'debug';
set show_errors => 1;
set atombus => {
page_size => 100,
db => {
dsn => 'dbi:SQLite:dbname=/var/local/atombus/atombus.db',
}
};
dance;
=head1 DATABASE
AtomBus is backed by a database.
The dsn in the config must point to a database which you have write privileges
to.
The tables will be created automagically for you if they don't already exist.
Of course that requires create table privileges.
All databases supported by L<DBIx::Class> are supported,
which are most major databases including postgresql, sqlite, mysql and oracle.
=head1 DEPLOYMENT
Deployment is very flexible.
It can be run on a web server via CGI or FastCGI.
It can also be run on any L<Plack> web server.
See L<Dancer::Deployment> for more details.
=head2 FastCGI
AtomBus can be run via FastCGI.
This requires that you have the L<FCGI> and L<Plack> modules installed.
Here is an example FastCGI script.
It assumes your AtomBus server is in the file atombus.pl.
#!/usr/bin/env perl
use Dancer ':syntax';
use Plack::Handler::FCGI;
my $app = do "/path/to/atombus.pl";
my $server = Plack::Handler::FCGI->new(nproc => 5, detach => 1);
$server->run($app);
Here is an example lighttpd config.
It assumes you named the above file atombus.fcgi.
fastcgi.server += (
"/atombus" => ((
"socket" => "/tmp/fcgi.sock",
"check-local" => "disable",
"bin-path" => "/path/to/atombus.fcgi",
)),
)
Now AtomBus will be running via FastCGI under /atombus.
=head2 Plack
AtomBus can be run with any L<Plack> web server. Just run:
plackup atombus.pl
You can change the Plack web server via the -s option to plackup.
=head1 MOTIVATION
I like messaging systems because they make it so easy to create scalable
applications.
Existing message brokers are great for creating message queues.
But once a consumer reads a message off of a queue, it is not available for
other consumers.
I needed a system to publish events such that multiple heterogeneous services
could subscribe to them.
So I really needed a message bus, not a message queue.
I could for example have used something called topics in ActiveMQ,
but I have found ActiveMQ to be broken in general.
An instance I manage has to be restarted daily.
AtomBus on the other hand will be extremely stable, because it is so simple.
It is in essence just a simple interface to a database.
As long as your database and web server are up, AtomBus will be there for you.
And there are many ways to add redundancy to databases and web heads.
Another advantage of using AtomBus is that Atom is a well known standard.
Everyone already has a client for it, their browser.
Aren't standards great!
By the way, if you just need message queues, try
L<POE::Component::MessageQueue>.
It rocks. If you need a message bus, give AtomBus a shot.
=head1 AUTHOR
Naveed Massjouni <naveedm9@gmail.com>
=head1 COPYRIGHT AND LICENSE
This software is copyright (c) 2010 by Naveed Massjouni.
This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.
=cut
|