package Net::Flowdock::Stream;
use Moose;
# ABSTRACT: Streaming API for Flowdock
use JSON;
use MIME::Base64;
use Net::HTTPS::NB;
=head1 SYNOPSIS
my $stream = Net::Flowdock::Stream->new(
token => '...',
flows => ['myorg/testing'],
);
while (1) {
if (my $event = $stream->get_next_event) {
process_event($event);
}
}
=head1 DESCRIPTION
This module implements the streaming api for
L. It provides a non-blocking method which
you can call to get the next available event in the stream. You can then
integrate this method into your existing event-driven app.
=cut
=attr token
Your account's API token, for authentication. Required unless C and
C are provided.
=cut
has token => (
is => 'ro',
isa => 'Str',
);
=attr email
Your account's email address, for authentication. Required unless C is
provided.
=cut
has email => (
is => 'ro',
isa => 'Str',
);
=attr password
Your account's password, for authentication. Required unless C is
provided.
=cut
has password => (
is => 'ro',
isa => 'Str',
);
=attr flows
An arrayref of flows that should be listened to for events. Note that the flow
names must include the organization, so C, not just C.
=cut
has flows => (
traits => ['Array'],
isa => 'ArrayRef[Str]',
required => 1,
handles => {
flows => 'elements',
},
);
has socket_timeout => (
is => 'ro',
isa => 'Num',
default => 0.01,
);
has debug => (
is => 'rw',
isa => 'Bool',
default => 0,
);
has _socket => (
is => 'rw',
isa => 'Net::HTTPS::NB',
);
has _readbuf => (
traits => ['String'],
is => 'rw',
isa => 'Str',
default => '',
handles => {
_append_readbuf => 'append',
},
);
has _events => (
is => 'rw',
isa => 'ArrayRef[HashRef]', # XXX make these into objects
default => sub { [] },
);
sub BUILD {
my $self = shift;
my $auth;
if (my $token = $self->token) {
$auth = $token;
}
elsif (my ($email, $pass) = ($self->email, $self->password)) {
$auth = "$email:$pass";
}
else {
die "You must supply either your token or your email and password";
}
my $s = Net::HTTPS::NB->new(Host => 'stream.flowdock.com');
my $flows = join(',', $self->flows);
$s->write_request(
GET => "/flows?filter=$flows" =>
Authorization => 'Basic ' . MIME::Base64::encode($auth),
Accept => 'application/json',
);
my ($code, $message, %headers) = $s->read_response_headers;
die "Unable to connect: $message"
unless $code == 200;
$self->_socket($s);
}
=method get_next_event
Returns the next event that has been received in the stream. This call is
nonblocking, and will return undef if no events are currently available.
=cut
sub get_next_event {
my $self = shift;
return unless $self->_socket_is_readable;
$self->_read_next_chunk;
return $self->_process_readbuf;
}
sub _socket_is_readable {
my $self = shift;
my ($rin, $rout) = ('');
vec($rin, fileno($self->_socket), 1) = 1;
my $res = select($rout = $rin, undef, undef, $self->socket_timeout);
if ($res == -1) {
return if $!{EAGAIN} || $!{EINTR};
die "Error reading from socket: $!";
}
return if $res == 0;
return 1 if $rout;
return;
}
sub _read_next_chunk {
my $self = shift;
my $nbytes = $self->_socket->read_entity_body(my $buf, 4096);
if (!defined $nbytes) {
return if $!{EINTR} || $!{EAGAIN};
die "Error reading from server";
}
die "Disconnected" if $nbytes == 0;
return if $nbytes == -1;
$self->_append_readbuf($buf);
}
sub _process_readbuf {
my $self = shift;
if ((my $buf = $self->_readbuf) =~ s/^([^\x0d]*)\x0d//) {
my $chunk = $1;
$self->_readbuf($buf);
warn "New event:\n$chunk" if $self->debug;
return decode_json($chunk);
}
return;
}
__PACKAGE__->meta->make_immutable;
no Moose;
=head1 BUGS
No known bugs.
Please report any bugs through RT: email
C, or browse to
L.
=head1 SEE ALSO
L
=head1 SUPPORT
You can find this documentation for this module with the perldoc command.
perldoc Net::Flowdock::Stream
You can also look for information at:
=over 4
=item * AnnoCPAN: Annotated CPAN documentation
L
=item * CPAN Ratings
L
=item * RT: CPAN's request tracker
L
=item * Search CPAN
L
=back
=begin Pod::Coverage
BUILD
=end Pod::Coverage
=cut
1;