implement aggregate_locking design
authorJoey Hess <joey@kodama.kitenet.net>
Sun, 3 Feb 2008 21:48:26 +0000 (16:48 -0500)
committerJoey Hess <joey@kodama.kitenet.net>
Sun, 3 Feb 2008 21:48:26 +0000 (16:48 -0500)
Now aggregation will not lock the wiki. Any changes made during aggregaton are
merged in with the changed state accumulated while aggregating. A separate
lock file prevents multiple concurrent aggregators. Garbage collection
of orphaned guids is much improved. loadstate() is only called once
per process, so tricky support for reloading wiki state is not needed.

(Tested fairly thuroughly.)

IkiWiki/Plugin/aggregate.pm
debian/changelog
doc/todo/aggregate_locking.mdwn
po/ikiwiki.pot

index cfc4ec955328e2dfa812b04b4cacd683ba336884..ba40ee6bcb42f57f2f3929db25a5f0fa75aeeb34 100644 (file)
@@ -33,33 +33,62 @@ sub getopt () { #{{{
 sub checkconfig () { #{{{
        if ($config{aggregate} && ! ($config{post_commit} && 
                                     IkiWiki::commit_hook_enabled())) {
-               if (! IkiWiki::lockwiki(0)) {
-                       debug("wiki is locked by another process, not aggregating");
-                       exit 1;
-               }
-       
+               # See if any feeds need aggregation.
                loadstate();
-               IkiWiki::loadindex();
-               aggregate();
-               expire();
-               savestate();
-               clearstate();
+               my @feeds=needsaggregate();
+               return unless @feeds;
+               if (! lockaggregate()) {
+                       debug("an aggregation process is already running");
+                       return;
+               }
+               # force a later rebuild of source pages
+               $IkiWiki::forcerebuild{$_->{sourcepage}}=1
+                       foreach @feeds;
+
+               # Fork a child process to handle the aggregation.
+               # The parent process will then handle building the
+               # result. This avoids messy code to clear state
+               # accumulated while aggregating.
+               defined(my $pid = fork) or error("Can't fork: $!");
+               if (! $pid) {
+                       IkiWiki::loadindex();
+
+                       # Aggregation happens without the main wiki lock
+                       # being held. This allows editing pages etc while
+                       # aggregation is running.
+                       aggregate(@feeds);
+
+                       IkiWiki::lockwiki;
+                       # Merge changes, since aggregation state may have
+                       # changed on disk while the aggregation was happening.
+                       mergestate();
+                       expire();
+                       savestate();
+                       IkiWiki::unlockwiki;
+                       exit 0;
+               }
+               waitpid($pid,0);
+               if ($?) {
+                       error "aggregation failed with code $?";
+               }
 
-               IkiWiki::unlockwiki();
+               clearstate();
+               unlockaggregate();
        }
 } #}}}
 
 sub needsbuild (@) { #{{{
        my $needsbuild=shift;
        
-       loadstate(); # if not already loaded
+       loadstate();
 
        foreach my $feed (values %feeds) {
                if (exists $pagesources{$feed->{sourcepage}} && 
                    grep { $_ eq $pagesources{$feed->{sourcepage}} } @$needsbuild) {
-                       # Mark all feeds originating on this page as removable;
-                       # preprocess will unmark those that still exist.
-                       remove_feeds($feed->{sourcepage});
+                       # Mark all feeds originating on this page as 
+                       # not yet seen; preprocess will unmark those that
+                       # still exist.
+                       markunseen($feed->{sourcepage});
                }
        }
 } # }}}
@@ -92,8 +121,7 @@ sub preprocess (@) { #{{{
        $feed->{updateinterval}=defined $params{updateinterval} ? $params{updateinterval} * 60 : 15 * 60;
        $feed->{expireage}=defined $params{expireage} ? $params{expireage} : 0;
        $feed->{expirecount}=defined $params{expirecount} ? $params{expirecount} : 0;
-       delete $feed->{remove};
-       delete $feed->{expired};
+       delete $feed->{unseen};
        $feed->{lastupdate}=0 unless defined $feed->{lastupdate};
        $feed->{numposts}=0 unless defined $feed->{numposts};
        $feed->{newposts}=0 unless defined $feed->{newposts};
@@ -123,16 +151,27 @@ sub delete (@) { #{{{
        # Remove feed data for removed pages.
        foreach my $file (@files) {
                my $page=pagename($file);
-               remove_feeds($page);
+               markunseen($page);
+       }
+} #}}}
+
+sub markunseen ($) { #{{{
+       my $page=shift;
+
+       foreach my $id (keys %feeds) {
+               if ($feeds{$id}->{sourcepage} eq $page) {
+                       $feeds{$id}->{unseen}=1;
+               }
        }
 } #}}}
 
 my $state_loaded=0;
+
 sub loadstate () { #{{{
        return if $state_loaded;
        $state_loaded=1;
        if (-e "$config{wikistatedir}/aggregate") {
-               open(IN, "<", "$config{wikistatedir}/aggregate") ||
+               open(IN, "$config{wikistatedir}/aggregate") ||
                        die "$config{wikistatedir}/aggregate: $!";
                while (<IN>) {
                        $_=IkiWiki::possibly_foolish_untaint($_);
@@ -166,32 +205,13 @@ sub loadstate () { #{{{
 
 sub savestate () { #{{{
        return unless $state_loaded;
+       garbage_collect();
        eval q{use HTML::Entities};
        error($@) if $@;
        my $newfile="$config{wikistatedir}/aggregate.new";
        my $cleanup = sub { unlink($newfile) };
-       open (OUT, ">", $newfile) || error("open $newfile: $!", $cleanup);
+       open (OUT, ">$newfile") || error("open $newfile: $!", $cleanup);
        foreach my $data (values %feeds, values %guids) {
-               if ($data->{remove}) {
-                       if ($data->{name}) {
-                               foreach my $guid (values %guids) {
-                                       if ($guid->{feed} eq $data->{name}) {
-                                               $guid->{remove}=1;
-                                       }
-                               }
-                       }
-                       else {
-                               unlink pagefile($data->{page})
-                                       if exists $data->{page};
-                       }
-                       next;
-               }
-               elsif ($data->{expired} && exists $data->{page}) {
-                       unlink pagefile($data->{page});
-                       delete $data->{page};
-                       delete $data->{md5};
-               }
-
                my @line;
                foreach my $field (keys %$data) {
                        if ($field eq "name" || $field eq "feed" ||
@@ -212,6 +232,63 @@ sub savestate () { #{{{
                error("rename $newfile: $!", $cleanup);
 } #}}}
 
+sub garbage_collect () { #{{{
+       foreach my $name (keys %feeds) {
+               # remove any feeds that were not seen while building the pages
+               # that used to contain them
+               if ($feeds{$name}->{unseen}) {
+                       delete $feeds{$name};
+               }
+       }
+
+       foreach my $guid (values %guids) {
+               # any guid whose feed is gone should be removed
+               if (! exists $feeds{$guid->{feed}}) {
+                       unlink pagefile($guid->{page})
+                               if exists $guid->{page};
+                       delete $guids{$guid->{guid}};
+               }
+               # handle expired guids
+               elsif ($guid->{expired} && exists $guid->{page}) {
+                       unlink pagefile($guid->{page});
+                       delete $guid->{page};
+                       delete $guid->{md5};
+               }
+       }
+} #}}}
+
+sub mergestate () { #{{{
+       # Load the current state in from disk, and merge into it
+       # values from the state in memory that might have changed
+       # during aggregation.
+       my %myfeeds=%feeds;
+       my %myguids=%guids;
+       clearstate();
+       loadstate();
+
+       # All that can change in feed state during aggregation is a few
+       # fields.
+       foreach my $name (keys %myfeeds) {
+               if (exists $feeds{$name}) {
+                       foreach my $field (qw{message lastupdate numposts
+                                             newposts error}) {
+                               $feeds{$name}->{$field}=$myfeeds{$name}->{$field};
+                       }
+               }
+       }
+
+       # New guids can be created during aggregation.
+       # It's also possible that guids were removed from the on-disk state
+       # while the aggregation was in process. That would only happen if
+       # their feed was also removed, so any removed guids added back here
+       # will be garbage collected later.
+       foreach my $guid (keys %myguids) {
+               if (! exists $guids{$guid}) {
+                       $guids{$guid}=$myguids{$guid};
+               }
+       }
+} #}}}
+
 sub clearstate () { #{{{
        %feeds=();
        %guids=();
@@ -249,7 +326,12 @@ sub expire () { #{{{
        }
 } #}}}
 
-sub aggregate () { #{{{
+sub needsaggregate () { #{{{
+       return values %feeds if $config{rebuild};
+       return grep { time - $_->{lastupdate} >= $_->{updateinterval} } values %feeds;
+} #}}}
+
+sub aggregate (@) { #{{{
        eval q{use XML::Feed};
        error($@) if $@;
        eval q{use URI::Fetch};
@@ -257,15 +339,12 @@ sub aggregate () { #{{{
        eval q{use HTML::Entities};
        error($@) if $@;
 
-       foreach my $feed (values %feeds) {
-               next unless $config{rebuild} || 
-                       time - $feed->{lastupdate} >= $feed->{updateinterval};
+       foreach my $feed (@_) {
                $feed->{lastupdate}=time;
                $feed->{newposts}=0;
                $feed->{message}=sprintf(gettext("processed ok at %s"),
                        displaytime($feed->{lastupdate}));
                $feed->{error}=0;
-               $IkiWiki::forcerebuild{$feed->{sourcepage}}=1;
 
                debug(sprintf(gettext("checking feed %s ..."), $feed->{name}));
 
@@ -473,18 +552,6 @@ sub htmlabs ($$) { #{{{
        return $ret;
 } #}}}
 
-sub remove_feeds () { #{{{
-       my $page=shift;
-
-       my %removed;
-       foreach my $id (keys %feeds) {
-               if ($feeds{$id}->{sourcepage} eq $page) {
-                       $feeds{$id}->{remove}=1;
-                       $removed{$id}=1;
-               }
-       }
-} #}}}
-
 sub pagefile ($) { #{{{
        my $page=shift;
 
index 99ee5cdc2b362da57db7e0ce02ad3c4ef3fb4626..1266666e3b4a40b7c062e037944137b87f7d92db 100644 (file)
@@ -6,19 +6,22 @@ ikiwiki (2.31) UNRELEASED; urgency=low
     that contributes to a page's content and using the youngest of them all,
     as well as special cases for things like the version plugin, and it's just
     too complex to do.
+  * aggregate: Forking a child broke the one state that mattered: Forcing
+    the aggregating page to be rebuilt. Fix this.
   * cgi hooks are now run before ikiwiki state is loaded.
   * This allows locking the wiki before loading state, which avoids some
     tricky locking code when saving a web edit.
   * poll: This plugin turns out to have edited pages w/o doing any locking.
     Oops. Convert it from a cgi to a sessioncgi hook, which will work
     much better.
-  * aggregate: Revert use of forking to not save state, that was not the right
-    approach.
   * recentchanges: Improve handling of links on the very static changes pages
     by thunking to the CGI, which can redirect to the page, or allow it to be
     created if it doesn't exist.
   * recentchanges: Exipre all *._change pages, even if the directory
     they're in has changed.
+  * aggregate: Lots of changes; aggregation can now run without locking the
+    wiki, and there is a separate aggregatelock to prevent multiple concurrent
+    aggregation runs.
 
  -- Joey Hess <joeyh@debian.org>  Sat, 02 Feb 2008 23:36:31 -0500
 
index 91df662a7125f85c1f0253f83cbdef9b5ddd648a..b6c82e923758789316aa3c17c3e10e71aeb3edad 100644 (file)
@@ -46,16 +46,12 @@ would be loaded, and there would be no reason to worry about aggregating.
 
 Or aggregation could be kept in checkconfig, like so:
 
-* lock wiki
 * load aggregation state
-* unlock wiki
 * get list of feeds needing aggregation
 * exit if none
 * attempt to take aggregation lock, exit if another aggregation is happening
 * fork a child process to do the aggregation
-  * lock wiki
   * load wiki state (needed for aggregation to run)
-  * unlock wiki
   * aggregate
   * lock wiki
   * reload aggregation state
@@ -64,3 +60,5 @@ Or aggregation could be kept in checkconfig, like so:
 * drop aggregation lock
 * force rebuild of sourcepages of feeds that were aggregated
 * exit checkconfig and continue with usual refresh process
+
+[[done]]
index 69e231ada2f1d975d46fefd4076c883aba1f42fc..62810a687c68546049d56ab983be6659f38b0ce2 100644 (file)
@@ -8,7 +8,7 @@ msgid ""
 msgstr ""
 "Project-Id-Version: PACKAGE VERSION\n"
 "Report-Msgid-Bugs-To: \n"
-"POT-Creation-Date: 2008-02-03 14:52-0500\n"
+"POT-Creation-Date: 2008-02-03 16:05-0500\n"
 "PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n"
 "Last-Translator: FULL NAME <EMAIL@ADDRESS>\n"
 "Language-Team: LANGUAGE <LL@li.org>\n"
@@ -67,67 +67,67 @@ msgstr ""
 msgid "You are banned."
 msgstr ""
 
-#: ../IkiWiki/Plugin/aggregate.pm:72
+#: ../IkiWiki/Plugin/aggregate.pm:100
 #, perl-format
 msgid "missing %s parameter"
 msgstr ""
 
-#: ../IkiWiki/Plugin/aggregate.pm:100
+#: ../IkiWiki/Plugin/aggregate.pm:127
 msgid "new feed"
 msgstr ""
 
-#: ../IkiWiki/Plugin/aggregate.pm:114
+#: ../IkiWiki/Plugin/aggregate.pm:141
 msgid "posts"
 msgstr ""
 
-#: ../IkiWiki/Plugin/aggregate.pm:116
+#: ../IkiWiki/Plugin/aggregate.pm:143
 msgid "new"
 msgstr ""
 
-#: ../IkiWiki/Plugin/aggregate.pm:232
+#: ../IkiWiki/Plugin/aggregate.pm:307
 #, perl-format
 msgid "expiring %s (%s days old)"
 msgstr ""
 
-#: ../IkiWiki/Plugin/aggregate.pm:239
+#: ../IkiWiki/Plugin/aggregate.pm:314
 #, perl-format
 msgid "expiring %s"
 msgstr ""
 
-#: ../IkiWiki/Plugin/aggregate.pm:265
+#: ../IkiWiki/Plugin/aggregate.pm:343
 #, perl-format
 msgid "processed ok at %s"
 msgstr ""
 
-#: ../IkiWiki/Plugin/aggregate.pm:270
+#: ../IkiWiki/Plugin/aggregate.pm:347
 #, perl-format
 msgid "checking feed %s ..."
 msgstr ""
 
-#: ../IkiWiki/Plugin/aggregate.pm:275
+#: ../IkiWiki/Plugin/aggregate.pm:352
 #, perl-format
 msgid "could not find feed at %s"
 msgstr ""
 
-#: ../IkiWiki/Plugin/aggregate.pm:290
+#: ../IkiWiki/Plugin/aggregate.pm:367
 msgid "feed not found"
 msgstr ""
 
-#: ../IkiWiki/Plugin/aggregate.pm:301
+#: ../IkiWiki/Plugin/aggregate.pm:378
 #, perl-format
 msgid "(invalid UTF-8 stripped from feed)"
 msgstr ""
 
-#: ../IkiWiki/Plugin/aggregate.pm:307
+#: ../IkiWiki/Plugin/aggregate.pm:384
 #, perl-format
 msgid "(feed entities escaped)"
 msgstr ""
 
-#: ../IkiWiki/Plugin/aggregate.pm:313
+#: ../IkiWiki/Plugin/aggregate.pm:390
 msgid "feed crashed XML::Feed!"
 msgstr ""
 
-#: ../IkiWiki/Plugin/aggregate.pm:387
+#: ../IkiWiki/Plugin/aggregate.pm:464
 #, perl-format
 msgid "creating new page %s"
 msgstr ""