/[scire]/branches/new-fu/server/scireserver.pl
Gentoo

Contents of /branches/new-fu/server/scireserver.pl

Parent Directory Parent Directory | Revision Log Revision Log


Revision 357 - (show annotations) (download) (as text)
Sun Feb 17 19:40:38 2008 UTC (6 years, 6 months ago) by codeman
File MIME type: text/x-perl
File size: 15502 byte(s)
removing some comments.  small other touchups.

1 #!/usr/bin/perl
2
3 # $Id$
4
5 use strict;
6 use warnings;
7 use DBI;
8 use Data::Dumper;
9 use Digest::MD5 qw(md5 md5_hex );
10 use File::Path;
11 use Schedule::Cron::Events;
12
13
14 $| = 1;
15 $Data::Dumper::Purity = 1;
16
17 my $ETC_DIR = "/etc/scire";
18 my $SCIRE_CONFIG_FILE = "${ETC_DIR}/scireserver.conf";
19 my %conf;
20 my $LOGFILE;
21
22 my $conf_file = (defined($conf{config})) ? $conf{config} : $SCIRE_CONFIG_FILE;
23 read_config_file($conf_file);
24 Dumper(\%conf);
25
26 my $identified = 0; #Global variable to determine if already identified or not.
27 my $client_id = 0; #Clobal variable for the client id.
28 # Somehow this feels insecure.
29
30 sub logger {
31 my $line = shift;
32 if(!defined $LOGFILE) {
33 open(*LOGFILE, ">>$conf{logfile}") or die "Cannot open logfile $conf{logfile}";
34 }
35 print LOGFILE localtime() . " " . $line . "\n";
36 }
37
38 sub debug {
39 my $line = shift;
40 if ($conf{debug}) {
41 if (defined($conf{logfile})) {
42 logger("DEBUG: ${line}");
43 } else {
44 print STDERR "DEBUG: ${line}\n";
45 }
46 }
47 }
48
49 #Connect to the Database.
50 my $connect_string = "DBI:$conf{db_type}:$conf{db_name};host=$conf{db_host}";
51 debug("Connecting to $connect_string");
52 my $dbh = DBI->connect($connect_string, $conf{db_user}, $conf{db_passwd}, { RaiseError => 1 } )
53 or die "Could not connect to database: $DBI::errstr";
54
55 while(<>) {
56 my ($command, @args) = parse_command($_);
57 # chomp( my $line = $_);
58 debug("DEBUG: line is: $_");
59 # SEE http://agaffney.org/mediawiki/index.php/SSH-based_protocol for documentation on the protocol.
60
61 if($command eq "QUIT") {
62 print "OK\n";
63 exit;
64 }
65
66 if($command eq "REGISTER") {
67 my ($mac,$ip,$hostname) = @args;
68 register_client($mac, $ip, $hostname);
69 next; #End switch here. You can go no further.
70 }
71
72 if($command eq "IDENTIFY") {
73 my $fingerprint = $args[0];
74 identify_client($fingerprint);
75 next; #End switch here. You can go no further.
76 }
77 unless($identified == 1) {
78 print "ERROR This client has not yet been authorized. Please identify!\n";
79 next;
80 }
81
82 if ($command eq "GET_JOBS") {
83 my @jobs = get_jobs();
84 print "OK " . join(",", @jobs) . "\n";
85 } elsif ($command eq "GET_JOB") {
86 my $job = $args[0];
87 my $jobfile = get_job($job);
88 print "OK ${jobfile}\n";
89 } elsif ($command eq "JOB_FETCHED") {
90 my $job = $args[0];
91 job_fetched($job) and print "OK\n";
92 } elsif ($command eq "SET_JOB_STATUS") {
93 my ($jobid,$status) = @args;
94 set_job_status($jobid,$client_id,$status) and print "OK\n";
95 } elsif ($command eq "RETURN_JOBFILE") {
96 my $jobid = $args[0];
97 my @filenames = ("$conf{job_dir}/$client_id/result/$jobid.stdout", "$conf{job_dir}/$client_id/result/$jobid.stderr");
98 print "OK " . join(" ", @filenames) . "\n";
99 } elsif ($command eq "JOBFILE_SENT") {
100 my @jobfiles = @args;
101 process_jobfile($_) foreach(@jobfiles);
102 print "OK\n"
103 } else {
104 print "ERROR The command $command is unknown. Please try again.\n";
105 }
106 }
107
108
109
110 sub read_config_file {
111 my $conf_file = shift;
112 open(FH, "< ${conf_file}") or die("Couldn't open the config file ${conf_file}: $!");
113 while (<FH>) {
114 chomp;
115 next if /^\s*(?:#|$)/;
116 if(/^\s*(.+?)\s*=\s*(.+?)\s*(?:#.*)?$/) {
117 unless(defined($conf{lc($1)})) { #Don't overwrite anything specified in cmdline
118 $conf{lc($1)} = $2;
119 }
120 }
121 }
122 close(FH) or die("Couldn't close the config file ${conf_file}: $!");
123 debug("Conf file $conf_file read.");
124 }
125
126 #New clients must be registered so they can be given a key to use (perhaps for job file transfers?) for authentication. This must be allowed before identifying.
127 sub register_client {
128 my ($mac,$ip, $hostname) = @_;
129 #Validate your inputs!
130 $mac =~ /^[a-zA-Z0-9\:]+$/ or print "ERROR invalid mac $mac!\n";
131 $ip =~ /^[a-zA-Z0-9\.\:]+$/ or print "ERROR invalid ip $ip!\n";
132
133 my ($query, $status_id, $id, $sth);
134
135 #Generate the digest
136 my $digest = md5_hex(time()."${mac}${ip}${hostname}");
137
138 eval {
139 $query = 'SELECT statusid FROM client_status WHERE statusname = "Pending"';
140 $sth = run_query($query);
141 $status_id = $sth->fetchrow_hashref->{'statusid'};
142 };
143 ($@) and print "ERROR Could not get status id: $DBI::errstr\n";
144
145 eval {
146 run_query('LOCK TABLES `gacl_axo_seq` WRITE');
147 $query = 'SELECT id FROM `gacl_axo_seq`';
148 $sth = run_query($query);
149 $id = $sth->fetchrow_hashref->{'id'};
150 $id += 1;
151 $query = 'UPDATE `gacl_axo_seq` SET id=?';
152 run_query($query,$id);
153 run_query('UNLOCK TABLES');
154 };
155 ($@) and print "ERROR during fetching of id sequence: $DBI::errstr\n";
156
157 eval {
158 $query = 'INSERT INTO `gacl_axo` (id,section_value,value,order_value,name,hidden) VALUES (?,"clients",?,"1",?,"0")';
159 run_query($query,$id,$hostname,$hostname);
160 #NOTE: not sure if this query is still valid. may be using id instead of hostname for one of those two now.
161
162 $query = 'INSERT INTO clients (clientid,digest,hostname,mac,ip,status) VALUES (?,?,?,?,?,?)';
163 run_query($query,$id,$digest,$hostname,$mac,$ip,$status_id);
164 };
165 ($@) and print "ERROR Could not insert client with $query: $DBI::errstr\n";
166 #FIXME look for "duplicate key" and if found fail and notify admin.
167
168 print "OK $digest\n";
169 }
170
171 #Identify the client by looking up the fingerprint in the database, and matching it up.
172 sub identify_client {
173 my $digest = shift;
174 #Validate your inputs!
175 $digest =~ s/"//g; #Clear the quotes.
176 $digest =~ /^[A-Za-z0-9]+$/ or print "ERROR invalid digest!\n";
177
178 my $query = 'SELECT client_status.statusname, clients.clientid FROM clients JOIN client_status on (clients.status = client_status.statusid) WHERE clients.digest=?';
179 my $sth = run_query($query,$digest);
180 my $hashref = $sth->fetchrow_hashref();
181 debug(Dumper($hashref));
182 my $status_name = $hashref->{'statusname'};
183 $client_id = $hashref->{'clientid'};
184 if (defined($client_id) and ($client_id > 0) and ($status_name eq 'Active')) {
185 $identified = 1;
186 print "OK\n";
187 } else {
188 print "ERROR Client could not be identified. Status was $status_name\n";
189 }
190
191 }
192
193 sub get_jobs {
194
195 #FIXME expand jobs for $client_id
196 expand_jobs();
197
198 my $query = <<'EndOfQuery';
199 SELECT jobs.jobid
200 FROM jobs NATURAL JOIN jobs_clients NATURAL JOIN job_conditions
201 WHERE jobs_clients.clientid = ?
202 AND jobs.jobid = jobs_clients.jobid
203 AND (job_conditions.deploy_time < now())
204 AND ((job_conditions.expiration_time > now()) OR (job_conditions.expiration_time IS NULL))
205 ORDER BY jobs.priority,jobs.created
206 EndOfQuery
207
208 #FIXME ADD JOB DEPENDENCIES TO THIS QUERY.
209 my $sth = run_query($query,$client_id);
210 my $jobs_ref = $sth->fetchall_arrayref();
211 # Don't ask me...ask the guys in #perl :P
212 my @jobs = map { @$_ } @$jobs_ref;
213 return @jobs;
214 }
215
216 sub get_job {
217 my $jobid = shift;
218 #Validate your inputs!
219 my $query = 'SELECT * FROM jobs LEFT JOIN job_conditions on (jobs.jobid) WHERE jobs.jobid = ?';
220 my $sth = run_query($query, $jobid);
221 my $job = $sth->fetchrow_hashref();
222 my $scriptid = $job->{'script'};
223
224 $query = 'SELECT * FROM scripts WHERE scriptid=?';
225 $sth = run_query($query,$scriptid);
226 $job->{'script'} = $sth->fetchrow_hashref();
227
228 debug(Dumper($job));
229 #Write the job w/ all data to a jobfile with the following path /JOBDIR/CLIENT_ID/queue/JOBID.job
230 my $path = "$conf{job_dir}/$client_id/queue";
231 my $filename = "$path/$jobid.job";
232 unless (-d $path) {
233 print "WARNING! $path does not exist...creating\n";
234 mkpath( $path, {verbose => 1, mode => 0660})
235 or die("Couldn't make $path w/ perms 0660: $!");
236 }
237 open(FH, ">$filename") or die("Couldn't open $filename: $!");
238 my $jobdata = Dumper($job);
239 print FH $jobdata . "\n";
240 close(FH) or die("Couldn't close $filename : $!");
241 debug("OK $filename");
242 return $filename;
243 }
244
245 sub job_fetched {
246 my $jobid = shift;
247 set_job_status($jobid,$client_id,'Downloaded', 'Job downloaded by client.') or print "ERROR could not set job status to downloaded.\n";
248
249 eval {
250 my $query = 'DELETE FROM jobs_clients WHERE jobid=? AND clientid=?';
251 run_query($query,$jobid,$client_id);
252 };
253 ($@) and print "ERROR Could not get status id: $DBI::errstr\n";
254
255 my $filename = "$conf{job_dir}/$client_id/queue/$jobid.job";
256 unlink ($filename) or die("ERROR Could not unlink the jobfile from the queue. filename: $filename : $!");
257 return 1;
258 }
259
260 sub set_job_status {
261 my ($jobid,$id_of_client,$status,$eventmsg) = @_;
262 #Validate your inputs!
263 $jobid =~ /^\d+$/ or die("Invalid jobid $jobid");
264 $id_of_client ||= $client_id;
265 $id_of_client =~ /^\d+$/ or die("Invalid id of client $id_of_client");
266 $eventmsg ||= "Server status update.";
267 #fixme validate status
268 my $status_id;
269 eval {
270 my $query = 'SELECT statusid FROM jobs_status WHERE statusname = ?';
271 my $sth = run_query($query,$status);
272 $status_id = $sth->fetchrow_hashref->{'statusid'};
273 };
274 ($@) and print "ERROR Could not get status id: $DBI::errstr\n";
275 $status_id or print "ERROR Invalid status id $status_id\n";
276
277 eval {
278 my $query = 'INSERT INTO job_history (jobid,clientid,statusid,eventmsg) VALUES (?,?,?,?)';
279 run_query($query,$jobid,$id_of_client,$status_id,$eventmsg);
280 };
281 ($@) and print "ERROR Could not insert into job_history: $DBI::errstr\n";
282
283
284 #If we're marking the completetion or failure of a job, we have more work to do here.
285 if ($status eq 'Failed') {
286 mark_job_as_failed($jobid,$id_of_client);
287 } elsif ($status eq 'Completed') {
288 mark_job_as_completed($jobid,$id_of_client);
289 }
290
291 return 1;
292 }
293
294 sub parse_command {
295 my $line = shift;
296 chomp $line;
297 my @parts = split / (?!(?:[^" ]|[^"] [^"])+")/, $line;
298 for(0..$#parts) {
299 $parts[$_] =~ s/(^"|"$)//g;
300 $parts[$_] =~ s/\\"/"/g;
301 }
302 return @parts;
303 }
304
305 sub run_query {
306 my ($query, @params) = @_;
307 debug("Query is $query");
308 my $sth = $dbh->prepare($query);
309 $sth->execute(@params);
310 return $sth;
311 }
312
313
314 sub expand_jobs {
315 #Searches for the group jobs that the client must be into and does the expansion.
316 my @groups = get_client_groups();
317 foreach my $groupid (@groups) {
318 debug("Groupid is $groupid");
319 my @members = get_group_clients($groupid);
320 eval {
321 my $query = <<'EndOfQuery2';
322 SELECT DISTINCT(jobs_clients.jobid)
323 FROM jobs_clients LEFT JOIN job_conditions on (jobs_clients.jobid=job_conditions.jobid)
324 WHERE jobs_clients.groupid = ?
325 AND (job_conditions.deploy_time < now())
326 AND ((job_conditions.expiration_time > now()) OR (job_conditions.expiration_time IS NULL))
327 AND ((job_conditions.last_run_date < job_conditions.deploy_time) OR (job_conditions.last_run_date IS NULL))
328 EndOfQuery2
329 my $sth = run_query($query,$groupid);
330 run_query('LOCK TABLES `jobs_clients` WRITE, `job_conditions` WRITE, `job_history` WRITE, `jobs_status` WRITE, `jobs` WRITE');
331 #FIXME need to lock jobs_clients for READ as well!!!
332 while( my $jobref = $sth->fetchrow_hashref() ) {
333 my $jobid = $jobref->{'jobid'};
334 foreach my $member (@members) {
335 $query = 'INSERT INTO jobs_clients (jobid, clientid) VALUES (?,?)';
336 my $sth2 = run_query($query,$jobid,$member);
337
338 set_job_status($jobid,$member,'Pending', 'Job expanded.') or print "ERROR could not add expanded jobs to job_history.\n";
339 }
340 $query = 'UPDATE `job_conditions` SET last_run_date = now() WHERE jobid = ?';
341 run_query($query,$jobid);
342
343 $query = 'UPDATE `jobs` SET pending=pending+? WHERE jobid = ?';
344 run_query($query,$#members,$jobid); #This works because you want one less b/c of removing the group.
345
346 # One last query to remove the row from jobs_clients so someone else doesn't expand it.
347 $query = 'DELETE FROM `jobs_clients` WHERE groupid=? AND jobid=?';
348 run_query($query,$groupid,$jobid);
349
350 }
351
352 run_query('UNLOCK TABLES');
353 };
354 ($@) and print "ERROR Could not expand jobs: $@ $DBI::errstr\n";
355 return undef;
356 }
357 }
358
359 sub mark_job_as_failed {
360 my ($jobid,$id_of_client) = @_;
361
362 #First off, update the pending count and failed count with the result.
363 eval {
364 my $query = 'UPDATE jobs SET pending=pending-1, failed=failed+1 WHERE jobid=?';
365 run_query($query);
366 };
367 ($@) and print "ERROR Could not update pending count: $@ $DBI::errstr\n";
368 }
369
370 sub mark_job_as_completed {
371 my ($jobid,$id_of_client) = @_;
372 my ($query,$sth);
373 debug("Marking $jobid as completed for client $id_of_client");
374 #If we succeeded, we need to check this jobid to see if it is a recurring job, and then set the next_run if necessary.
375 #This requries looking at the pending count for the job as well as the run_schedule.
376
377 #First off, update the pending count now that we've finished.
378 eval {
379 $query = 'UPDATE jobs SET pending=pending-1 WHERE jobid=?';
380 run_query($query,$jobid);
381 };
382 ($@) and print "ERROR Could not update pending count: $@ $DBI::errstr\n";
383
384 #Now get the pending count and run_schedule.
385 eval {
386 $query = 'SELECT pending,run_schedule,expiration_time,deploy_time FROM jobs WHERE jobid=?';
387 my $sth = run_query($query,$jobid);
388 my $rowref = $sth->fetchrow_hashref();
389 my ($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst)=localtime(time);
390 my $datetime = sprintf "%4d-%02d-%02d %02d:%02d:%02d\n",$year+1900,$mon+1,$mday,$hour,$min,$sec;
391 if ($rowref->{'run_schedule'} and ($rowref->{'pending'} == 0) and ( $rowref->{'expiration_time'} > $datetime)) {
392 #Determine the next run time.
393 my $cron = new Schedule::Cron::Events( $rowref->{'run_schedule'}, Date => [ ( localtime(time()) )[0..5] ] );
394 my ($sec, $min, $hour, $day, $month, $year) = $cron->nextEvent;
395 printf("Event will start next at %2d:%02d:%02d on %d %s, %d\n", $hour, $min, $sec, $day, $month, ($year+1900));
396
397 #Get the groups and clients from the recurring_jobs_clients table.
398 $query = 'SELECT clientid,groupid FROM recurring_jobs_clients WHERE jobid=?';
399 my $sth2 = run_query($query,$jobid);
400 while( my $recrow_ref = $sth2->fetchrow_hashref() ) {
401 $query = 'INSERT INTO jobs_clients (jobid,clientid,groupid) VALUES (?,?,?)';
402 run_query($query,$jobid,$recrow_ref->{'clientid'},$recrow_ref->{'groupid'});
403 }
404 }
405
406 };
407 ($@) and print "ERROR Could not get run_schedule and pending count: $@ $DBI::errstr\n";
408
409 }
410
411 sub process_jobfile {
412 my $filename = shift;
413
414 }
415
416 #########################################################
417 # PHPGACL FUNCTIONS
418 #########################################################
419
420 sub get_client_groups {
421 my $query;
422 my @groups;
423 my $option = 'NO RECURSE';
424 # If RECURSE it will get all ancestor groups. defaults to only get direct parents.
425
426 debug("get_object_groups(): Object ID: $client_id, option: $option");
427 my $object_type = 'axo';
428 my $group_table = 'gacl_axo_groups';
429 my $map_table = 'gacl_groups_axo_map';
430
431 if ($option eq 'RECURSE') {
432 $query = "SELECT DISTINCT g.id as group_id FROM $map_table gm ";
433 $query .= "LEFT JOIN $group_table g1 ON g1.id=gm.group_id ";
434 $query .= "LEFT JOIN $group_table g ON g.lft<=g1.lft AND g.rgt>=g1.rgt";
435 } else {
436 $query = "SELECT gm.group_id FROM $map_table gm ";
437 }
438 $query .= " WHERE gm.axo_id=?";
439 debug("Query is $query");
440 eval {
441 my $sth = $dbh->prepare($query);
442 $sth->execute($client_id);
443 my $groups_ref = $sth->fetchall_arrayref();
444 # Don't ask me...ask the guys in #perl :P
445 @groups = map { @$_ } @$groups_ref;
446 };
447 ($@) and print "ERROR Could not get client groups: $DBI::errstr\n";
448 return @groups;
449 }
450
451 sub get_group_clients {
452 #This function gets the members of groups. Returns an array containing those clients, empty otherwise.
453 my $groupid = shift;
454 my @members;
455 my $query = 'SELECT axo_id FROM gacl_groups_axo_map WHERE group_id = ?';
456 debug("Query is $query");
457 eval {
458 my $sth = $dbh->prepare($query);
459 $sth->execute($groupid);
460 my $members_ref = $sth->fetchall_arrayref();
461 # Don't ask me...ask the guys in #perl :P
462 @members = map { @$_ } @$members_ref;
463 };
464 ($@) and print "ERROR Could not get group members: $DBI::errstr\n";
465 return @members;
466 }

Properties

Name Value
svn:executable *
svn:keywords Id

  ViewVC Help
Powered by ViewVC 1.1.20