/[cvs]/nfo/perl/libs/POE/Component/LookupClient.pm
ViewVC logotype

Contents of /nfo/perl/libs/POE/Component/LookupClient.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.6 - (show annotations)
Wed Dec 3 04:01:05 2003 UTC (20 years, 5 months ago) by joko
Branch: MAIN
CVS Tags: HEAD
Changes since 1.5: +25 -11 lines
somehow got this to re-connect transparently if server-side goes down

1 ## ------------------------------------------------------------------------
2 ## $Id: LookupClient.pm,v 1.5 2003/07/01 18:13:15 joko Exp $
3 ## ------------------------------------------------------------------------
4 ## $Log: LookupClient.pm,v $
5 ## Revision 1.5 2003/07/01 18:13:15 joko
6 ## fixed: shutdown and session-unaliasing seems to be done by gc now...?
7 ##
8 ## Revision 1.3 2003/07/01 13:13:44 joko
9 ## made "port" and "host" configurable from script
10 ##
11 ## Revision 1.2 2003/07/01 13:05:01 joko
12 ## major changes, tried to clean up shutdown phase - the watchdog-mech didn't work out well..... - what's about IKC's monitor? does it work on Linux?
13 ##
14 ## Revision 1.1 2003/06/29 01:35:29 joko
15 ## initial commit
16 ##
17 ## ------------------------------------------------------------------------
18
19
20 package POE::Component::LookupClient;
21
22 use strict;
23 use warnings;
24
25 use POE qw( Session Component::IKC::Client );
26 use Data::Dumper;
27
28
29 sub new {
30 my $classname = shift;
31 my @args = @_;
32
33 my $self = {};
34 bless $self, $classname;
35
36 $self->{options} = { @args };
37
38 #my $event_handler = lookupd->new();
39 POE::Session->create(
40 object_states => [
41 $self => [qw( _start _stop boot_intercom start_session waste_time watchdog )]
42 ]
43 );
44
45 }
46
47 # This is not a POE method. It's a plain OO one.
48 sub debug {
49 my $self = shift;
50 my $msg = shift;
51 $msg ||= '';
52 print STDERR __PACKAGE__ . ": " . $msg, "\n";
53 }
54
55 # Controller's event handlers
56
57 sub _start {
58 my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ];
59 $self->debug("_start");
60
61 #$kernel->alias_set("controller");
62
63 #$kernel->post( controller => 'start_daemon' );
64 #$kernel->yield( 'boot_storage' );
65 $kernel->yield( 'boot_intercom' );
66 #$_[HEAP]->{subsession} = POE::Session->create( inline_states => { _start => sub { print "WORKER!", "\n"; } } );
67 #$_[HEAP]->{subsession}->yield(
68
69 };
70
71 sub _stop {
72 my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ];
73 $self->debug("_stop");
74 };
75
76 sub boot_intercom {
77
78 my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ];
79 $self->debug("boot_intercom");
80
81 # Client component - encapsulates some session(s) and/or wheel(s)?
82
83 $self->{options}->{host} ||= "localhost";
84 $self->{options}->{port} ||= 30;
85
86 #create_ikc_client( host => $host, port => 30, name => 'Client', on_connect => $self->{options}->{on_connect} );
87 create_ikc_client(
88 ip => $self->{options}->{host},
89 port => $self->{options}->{port},
90 #name => 'Client',
91 #on_connect => sub { $self->build(); },
92 on_connect => sub { $self->build(); },
93 #subscribe => [qw( poe://LookupService/ServiceRegistrar )],
94 #subscribe => [qw( poe://LookupService/ServiceRegistrar )],
95 );
96
97 #$kernel->post( IKC => 'monitor', '*' => { register => 'start_session' });
98 #$kernel->post( IKC => 'monitor', 'LookupService' => { register => 'start_session' });
99 #$kernel->post( IKC => 'subscribe', [qw( poe://LookupService/ServiceRegistrar )], 'poe:start_session' );
100
101 # start up the watchdog which monitors the required IKC intercom session
102 #$kernel->yield('waste_time');
103 $kernel->delay('watchdog', 2);
104
105 };
106
107 sub start_session {
108 #my $self = shift;
109 print STDERR "start_session", "\n";
110 }
111
112 sub build {
113 my $self = shift;
114 #print "build", "\n";
115 # create sessions that depend on the foreign kernel.
116 POE::Component::LookupClient::Session->new();
117 }
118
119 sub watchdog {
120 my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ];
121 $self->debug("watchdog");
122
123 #$kernel->post( IKC => 'monitor', '*' => { register => 'start_session' });
124
125 if (not defined $kernel->alias_resolve('DeviceClient')) {
126 #if (not defined $kernel->alias_resolve('IKC')) {
127 print STDERR "Session died, trying to restart!", "\n";
128 $kernel->yield('boot_intercom');
129 return;
130 }
131
132 $kernel->delay('watchdog', 2);
133 };
134
135 #------------------------------------------------------------------------------
136 # This event keeps this POE kernel alive.
137 # (stolen from POE::Component::IKC::Server, but not used 'til now...)
138 sub waste_time {
139 my($kernel, $heap)=@_[KERNEL, HEAP];
140 return if $heap->{'is a child'};
141
142 unless($heap->{'been told we are parent'}) {
143 warn "$$: Telling everyone we are the parent\n";
144 $heap->{'been told we are parent'}=1;
145 $kernel->signal($kernel, '__parent');
146 }
147 if($heap->{'die'}) {
148 #DEBUG and warn "$$: Orderly shutdown\n";
149 } else {
150 $kernel->yield('watchdog');
151 $kernel->delay('waste_time', 60);
152 }
153 return;
154 }
155
156
157
158
159 package POE::Component::LookupClient::Session;
160
161 use strict;
162 use warnings;
163
164 use POE qw( Session );
165 use Data::Dumper;
166
167
168 sub new {
169 my $classname = shift;
170 my @args = @_;
171
172 my $self = {};
173 bless $self, $classname;
174
175 $self->{options} = { @args };
176
177 POE::Session->create(
178 object_states => [
179 $self => [qw( _start _stop on_response on_subscribe register_lease renew_lease remote_shutdown remote_timeout )]
180 ]
181 );
182
183 }
184
185
186 # This is not a POE method. It's a plain OO one.
187 sub debug {
188 my $self = shift;
189 my $msg = shift;
190 $msg ||= '';
191 print STDERR __PACKAGE__ . ": " . $msg, "\n";
192 }
193
194 # Controller's event handlers
195
196 sub _start {
197 my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ];
198 $self->debug("_start");
199
200 #$kernel->alias_set('');
201 #$kernel->post();
202
203 #sub POE::Component::IKC::Responder::DEBUG { 1 }
204 #sub POE::Component::IKC::Responder::Object::DEBUG { 1 }
205
206 # set up communication channel for asynchronous responses
207 $kernel->alias_set('DeviceClient');
208 $kernel->post('IKC', 'publish', 'DeviceClient', [qw( on_response )]);
209
210 #$kernel->post( IKC => 'subscribe', [qw( poe://LookupService/ServiceRegistrar )], 'poe:start_session' );
211
212 $kernel->post( IKC => 'subscribe', [qw( poe://LookupService/ServiceRegistrar )], 'on_subscribe' );
213
214 # try to register on startup?
215 #$kernel->yield('register_lease');
216
217 };
218
219 sub _stop {
220 my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ];
221 $self->debug("_stop");
222
223 # try to re-register if session dies?
224 #$kernel->yield('register_lease');
225
226 #$kernel->alias_remove('DeviceClient');
227
228 #$self = undef;
229 };
230
231
232 # Subscription receipt callback, see "perldoc POE::Component::IKC::Responder".
233 sub on_subscribe {
234 my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ];
235 $self->debug("on_subscribe");
236 # register lease on subscription
237 $kernel->yield('register_lease');
238 }
239
240 # Main response dispatcher, this should dispatch to local states programmatically.
241 sub on_response {
242 my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ];
243 #$self->debug("response");
244
245 $heap->{'cancel timeout'} = 1;
246
247 my $payload = $_[ARG0];
248 $payload ||= '';
249
250 # If registration succeeded, start the renewal cycle.
251 if ($payload eq 'REG_OK') {
252 $self->debug("Starting lease renewal loop.");
253 $kernel->post( IKC => 'monitor', '*' => { unregister => 'remote_shutdown' });
254 $kernel->post( IKC => 'monitor', '*' => { shutdown => 'remote_shutdown' });
255 $kernel->yield( 'renew_lease' );
256
257 } elsif ($payload eq 'LEASE_OK') {
258 $self->debug("Received 'LEASE_OK'.");
259 #$kernel->yield( 'renew_lease' );
260
261 # 1. otherwise dump anything sent to us for debugging purposes
262 # 2. destroy lease in any case on unknown payload
263 } else {
264 #print Dumper($payload);
265 $heap->{'destroy lease'} = 1;
266 #$kernel->alarm_remove_all();
267
268 }
269
270 };
271
272 sub register_lease {
273 my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ];
274 $self->debug("register_lease");
275
276 my $ONE_arg = "Hello World!";
277
278 # V1 - without subscription
279 #$kernel->post('IKC', 'post', "poe://LookupService/ServiceRegistrar/register_lease", $ONE_arg);
280 # V2 - with subscription
281 $kernel->post( "poe://LookupService/ServiceRegistrar", "register_lease", $ONE_arg );
282 }
283
284 sub renew_lease {
285 my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ];
286
287 if ($heap->{'destroy lease'}) {
288 #$heap->{'destroy lease'} = 0;
289 #$kernel->delay_set('remote_timeout');
290 $self->debug("destroyed lease");
291 #undef $self;
292 #$kernel->alias_remove('DeviceClient');
293 #undef $_[SESSION];
294 #$kernel->post( 'IKC' => 'shutdown' );
295 # clear delayed posts
296 #$kernel->delay('renew_lease');
297 #$kernel->delay('remote_timeout');
298 return;
299 }
300
301 $self->debug("renew_lease");
302
303 # check if remote kernel(s) are still around
304 #$kernel->post('IKC', 'call', 'poe://LookupService/IKC/ping', '', 'poe:remote_timeout');
305 #$kernel->post('IKC', 'call', 'poe://LookupService/IKC/ping', undef, 'poe:remote_timeout');
306 #$kernel->post('poe://remote/IKC', 'ping', 'poe:remote_timeout');
307 #$kernel->delay('remote_timeout', 5); # timeout
308
309 my $ONE_arg = '';
310 #$kernel->post('IKC', 'post', "poe://LookupService/ServiceRegistrar/renew_lease", $ONE_arg);
311 # V1 - without subscription
312 $kernel->post('IKC', 'call', "poe://LookupService/ServiceRegistrar/renew_lease", $ONE_arg, 'poe:on_response');
313
314 # V2 - with subscription
315 #my $resp = $kernel->call( "poe://LookupService/ServiceRegistrar", "renew_lease", $ONE_arg);
316 #print $resp, "\n";
317
318 # V3 - have we been able to post?
319 #my $resp = $kernel->call('IKC', 'call', "poe://LookupService/ServiceRegistrar/renew_lease", $ONE_arg, 'poe:response');
320 #print "resp: $resp", "\n";
321
322 # and again...
323 $kernel->delay('renew_lease', 5);
324 #$kernel->delay_set('renew_lease', 15);
325 #$kernel->delay_add('renew_lease', 15);
326
327 # timeout!?
328 #$kernel->delay('remote_timeout', 20);
329 $kernel->delay_add('remote_timeout', 7);
330 #$kernel->delay_set('remote_timeout', 5);
331 #$kernel->delay_set('remote_timeout', 20);
332
333 }
334
335 sub remote_shutdown {
336 my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ];
337 $self->debug("remote_shutdown");
338 }
339
340 sub remote_timeout {
341
342 #my ($pong) = $_[ARG0];
343 #return if $pong; # all is cool
344
345 my ($self, $kernel, $heap) = @_[OBJECT, KERNEL, HEAP];
346
347 #=pod
348 if ($heap->{'cancel timeout'}) {
349 $heap->{'cancel timeout'} = 0;
350 return;
351 }
352 #=cut
353
354 $self->debug("remote_timeout");
355
356 $heap->{'destroy lease'} = 1;
357
358 # YOW! Remote kernel timed out. RUN AROUND SCREAMING!
359 print STDERR "# YOW! Remote kernel timed out. RUN AROUND SCREAMING!", "\n";
360
361 # free all resources that keep this session running
362 $kernel->delay('renew_lease');
363 $kernel->delay('remote_timeout');
364
365 #$kernel->post( 'IKC' => 'shutdown' );
366
367 #$kernel->alias_remove('DeviceClient');
368
369 #$kernel->alias_remove('DeviceClient');
370 #$kernel->yield('renew_lease');
371
372 #sub POE::Component::IKC::Responder::DEBUG { return 1; }
373 #$kernel->post( 'IKC' => 'unregister' );
374
375 #print Dumper($kernel->alias_list());
376
377 $kernel->post( IKC => 'retract', 'DeviceClient' => [qw( on_response )]);
378 #$kernel->post( IKC => 'retract', 'me' => [qw( on_response )]);
379 $kernel->post( IKC => 'unsubscribe', [qw( poe://LookupService/ServiceRegistrar )]);
380 $kernel->post( IKC => 'unregister', [qw( poe://LookupService )]);
381 #$kernel->post( IKC => 'unsubscribe', [qw( poe://me )]);
382 #$kernel->post( IKC => 'unregister', [qw( poe://me )]);
383 #$kernel->run_one_timeslice();
384 #$kernel->run_one_timeslice();
385 #$kernel->run_one_timeslice();
386 #return;
387
388 #$kernel->post( 'IKC' => 'shutdown' );
389 #$kernel->post( 'IKC' => 'blah' );
390 #$kernel->run_one_timeslice();
391 #$kernel->run_one_timeslice();
392
393 $kernel->post('DeviceClient', 'stop');
394
395 $kernel->alias_remove('DeviceClient');
396 $kernel->alias_remove('IKC');
397
398 #$kernel->run_one_timeslice();
399 #$kernel->run_one_timeslice();
400
401 #$kernel->yield('_start');
402
403 }
404
405 1;
406 __END__

MailToCvsAdmin">MailToCvsAdmin
ViewVC Help
Powered by ViewVC 1.1.26 RSS 2.0 feed